开发者

使用java项目搭建一个netty服务

映入依赖,只要保证有这个依赖,就不需要单独引入依赖,支持多个端口直连,支持多个实现层解析数据,

  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-Redis</artifactId>
  <version>3.3.4</version>

yml配置

# TCP设备对接
iot:
  device:
    port1: 1883
    port2: 1885
package com.cqcloud.platform.handler;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.cqcloud.platform.service.IotNbIotMqttService;
import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.service.implwww.devze.com.IotNbIotServiceImpl;
import com.cqcloud.platform.service.impl.IotPushServiceImpl;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NIOServerSocketChannel;
import jakarta.annotation.PostConstruct;
/**
 * @author weimeilayer@gmail.com ✨
 * @date  2022年3月8日 
 */
@Component
public class NettyTcpServer {
	/**
	 * 用于自设备1协议端口
	 */
    private static int PORT1;
    /**
     * 来自设备2协议端口
     */
    private static int PORT2;

    @Value("${iot.device.port1}")
    public int port1Value;

    @Value("${iot.device.port2}")
    public int port2Value;
    
    @PostConstruct
    public void init() {
        PORT1 = port1Value;
        PORT2 = port2Value;
    }

	public void start() throws Exception {
		final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
	    final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
	    try {
	        ServerBootstrap bootstrap = new ServerBootstrap();
	        // 创建 MqttService 和 MqttPushService 实例
	        IotNbIotMqttService iotNbIotMqttService = new IotNbIotServiceImpl();
	        IotPushService iotPushService = new IotPushServiceImpl();

	        bootstrap.group(bossGroup, workerGroup)
	            .channel(NioServerSocketChannel.class)
	            .childHandler(new ChannelInitializer<SocketChannel>() {
	                @Override
	                protected void initChannel(SocketChannel ch) {
	                    ChannelPipeline pipeline = ch.pipeline();
	                    // 直接使用 ByteBuf,无需编码器和解码器
	                    // 根据端口注入不同的服务
	                    if (ch.localAddress().getPort() == PORT1) {
	                        pipeline.addLast(new TcpIotNbServerHandler(iotNbIotMqttService)); // 业务逻辑处理器
	                    } else if (ch.localAddress().getPort() == PORT2) {
	                        pipeline.addLast(new TcpIotServerHandler(iotPushService)); // 新处理器
	             编程客栈       }
	                }
	            });

	        // 绑定第一个端口并启动
	        ChannelFuture future1 = bootstrap.bind(PORT1).sync();
	        // 绑定第二个端口并启动
	        ChannelFuture future2 = bootstrap.bind(PORT2).sync();

	        // 等待服务器关闭
	        future1.channel().closeFuture().sync();
	        future2.channel().closeFuture().sync();
	    } finally {
	        // 优雅地关闭线程池
	        workerGroup.shutdownGracefully();
	        bossGroup.shutdownGracefully();
	    }
	}
}

启动类需要

public static void main(String[] args) throws IOException {
		ConfigurableEnvironment env = new SpringApplication(DynamicYearningApplication.class).run(args).getEnvironment();
		String envPort = env.getProperty("server.port");
		String port = Objects.isNull(envPort) ? "8000" : envPort;
		String envContext = env.getProperty("server.servlet.context-path");
		String contextPath = Objects.isNull(envContext) ? "" : envContext;
		String path = port + contextPath + "/doc.html";
		String externalAPI = InetAddress.getLocalHost().getHostAddress();
		Console.log("Access URLs:\n\t--------------------------------------------编程客栈-----------------------------\n\tLocal-swagger: \t\thttp://127.0.0.1:{}\n\tExternal-swagger: \thttp://{}:{}\n\t-------------------------------------------------------------------------",path, externalAPI, path);
		// 加上以下代码
		NettyTcpServer server = new NettyTcpServer();
		try {
			server.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

创建TcpIotServerHandler

package com.cqcloud.platform.handler;

import com.baomidou.myBATisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.cqcloud.platform.entity.IotCommandRecords;
import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.utils.DeviceActionParser;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import Java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
 * 设备协议
 * @author weimeilayer@gmail.com ✨
 * @date  2022年3月8日  
 */
@Slf4j
public class TcpIotServerHandler extends SimpleChannelInboundHandler<ByteBuf>  {

	// 接口注入
	private final IotPushService iotPushService;

	public TcpIotServerHandler(IotPushService iotPushService) {
		this.iotPushService = iotPushService;
	}
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
		byte[] byteArray;
		if (in.readableBytes() <= 0) {
			in.release();
			return;
		}
		byteArray = new byte[in.readableBytes()];
		in.readBytes(byteArray);
		if (byteArray.length <= 0) {
			in.release();
			return;
		}
		// 将消息传递给 iotPushService
		iotPushService.pushMessageArrived(byteArray);
	}
	// 发送响应的统一辅助方法
	private void sendResponse(ChannelHandlerContext ctx, String hexResponse) {
		byte[] responseBytes = hexStringToByteArray(hexResponse);
		ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
		ctx.writeAndFlush(responseBuffer);
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		// 打印异常堆栈跟踪,便于调试和错误排查
		cause.printStackTrace();
		// 关闭当前的通道,释放相关资源
		ctx.close();
	}
}

创建 TcpIotNbServerHandler

package com.cqcloud.platform.handler;

import com.cqcloud.platform.service.IotNbIotMqttService;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * NB-IOT CAT1数据格协议
 * 
 * @author weimeilayer@gmail.com
 * @date 2022年3月8日
 */
public class TcpIotNbServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
	private final IotNbIotMqttService iotNbIotMqttService;

	// 构造函数注入 MqttService
	public TcpIotNbServerHandler(IotNbIotMqttService iotNbIotMqttService) {
		this.iotNbIotMqttService = iotNbIotMqttService;
	}

	@Override
	public void channelRead0(ChannelHandlerContext ctx,ByteBuf in) {
		byte[] byteArray;
		if (in.readableBytes() <= 0) {
			in.release();
			return;
		}
		byteArray = new byte[in.readableBytes()];
		in.readBytes(byteArray);
		if (byteArray.length <= 0) {
			in.release();
			return;
		}
	    // 将 byte[] 数据传递给 iotNbIotMqttService
	    iotNbIotMqttService.messageArrived(byteArray); 
		//发送固定事件默认回复
        sendResponse(ctx);
	}
	
	// 发送响应的统一辅助方法
    private void sendResponse(ChannelHandlerContext ctx) {
    	// 回复客户端--向设备回复AAAA8001(设备将保持20秒不休眠),平台尽量在10秒
    	byte[] responseBytes = new byte[] { (byte) 0xAA, (byte) 0xAA, (byte) 0x80, (byte) 0x01 };
        ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
        ctx.writeAndFlush(responseBuffer);
    }
    
    //将响应消息转换为字节数组
    public static byte[] hexStringToByteArray(String s) {
        int len = s.length();
        byte[] data = new byte[len / 2];
        for (int i = 0; i < len; i += 2) {
            data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
                    + Character.digit(s.charAt(i + 1), 16));
        }
        return data;
    }
	@Ovwww.devze.comerride
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable caQOhZaVIcuse) {
		cause.printStackTrace();
		ctx.close();
	}
}

创建接口类IotPushService

package com.cqcloud.platform.service;

/**
 * @author weimeilayer@gmail.com
 * @date 2022年3月8日
 */
public interface IotPushService {
	public void pushMessageArrived(byte[] message);
}

创建IotNbIotMqttService 类

package com.cqcloud.platform.service;

/**
 * @author weimeilayer@gmail.com
 * @date 2022年3月8日
 */
public interface IotNbIotMqttService {

	public void messageArrived(byte[] message);
}

创建实现类IotNbIotServiceImpl

package com.cqcloud.platform.service.impl;

import org.springframework.stereotype.Service;

import com.cqcloud.platform.service.IotNbIotMqttService;
import com.cqcloud.platform.utils.DataParser;

import lombok.AllArgsConstructor;

/**
 * @author weimeilayer@gmail.com
 * @date 2022年3月8日
 */
@Service
@AllArgsConstructor
public class IotNbIotServiceImpl implements IotNbIotMqttService {

	@Override
	public void messageArrived(byte[] message) {
		// 将 byte 数组转换为十六进制字符串  
		String convertData = printByteArray(message);
		// 打印字节数组内容
		System.out.println("来自于xxx数据格式协议的1883端口的数据字节数组内容:"+ convertData);
        //调用解析方法
        dispatchMessage(convertData);
	}
	
	// 将 byte[] 转换为十六进制字符串的辅助方法  
    public static String bytesToHex(byte[] bytes) {  
        StringBuilder hex = new StringBuilder();  
        for (byte b : bytes) {  
            // 将每个字节转换为两位的十六进制表示  
            hex.append(String.format("%02X", b));  
        }  
        return hex.toString();  
    }  

	public static String printByteArray(byte[] byteArray) {
		StringBuilder hexString = new StringBuilder();
		for (byte b : byteArray) {
			// 将字节转换为无符号的十六进制字符串,去掉空格
			hexString.append(String.format("%02X", b & 0xFF));
		}
		System.out.println("Byte Array (Hex): " + hexString.toString());
		return hexString.toString();
	}
	
	public void dispatchMessage(String byteArray) {
        String prefix = byteArray.substring(0, 2);  
        // 根据 messageID 进行判断
          System.out.println("来自于数据格式协议来自于1883端口的数据处理消息:" +byteArray);
	}
}

创建 IotPushServiceImpl

package com.cqcloud.platform.service.impl;

import org.springframework.stereotype.Service;

import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.utils.DeviceActionParser;

import lombok.AllArgsConstructor;

/**
 * 发送指令实现类
 * @author weimeilayer@gmail.com
 * @date 2022年3月8日
 */
@Service
@AllArgsConstructor
public class IotPushServiceImpl implements IotPushService {

	@Override
	public void pushMessageArrived(byte[] message) {
		// 解析字节数组
		System.out.println("来自物联网平台的设备协议于1885端口的数据设备返回的的内容处理");
		//打印数据
		printByteArray(message);
		//调用解析方法
		dispatchMessage(message);
	}

	//设备回复的接受内容
	public static void dispatchMessage(byte[] byteArray) {
       
	}
	
	public static void printByteArray(byte[] byteArray) {
		StringBuilder hexString = new StringBuilder();
		for (byte b : byteArray) {
			// 将字节转换为无符号的十六进制字符串,去掉空格
			hexString.append(String.format("%02X", b & 0xFF));
		}
		System.out.println("Byte Array (Hex): " + hexString.toString());
	}

	 // 将十六进制字符串转换为字节数组的实用方法
    public static byte[] stringToBytes(String s) {
        int len = s.length();
        byte[] data = new byte[len / 2];
        for (int i = 0; i < len; i += 2) {
            data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
                                  + Character.digit(s.charAt(i+1), 16));
        }
        return data;
    }
	// 提取设备类型的十六进制字符串
    private static String extractDeviceTypeHex(byte[] byteArray) {
        // 转换为十六进制字符串
        String hexString = bytesToHex(byteArray);
        // 提取设备类型
        return hexString.substring(10, 12); // 设备类型的位数
    }

    // 辅助方法:将字节数组转换为十六进制字符串
    private static String bytesToHex(byte[] bytes) {
        StringBuilder hexString = new StringBuilder();
        for (byte b : bytes) {
            String hex = Integer.toHexString(0xFF & b);
            if (hex.length() == 1) {
                hexString.append('0'); // 确保每个字节都为两位
            }
            hexString.append(hex);
        }
        return hexString.toString().toUpperCase(); // 返回大写格式
    }

    // 将十六进制字符串转换为 byte
    private static byte hexStringToByte(String hex) {
        return (byte) Integer.parseInt(hex, 16);
    }
}

然后使用网络根据助手请求。

使用java项目搭建一个netty服务

以上就是使用java项目搭建一个netty服务的详细内容,更多关于java搭建netty服务的资料请关注编程客栈(www.devze.com)其它相关文章!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜