📉 前言:HTTP 为什么不适合物联网?
很多转行做 IoT 的朋友习惯用 HTTP 发送 JSON。但在嵌入式设备中:
- 开销大:HTTP 报头太长,对于只有几字节的传感器数据是巨大的浪费。
- 实时性差:服务器无法主动向设备推送指令(控制开灯、关阀门)。
- 不稳定:弱网环境下 HTTP 容易断连。
MQTT (Message Queuing Telemetry Transport)是 IoT 的事实标准。它是基于 TCP 的发布/订阅协议,头部最小只有 2 字节,极其轻量。而Netty则是处理 TCP 长连接的王者。
🏗️ 一、 架构设计:漏斗型流量处理
处理百万连接,核心思路是“接入与业务分离”。
系统架构图 (Mermaid):
🚀 二、 Netty 接入层:搞定 C1000K 问题
要实现百万连接,不能用 Tomcat 的“一请求一线程”模型,必须用 Netty 的Reactor 多路复用模型。
1. 操作系统内核调优 (Linux)
代码写得再好,文件描述符限制了也没用。
# 修改 /etc/sysctl.conffs.file-max=1000000net.ipv4.tcp_max_tw_buckets=6000net.ipv4.tcp_keepalive_time=1202. Netty 服务端启动代码
利用 Netty 官方提供的MqttDecoder,我们不需要自己解析二进制位。
@ComponentpublicclassMqttServer{@PostConstructpublicvoidstart(){NioEventLoopGroupbossGroup=newNioEventLoopGroup(1);// 接收连接NioEventLoopGroupworkerGroup=newNioEventLoopGroup();// 处理 IOServerBootstrapb=newServerBootstrap();b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)// 核心优化:连接队列大小.option(ChannelOption.SO_BACKLOG,1024)// 核心优化:复用缓冲区.childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT).childHandler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannelch){ChannelPipelinep=ch.pipeline();// 1. MQTT 解码器 (Netty 自带)p.addLast("decoder",newMqttDecoder());// 2. MQTT 编码器p.addLast("encoder",newMqttEncoder());// 3. 心跳检测 (60秒无读写则断开)p.addLast(newIdleStateHandler(60,0,0));// 4. 业务处理器p.addLast(newMqttBrokerHandler());}});b.bind(1883).sync();}}📡 三、 协议实战:处理 CONNECT 与 PUBLISH
在MqttBrokerHandler中,我们根据 MQTT 的报文类型(Packet Type)做不同处理。
@ChannelHandler.SharablepublicclassMqttBrokerHandlerextendsSimpleChannelInboundHandler<MqttMessage>{@OverrideprotectedvoidchannelRead0(ChannelHandlerContextctx,MqttMessagemsg){// 获取报文类型MqttMessageTypetype=msg.fixedHeader().messageType();switch(type){caseCONNECT:handleConnect(ctx,(MqttConnectMessage)msg);break;casePUBLISH:handlePublish(ctx,(MqttPublishMessage)msg);break;casePINGREQ:// 响应心跳 PINGRESPctx.writeAndFlush(newMqttMessage(newMqttFixedHeader(MqttMessageType.PINGRESP,false,MqttQoS.AT_MOST_ONCE,false,0)));break;default:break;}}privatevoidhandleConnect(ChannelHandlerContextctx,MqttConnectMessagemsg){// 1. 校验 ClientID、用户名密码StringclientId=msg.payload().clientIdentifier();// 2. 存储连接关系 (ClientID -> Channel) 到本地 Map 或 RedisChannelRepository.put(clientId,ctx.channel());// 3. 返回 CONNACK (连接成功)MqttConnAckMessageok=MqttMessageBuilders.connAck().returnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED).build();ctx.writeAndFlush(ok);}privatevoidhandlePublish(ChannelHandlerContextctx,MqttPublishMessagemsg){Stringtopic=msg.variableHeader().topicName();byte[]payload=newbyte[msg.payload().readableBytes()];msg.payload().readBytes(payload);// ❗关键:不要在这里直接写库!会阻塞 Netty IO 线程// ✅ 正确做法:丢入 Kafka / RocketMQkafkaTemplate.send("iot-device-data",topic,newString(payload));}}💾 四、 数据存储:为什么不用 MySQL?
设备每秒上报一次数据,100 万台设备就是 100 万 TPS。
MySQL 根本扛不住这种写入压力,而且我们通常查询的是“过去 24 小时的温度变化曲线”。
这是典型的时序数据(Time-Series Data)。
技术选型:
- InfluxDB:老牌强者,生态好。
- TDengine:国产之光,写入性能极强,针对物联网优化。
Spring Boot 写入 InfluxDB 示例:
@ServicepublicclassDataStorageService{@AutowiredprivateInfluxDBClientinfluxDBClient;@KafkaListener(topics="iot-device-data")publicvoidconsume(Stringmessage){// 解析 JSONDeviceDatadata=JSON.parseObject(message,DeviceData.class);// 写入时序数据库Pointpoint=Point.measurement("sensor_data").addTag("device_id",data.getDeviceId()).addField("temperature",data.getTemp()).addField("humidity",data.getHumidity()).time(Instant.now(),WritePrecision.MS);influxDBClient.getWriteApiBlocking().writePoint(point);}}⚡ 五、 性能优化的深水区
当你真的面对百万连接时,坑才刚刚开始:
- 堆外内存泄漏 (Direct Memory Leak):Netty 大量使用堆外内存,如果
ByteBuf没有释放(ReferenceCountUtil.release(msg)),服务运行两天就会 OOM。 - GC 停顿:海量小对象(Message)会导致频繁 GC。可以使用对象池(Recycler)技术。
- 连接风暴:如果服务重启,100 万设备同时重连,会瞬间打挂 CPU。需要实现**指数退避(Exponential Backoff)**的重连策略,并限制每秒接入速率。
🎯 总结
搭建一个百万级 IoT 平台,不仅仅是写代码,更是对计算机网络、操作系统 IO、分布式架构的综合考量。
- 接入层:Netty + MQTT,做轻量级协议解析。
- 传输层:Kafka,做削峰填谷。
- 存储层:InfluxDB/TDengine,做高吞吐写入。
Next Step:
你可以下载一个 MQTT 压测工具(如JMeter MQTT Plugin或emqtt_bench),对着你的 Netty 服务发起 1 万个并发连接,看看 CPU 和内存的变化,那是检验真理的唯一标准。