文 / Kenyon,资深软件架构师,15年软件开发和技术管理经验,从程序员做到企业技术高管,专注技术管理、架构设计、AI技术应用和落地。
由于公众号推流的原因,请在关注页右上角加星标,这样才能及时收到新文章的推送。
引言
在上一篇文章中,我们基于架构设计原则设计了RPC框架的基础架构。今天,我们将进入实战阶段,实现RPC框架的核心功能,包括服务代理、序列化、网络通信等模块。在实现过程中,我们将重点展示如何将SOLID原则、高内聚低耦合、KISS等架构设计原则应用到实际代码中。
一、核心组件的实现
1. 序列化模块(Serializer)
遵循开闭原则,我们设计了Serializer接口,并提供了JSON实现:
// 序列化接口,支持扩展不同的序列化方式publicinterfaceSerializer{/** * 将对象序列化为字节数组 * * @param obj 要序列化的对象 * @param <T> 对象类型 * @return 序列化后的字节数组 * @throws Exception 序列化异常 */<T>byte[]serialize(Tobj)throwsException;/** * 将字节数组反序列化为对象 * * @param bytes 序列化后的字节数组 * @param clazz 对象类型 * @param <T> 对象类型 * @return 反序列化后的对象 * @throws Exception 反序列化异常 */<T>Tdeserialize(byte[]bytes,Class<T>clazz)throwsException;/** * 序列化类型枚举 */enumType{//目前暂时只是支持JSON,后续可以在这里添加要支持的其他序列化方式JSON(1);privatefinalintcode;Type(intcode){this.code=code;}publicintgetCode(){returncode;}/** * 根据code查找对应的序列化类型 * * @param code 序列化类型码 * @return 序列化类型 */publicstaticTypefindByCode(intcode){for(Typetype:Type.values()){if(type.code==code){returntype;}}returnJSON;// 默认使用JSON}}}// JSON序列化实现publicclassJsonSerializerimplementsSerializer{privatestaticfinalObjectMapperobjectMapper=newObjectMapper();@Overridepublic<T>byte[]serialize(Tobj)throwsException{if(obj==null){returnnewbyte[0];}returnobjectMapper.writeValueAsBytes(obj);}@Overridepublic<T>Tdeserialize(byte[]bytes,Class<T>clazz)throwsException{if(bytes==null||bytes.length==0){returnnull;}returnobjectMapper.readValue(bytes,clazz);}}2. 网络传输模块(Transport)
基于单一职责原则,我们将网络传输模块拆分为客户端和服务端:
// 网络传输客户端接口publicinterfaceTransportClient{voidconnect(InetSocketAddressaddress);byte[]send(byte[]data)throwsException;voidclose();}// 网络传输服务端接口publicinterfaceTransportServer{voidstart(intport,RequestHandlerhandler);voidstop();intgetPort();}// 请求处理器接口publicinterfaceRequestHandler{byte[]handle(byte[]request);}// 使用Netty实现的传输客户端publicclassNettyTransportClientimplementsTransportClient{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(NettyTransportClient.class);privatestaticfinalintDEFAULT_CONNECT_TIMEOUT=5000;privateChannelchannel;privateEventLoopGroupgroup;privateResponseHandlerresponseHandler;@Overridepublicvoidconnect(InetSocketAddressaddress){group=newNioEventLoopGroup();Bootstrapbootstrap=newBootstrap();try{bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.CONNECT_TIMEOUT_MILLIS,DEFAULT_CONNECT_TIMEOUT).option(ChannelOption.TCP_NODELAY,true).handler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannelch)throwsException{ChannelPipelinepipeline=ch.pipeline();// 处理粘包问题pipeline.addLast(newLengthFieldBasedFrameDecoder(65535,0,4,0,4));pipeline.addLast(newLengthFieldPrepender(4));// 字节数组编解码器pipeline.addLast(newByteArrayDecoder());pipeline.addLast(newByteArrayEncoder());// 客户端处理器NettyClientHandlerclientHandler=newNettyClientHandler();pipeline.addLast(clientHandler);}});// 连接服务端ChannelFuturefuture=bootstrap.connect(address).sync();this.channel=future.channel();// 初始化响应处理器responseHandler=newResponseHandler();// 设置客户端处理器的响应处理器((NettyClientHandler)channel.pipeline().last()).setResponseHandler(responseHandler);}catch(Exceptione){logger.error("Failed to connect to server: {}",address,e);thrownewRuntimeException("Failed to connect to server: "+address,e);}}@Overridepublicbyte[]send(byte[]data)throwsException{if(channel==null||!channel.isActive()){thrownewIllegalStateException("Channel is not connected");}// 发送数据channel.writeAndFlush(data).addListener((ChannelFutureListener)future->{if(!future.isSuccess()){Throwablecause=future.cause();if(causeinstanceofException){responseHandler.setException((Exception)cause);}else{responseHandler.setException(newRuntimeException(cause));}}});// 等待响应returnresponseHandler.waitForResponse();}@Overridepublicvoidclose(){if(channel!=null){channel.close();}if(group!=null){group.shutdownGracefully();}}/** * 客户端处理器 */privatestaticclassNettyClientHandlerextendsSimpleChannelInboundHandler<byte[]>{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(NettyClientHandler.class);privateResponseHandlerresponseHandler;@OverrideprotectedvoidchannelRead0(ChannelHandlerContextctx,byte[]msg)throwsException{if(responseHandler!=null){responseHandler.setResponse(msg);}}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{logger.error("Exception in Netty client handler",cause);if(responseHandler!=null){if(causeinstanceofException){responseHandler.setException((Exception)cause);}else{responseHandler.setException(newRuntimeException(cause));}}ctx.close();}publicvoidsetResponseHandler(ResponseHandlerresponseHandler){this.responseHandler=responseHandler;}}/** * 响应处理器 */privatestaticclassResponseHandler{privatefinalCountDownLatchlatch=newCountDownLatch(1);privatebyte[]response;privateExceptionexception;publicbyte[]waitForResponse()throwsException{if(!latch.await(30,TimeUnit.SECONDS)){thrownewRuntimeException("Request timeout");}if(exception!=null){throwexception;}returnresponse;}publicvoidsetResponse(byte[]response){this.response=response;latch.countDown();}publicvoidsetException(Exceptionexception){this.exception=exception;latch.countDown();}}}3. 服务代理模块(Service Proxy)
使用迪米特法则,代理模块只与必要的组件通信:
// RPC客户端核心类publicclassServiceProxyimplementsInvocationHandler{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(ServiceProxy.class);// 服务接口类privatefinalClass<?>serviceClass;// 服务注册中心privatefinalRegistryCenterregistryCenter;// 负载均衡策略privatefinalLoadBalanceloadBalance;/** * 构造函数 * * @param serviceClass 服务接口类 * @param registryCenter 服务注册中心 */publicServiceProxy(Class<?>serviceClass,RegistryCenterregistryCenter){this(serviceClass,registryCenter,newRandomLoadBalance());}/** * 构造函数 * * @param serviceClass 服务接口类 * @param registryCenter 服务注册中心 * @param loadBalance 负载均衡策略 */publicServiceProxy(Class<?>serviceClass,RegistryCenterregistryCenter,LoadBalanceloadBalance){this.serviceClass=serviceClass;this.registryCenter=registryCenter;this.loadBalance=loadBalance;}@OverridepublicObjectinvoke(Objectproxy,Methodmethod,Object[]args)throwsThrowable{// 创建RPC请求的对象,把调用的服务接口类、方法名、参数类型、参数值等信息封装到请求对象中RpcRequestrequest=newRpcRequest();request.setRequestId(UUID.randomUUID().toString());request.setServiceName(serviceClass.getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(args);logger.debug("Sending RPC request: {}, service: {}, method: {}",request.getRequestId(),request.getServiceName(),request.getMethodName());// 从注册中心获取服务地址列表List<InetSocketAddress>addresses=registryCenter.discover(serviceClass.getName());if(addresses==null||addresses.isEmpty()){thrownewRuntimeException("No service available for: "+serviceClass.getName());}// 使用负载均衡策略选择服务地址InetSocketAddressaddress=loadBalance.select(serviceClass.getName(),addresses);if(address==null){thrownewRuntimeException("No service address selected for: "+serviceClass.getName());}logger.debug("Selected service address: {}",address);// 创建客户端并发送请求,这里暂时使用Netty作为网络传输组件TransportClientclient=newNettyTransportClient();try{// 创建序列化器,这里暂时使用JSON序列化,后续可以添加其他序列化方式,并且改成读取配置的方式来确定使用哪种序列化方式Serializerserializer=newJsonSerializer();// 连接到服务端client.connect(address);// 序列化请求byte[]requestData=serializer.serialize(request);// 发送请求并获取响应数据byte[]responseData=client.send(requestData);// 反序列化响应RpcResponseresponse=serializer.deserialize(responseData,RpcResponse.class);if(response.isSuccess()){returnresponse.getResult();}else{thrownewRuntimeException("RPC call failed: "+response.getError());}}finally{client.close();}}}二、架构设计原则的应用总结
在上面代码实现的过程中,我们分别应用了以下架构设计原则:
1. SOLID原则
- 比如每个组件都只是负责一个明确的功能,这就很好符合了单一职责原则;
- 然后涉及到后续可能要调整或者扩展到地方我们都是通过面向接口的编程,然后再通过实现接口的方式来实现组件的可扩展,这样就很好地应用了开闭原则和里氏替换原则;
- 在编写接口的时候,我们也遵循了接口隔离原则和里氏替换原则,即每个组件都有自己的接口,而且接口只包含必要的方法,然后组件接口的实现类可以随时替换父类的实现,而不会影响到程序的正常运行。
- 最后,我们使用其他模块的时候,依赖都是依赖接口,然后再通过构造函数的方式来注入具体的实现类,这样高层模块就不需要依赖底层模块,从而做到了依赖倒置原则。
2. 通用设计原则
在实现RPC框架的过程中,我们也应用了多个通用设计原则:
- 我们在代码的实现过程中非常注重代码的简洁,基本都是做最基础的设计和实现,避免了过度设计,实现的过程中也只实现当前必要的核心功能,确保代码的可读性和可维护性,同时也考虑到了代码的性能和扩展性,这就很好地体现了KISS原则。
- 我们设计和实现的每个组件内部的功能都紧密相关的(高内聚),而组件之间基本都是通过抽象接口来进行通信,减少跟实现模块或者代码的直接依赖(低耦合),这样的设计使得各组件可以独立演化和维护,这就是高内聚低耦合原则的应用。
- 在模块和组件之间,我们遵循"只与直接朋友通信"的原则,组件之间只与直接依赖的组件进行交互,避免形成复杂的依赖链,提高系统的稳定性,这就是迪米特法则的应用。
- 然后我们通过业务的抽象和代码复用的机制避免了出现大量代码重复的情况,例如通过统一的接口定义实现不同组件的复用,降低了维护成本,这个就是DRY原则的应用。
三、总结与下一步计划
因为篇幅的问题,在这篇文章就先写这么多,文章中我们实现了整个RPC框架里面最核心的组件,包括了序列化模块、网络传输模块和服务代理模块。在实现的过程中,我们重点展示了如何将架构设计原则应用到实际代码中,确保代码的可扩展性、可维护性和灵活性。
在下一篇文章中,我们将会完成这个RPC框架的剩余功能,像服务注册与发现、服务端核心的实现、客户端的负载均衡等模块,并编写相关的测试用例来进行完整的测试。同时,也会把项目的代码一起放上来给大家观摩和吐槽。
互动话题:在实现RPC框架的过程中,你认为哪个组件的设计最具挑战性?为什么?欢迎在评论区分享你的观点。
关于作者
Kenyon,资深软件架构师,15年的软件开发和技术管理经验,从程序员做到企业技术高管。多年企业数字化转型和软件架构设计经验,善于帮助企业构建高质量、可维护的软件系统,目前专注技术管理、架构设计、AI技术应用和落地;全网统一名称"六边形架构",欢迎关注交流。
原创不易,转载请联系授权,如果觉得有帮助,请点赞、收藏、转发三连支持!