news 2026/4/2 13:54:16

基于Netty的设备接入网关系统实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于Netty的设备接入网关系统实战

基于Netty的设备接入网关系统实战

作者:系统管理员

摘要

基于Netty的设备接入网关系统实战


基于Netty的设备接入网关系统实战

本文将完整实现一套企业级设备接入网关,涵盖Netty长连接核心特性(主从Reactor、粘包解包、心跳保活、设备认证)、网关集群、Redis/MySQL集群适配,提供可直接落地的Java代码及全流程说明。

一、整体架构设计

核心组件

  1. Netty网关层

    :主从Reactor模型,处理设备TCP长连接、粘包解包、心跳、认证

  2. 集群层

    :Netty网关集群+Redis集群(主从+哨兵)+MySQL主从

  3. 转发层

    :基于设备ID的请求路由转发

  4. 存储层

    :Redis缓存设备状态,MySQL持久化设备信息

技术栈

  • 核心:Netty 4.1.x

  • 存储:Redis 6.x(主从+哨兵)、MySQL 8.x(主从)

  • 序列化:Protobuf(高效二进制,适合物联网场景)

  • 工具:Lombok、HikariCP、Redisson

二、核心代码实现

1. 基础依赖(Maven)

<dependencies> <!-- Netty核心 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.94.Final</version> </dependency> <!-- Protobuf --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.24.4</version> </dependency> <!-- Redis --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.23.3</version> </dependency> <!-- MySQL --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.33</version> </dependency> <dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP</artifactId> <version>5.0.1</version> </dependency> <!-- 工具 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> </dependency> </dependencies>

2. 协议定义(Protobuf)

定义设备通信协议(解决粘包/解包、数据结构化问题)

syntax = "proto3"; package com.device.gateway.protocol; option java_outer_classname = "DeviceProtocol"; // 设备消息体 message DeviceMessage { string deviceId = 1; // 设备ID int32 messageType = 2; // 消息类型:1-认证 2-心跳 3-业务数据 4-响应 bytes data = 3; // 业务数据 int64 timestamp = 4; // 时间戳 string authCode = 5; // 认证码(仅认证消息使用) }

编译后生成DeviceProtocol类,用于Netty编解码。

3. Netty粘包/解包处理器

基于长度字段的拆包器(解决TCP粘包/半包问题)

package com.device.gateway.netty.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; /** * 粘包解包器:基于长度字段 * 协议格式:长度字段(4字节) + protobuf数据 */ public class DeviceFrameDecoder extends LengthFieldBasedFrameDecoder { // 最大帧长度:1024*1024=1MB private static final int MAX_FRAME_LENGTH = 1024 * 1024; // 长度字段偏移量:0 private static final int LENGTH_FIELD_OFFSET = 0; // 长度字段长度:4字节(int) private static final int LENGTH_FIELD_LENGTH = 4; // 长度调整值:0 private static final int LENGTH_ADJUSTMENT = 0; // 跳过初始字节数:4(跳过长度字段) private static final int INITIAL_BYTES_TO_STRIP = 4; public DeviceFrameDecoder() { super(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { // 校验最小长度(长度字段+至少1字节数据) if (in.readableBytes() < LENGTH_FIELD_LENGTH + 1) { return null; } return super.decode(ctx, in); } } /** * Protobuf编码器:添加长度字段 */ public class DeviceFrameEncoder extends io.netty.handler.codec.MessageToByteEncoder<DeviceProtocol.DeviceMessage> { @Override protected void encode(ChannelHandlerContext ctx, DeviceProtocol.DeviceMessage msg, ByteBuf out) throws Exception { // 序列化Protobuf byte[] data = msg.toByteArray(); // 写入长度字段(4字节) out.writeInt(data.length); // 写入protobuf数据 out.writeBytes(data); } } /** * Protobuf编解码器 */ public class DeviceProtobufCodec { public static DeviceFrameDecoder decoder() { return new DeviceFrameDecoder(); } public static DeviceFrameEncoder encoder() { return new DeviceFrameEncoder(); } }

4. 心跳检测处理器

基于Netty的IdleStateHandler实现心跳保活

package com.device.gateway.netty.heartbeat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; /** * 心跳处理器: * - 读空闲(设备30秒未发数据):发送心跳请求 * - 读空闲60秒:断开连接 */ @Slf4j public class HeartbeatHandler extends ChannelInboundHandlerAdapter { // 读空闲超时时间(秒) public static final int READER_IDLE_TIME = 30; // 写空闲超时时间(秒) public static final int WRITER_IDLE_TIME = 0; // 全空闲超时时间(秒) public static final int ALL_IDLE_TIME = 0; // 心跳失败次数 private int heartbeatFailCount = 0; // 最大心跳失败次数 private static final int MAX_HEARTBEAT_FAIL_COUNT = 2; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { heartbeatFailCount++; log.warn("设备[{}]读空闲,心跳失败次数:{}", ctx.channel().id(), heartbeatFailCount); // 超过最大失败次数,断开连接 if (heartbeatFailCount >= MAX_HEARTBEAT_FAIL_COUNT) { log.error("设备[{}]心跳超时,断开连接", ctx.channel().id()); ctx.close(); return; } // 发送心跳请求 DeviceProtocol.DeviceMessage heartbeatMsg = DeviceProtocol.DeviceMessage.newBuilder() .setMessageType(2) // 2-心跳 .setTimestamp(System.currentTimeMillis()) .build(); ctx.writeAndFlush(heartbeatMsg); } } else { super.userEventTriggered(ctx, evt); } } // 收到心跳响应,重置失败次数 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof DeviceProtocol.DeviceMessage) { DeviceProtocol.DeviceMessage message = (DeviceProtocol.DeviceMessage) msg; if (message.getMessageType() == 2) { heartbeatFailCount = 0; log.debug("收到设备[{}]心跳响应", message.getDeviceId()); } } super.channelRead(ctx, msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("心跳处理器异常", cause); ctx.close(); } }

5. 设备认证处理器

package com.device.gateway.netty.auth; import com.device.gateway.redis.DeviceRedisService; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 设备认证处理器:首次连接必须认证,否则断开 */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceAuthHandler extends ChannelInboundHandlerAdapter { private final DeviceRedisService deviceRedisService; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof DeviceProtocol.DeviceMessage) { DeviceProtocol.DeviceMessage message = (DeviceProtocol.DeviceMessage) msg; // 检查是否已认证 Boolean isAuth = deviceRedisService.getDeviceAuthState(message.getDeviceId()); if (isAuth != null && isAuth) { // 已认证,透传消息 ctx.fireChannelRead(msg); return; } // 未认证,检查是否是认证消息 if (message.getMessageType() != 1) { log.warn("设备[{}]未认证,拒绝非认证消息", message.getDeviceId()); ctx.close(); return; } // 执行认证 boolean authSuccess = authenticate(message); if (authSuccess) { // 认证成功,缓存认证状态 deviceRedisService.setDeviceAuthState(message.getDeviceId(), true, 3600 * 24); // 绑定设备ID到Channel ctx.channel().attr(AttributeKey.valueOf("DEVICE_ID")).set(message.getDeviceId()); // 记录连接信息 deviceRedisService.setDeviceChannel(message.getDeviceId(), ctx.channel().id().asLongText()); log.info("设备[{}]认证成功", message.getDeviceId()); // 发送认证成功响应 DeviceProtocol.DeviceMessage response = DeviceProtocol.DeviceMessage.newBuilder() .setDeviceId(message.getDeviceId()) .setMessageType(4) // 4-响应 .setData("auth_success".getBytes()) .setTimestamp(System.currentTimeMillis()) .build(); ctx.writeAndFlush(response); // 透传消息 ctx.fireChannelRead(msg); } else { log.warn("设备[{}]认证失败,断开连接", message.getDeviceId()); ctx.close(); } } else { ctx.fireChannelRead(msg); } } /** * 设备认证逻辑:校验设备ID和认证码 */ private boolean authenticate(DeviceProtocol.DeviceMessage message) { // 从MySQL/Redis获取设备认证信息 String storedAuthCode = deviceRedisService.getDeviceAuthCode(message.getDeviceId()); if (storedAuthCode == null) { // 从MySQL主库查询 storedAuthCode = deviceRedisService.loadDeviceAuthCodeFromDb(message.getDeviceId()); if (storedAuthCode == null) { return false; } // 缓存到Redis deviceRedisService.setDeviceAuthCode(message.getDeviceId(), storedAuthCode, 3600); } // 校验认证码 return storedAuthCode.equals(message.getAuthCode()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("设备认证异常", cause); ctx.close(); } }

6. 主从Reactor模型的Netty服务端

package com.device.gateway.netty.server; import com.device.gateway.netty.auth.DeviceAuthHandler; import com.device.gateway.netty.codec.DeviceProtobufCodec; import com.device.gateway.netty.heartbeat.HeartbeatHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.TimeUnit; /** * Netty服务端:主从Reactor模型 * - BossGroup:处理连接请求(主Reactor) * - WorkerGroup:处理IO读写(从Reactor) */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class NettyDeviceServer { // 监听端口 @Value("${netty.server.port:8888}") private int port; // Boss线程数:CPU核心数 private static final int BOSS_THREADS = Runtime.getRuntime().availableProcessors(); // Worker线程数:CPU核心数*2 private static final int WORKER_THREADS = Runtime.getRuntime().availableProcessors() * 2; // BossGroup:主Reactor private EventLoopGroup bossGroup; // WorkerGroup:从Reactor private EventLoopGroup workerGroup; private final DeviceAuthHandler deviceAuthHandler; private final DeviceMessageHandler deviceMessageHandler; /** * 启动Netty服务 */ @PostConstruct public void start() { bossGroup = new NioEventLoopGroup(BOSS_THREADS); workerGroup = new NioEventLoopGroup(WORKER_THREADS); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) // NIO服务端通道 .channel(NioServerSocketChannel.class) // 连接队列大小 .option(ChannelOption.SO_BACKLOG, 1024) // 开启TCP保活 .childOption(ChannelOption.SO_KEEPALIVE, true) // 禁用Nagle算法(低延迟) .childOption(ChannelOption.TCP_NODELAY, true) // 通道初始化 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 1. 心跳检测:读空闲30秒 pipeline.addLast(new IdleStateHandler( HeartbeatHandler.READER_IDLE_TIME, HeartbeatHandler.WRITER_IDLE_TIME, HeartbeatHandler.ALL_IDLE_TIME, TimeUnit.SECONDS )); // 2. 心跳处理器 pipeline.addLast(new HeartbeatHandler()); // 3. 粘包解包器 pipeline.addLast(DeviceProtobufCodec.decoder()); pipeline.addLast(DeviceProtobufCodec.encoder()); // 4. 设备认证处理器 pipeline.addLast(deviceAuthHandler); // 5. 业务消息处理器 pipeline.addLast(deviceMessageHandler); } }); // 绑定端口,同步等待成功 ChannelFuture future = bootstrap.bind(port).sync(); log.info("Netty设备网关启动成功,监听端口:{}", port); // 等待服务端关闭 future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("Netty服务启动异常", e); Thread.currentThread().interrupt(); } } /** * 停止Netty服务 */ @PreDestroy public void stop() { log.info("开始停止Netty设备网关"); if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } log.info("Netty设备网关停止完成"); } }

7. 设备消息转发处理器

package com.device.gateway.netty.handler; import com.device.gateway.redis.DeviceRedisService; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 设备消息转发处理器:根据设备ID路由消息 */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { private final DeviceRedisService deviceRedisService; private final DeviceMessageRouter deviceMessageRouter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof DeviceProtocol.DeviceMessage) { DeviceProtocol.DeviceMessage message = (DeviceProtocol.DeviceMessage) msg; String deviceId = message.getDeviceId(); log.debug("收到设备[{}]消息,类型:{}", deviceId, message.getMessageType()); // 1. 更新设备最后活跃时间 deviceRedisService.updateDeviceLastActiveTime(deviceId); // 2. 消息转发(根据业务类型路由) deviceMessageRouter.route(message); // 3. 响应处理(如果需要) if (message.getMessageType() == 3) { // 业务数据 DeviceProtocol.DeviceMessage response = DeviceProtocol.DeviceMessage.newBuilder() .setDeviceId(deviceId) .setMessageType(4) .setData("success".getBytes()) .setTimestamp(System.currentTimeMillis()) .build(); ctx.writeAndFlush(response); } } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 设备断开连接,清理缓存 String deviceId = ctx.channel().attr(AttributeKey.valueOf("DEVICE_ID")).get(); if (deviceId != null) { log.info("设备[{}]断开连接", deviceId); deviceRedisService.clearDeviceState(deviceId); } super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("消息处理器异常", cause); ctx.close(); } } /** * 消息路由器:集群内转发消息 */ @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceMessageRouter { private final DeviceRedisService deviceRedisService; private final NettyClusterService nettyClusterService; /** * 路由消息到目标设备 */ public void route(DeviceProtocol.DeviceMessage message) { String deviceId = message.getDeviceId(); // 1. 获取设备所在的网关节点 String gatewayNode = deviceRedisService.getDeviceGatewayNode(deviceId); if (gatewayNode == null) { log.warn("设备[{}]未在线", deviceId); return; } // 2. 判断是否是当前节点 String currentNode = nettyClusterService.getCurrentNodeId(); if (currentNode.equals(gatewayNode)) { // 当前节点,直接处理 processLocalMessage(message); } else { // 其他节点,集群转发 nettyClusterService.forwardMessage(gatewayNode, message); } } /** * 处理本地消息 */ private void processLocalMessage(DeviceProtocol.DeviceMessage message) { // 业务处理逻辑:如保存数据、调用业务接口等 log.debug("处理本地设备[{}]消息", message.getDeviceId()); } }

8. Redis集群配置(主从+哨兵)

package com.device.gateway.redis; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Redis集群配置:主从+哨兵 */ @Configuration public class RedisConfig { @Value("${redis.sentinel.master-name:mymaster}") private String masterName; @Value("${redis.sentinel.nodes:127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381}") private String sentinelNodes; @Value("${redis.password:}") private String password; @Bean public RedissonClient redissonClient() { Config config = new Config(); // 哨兵模式配置 config.useSentinelServers() .setMasterName(masterName) .addSentinelAddress(parseSentinelNodes()) .setPassword(password.isEmpty() ? null : password) .setDatabase(0) .setConnectTimeout(3000) .setTimeout(3000) .setRetryAttempts(3) .setRetryInterval(1000); return Redisson.create(config); } /** * 解析哨兵节点为Redis地址格式 */ private String[] parseSentinelNodes() { String[] nodes = sentinelNodes.split(","); for (int i = 0; i < nodes.length; i++) { nodes[i] = "redis://" + nodes[i].trim(); } return nodes; } } /** * 设备Redis服务 */ @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceRedisService { private final RedissonClient redissonClient; private final DeviceMapper deviceMapper; // 设备认证状态Key:DEVICE_AUTH:{deviceId} private static final String KEY_DEVICE_AUTH = "DEVICE_AUTH:%s"; // 设备认证码Key:DEVICE_AUTH_CODE:{deviceId} private static final String KEY_DEVICE_AUTH_CODE = "DEVICE_AUTH_CODE:%s"; // 设备通道Key:DEVICE_CHANNEL:{deviceId} private static final String KEY_DEVICE_CHANNEL = "DEVICE_CHANNEL:%s"; // 设备网关节点Key:DEVICE_GATEWAY:{deviceId} private static final String KEY_DEVICE_GATEWAY = "DEVICE_GATEWAY:%s"; // 设备最后活跃时间Key:DEVICE_LAST_ACTIVE:{deviceId} private static final String KEY_DEVICE_LAST_ACTIVE = "DEVICE_LAST_ACTIVE:%s"; /** * 设置设备认证状态 */ public void setDeviceAuthState(String deviceId, boolean state, long expireSeconds) { String key = String.format(KEY_DEVICE_AUTH, deviceId); redissonClient.getBucket(key).set(state, expireSeconds, TimeUnit.SECONDS); } /** * 获取设备认证状态 */ public Boolean getDeviceAuthState(String deviceId) { String key = String.format(KEY_DEVICE_AUTH, deviceId); return redissonClient.getBucket(key).get(); } /** * 设置设备认证码缓存 */ public void setDeviceAuthCode(String deviceId, String authCode, long expireSeconds) { String key = String.format(KEY_DEVICE_AUTH_CODE, deviceId); redissonClient.getBucket(key).set(authCode, expireSeconds, TimeUnit.SECONDS); } /** * 获取设备认证码缓存 */ public String getDeviceAuthCode(String deviceId) { String key = String.format(KEY_DEVICE_AUTH_CODE, deviceId); return redissonClient.getBucket(key).get(); } /** * 从MySQL加载设备认证码 */ public String loadDeviceAuthCodeFromDb(String deviceId) { // 从MySQL主库查询 return deviceMapper.getDeviceAuthCode(deviceId); } /** * 设置设备通道信息 */ public void setDeviceChannel(String deviceId, String channelId) { String key = String.format(KEY_DEVICE_CHANNEL, deviceId); redissonClient.getBucket(key).set(channelId); } /** * 设置设备网关节点 */ public void setDeviceGatewayNode(String deviceId, String nodeId) { String key = String.format(KEY_DEVICE_GATEWAY, deviceId); redissonClient.getBucket(key).set(nodeId); } /** * 获取设备网关节点 */ public String getDeviceGatewayNode(String deviceId) { String key = String.format(KEY_DEVICE_GATEWAY, deviceId); return redissonClient.getBucket(key).get(); } /** * 更新设备最后活跃时间 */ public void updateDeviceLastActiveTime(String deviceId) { String key = String.format(KEY_DEVICE_LAST_ACTIVE, deviceId); redissonClient.getBucket(key).set(System.currentTimeMillis()); } /** * 清理设备状态 */ public void clearDeviceState(String deviceId) { redissonClient.getBucket(String.format(KEY_DEVICE_AUTH, deviceId)).delete(); redissonClient.getBucket(String.format(KEY_DEVICE_CHANNEL, deviceId)).delete(); redissonClient.getBucket(String.format(KEY_DEVICE_GATEWAY, deviceId)).delete(); } }

9. MySQL主从配置

# application.yml spring: datasource: # 主库配置 master: jdbc-url: jdbc:mysql://master-mysql:3306/device_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver # 从库配置 slave: jdbc-url: jdbc:mysql://slave-mysql:3306/device_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver type: com.zaxxer.hikari.HikariDataSource hikari: maximum-pool-size: 10 minimum-idle: 5 idle-timeout: 300000 connection-timeout: 20000
package com.device.gateway.db; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; /** * MySQL主从数据源配置 */ @Configuration public class DataSourceConfig { /** * 主库数据源 */ @Bean(name = "masterDataSource") @ConfigurationProperties("spring.datasource.master") public DataSource masterDataSource() { return DataSourceBuilder.create().build(); } /** * 从库数据源 */ @Bean(name = "slaveDataSource") @ConfigurationProperties("spring.datasource.slave") public DataSource slaveDataSource() { return DataSourceBuilder.create().build(); } /** * 动态数据源路由 */ @Bean(name = "dynamicDataSource") public AbstractRoutingDataSource dynamicDataSource( @Qualifier("masterDataSource") DataSource masterDataSource, @Qualifier("slaveDataSource") DataSource slaveDataSource) { Map<Object, Object> targetDataSources = new HashMap<>(); targetDataSources.put("master", masterDataSource); targetDataSources.put("slave", slaveDataSource); DynamicRoutingDataSource routingDataSource = new DynamicRoutingDataSource(); routingDataSource.setTargetDataSources(targetDataSources); routingDataSource.setDefaultTargetDataSource(masterDataSource); return routingDataSource; } /** * 懒加载数据源(优化性能) */ @Bean @Primary public DataSource dataSource(@Qualifier("dynamicDataSource") AbstractRoutingDataSource dynamicDataSource) { return new LazyConnectionDataSourceProxy(dynamicDataSource); } } /** * 动态数据源路由 */ public class DynamicRoutingDataSource extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { return DataSourceContextHolder.getDataSourceType(); } } /** * 数据源上下文 Holder */ public class DataSourceContextHolder { private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>(); public static void setDataSourceType(String type) { CONTEXT_HOLDER.set(type); } public static String getDataSourceType() { return CONTEXT_HOLDER.get() == null ? "master" : CONTEXT_HOLDER.get(); } public static void clearDataSourceType() { CONTEXT_HOLDER.remove(); } } /** * 数据源注解:标记读从库 */ @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface ReadOnly { } /** * 数据源切面:自动切换主从库 */ @Aspect @Component @Order(1) public class DataSourceAspect { @Around("@annotation(readOnly)") public Object around(ProceedingJoinPoint joinPoint, ReadOnly readOnly) throws Throwable { try { DataSourceContextHolder.setDataSourceType("slave"); return joinPoint.proceed(); } finally { DataSourceContextHolder.clearDataSourceType(); } } }

10. Netty网关集群服务

package com.device.gateway.cluster; import com.device.gateway.redis.DeviceRedisService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.UUID; /** * Netty网关集群服务 */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class NettyClusterService { @Value("${netty.cluster.node-id:}") private String nodeId; @Value("${netty.server.port:8888}") private int port; private final DeviceRedisService deviceRedisService; @PostConstruct public void init() { // 生成节点ID:IP+端口 或 配置的节点ID if (nodeId == null || nodeId.isEmpty()) { try { String ip = InetAddress.getLocalHost().getHostAddress(); nodeId = ip + ":" + port; } catch (UnknownHostException e) { nodeId = UUID.randomUUID().toString().substring(0, 8); } } log.info("Netty网关集群节点ID:{}", nodeId); } /** * 获取当前节点ID */ public String getCurrentNodeId() { return nodeId; } /** * 转发消息到指定节点 * 实现方式: * 1. Redis Pub/Sub * 2. HTTP接口 * 3. Netty集群通信 */ public void forwardMessage(String targetNode, DeviceProtocol.DeviceMessage message) { // 示例:使用Redis Pub/Sub转发 String channel = "DEVICE_MESSAGE:" + targetNode; deviceRedisService.getRedissonClient().getTopic(channel).publish(message.toByteArray()); log.debug("转发消息到节点[{}],设备[{}]", targetNode, message.getDeviceId()); } }

三、核心功能说明

1. 主从Reactor模型

  • BossGroup(主Reactor)

    :处理TCP连接请求,默认CPU核心数线程

  • WorkerGroup(从Reactor)

    :处理IO读写,默认CPU核心数*2线程

  • 优势:高并发、高吞吐量,适合大量设备长连接场景

2. 粘包/解包处理

  • 采用LengthFieldBasedFrameDecoder,基于长度字段的拆包方式

  • 协议格式:4字节长度 + Protobuf数据

  • 解决TCP粘包/半包问题,保证数据完整性

3. 心跳保活

  • 基于IdleStateHandler检测读空闲(设备30秒未发数据)

  • 心跳失败2次(总计60秒)断开连接

  • 收到心跳响应重置失败计数,保证长连接稳定性

4. 设备认证

  • 首次连接必须发送认证消息(设备ID+认证码)

  • 认证信息缓存到Redis,避免频繁查询DB

  • 未认证设备拒绝所有非认证消息,保障安全性

5. 设备转发

  • 基于设备ID路由消息,通过Redis记录设备所在网关节点

  • 同节点直接处理,跨节点通过Redis Pub/Sub转发

  • 支持网关集群部署,负载均衡

6. Redis集群(主从+哨兵)

  • 哨兵模式实现Redis高可用,自动故障转移

  • 缓存设备状态、认证信息、连接信息,提升性能

  • 过期策略保证缓存有效性

7. MySQL集群(主从)

  • 主库写入,从库读取,读写分离提升性能

  • 注解+切面自动切换数据源,对业务透明

  • 设备基础信息持久化存储

四、部署与扩展

1. 单机部署

  • 启动Redis哨兵集群(1主2从3哨兵)

  • 启动MySQL主从复制

  • 启动Netty网关服务

2. 集群部署

  • 多台服务器部署Netty网关,使用相同Redis集群

  • 设备连接任意网关节点,通过Redis路由消息

  • 网关节点无状态,可水平扩展

3. 监控与运维

  • 监控Redis集群状态(主从切换、哨兵状态)

  • 监控MySQL主从同步延迟

  • 监控Netty网关连接数、消息吞吐量、心跳成功率

  • 设备离线告警、认证失败告警

五、总结

本方案完整实现了基于Netty的设备接入网关核心功能,解决了长连接管理、粘包处理、心跳保活、设备认证、集群转发等关键问题,并适配了Redis/MySQL集群保证高可用。代码结构清晰,可根据实际业务需求扩展:

  • 协议扩展:支持JSON、自定义二进制协议

  • 加密扩展:添加SSL/TLS加密通信

  • 负载均衡:接入Nginx/TCP负载均衡器

  • 监控扩展:集成Prometheus/Grafana监控指标


原文链接: https://1024bat.cn/article/33

来源: 淘书1024bat

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/31 0:48:19

BackgroundRemover终极指南:5大技巧让任何图片视频秒变透明背景

BackgroundRemover终极指南&#xff1a;5大技巧让任何图片视频秒变透明背景 【免费下载链接】backgroundremover Background Remover lets you Remove Background from images and video using AI with a simple command line interface that is free and open source. 项目地…

作者头像 李华
网站建设 2026/3/20 20:23:15

nanoMODBUS终极指南:嵌入式系统轻量级MODBUS通信完整解决方案

nanoMODBUS终极指南&#xff1a;嵌入式系统轻量级MODBUS通信完整解决方案 【免费下载链接】nanoMODBUS nanoMODBUS - 一个紧凑的MODBUS RTU/TCP C库&#xff0c;专为嵌入式系统和微控制器设计。 项目地址: https://gitcode.com/gh_mirrors/na/nanoMODBUS nanoMODBUS是一…

作者头像 李华
网站建设 2026/3/27 18:31:13

Qwen3-VL-WEBUI电商应用案例:智能图文生成系统搭建教程

Qwen3-VL-WEBUI电商应用案例&#xff1a;智能图文生成系统搭建教程 1. 引言 1.1 业务场景描述 在当前电商行业竞争日益激烈的背景下&#xff0c;商品内容的生产效率直接决定了平台的上新速度与用户体验。传统图文详情页依赖设计师和文案团队协作&#xff0c;平均耗时2-3小时…

作者头像 李华
网站建设 2026/3/27 13:33:48

Qwen3-VL-WEBUI实战案例:图文理解与GUI操作详细步骤

Qwen3-VL-WEBUI实战案例&#xff1a;图文理解与GUI操作详细步骤 1. 引言 随着多模态大模型的快速发展&#xff0c;视觉-语言理解能力已成为AI代理系统的核心竞争力。阿里云最新推出的 Qwen3-VL-WEBUI 正是这一趋势下的重要实践成果。该工具基于开源项目构建&#xff0c;内置了…

作者头像 李华
网站建设 2026/4/1 19:53:46

PCAN多通道同步配置操作指南

PCAN多通道同步配置实战指南&#xff1a;从原理到高精度时间对齐你有没有遇到过这样的情况&#xff1f;在测试一个双CAN网络的车载系统时&#xff0c;明明刹车信号先发出&#xff0c;记录下来的数据却显示警示灯动作更早。排查半天发现&#xff0c;不是ECU逻辑出错&#xff0c;…

作者头像 李华
网站建设 2026/3/30 13:52:11

Qwen2.5体验避坑指南:选对云端GPU,省下80%测试成本

Qwen2.5体验避坑指南&#xff1a;选对云端GPU&#xff0c;省下80%测试成本 引言&#xff1a;创业者的AI模型选择困境 作为创业者&#xff0c;你可能已经尝试过多个AI模型&#xff0c;结果发现测试成本像流水一样消耗。每次更换模型都意味着重新投入时间和金钱&#xff0c;而效…

作者头像 李华