现在很多企业都在使用Dubbo或者SpringCloud做企业的微服务架构,其实对于Dubbo最核心的技术就是RPC调用,现在我们就来动手自己编写一个RPC框架,通过这篇文章的学习,你将学习到分布式系统的概念RPC远程方法调用的应用Dubbo的原理深入理解 当然,如果要完全自己编写一个RPC框架,我们需要掌握以下知识点网络编程(网络通信)本文将使用netty4网络通信框架多线程相关知识反射相关知识jdk的动态代理Spring框架的相关知识 如果对于上述的知识点有一部分不是很理解,也不会影响你阅读本文和对Dubbo的RPC调用原理的理解 好了,我们先来简单的描述一下整个RPC调用的业务流程图 rpc通信模型。png 为了可以实现上面的RPC调用,我们创建的RPC框架的模块之间的关系图如下: RPC框架流程图。png 对于上面的每个模块的具体作用,使用一个表格简单的进行描述 模块名称主要功能rpcregister主要完成可注册中心Zookeeper的交互RPC服务端使用该模块往注册中心注册地址和端口RPC客户端通过该模块获取实时已近注册的服务地址和端口rpccommon定义RPC通信的请求消息和响应消息的规则,以及消息的序列化和反序列化的帮助类rpcserverRPC服务端,启动RPC服务,扫描appserver中的所有可以提供的服务列表并保存接受RPC客户端的消息并且通过反射调用具体的方法 响应RPC客户端,把方法执行结果返回到RPC客户端rpcclientRPC客户端,通过网络通信往RPC服务端发送请求调用消息 接受服务端的响应消息 配置动态代理类,所有的方法调用都通过网络调用发送到RPC服务端appcommon具体的应用中的接口和JavaBean对象,类似于service模块和bean模块appserver通过Spring的配置启动SpringContext,并且配置RpcServer和RpcRegistryBean对象的创建实现appcommon中的接口,并且在接口上添加注解RpcService(IProductService。class)可以让RPCServer识别到该服务启动服务appclient通过Spring的配置创建RpcDiscover对象和RpcProxy对象,其中RpcDiscover用于从注册中心获取到服务的地址信息,RpcProxy用于创建类的动态代理对象 接下来我们来看一下具体的实现代码rpcregister这个模块用户和注册中心进行交互,主要包括三个类具体的实现代码packagecn。wolfcode。rpc。publicinterfaceConstant{定义客户端连接session会话超时时间,单位为毫秒,该值的设置和zkServer设置的心跳时间有关系 intSESSIONTIMEOUT4000;定义用于保存rpc通信服务端的地址信息的目录 StringREGISTRYPATH定义数据存放的具体目录 StringDATAPATHREGISTRYPATH } packagecn。wolfcode。rpc。importlombok。AllArgsCimportlombok。Gimportlombok。NoArgsCimportlombok。Simportorg。apache。zookeeper。;importorg。apache。zookeeper。data。Simportorg。slf4j。Limportorg。slf4j。LoggerFSetterGetterAllArgsConstructor()NoArgsConstructorpublicclassRpcRegistry{publicstaticfinalLoggerLOGGERLoggerFactory。getLogger(RpcRegistry。class);zkServer的地址信息 privateStringregistryAzk客户端程序 privateZooKeeperzooKpublicvoidcreateNode(Stringdata)throwsException{创建一个客户端程序,对于注册可以不用监听事件 zooKeepernewZooKeeper(registryAddress,Constant。SESSIONTIMEOUT,newWatcher(){Override publicvoidprocess(WatchedEventevent){ } });if(zooKeeper!null){try{判断注册的目录是否存在 StatstatzooKeeper。exists(Constant。REGISTRYPATH,false);if(statnull){如果不存在,创建一个持久的节点目录 zooKeeper。create(Constant。REGISTRYPATH,null,ZooDefs。Ids。OPENACLUNSAFE,CreateMode。PERSISTENT); }创建一个临时的序列节点,并且保存数据信息 zooKeeper。create(Constant。DATAPATH,data。getBytes(),ZooDefs。Ids。OPENACLUNSAFE,CreateMode。EPHEMERALSEQUENTIAL); }catch(Exceptione){ LOGGER。error(,e); e。printStackTrace(); } }else{ LOGGER。debug(zooKeeperconnectisnull); } }测试程序 publicstaticvoidmain(String〔〕args)throwsException{ RpcRegistryrpcRegistrynewRpcRegistry(); rpcRegistry。setRegistryAddress(192。168。158。151:2181); rpcRegistry。createNode(testdata);让程序等待输入,程序一直处于运行状态 System。in。read(); } } packagecn。wolfcode。rpc。importlombok。Gimportlombok。Simportorg。apache。zookeeper。WatchedEimportorg。apache。zookeeper。Wimportorg。apache。zookeeper。ZooKimportorg。slf4j。Limportorg。slf4j。LoggerFimportjava。util。ArrayLimportjava。util。Limportjava。util。RSetterGetter地址发现,用于实时的获取最新的RPC服务信息publicclassRpcDiscover{publicstaticfinalLoggerLOGGERLoggerFactory。getLogger(RpcRegistry。class);服务端地址zkServer的地址 privateStringregistryA获取到的所有提供服务的服务器列表 privatevolatileLSdataListnewArrayL();privateZooKeeperzooK初始化zkClient客户端 publicRpcDiscover(StringregistryAddress)throwsException{this。registryAddressregistryA zooKeepernewZooKeeper(registryAddress,Constant。SESSIONTIMEOUT,newWatcher(){Override publicvoidprocess(WatchedEventwatchedEvent){if(watchedEvent。getType()Event。EventType。NodeChildrenChanged){监听zkServer的服务器列表变化 watchNode(); } } });获取节点相关数据 watchNode(); }从dataList列表随机获取一个可用的服务端的地址信息给rpcclient publicStringdiscover(){intsizedataList。size();if(0){intindexnewRandom()。nextInt(size);returndataList。get(index); }thrownewRuntimeException(没有找到对应的服务器); }监听服务端的列表信息 privatevoidwatchNode(){try{获取子节点信息 LSnodeListzooKeeper。getChildren(Constant。REGISTRYPATH,true); LSdataListnewArrayL();for(Stringnode:nodeList){byte〔〕byteszooKeeper。getData(Constant。REGISTRYPATHnode,false,null); dataList。add(newString(bytes)); }this。dataListdataL }catch(Exceptione){ LOGGER。error(,e); e。printStackTrace(); } }测试程序 publicstaticvoidmain(String〔〕args)throwsException{打印获取到的连接地址信息 System。out。println(newRpcDiscover(192。168。158。151:2181)。discover()); System。in。read(); } } Constant常量定义,设置连接ZKServer的相关参数RpcRegistry:往注册中心ZKServer设置地址信息,RPCServer需要使用RpcDiscover:从注册中心ZKServer获取服务端的网络地址信息RPCclient需要使用rpccommon定义RPC通信的请求消息和响应消息的规则,以及消息的序列化和反序列化的帮助类,主要包括具体代码如下packagecn。wolfcode。rpc。importlombok。;SetterGetterNoArgsConstructorAllArgsConstructorToStringRPC通信的数据请求规则publicclassRpcRequest{请求消息的消息Id privateStringrequestId;请求的具体的类名(接口名称) privateStringclassN请求的具体的方法名称 privateStringmethodN请求的方法参数类型列表 privateC?〔〕parameterT请求的方法参数列表 privateObject〔〕 } packagecn。wolfcode。rpc。importlombok。;SetterGetterNoArgsConstructorAllArgsConstructorToStringRPC通信消息的响应数据规则publicclassRpcResponse{响应的消息id privateStringresponseId;请求的消息id privateStringrequestId;响应的消息是否成功 响应的数据结果 privateO如果有异常信息,在该对象中记录异常信息 privateT } packagecn。wolfcode。rpc。importcom。dyuproject。protostuff。LinkedBimportcom。dyuproject。protostuff。ProtostuffIOUimportcom。dyuproject。protostuff。Simportcom。dyuproject。protostuff。runtime。RuntimeSimportorg。objenesis。Oimportorg。objenesis。ObjenesisSimportjava。util。Mimportjava。util。concurrent。ConcurrentHashM 序列化工具类(基于Protostuff实现)用于把对象序列化字节数组,把字节数组反序列化对象 publicclassSerializationUtil{privatestaticMC?,S?cachedSchemanewConcurrentHashMC?,S?();privatestaticObjenesisobjenesisnewObjenesisStd(true);privateSerializationUtil(){ } 获取类的schema paramcls return SuppressWarnings(unchecked)TSTgetSchema(CTcls){ STschema(ST)cachedSchema。get(cls);if(schemanull){ schemaRuntimeSchema。createFrom(cls);if(schema!null){ cachedSchema。put(cls,schema); } } } 序列化(对象字节数组) SuppressWarnings(unchecked)Tbyte〔〕serialize(Tobj){ CTcls(CT)obj。getClass(); LinkedBufferbufferLinkedBuffer。allocate(LinkedBuffer。DEFAULTBUFFERSIZE);try{ STschemagetSchema(cls);returnProtostuffIOUtil。toByteArray(obj,schema,buffer);序列化 }catch(Exceptione){thrownewIllegalStateException(e。getMessage(),e); }finally{ buffer。clear(); } } 反序列化(字节数组对象) TTdeserialize(byte〔〕data,CTcls){try{ 如果一个类没有参数为空的构造方法时候,那么你直接调用newInstance方法试图得到一个实例对象的时候是会抛出异常的 通过ObjenesisStd可以完美的避开这个问题 Tmessage(T)objenesis。newInstance(cls);实例化 STschemagetSchema(cls);获取类的schema ProtostuffIOUtil。mergeFrom(data,message,schema); }catch(Exceptione){thrownewIllegalStateException(e。getMessage(),e); } } } packagecn。wolfcode。rpc。importio。netty。buffer。ByteBimportio。netty。channel。ChannelHandlerCimportio。netty。handler。codec。MessageToByteE对传递的消息进行编码,因为是请求响应对象的传递,先编码为字节数组在发送到服务器解码publicclassRpcEncoderextendsMessageToByteEncoder{传递的数据的对象类型 privateClassgenericCpublicRpcEncoder(ClassgenericClass){this。genericClassgenericC }Override protectedvoidencode(ChannelHandlerContextctx,Objectmsg,ByteBufout)throwsException{if(genericClass。isInstance(msg)){序列化请求消息为字节数组 byte〔〕bytesSerializationUtil。serialize(msg);把数据写入到下一个通道(channel)或者是发往服务端 out。writeBytes(bytes); } } } packagecn。wolfcode。rpc。importio。netty。buffer。ByteBimportio。netty。channel。ChannelHandlerCimportio。netty。handler。codec。ByteToMessageDimportjava。util。L对传递的消息进行解码,接受到的数据是字节数组,需要把数组转换为对应的请求响应消息对象publicclassRpcDecoderextendsByteToMessageDecoder{privateC?genericCpublicRpcDecoder(C?genericClass){this。genericClassgenericC }Override 解码方法,把字节数组转换为消息对象 protectedvoiddecode(ChannelHandlerContextctx,ByteBufin,LOout)throwsException{消息的长度 intsizein。readableBytes();if(4){保证所有的消息都完全接受完成 }byte〔〕bytesnewbyte〔size〕;把传递的字节数组读取到bytes中 in。readBytes(bytes);反序列化为对象(RPCRequestRPCResponse对象) ObjectobjectSerializationUtil。deserialize(bytes,genericClass);输出对象 out。add(object);刷新缓存 ctx。flush(); } } RpcRequest请求消息封装对象RpcResponse响应消息封装对象SerializationUtil消息的序列化,烦序列化帮助类RpcEncoder把消息对象转换为字节数组进行通信RpcDecoder把获取到的字节数组转换为对应的消息对象rpcserverRPC服务端,启动RPC服务,扫描appserver中的所有可以提供的服务列表并保存,接受RPC客户端的消息并且通过反射调用具体的方法,响应RPC客户端,把方法执行结果返回到RPC客户端主要包括:packagecn。wolfcode。rpc。importjava。lang。annotation。ElementTimportjava。lang。annotation。Rimportjava。lang。annotation。RetentionPimportjava。lang。annotation。T 这个注解用于贴在每个提供服务的实现类, 在Spring容器启动的时候,自动扫描到贴了该注解的所有的服务 Retention(RetentionPolicy。RUNTIME)Target({ElementType。TYPE})publicinterfaceRpcService{publicC?value(); } packagecn。wolfcode。rpc。importcn。wolfcode。rpc。common。RpcDimportcn。wolfcode。rpc。common。RpcEimportcn。wolfcode。rpc。common。RpcRimportcn。wolfcode。rpc。common。RpcRimportcn。wolfcode。rpc。register。RpcRimportio。netty。bootstrap。ServerBimportio。netty。channel。ChannelFimportio。netty。channel。ChannelIimportio。netty。channel。ChannelOimportio。netty。channel。nio。NioEventLoopGimportio。netty。channel。socket。SocketCimportio。netty。channel。socket。nio。NioServerSocketCimportlombok。AllArgsCimportlombok。Gimportlombok。NoArgsCimportlombok。Simportorg。apache。commons。collections4。MapUimportorg。springframework。beans。BeansEimportorg。springframework。beans。factory。InitializingBimportorg。springframework。context。ApplicationCimportorg。springframework。context。ApplicationContextAimportjava。util。HashMimportjava。util。MSetterGetterNoArgsConstructorAllArgsConstructorRPC服务端启动,实现Spring的感知接口publicclassRpcServerimplementsApplicationContextAware,InitializingBean{用于保存所有提供服务的方法,其中key为类的全路径名,value是所有的实现类 privatefinalMString,OserviceBeanMapnewHashM();rpcRegistry用于注册相关的地址信息 privateRpcRegistryrpcR提供服务的地址信息格式为192。168。158。151:9000类似 privateStringserverA在Spring容器启动完成后会执行该方法 Override publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{获取到所有贴了RpcService注解的Bean对象 MString,OserviceBeanMapapplicationContext。getBeansWithAnnotation(RpcService。class);if(MapUtils。isNotEmpty(serviceBeanMap)){for(Objectobject:serviceBeanMap。values()){获取到类的路径名称 StringserviceNameobject。getClass()。getAnnotation(RpcService。class)。value()。getName();把获取到的信息保存到serviceBeanMap中 this。serviceBeanMap。put(serviceName,object); } } System。out。println(服务器:serverAddress提供的服务列表:serviceBeanMap); }初始化完成后执行 Override publicvoidafterPropertiesSet()throwsException{创建服务端的通信对象 ServerBootstrapservernewServerBootstrap();创建异步通信的事件组用于建立TCP连接的 NioEventLoopGroupbossGroupnewNioEventLoopGroup();创建异步通信的事件组用于处理Channel(通道)的IO事件 NioEventLoopGroupworkerGroupnewNioEventLoopGroup();try{开始设置server的相关参数 server。group(bossGroup,workerGroup)启动异步ServerSocket 。channel(NioServerSocketChannel。class)初始化通道信息 。childHandler(newChannelISocketC(){Override protectedvoidinitChannel(SocketChannelch)throwsException{ ch。pipeline()。addLast(newRpcDecoder(RpcRequest。class))1解码请求参数 。addLast(newRpcEncoder(RpcResponse。class))2编码响应信息 。addLast(newRpcServerHandler(serviceBeanMap));3请求处理 } })。option(ChannelOption。SOBACKLOG,128) 。childOption(ChannelOption。SOKEEPALIVE,true);; StringhostserverAddress。split(:)〔0〕;获取到主机地址 intportInteger。valueOf(serverAddress。split(:)〔1〕);端口 ChannelFuturefutureserver。bind(host,port)。sync();开启异步通信服务 System。out。println(服务器启动成功:future。channel()。localAddress()); rpcRegistry。createNode(serverAddress); System。out。println(向zkServer注册服务地址信息); future。channel()。closeFuture()。sync();等待通信完成 }catch(Exceptione){ e。printStackTrace(); }finally{优雅的关闭socket bossGroup。shutdownGracefully(); workerGroup。shutdownGracefully(); } } } packagecn。wolfcode。rpc。importcn。wolfcode。rpc。common。RpcRimportcn。wolfcode。rpc。common。RpcRimportio。netty。channel。ChannelFutureLimportio。netty。channel。ChannelHandlerCimportio。netty。channel。ChannelInboundHandlerAimportlombok。Gimportlombok。NoArgsCimportlombok。Simportjava。lang。reflect。Mimportjava。util。Mimportjava。util。UUID;SetterGetterNoArgsConstructorpublicclassRpcServerHandlerextendsChannelInboundHandlerAdapter{privateMString,OserviceBeanMpublicRpcServerHandler(MString,OserviceBeanMap){this。serviceBeanMapserviceBeanM }Override publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{ System。out。println(RpcServerHandler。channelRead); System。out。println(msg); RpcRequestrpcRequest(RpcRequest) RpcResponserpcResponsehandler(rpcRequest);告诉客户端,关闭socket连接 ctx。writeAndFlush(rpcResponse)。addListener(ChannelFutureListener。CLOSE); }privateRpcResponsehandler(RpcRequestrpcRequest){创建一个响应消息对象 RpcResponserpcResponsenewRpcResponse();设置响应消息ID rpcResponse。setResponseId(UUID。randomUUID()。toString());请求消息ID rpcResponse。setRequestId(rpcRequest。getRequestId());try{获取到类名(接口名称) StringclassNamerpcRequest。getClassName();获取到方法名 StringmethodNamerpcRequest。getMethodName();获取到参数类型列表 C?〔〕parameterTypesrpcRequest。getParameterTypes();获取到参数列表 Object〔〕parametersrpcRequest。getParameters();获取到具字节码对象 C?clzClass。forName(className);获取到实现类 ObjectserviceBeanserviceBeanMap。get(className);if(serviceBeannull){thrownewRuntimeException(className没有找到对应的serviceBean:className:beanMap:serviceBeanMap); }反射调用方法 Methodmethodclz。getMethod(methodName,parameterTypes);if(methodnull)thrownewRuntimeException(没有找到对应的方法); Objectresultmethod。invoke(serviceBean,parameters); rpcResponse。setSuccess(true);设置方法调用的结果 rpcResponse。setResult(result); }catch(Exceptione){ rpcResponse。setSuccess(false); rpcResponse。setThrowable(e); e。printStackTrace(); }returnrpcR } } RpcService定义一个注解,用于标记服务程序的提供者,通过Spring扫描出所有的服务并且保存RpcServerHandler处理RPC客户端请求,调用服务提供者的具体方法,响应执行结果RpcServer扫描所有的服务(标记了RPCService的类),启动RPC服务rpcclientRPC客户端,通过网络通信往RPC服务端发送请求调用消息,接受服务端的响应消息,配置动态代理类,所有的方法调用都通过网络调用发送到RPC服务端其中包括的主要代码:具体实现代码:packagecn。wolfcode。rpc。importcn。wolfcode。rpc。common。RpcDimportcn。wolfcode。rpc。common。RpcEimportcn。wolfcode。rpc。common。RpcRimportcn。wolfcode。rpc。common。RpcRimportcn。wolfcode。rpc。register。RpcDimportio。netty。bootstrap。Bimportio。netty。channel。;importio。netty。channel。nio。NioEventLoopGimportio。netty。channel。socket。SocketCimportio。netty。channel。socket。nio。NioSocketCRPC通信客户端,往服务端发送请求,并且接受服务端的响应publicclassRpcClientextendsSimpleChannelInboundHRpcR{消息响应对象 privateRpcResponserpcR消息请求对象 privateRpcRequestrpcR同步锁资源对象 privateObjectobjectnewObject();用于获取服务地址列表信息 privateRpcDiscoverrpcD构造函数 publicRpcClient(RpcRequestrpcRequest,RpcDiscoverrpcDiscover){this。rpcDiscoverrpcDthis。rpcRequestrpcR }Override protectedvoidchannelRead0(ChannelHandlerContextctx,RpcResponsemsg)throwsException{this。rpcR响应消息 synchronized(object){ ctx。flush();刷新缓存 object。notifyAll();唤醒等待 } }发送消息 publicRpcResponsesend()throwsException{创建一个socket通信对象 BootstrapclientnewBootstrap();创建一个通信组,负责Channel(通道)的IO事件的处理 NioEventLoopGrouploopGroupnewNioEventLoopGroup();try{ client。group(loopGroup)设置参数 。channel(NioSocketChannel。class)使用异步socket通信 。handler(newChannelISocketC(){Override protectedvoidinitChannel(SocketChannelch)throwsException{ ch。pipeline()。addLast(newRpcEncoder(RpcRequest。class))编码请求对象 。addLast(newRpcDecoder(RpcResponse。class))解码响应对象 。addLast(RpcClient。this);发送请求对象 } })。option(ChannelOption。SOKEEPALIVE,true);; StringserverAddressrpcDiscover。discover();获取一个服务器地址 StringhostserverAddress。split(:)〔0〕;intportInteger。valueOf(serverAddress。split(:)〔1〕); ChannelFuturefutureclient。connect(host,port)。sync(); System。out。println(客户端准备发送数据:rpcRequest); future。channel()。writeAndFlush(rpcRequest)。sync();synchronized(object){ object。wait();线程等待,等待客户端响应 }if(rpcResponse!null){ future。channel()。closeFuture()。sync();等待服务端关闭socket }returnrpcR }finally{ loopGroup。shutdownGracefully();优雅关闭socket } } 异常处理 Override publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause) throwsException{ ctx。close(); } } packagecn。wolfcode。rpc。importcn。wolfcode。rpc。common。RpcRimportcn。wolfcode。rpc。common。RpcRimportcn。wolfcode。rpc。register。RpcDimportlombok。Gimportlombok。Simportjava。lang。reflect。InvocationHimportjava。lang。reflect。Mimportjava。lang。reflect。Pimportjava。util。UUID;SetterGetter动态代理类,用于获取到每个类的代理对象对于被代理对象的所有的方法调用都会执行invoke方法publicclassRpcProxy{用于获取到RPCServer的地址信息 privateRpcDiscoverrpcDSuppressWarnings(all)TTgetInstance(CTinterfaceClass){ Tinstance(T)Proxy。newProxyInstance(interfaceClass。getClassLoader(),newC?〔〕{interfaceClass},newInvocationHandler(){Override publicObjectinvoke(Objectproxy,Methodmethod,Object〔〕args)throwsThrowable{创建请求对象 RpcRequestrpcRequestnewRpcRequest();获取到被调用的类名和RPCServer中的serviceMap中的key进行匹配 StringclassNamemethod。getDeclaringClass()。getName();获取到方法的参数列表 C?〔〕parameterTypesmethod。getParameterTypes();生成一个请求的id rpcRequest。setRequestId(UUID。randomUUID()。toString()); rpcRequest。setClassName(className);类名 rpcRequest。setParameterTypes(parameterTypes);参数类型列表 rpcRequest。setParameters(args);参数列表 rpcRequest。setMethodName(method。getName());调用的放方法名称 RpcResponserpcResponsenewRpcClient(rpcRequest,rpcDiscover)。send();创建一个RPCclient对象,并且发送消息到服务端 返回调用结果 returnrpcResponse。getResult(); } });返回一个代理对象 } } RpcProxy对于每一个类都创建一个动态代理对象,并且在invoke方法创建rpc客户端并且发送网络通信请求RpcClientRPC通信客户端,启动RPC通信服务,创建TCP连接,发送请求,接受响应appcommon这是具体应用的通用模块,和具体的项目结构有关系,这里主要包括接口定义和JavaBean对象的定义具体代码为:packagecn。wolfcode。app。publicinterfaceIProductService{ 保存产品 paramproduct voidsave(Productproduct); 根据产品id删除产品 paramproductId voiddeleteById(LongproductId); 修改产品信息 paramproduct voidupdate(Productproduct); 根据产品id获取到产品信息 paramproductId return Productget(LongproductId); } packagecn。wolfcode。app。importlombok。;importjava。math。BigD 产品信息 SetterGetterToStringAllArgsConstructorNoArgsConstructorpublicclassProduct{privateLid privateS产品编号 privateS产品名称 privateBigD产品价格} appserver这个模块主要是定义服务的具体实现和启动Spring容器,在启动Spring容器的时候需要创建RpcRegistry,RpcServer对象具体代码实现:packagecn。wolfcode。app。importcn。wolfcode。app。common。IProductSimportcn。wolfcode。app。common。Pimportcn。wolfcode。rpc。server。RpcSimportorg。springframework。stereotype。Cimportjava。math。BigDComponentRpcService(IProductService。class)publicclassProductServiceImplimplementsIProductService{Override publicvoidsave(Productproduct){ System。out。println(产品保存成功:product); }Override publicvoiddeleteById(LongproductId){ System。out。println(产品删除成功:productId); }Override publicvoidupdate(Productproduct){ System。out。println(产品修改成功:product); }Override publicProductget(LongproductId){ System。out。println(产品获取成功);returnnewProduct(1L,001,笔记本电脑,BigDecimal。TEN); } } packagecn。wolfcode。app。importorg。springframework。context。support。ClassPathXmlApplicationCpublicclassBootAppServer{publicstaticvoidmain(String〔〕args){启动Spring容器 newClassPathXmlApplicationContext(classpath:application。xml); } } 其中配置文件:?xmlversion1。0encodingUTF8?beansxmlnshttp:www。springframework。orgschemabeans xmlns:xsihttp:www。w3。org2001XMLSchemainstance xmlns:contexthttp:www。springframework。orgschemacontext xsi:schemaLocationhttp:www。springframework。orgschemabeans http:www。springframework。orgschemabeansspringbeans。xsd http:www。springframework。orgschemacontext http:www。springframework。orgschemacontextspringcontext。 context:componentscanbasepackagecn。wolfcode。app。 context:propertyplaceholderlocationclasspath:rpc。 beanidserviceRegistryclasscn。wolfcode。rpc。register。RpcR propertynameregistryAddressvalue{registry。address} beanidrpcServerclasscn。wolfcode。rpc。server。RpcS propertynameserverAddressvalue{server。address} propertynamerpcRegistryrefserviceR zookeeperserverregistry。address192。168。158。151:2181rpcserverserver。address192。168。158。1:9090 application。xmlSpring的配置文件log4j。properties日志配置文件rpc。properties服务提供者的地址和端口以及zkServer的连接地址和端口appclient通过Spring的配置创建RpcDiscover对象和RpcProxy对象,其中RpcDiscover用于从注册中心获取到服务的地址信息,RpcProxy用于创建类的动态代理对象测试类:使用Spring的Junit进行测试packagecn。wolfcode。app。importcn。wolfcode。app。common。IProductSimportcn。wolfcode。app。common。Pimportcn。wolfcode。rpc。client。RpcPimportorg。junit。Bimportorg。junit。Timportorg。junit。runner。RunWimportorg。springframework。beans。factory。annotation。Aimportorg。springframework。test。context。ContextCimportorg。springframework。test。context。junit4。SpringJUnit4ClassRimportjava。math。BigD模拟客户端启动RunWith(SpringJUnit4ClassRunner。class)ContextConfiguration(locationsclasspath:application。xml)publicclassAPP{Autowired privateRpcProxyrpcPprivateIProductServiceproductSBefore publicvoidinit(){ productServicerpcProxy。getInstance(IProductService。class); }Test publicvoidtestSave()throwsException{ productService。save(newProduct(2L,002,内衣,BigDecimal。TEN)); }Test publicvoidtestDelete()throwsException{ productService。deleteById(2L); }Test publicvoidtestUpdate()throwsException{ productService。update(newProduct(2L,002,内衣,BigDecimal。ONE)); }Test publicvoidtestGet()throwsException{ ProductproductproductService。get(1L); System。out。println(获取到的产品信息为:product); } } 配置文件信息 application。?xmlversion1。0encodingUTF8?beansxmlnshttp:www。springframework。orgschemabeans xmlns:xsihttp:www。w3。org2001XMLSchemainstance xmlns:contexthttp:www。springframework。orgschemacontext xsi:schemaLocationhttp:www。springframework。orgschemabeans http:www。springframework。orgschemabeansspringbeans。xsd http:www。springframework。orgschemacontext http:www。springframework。orgschemacontextspringcontext。 context:componentscanbasepackagecn。wolfcode。app。 context:propertyplaceholderlocationclasspath:rpc。 beanidserviceRpcDiscoverclasscn。wolfcode。rpc。register。RpcD constructorargnameregistryAddressvalue{registry。address} beanidrpcProxyclasscn。wolfcode。rpc。client。RpcP propertynamerpcDiscoverrefserviceRpcD log4j。propertieslog4j。rootLoggerERROR,consolelog4j。appender。consoleorg。apache。log4j。ConsoleAppender log4j。appender。console。targetSystem。out log4j。appender。console。layoutorg。apache。log4j。PatternLayout log4j。appender。console。layout。ConversionPatternmn log4j。logger。cn。wolfcode。rpcDEBUG rpc。propertieszookeeperserverregistry。address192。168。158。151:2181 对于本文的完整代码下载地址为https:gitee。comheshengjunrpcdemo。git 如果要正常运行,请部署一个zookeeper注册中心,修改rpc。properites的地址即可先运行appserver中的BootAppServer在运行appclient中的APP测试用例