您现在的位置是:亿华云 > 系统运维

RPC好,还是HTTP好?不要选错了!

亿华云2025-10-04 03:13:15【系统运维】2人已围观

简介图片来自 包图网下面是 RPC 的演进历史,一开始是 RMI,但是局限 Java 与 Java 之前的通信,不能跨语言;接下来是 http+xml,即 webservice,可以跨语言调用,但是我们知

 

图片来自 包图网

下面是RPC好 RPC 的演进历史,一开始是还HP好 RMI,但是不选局限 Java 与 Java 之前的通信,不能跨语言;接下来是RPC好 http+xml,即 webservice,还HP好可以跨语言调用,不选但是RPC好我们知道 xml 是很大的,很占网络资源。还HP好

然后就是不选 http+json,很轻量级,RPC好很是还HP好要写很多重复的非业务代码;再接下来就是框架阶段了,Google 的不选 GRPC,Facebook 的RPC好 Thrift(现在交给了 Apache),阿里的还HP好 Dubbo,最后到 Spring Cloud 用到的不选 Restful。

这里补充说下,不要说 RPC 好,源码下载也不要说 HTTP 好,两者各有千秋。本质上,两者是可读性和效率之间的抉择,通用性和易用性之间的抉择。最终谁能发展更好,很难说。

RPC 流程图

下面是一个网上的通用流程图,当发起请求的时候,调用方通过动态代理,然后把请求的参数进行序列化,通过网络到达被调用方,被调用方拿到参数,进行反序列化。

然后在本地进行反射调用方法,最后再将计算出来的结果进行序列化返回给调用方,调用法反序列化取得值。

整体就是这样一个流程:

下面是本次手写 RPC 的一个流程图:

用户发起请求访问客户端 rpc-user-service 服务,rpc-user-service 再去调用服务端 rpc-order-service 服务查询订单信息。当中也会经过序列化和反序列化流程。

代码实现

①服务端 rpc-order-service

订单服务 rpc-order-service,这是云服务器一个 maven 项目,这是一个父 pom,然后创建两个子项目,order-api 和 order-provider。

这两个也是 maven 项目,项目结构如下:

②order-api

order-api 是契约,也就是定义接口的,order-provider 需要实现它。然后把它打成一个 jar 包,上传到 nexus 私服,因为 rpc-user-service 也需要引用它,调用 order 服务提供的契约。

RpcRequest 类就是定义 rpc-user-service 请求 rpc-order-service 时,告诉 order 调用哪个类里的哪个方法以及传入的参数是什么。

这里我没有搭建私服,一般公司是有私服的,在自己电脑上用 install 安装到maven 本地仓库即可:

@Data public class RpcRequest implements Serializable {      private String className;     private String methodName;     private Object[] args; } 

③order-provider

先看下项目中的类,类很多,然后我们接下来分别讲解。

首先是网站模板 service 层实现契约,既然是实现,先引用一下 order-api 的 pom:

<dependency>     <groupId>com.jack</groupId>     <artifactId>order-api</artifactId>     <version>1.0-SNAPSHOT</version> </dependency> 

实现类 OrderServiceImpl.class:

//该注解bean加载以后会将bean信息保存到哈希表 @JackRemoteService public class OrderServiceImpl implements IOrderService {      @Override     public String queryOrderList() {          return "this is rpc-order-service queryOrderList method";     }     @Override     public String orderById(String id) {          return "this is rpc-order-service orderById method,param  is " + id;     } } 

细心的小伙伴发现,这里打了一个自定义注解 @JackRemoteService,打这个注解的作用是当 bean 加载完以后把该 bean 的信息保存到哈希表,以供后面的反射调用。

@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Component public @interface JackRemoteService {  } 

注解就是一个打标记的作用,打了标记就需要有人去识别它。这里就需要实现 BeanPostProcessor 接口,重写里面的 postProcessAfterInitialization 方法。

这个方法里干的事就是检查加载的当前 bean 有没有打 JackRemoteService 这个注解,如果打了就把 bean 里面的所有方法添加到哈希表里。

/**  * @author jackxu  * bean加载以后将bean的信息保存到哈希表  */ @Component public class InitialMerdiator implements BeanPostProcessor {      @Override     public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {          if (bean.getClass().isAnnotationPresent(JackRemoteService.class)) {              Method[] methods = bean.getClass().getDeclaredMethods();             for (Method method : methods) {                  //接口名.方法名                 String key = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();                 BeanInfo beanInfo = new BeanInfo();                 beanInfo.setBean(bean);                 beanInfo.setMethod(method);                 Mediator.getInstance().put(key, beanInfo);             }         }         return bean;     } } 

哈希表的定义是 Mediator.class,key 是类名.方法名:

public class Mediator {      public Map<String, BeanInfo> map = new ConcurrentHashMap<>();     private Mediator() {      }     private static volatile Mediator instance;     public static Mediator getInstance() {          if (instance == null) {              synchronized (Mediator.class) {                  if (instance == null) {                      instance = new Mediator();                 }             }         }         return instance;     }     public Map<String, BeanInfo> getMap() {          return map;     }     public void put(String key, BeanInfo beanInfo) {          map.put(key, beanInfo);     } } 

最后在所有 bean 都加载完以后,启动一个 socket 的监听,这样服务端就写好了,等待客户端的请求。

Spring 有一些内置的事件,当完成某种操作时会发出某些事件动作。

比如监听 ContextRefreshedEvent 事件,当所有的 bean 都初始化完成并被成功装载后会触发该事件。

实现 ApplicationListener < ContextRefreshedEvent >接口可以收到监听动作,然后写自己的逻辑。

SocketServerInitial.class:

//spring容器启动完成之后,会发布一个ContextRefreshedEvent @Component public class SocketServerInitial implements ApplicationListener<ContextRefreshedEvent> {      //线程池     private final ExecutorService executorService = new ThreadPoolExecutor(5, 10, 0L,             TimeUnit.MILLISECONDS,             new ArrayBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(),             new ThreadPoolExecutor.AbortPolicy());     @Override     public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {          //启动服务         ServerSocket serverSocket = null;         try {              serverSocket = new ServerSocket(8888);             while (true) {                  Socket socket = serverSocket.accept();                 executorService.execute(new ProcessorHandler(socket));             }         } catch (Exception e) {              e.printStackTrace();         } finally {              //关闭socket             if (serverSocket != null) {                  try {                      serverSocket.close();                 } catch (IOException e) {                      e.printStackTrace();                 }             }         }     } } 

线程池里执行的方法,就是把接收到的 socket 请求,先把 RpcRequest 进行反序列化,然后按照传递过来的接口、方法在哈希表中找到该方法,然后通过反射进行调用,最终将结果返回去。

/**  * @author jack xu  */ public class ProcessorHandler implements Runnable {      private Socket socket;     public ProcessorHandler(Socket socket) {          this.socket = socket;     }     @Override     public void run() {          ObjectOutputStream outputStream = null;         ObjectInputStream inputStream = null;         try {              inputStream = new ObjectInputStream(socket.getInputStream());             //反序列化             RpcRequest request = (RpcRequest) inputStream.readObject();             //根据传过来的参数执行方法             System.out.println("request :" + request);             Object result = processor(request);             System.out.println("response :" + result);             //将计算结果写入输出流             outputStream = new ObjectOutputStream(socket.getOutputStream());             outputStream.writeObject(result);             outputStream.flush();         } catch (Exception e) {              e.printStackTrace();         } finally {              //关闭流             if (inputStream != null) {                  try {                      inputStream.close();                 } catch (IOException e) {                      e.printStackTrace();                 }             }             if (outputStream != null) {                  try {                      outputStream.close();                 } catch (IOException e) {                      e.printStackTrace();                 }             }         }     }     public Object processor(RpcRequest request) {          try {              Map<String, BeanInfo> map = Mediator.getInstance().getMap();             //接口名.方法名             String key = request.getClassName() + "." + request.getMethodName();             //取出方法             BeanInfo beanInfo = map.get(key);             if (beanInfo == null) {                  return null;             }             //bean对象             Object bean = beanInfo.getBean();             //方法             Method method = beanInfo.getMethod();             //反射             return method.invoke(bean, request.getArgs());         } catch (Exception e) {              e.printStackTrace();             return null;         }     } } 

采用 BIO 的传输方式,必须需要执行完毕一个请求后才可以执行下一个请求,这样就会导致效率很低,所以采用线程池的方式解决这个问题。

但是如果请求非常多,依然会出现堵塞,最好的方式是用 Netty 的方式来实现 RPC。

④客户端 rpc-user-service

rpc-user-service 是一个 spring boot 项目,因为最终我们要通过 restful 来调用的,如果用 ssm 搭建太慢了,还是先看下项目整体结构。

我们从 controller 层开始看,首先是引用了接口 order-api,因为我们已经安装到本地的 maven 仓库了,所以直接引用下 pom 即可。

<dependency>     <groupId>com.jack</groupId>     <artifactId>order-api</artifactId>     <version>1.0-SNAPSHOT</version> </dependency>  @RestController public class UserController {      //这里的作用是将接口封装成一个代理对象     @JackReference     private IOrderService orderService;     @JackReference     private IGoodService goodService;     @GetMapping("/test")     public String test() {          return orderService.queryOrderList();     }     @GetMapping("/get")     public String get() {          return goodService.getGoodInfoById(1L);     } } 

我们看到这里也有一个自定义注解 JackReference,它的作用是将打上该注解的接口变为代理对象。

@Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Component public @interface JackReference {  } 

我们还是依葫芦画瓢,当 bean 加载前,这里是 postProcessBeforeInitialization 方法,将打上 JackReference 注解的接口设置为代理对象。

@Component public class ReferenceInvokeProxy implements BeanPostProcessor {      @Autowired     RemoteInvocationHandler invocationHandler;     @Override     public Object postProcessBeforeInitialization(Object bean, String beanName) {          //获取所有字段         Field[] fields = bean.getClass().getDeclaredFields();         for (Field field : fields) {              if (field.isAnnotationPresent(JackReference.class)) {                  field.setAccessible(true);                 Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class<?>[]{ field.getType()}, invocationHandler);                 try {                      field.set(bean, proxy);                 } catch (IllegalAccessException e) {                      e.printStackTrace();                 }             }         }         return bean;     } } 

我们知道 orderService.queryOrderList() 在本地我们是没有这个实例的,也执行不了,所以代理对象里干的就是把要执行的方法、参数封装成 RpcRequest。

然后通过 Socket 发送到服务端,然后拿到返回的数据,让我们看起来就像在本地执行一样,实际是代理对象帮我们干了很多事。

@Component public class RemoteInvocationHandler implements InvocationHandler {      @Value("${ rpc.host}")     private String host;     @Value("${ rpc.port}")     private int port;     @Override     public Object invoke(Object proxy, Method method, Object[] args) {          RpcRequest request = new RpcRequest();         request.setArgs(args);         request.setClassName(method.getDeclaringClass().getName());         request.setMethodName(method.getName());         return send(request);     }     public Object send(RpcRequest request) {          ObjectOutputStream outputStream = null;         ObjectInputStream inputStream = null;         try {              Socket socket = new Socket(host, port);             //IO操作             outputStream = new ObjectOutputStream(socket.getOutputStream());             outputStream.writeObject(request);             outputStream.flush();             inputStream = new ObjectInputStream(socket.getInputStream());             return inputStream.readObject();         } catch (Exception e) {              e.printStackTrace();             return null;         } finally {              //关闭流             if (inputStream != null) {                  try {                      inputStream.close();                 } catch (IOException e) {                      e.printStackTrace();                 }             }             if (outputStream != null) {                  try {                      outputStream.close();                 } catch (IOException e) {                      e.printStackTrace();                 }             }         }     } } 

测试

首先启动服务端,服务端的代码是这样写的,需要加上 ComponentScan 扫包:

/**  * @author jack xu  */ @Configuration @ComponentScan("com.jack") public class Bootstrap {      public static void main(String[] args) {          ApplicationContext applicationContext = new AnnotationConfigApplicationContext(Bootstrap.class);     } } 

已经跑起来了,等待客户端请求:

客户端是 spring boot 项目,正常启动即可:

@SpringBootApplication public class RpcUserServiceApplication {      public static void main(String[] args) {          SpringApplication.run(RpcUserServiceApplication.class, args);     } } 

也跑起来了:

然后打开浏览器访问一下,成功拿到结果了:

服务端也打印出来对应的日志,一次完整的 RPC 请求结束。

结尾

本文的源码在 Github 上:

rpc-user-service:

https://github.com/xuhaoj/rpc-user-service 

rpc-order-service:

https://github.com/xuhaoj/rpc-order-service 

最后总结下我们这用的是多线程+BIO 的模式,感兴趣的小伙伴可以改成 Netty 的方式。

另外请求的地址我们在这里也是写死的,也没有做负载均衡,一般是要搭配注册中心使用的,更完善的还会有监控等功能,真正的 Dubbo 做了很多东西,本文只是探讨研究两个服务间的通信!

作者:小杰博士

编辑:陶家龙

出处:juejin.cn/post/6994803207838892063

很赞哦!(357)