spring boot 使用tcp协议接收客户端发送的xml数据,并处理数据
- 1.创建tcp 服务器,负责启动ServerSocket,接收连接
- 2.创建tcp 客户端,负责处理客户端连接请求数据
1.创建tcp 服务器,负责启动ServerSocket,接收连接
importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.ApplicationContext;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjavax.annotation.PreDestroy;importjava.net.InetAddress;importjava.net.ServerSocket;importjava.net.Socket;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;/** * tcp 服务器,负责启动ServerSocket,接收连接 */@Slf4j@ComponentpublicclassTcpServerService{/** * 本tcp服务器端口 */privatestaticfinalintPORT=8090;privateServerSocketserverSocket;privatevolatilebooleanrunning=true;/** * 正式环境需要换成自定义线程池,这里为了方便演示 */privatefinalExecutorServicethreadPool=Executors.newCachedThreadPool();/** * spring 允许我们直接注入spring 应用上下文,方便获取Bean */@AutowiredprivateApplicationContextapplicationContext;/** * 项目启动就执行 */@PostConstructpublicvoidstart(){//在独立线程中启动服务器,避免阻塞 Spring启动newThread(()->{try{// 参考我的另一篇文章获取IpUtilInetAddressbindAddress=InetAddress.getByName(IpUtil.getLocalIp());serverSocket=newServerSocket(PORT,50,bindAddress);log.info("tcp服务器:tcp 服务器启动在 {}的{}端口",bindAddress.getHostName(),PORT);while(running){SocketclientSocket=serverSocket.accept();log.info("tcp服务器:新客户端连接请求:{}",clientSocket);//从Spring 容器中获取一个新的 clientHander实例 (原型Bean)ClientHanderclientHander=applicationContext.getBean(ClientHander.class);clientHander.setSocket(clientSocket);threadPool.submit(clientHander);}}catch(Exceptione){if(running){log.error("tcp服务器:tcp服务器异常",e);}}}).start();}@PreDestroypublicvoidstop(){running=false;try{if(serverSocket!=null&&!serverSocket.isClosed()){serverSocket.close();}}catch(Exceptione){log.error("tcp服务器:关闭server socket 异常",e);}}}
2.创建tcp 客户端,负责处理客户端连接请求数据
importio.swagger.annotations.Scope;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.io.*;importjava.net.Socket;importjava.nio.charset.StandardCharsets;importjava.util.LinkedHashMap;importjava.util.regex.Matcher;importjava.util.regex.Pattern;@Slf4j@Component@Scope(name="prototype",description="原型Bean,处理单条连接,每次连接创建一个新的实例")publicclassClientHanderimplementsRunnable{/** * 由TcpServerService 注入 */privateSocketsocket;privatestaticfinalintBUFFER_SIZE=8192;//设置超时时间60秒privatestaticfinalintTIME_OUT=60_000;publicvoidsetSocket(Socketsocket){this.socket=socket;}/** * 接收客户端发过来的xml数据 * <pre>固定6位长度头如:长度是内容.length * {@code 000099<Message><Message>} * </pre> */@Overridepublicvoidrun(){log.info("ClientHander接收客户端发过来的xml数据");//使用 try-with-resource 自动关闭流和socket//使用转换流将字节流转换成字符流try(BufferedReaderreader=newBufferedReader(newInputStreamReader(socket.getInputStream(),StandardCharsets.UTF_8));PrintWriterwriter=newPrintWriter(newOutputStreamWriter(socket.getOutputStream(),StandardCharsets.UTF_8),true)){//设置超时时间socket.setSoTimeout(TIME_OUT);StringheadStr=readFiexLengthString(reader,6);intcontentLength;try{contentLength=Integer.parseInt(headStr);}catch(NumberFormatExceptione){thrownewIOException("无效的长度格式:"+headStr,e);}if(contentLength<=0||contentLength>1_000_000){thrownewIOException("非法长度:"+contentLength);}//读取指定长度内容StringrequestXml=readFiexLengthString(reader,contentLength);log.info("读取到接收到的数据长度及数据:{}{}",headStr,requestXml);//调用业务处理逻辑Stringresponse=porcessRequest(requestXml);if(response.length()<=0||response.length()>1_000_000){thrownewIOException("非法响应长度,无法用6位长度表示:"+contentLength);}//发送响应writer.write(response);writer.flush();if(writer.checkError()){log.error("响应发送失败,可能客户端已经断开");}else{log.info("响应发送成功,长度:{}",response.length());}//通知客户端响应结束,但不立即关闭socket.shutdownOutput();log.info("发送响应数据:{}",response);}catch(Exceptione){log.error("错误处理client",e);}}/** * 读取指定长度 * 为什么用循环,而不是一次 read? * Reader.read(char[], int, int) 方法不保证一次读完请求的全部字符(哪怕流还没有结束)。它可能因为内部缓冲区大小、系统调用限制等原因,每次只返回一部分数据。 * 因此,需要循环多次调用,并将每次读到的数据拼接到 buffer 的正确位置,直到凑满 length 个字符。 * @param reader * @param length * @return * @throws IOException */privateStringreadFiexLengthString(Readerreader,intlength)throwsIOException{char[]buffer=newchar[length];intoffset=0;while(offset<length){intread=reader.read(buffer,offset,length-offset);//当循环中读取的总字符数达到 length 时,循环结束,不再调用 read,所以不会看到 -1。// 只有当流中数据不足 length 时,最后一次 read 才会返回 -1,从而触发异常。// 所以代码不返回 -1 是因为流中数据足够,没有触发提前结束if(read==-1){thrownewIOException("流提前结束,预期"+length+"字符,实际收到"+offset);}offset+=read;}returnnewString(buffer);}/** * 业务处理:根据请求内容调用不同的Service<br/> * 有新的需求只需要加一个策略就行porcessRequest方法代码一行都不用动 * @param requestXml * @see ServiceCodeEnum * @return */publicStringporcessRequest(StringrequestXml){// todo 你们的业务处理逻辑}/** * 去除前6位数字,如果有的话 * * @param str * @return */privateStringremoveFirstSixNumber(Stringstr){Patternpattern=Pattern.compile("^\\d{6}");Matchermatcher=pattern.matcher(str);if(matcher.find()){//去掉前6位returnstr.substring(6);}returnstr;}}