基于Netty的设备接入网关系统实战
作者:系统管理员
摘要
基于Netty的设备接入网关系统实战
基于Netty的设备接入网关系统实战
本文将完整实现一套企业级设备接入网关,涵盖Netty长连接核心特性(主从Reactor、粘包解包、心跳保活、设备认证)、网关集群、Redis/MySQL集群适配,提供可直接落地的Java代码及全流程说明。
一、整体架构设计
核心组件
- Netty网关层
:主从Reactor模型,处理设备TCP长连接、粘包解包、心跳、认证
- 集群层
:Netty网关集群+Redis集群(主从+哨兵)+MySQL主从
- 转发层
:基于设备ID的请求路由转发
- 存储层
:Redis缓存设备状态,MySQL持久化设备信息
技术栈
- •
核心:Netty 4.1.x
- •
存储:Redis 6.x(主从+哨兵)、MySQL 8.x(主从)
- •
序列化:Protobuf(高效二进制,适合物联网场景)
- •
工具:Lombok、HikariCP、Redisson
二、核心代码实现
1. 基础依赖(Maven)
<dependencies> <!-- Netty核心 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.94.Final</version> </dependency> <!-- Protobuf --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.24.4</version> </dependency> <!-- Redis --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.23.3</version> </dependency> <!-- MySQL --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.33</version> </dependency> <dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP</artifactId> <version>5.0.1</version> </dependency> <!-- 工具 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> </dependency> </dependencies>2. 协议定义(Protobuf)
定义设备通信协议(解决粘包/解包、数据结构化问题)
syntax = "proto3"; package com.device.gateway.protocol; option java_outer_classname = "DeviceProtocol"; // 设备消息体 message DeviceMessage { string deviceId = 1; // 设备ID int32 messageType = 2; // 消息类型:1-认证 2-心跳 3-业务数据 4-响应 bytes data = 3; // 业务数据 int64 timestamp = 4; // 时间戳 string authCode = 5; // 认证码(仅认证消息使用) }编译后生成DeviceProtocol类,用于Netty编解码。
3. Netty粘包/解包处理器
基于长度字段的拆包器(解决TCP粘包/半包问题)
package com.device.gateway.netty.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; /** * 粘包解包器:基于长度字段 * 协议格式:长度字段(4字节) + protobuf数据 */ public class DeviceFrameDecoder extends LengthFieldBasedFrameDecoder { // 最大帧长度:1024*1024=1MB private static final int MAX_FRAME_LENGTH = 1024 * 1024; // 长度字段偏移量:0 private static final int LENGTH_FIELD_OFFSET = 0; // 长度字段长度:4字节(int) private static final int LENGTH_FIELD_LENGTH = 4; // 长度调整值:0 private static final int LENGTH_ADJUSTMENT = 0; // 跳过初始字节数:4(跳过长度字段) private static final int INITIAL_BYTES_TO_STRIP = 4; public DeviceFrameDecoder() { super(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { // 校验最小长度(长度字段+至少1字节数据) if (in.readableBytes() < LENGTH_FIELD_LENGTH + 1) { return null; } return super.decode(ctx, in); } } /** * Protobuf编码器:添加长度字段 */ public class DeviceFrameEncoder extends io.netty.handler.codec.MessageToByteEncoder<DeviceProtocol.DeviceMessage> { @Override protected void encode(ChannelHandlerContext ctx, DeviceProtocol.DeviceMessage msg, ByteBuf out) throws Exception { // 序列化Protobuf byte[] data = msg.toByteArray(); // 写入长度字段(4字节) out.writeInt(data.length); // 写入protobuf数据 out.writeBytes(data); } } /** * Protobuf编解码器 */ public class DeviceProtobufCodec { public static DeviceFrameDecoder decoder() { return new DeviceFrameDecoder(); } public static DeviceFrameEncoder encoder() { return new DeviceFrameEncoder(); } }4. 心跳检测处理器
基于Netty的IdleStateHandler实现心跳保活
package com.device.gateway.netty.heartbeat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; /** * 心跳处理器: * - 读空闲(设备30秒未发数据):发送心跳请求 * - 读空闲60秒:断开连接 */ @Slf4j public class HeartbeatHandler extends ChannelInboundHandlerAdapter { // 读空闲超时时间(秒) public static final int READER_IDLE_TIME = 30; // 写空闲超时时间(秒) public static final int WRITER_IDLE_TIME = 0; // 全空闲超时时间(秒) public static final int ALL_IDLE_TIME = 0; // 心跳失败次数 private int heartbeatFailCount = 0; // 最大心跳失败次数 private static final int MAX_HEARTBEAT_FAIL_COUNT = 2; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { heartbeatFailCount++; log.warn("设备[{}]读空闲,心跳失败次数:{}", ctx.channel().id(), heartbeatFailCount); // 超过最大失败次数,断开连接 if (heartbeatFailCount >= MAX_HEARTBEAT_FAIL_COUNT) { log.error("设备[{}]心跳超时,断开连接", ctx.channel().id()); ctx.close(); return; } // 发送心跳请求 DeviceProtocol.DeviceMessage heartbeatMsg = DeviceProtocol.DeviceMessage.newBuilder() .setMessageType(2) // 2-心跳 .setTimestamp(System.currentTimeMillis()) .build(); ctx.writeAndFlush(heartbeatMsg); } } else { super.userEventTriggered(ctx, evt); } } // 收到心跳响应,重置失败次数 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof DeviceProtocol.DeviceMessage) { DeviceProtocol.DeviceMessage message = (DeviceProtocol.DeviceMessage) msg; if (message.getMessageType() == 2) { heartbeatFailCount = 0; log.debug("收到设备[{}]心跳响应", message.getDeviceId()); } } super.channelRead(ctx, msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("心跳处理器异常", cause); ctx.close(); } }5. 设备认证处理器
package com.device.gateway.netty.auth; import com.device.gateway.redis.DeviceRedisService; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 设备认证处理器:首次连接必须认证,否则断开 */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceAuthHandler extends ChannelInboundHandlerAdapter { private final DeviceRedisService deviceRedisService; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof DeviceProtocol.DeviceMessage) { DeviceProtocol.DeviceMessage message = (DeviceProtocol.DeviceMessage) msg; // 检查是否已认证 Boolean isAuth = deviceRedisService.getDeviceAuthState(message.getDeviceId()); if (isAuth != null && isAuth) { // 已认证,透传消息 ctx.fireChannelRead(msg); return; } // 未认证,检查是否是认证消息 if (message.getMessageType() != 1) { log.warn("设备[{}]未认证,拒绝非认证消息", message.getDeviceId()); ctx.close(); return; } // 执行认证 boolean authSuccess = authenticate(message); if (authSuccess) { // 认证成功,缓存认证状态 deviceRedisService.setDeviceAuthState(message.getDeviceId(), true, 3600 * 24); // 绑定设备ID到Channel ctx.channel().attr(AttributeKey.valueOf("DEVICE_ID")).set(message.getDeviceId()); // 记录连接信息 deviceRedisService.setDeviceChannel(message.getDeviceId(), ctx.channel().id().asLongText()); log.info("设备[{}]认证成功", message.getDeviceId()); // 发送认证成功响应 DeviceProtocol.DeviceMessage response = DeviceProtocol.DeviceMessage.newBuilder() .setDeviceId(message.getDeviceId()) .setMessageType(4) // 4-响应 .setData("auth_success".getBytes()) .setTimestamp(System.currentTimeMillis()) .build(); ctx.writeAndFlush(response); // 透传消息 ctx.fireChannelRead(msg); } else { log.warn("设备[{}]认证失败,断开连接", message.getDeviceId()); ctx.close(); } } else { ctx.fireChannelRead(msg); } } /** * 设备认证逻辑:校验设备ID和认证码 */ private boolean authenticate(DeviceProtocol.DeviceMessage message) { // 从MySQL/Redis获取设备认证信息 String storedAuthCode = deviceRedisService.getDeviceAuthCode(message.getDeviceId()); if (storedAuthCode == null) { // 从MySQL主库查询 storedAuthCode = deviceRedisService.loadDeviceAuthCodeFromDb(message.getDeviceId()); if (storedAuthCode == null) { return false; } // 缓存到Redis deviceRedisService.setDeviceAuthCode(message.getDeviceId(), storedAuthCode, 3600); } // 校验认证码 return storedAuthCode.equals(message.getAuthCode()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("设备认证异常", cause); ctx.close(); } }6. 主从Reactor模型的Netty服务端
package com.device.gateway.netty.server; import com.device.gateway.netty.auth.DeviceAuthHandler; import com.device.gateway.netty.codec.DeviceProtobufCodec; import com.device.gateway.netty.heartbeat.HeartbeatHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.TimeUnit; /** * Netty服务端:主从Reactor模型 * - BossGroup:处理连接请求(主Reactor) * - WorkerGroup:处理IO读写(从Reactor) */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class NettyDeviceServer { // 监听端口 @Value("${netty.server.port:8888}") private int port; // Boss线程数:CPU核心数 private static final int BOSS_THREADS = Runtime.getRuntime().availableProcessors(); // Worker线程数:CPU核心数*2 private static final int WORKER_THREADS = Runtime.getRuntime().availableProcessors() * 2; // BossGroup:主Reactor private EventLoopGroup bossGroup; // WorkerGroup:从Reactor private EventLoopGroup workerGroup; private final DeviceAuthHandler deviceAuthHandler; private final DeviceMessageHandler deviceMessageHandler; /** * 启动Netty服务 */ @PostConstruct public void start() { bossGroup = new NioEventLoopGroup(BOSS_THREADS); workerGroup = new NioEventLoopGroup(WORKER_THREADS); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) // NIO服务端通道 .channel(NioServerSocketChannel.class) // 连接队列大小 .option(ChannelOption.SO_BACKLOG, 1024) // 开启TCP保活 .childOption(ChannelOption.SO_KEEPALIVE, true) // 禁用Nagle算法(低延迟) .childOption(ChannelOption.TCP_NODELAY, true) // 通道初始化 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 1. 心跳检测:读空闲30秒 pipeline.addLast(new IdleStateHandler( HeartbeatHandler.READER_IDLE_TIME, HeartbeatHandler.WRITER_IDLE_TIME, HeartbeatHandler.ALL_IDLE_TIME, TimeUnit.SECONDS )); // 2. 心跳处理器 pipeline.addLast(new HeartbeatHandler()); // 3. 粘包解包器 pipeline.addLast(DeviceProtobufCodec.decoder()); pipeline.addLast(DeviceProtobufCodec.encoder()); // 4. 设备认证处理器 pipeline.addLast(deviceAuthHandler); // 5. 业务消息处理器 pipeline.addLast(deviceMessageHandler); } }); // 绑定端口,同步等待成功 ChannelFuture future = bootstrap.bind(port).sync(); log.info("Netty设备网关启动成功,监听端口:{}", port); // 等待服务端关闭 future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("Netty服务启动异常", e); Thread.currentThread().interrupt(); } } /** * 停止Netty服务 */ @PreDestroy public void stop() { log.info("开始停止Netty设备网关"); if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } log.info("Netty设备网关停止完成"); } }7. 设备消息转发处理器
package com.device.gateway.netty.handler; import com.device.gateway.redis.DeviceRedisService; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 设备消息转发处理器:根据设备ID路由消息 */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { private final DeviceRedisService deviceRedisService; private final DeviceMessageRouter deviceMessageRouter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof DeviceProtocol.DeviceMessage) { DeviceProtocol.DeviceMessage message = (DeviceProtocol.DeviceMessage) msg; String deviceId = message.getDeviceId(); log.debug("收到设备[{}]消息,类型:{}", deviceId, message.getMessageType()); // 1. 更新设备最后活跃时间 deviceRedisService.updateDeviceLastActiveTime(deviceId); // 2. 消息转发(根据业务类型路由) deviceMessageRouter.route(message); // 3. 响应处理(如果需要) if (message.getMessageType() == 3) { // 业务数据 DeviceProtocol.DeviceMessage response = DeviceProtocol.DeviceMessage.newBuilder() .setDeviceId(deviceId) .setMessageType(4) .setData("success".getBytes()) .setTimestamp(System.currentTimeMillis()) .build(); ctx.writeAndFlush(response); } } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 设备断开连接,清理缓存 String deviceId = ctx.channel().attr(AttributeKey.valueOf("DEVICE_ID")).get(); if (deviceId != null) { log.info("设备[{}]断开连接", deviceId); deviceRedisService.clearDeviceState(deviceId); } super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("消息处理器异常", cause); ctx.close(); } } /** * 消息路由器:集群内转发消息 */ @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceMessageRouter { private final DeviceRedisService deviceRedisService; private final NettyClusterService nettyClusterService; /** * 路由消息到目标设备 */ public void route(DeviceProtocol.DeviceMessage message) { String deviceId = message.getDeviceId(); // 1. 获取设备所在的网关节点 String gatewayNode = deviceRedisService.getDeviceGatewayNode(deviceId); if (gatewayNode == null) { log.warn("设备[{}]未在线", deviceId); return; } // 2. 判断是否是当前节点 String currentNode = nettyClusterService.getCurrentNodeId(); if (currentNode.equals(gatewayNode)) { // 当前节点,直接处理 processLocalMessage(message); } else { // 其他节点,集群转发 nettyClusterService.forwardMessage(gatewayNode, message); } } /** * 处理本地消息 */ private void processLocalMessage(DeviceProtocol.DeviceMessage message) { // 业务处理逻辑:如保存数据、调用业务接口等 log.debug("处理本地设备[{}]消息", message.getDeviceId()); } }8. Redis集群配置(主从+哨兵)
package com.device.gateway.redis; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Redis集群配置:主从+哨兵 */ @Configuration public class RedisConfig { @Value("${redis.sentinel.master-name:mymaster}") private String masterName; @Value("${redis.sentinel.nodes:127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381}") private String sentinelNodes; @Value("${redis.password:}") private String password; @Bean public RedissonClient redissonClient() { Config config = new Config(); // 哨兵模式配置 config.useSentinelServers() .setMasterName(masterName) .addSentinelAddress(parseSentinelNodes()) .setPassword(password.isEmpty() ? null : password) .setDatabase(0) .setConnectTimeout(3000) .setTimeout(3000) .setRetryAttempts(3) .setRetryInterval(1000); return Redisson.create(config); } /** * 解析哨兵节点为Redis地址格式 */ private String[] parseSentinelNodes() { String[] nodes = sentinelNodes.split(","); for (int i = 0; i < nodes.length; i++) { nodes[i] = "redis://" + nodes[i].trim(); } return nodes; } } /** * 设备Redis服务 */ @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceRedisService { private final RedissonClient redissonClient; private final DeviceMapper deviceMapper; // 设备认证状态Key:DEVICE_AUTH:{deviceId} private static final String KEY_DEVICE_AUTH = "DEVICE_AUTH:%s"; // 设备认证码Key:DEVICE_AUTH_CODE:{deviceId} private static final String KEY_DEVICE_AUTH_CODE = "DEVICE_AUTH_CODE:%s"; // 设备通道Key:DEVICE_CHANNEL:{deviceId} private static final String KEY_DEVICE_CHANNEL = "DEVICE_CHANNEL:%s"; // 设备网关节点Key:DEVICE_GATEWAY:{deviceId} private static final String KEY_DEVICE_GATEWAY = "DEVICE_GATEWAY:%s"; // 设备最后活跃时间Key:DEVICE_LAST_ACTIVE:{deviceId} private static final String KEY_DEVICE_LAST_ACTIVE = "DEVICE_LAST_ACTIVE:%s"; /** * 设置设备认证状态 */ public void setDeviceAuthState(String deviceId, boolean state, long expireSeconds) { String key = String.format(KEY_DEVICE_AUTH, deviceId); redissonClient.getBucket(key).set(state, expireSeconds, TimeUnit.SECONDS); } /** * 获取设备认证状态 */ public Boolean getDeviceAuthState(String deviceId) { String key = String.format(KEY_DEVICE_AUTH, deviceId); return redissonClient.getBucket(key).get(); } /** * 设置设备认证码缓存 */ public void setDeviceAuthCode(String deviceId, String authCode, long expireSeconds) { String key = String.format(KEY_DEVICE_AUTH_CODE, deviceId); redissonClient.getBucket(key).set(authCode, expireSeconds, TimeUnit.SECONDS); } /** * 获取设备认证码缓存 */ public String getDeviceAuthCode(String deviceId) { String key = String.format(KEY_DEVICE_AUTH_CODE, deviceId); return redissonClient.getBucket(key).get(); } /** * 从MySQL加载设备认证码 */ public String loadDeviceAuthCodeFromDb(String deviceId) { // 从MySQL主库查询 return deviceMapper.getDeviceAuthCode(deviceId); } /** * 设置设备通道信息 */ public void setDeviceChannel(String deviceId, String channelId) { String key = String.format(KEY_DEVICE_CHANNEL, deviceId); redissonClient.getBucket(key).set(channelId); } /** * 设置设备网关节点 */ public void setDeviceGatewayNode(String deviceId, String nodeId) { String key = String.format(KEY_DEVICE_GATEWAY, deviceId); redissonClient.getBucket(key).set(nodeId); } /** * 获取设备网关节点 */ public String getDeviceGatewayNode(String deviceId) { String key = String.format(KEY_DEVICE_GATEWAY, deviceId); return redissonClient.getBucket(key).get(); } /** * 更新设备最后活跃时间 */ public void updateDeviceLastActiveTime(String deviceId) { String key = String.format(KEY_DEVICE_LAST_ACTIVE, deviceId); redissonClient.getBucket(key).set(System.currentTimeMillis()); } /** * 清理设备状态 */ public void clearDeviceState(String deviceId) { redissonClient.getBucket(String.format(KEY_DEVICE_AUTH, deviceId)).delete(); redissonClient.getBucket(String.format(KEY_DEVICE_CHANNEL, deviceId)).delete(); redissonClient.getBucket(String.format(KEY_DEVICE_GATEWAY, deviceId)).delete(); } }9. MySQL主从配置
# application.yml spring: datasource: # 主库配置 master: jdbc-url: jdbc:mysql://master-mysql:3306/device_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver # 从库配置 slave: jdbc-url: jdbc:mysql://slave-mysql:3306/device_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver type: com.zaxxer.hikari.HikariDataSource hikari: maximum-pool-size: 10 minimum-idle: 5 idle-timeout: 300000 connection-timeout: 20000package com.device.gateway.db; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; /** * MySQL主从数据源配置 */ @Configuration public class DataSourceConfig { /** * 主库数据源 */ @Bean(name = "masterDataSource") @ConfigurationProperties("spring.datasource.master") public DataSource masterDataSource() { return DataSourceBuilder.create().build(); } /** * 从库数据源 */ @Bean(name = "slaveDataSource") @ConfigurationProperties("spring.datasource.slave") public DataSource slaveDataSource() { return DataSourceBuilder.create().build(); } /** * 动态数据源路由 */ @Bean(name = "dynamicDataSource") public AbstractRoutingDataSource dynamicDataSource( @Qualifier("masterDataSource") DataSource masterDataSource, @Qualifier("slaveDataSource") DataSource slaveDataSource) { Map<Object, Object> targetDataSources = new HashMap<>(); targetDataSources.put("master", masterDataSource); targetDataSources.put("slave", slaveDataSource); DynamicRoutingDataSource routingDataSource = new DynamicRoutingDataSource(); routingDataSource.setTargetDataSources(targetDataSources); routingDataSource.setDefaultTargetDataSource(masterDataSource); return routingDataSource; } /** * 懒加载数据源(优化性能) */ @Bean @Primary public DataSource dataSource(@Qualifier("dynamicDataSource") AbstractRoutingDataSource dynamicDataSource) { return new LazyConnectionDataSourceProxy(dynamicDataSource); } } /** * 动态数据源路由 */ public class DynamicRoutingDataSource extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { return DataSourceContextHolder.getDataSourceType(); } } /** * 数据源上下文 Holder */ public class DataSourceContextHolder { private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>(); public static void setDataSourceType(String type) { CONTEXT_HOLDER.set(type); } public static String getDataSourceType() { return CONTEXT_HOLDER.get() == null ? "master" : CONTEXT_HOLDER.get(); } public static void clearDataSourceType() { CONTEXT_HOLDER.remove(); } } /** * 数据源注解:标记读从库 */ @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface ReadOnly { } /** * 数据源切面:自动切换主从库 */ @Aspect @Component @Order(1) public class DataSourceAspect { @Around("@annotation(readOnly)") public Object around(ProceedingJoinPoint joinPoint, ReadOnly readOnly) throws Throwable { try { DataSourceContextHolder.setDataSourceType("slave"); return joinPoint.proceed(); } finally { DataSourceContextHolder.clearDataSourceType(); } } }10. Netty网关集群服务
package com.device.gateway.cluster; import com.device.gateway.redis.DeviceRedisService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.UUID; /** * Netty网关集群服务 */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class NettyClusterService { @Value("${netty.cluster.node-id:}") private String nodeId; @Value("${netty.server.port:8888}") private int port; private final DeviceRedisService deviceRedisService; @PostConstruct public void init() { // 生成节点ID:IP+端口 或 配置的节点ID if (nodeId == null || nodeId.isEmpty()) { try { String ip = InetAddress.getLocalHost().getHostAddress(); nodeId = ip + ":" + port; } catch (UnknownHostException e) { nodeId = UUID.randomUUID().toString().substring(0, 8); } } log.info("Netty网关集群节点ID:{}", nodeId); } /** * 获取当前节点ID */ public String getCurrentNodeId() { return nodeId; } /** * 转发消息到指定节点 * 实现方式: * 1. Redis Pub/Sub * 2. HTTP接口 * 3. Netty集群通信 */ public void forwardMessage(String targetNode, DeviceProtocol.DeviceMessage message) { // 示例:使用Redis Pub/Sub转发 String channel = "DEVICE_MESSAGE:" + targetNode; deviceRedisService.getRedissonClient().getTopic(channel).publish(message.toByteArray()); log.debug("转发消息到节点[{}],设备[{}]", targetNode, message.getDeviceId()); } }三、核心功能说明
1. 主从Reactor模型
- •BossGroup(主Reactor)
:处理TCP连接请求,默认CPU核心数线程
- •WorkerGroup(从Reactor)
:处理IO读写,默认CPU核心数*2线程
- •
优势:高并发、高吞吐量,适合大量设备长连接场景
2. 粘包/解包处理
- •
采用
LengthFieldBasedFrameDecoder,基于长度字段的拆包方式 - •
协议格式:4字节长度 + Protobuf数据
- •
解决TCP粘包/半包问题,保证数据完整性
3. 心跳保活
- •
基于
IdleStateHandler检测读空闲(设备30秒未发数据) - •
心跳失败2次(总计60秒)断开连接
- •
收到心跳响应重置失败计数,保证长连接稳定性
4. 设备认证
- •
首次连接必须发送认证消息(设备ID+认证码)
- •
认证信息缓存到Redis,避免频繁查询DB
- •
未认证设备拒绝所有非认证消息,保障安全性
5. 设备转发
- •
基于设备ID路由消息,通过Redis记录设备所在网关节点
- •
同节点直接处理,跨节点通过Redis Pub/Sub转发
- •
支持网关集群部署,负载均衡
6. Redis集群(主从+哨兵)
- •
哨兵模式实现Redis高可用,自动故障转移
- •
缓存设备状态、认证信息、连接信息,提升性能
- •
过期策略保证缓存有效性
7. MySQL集群(主从)
- •
主库写入,从库读取,读写分离提升性能
- •
注解+切面自动切换数据源,对业务透明
- •
设备基础信息持久化存储
四、部署与扩展
1. 单机部署
- •
启动Redis哨兵集群(1主2从3哨兵)
- •
启动MySQL主从复制
- •
启动Netty网关服务
2. 集群部署
- •
多台服务器部署Netty网关,使用相同Redis集群
- •
设备连接任意网关节点,通过Redis路由消息
- •
网关节点无状态,可水平扩展
3. 监控与运维
- •
监控Redis集群状态(主从切换、哨兵状态)
- •
监控MySQL主从同步延迟
- •
监控Netty网关连接数、消息吞吐量、心跳成功率
- •
设备离线告警、认证失败告警
五、总结
本方案完整实现了基于Netty的设备接入网关核心功能,解决了长连接管理、粘包处理、心跳保活、设备认证、集群转发等关键问题,并适配了Redis/MySQL集群保证高可用。代码结构清晰,可根据实际业务需求扩展:
- •
协议扩展:支持JSON、自定义二进制协议
- •
加密扩展:添加SSL/TLS加密通信
- •
负载均衡:接入Nginx/TCP负载均衡器
- •
监控扩展:集成Prometheus/Grafana监控指标
原文链接: https://1024bat.cn/article/33
来源: 淘书1024bat