[关闭]
@yudesong 2018-02-12T03:01:01.000000Z 字数 3967 阅读 938

Java RPC简介

Java RPC


RPC,全称为Remote Procedure Call 即远程过程调用,它是一个计算机通信协议,它允许像调用本地服务一样调用远程服务。其核心思想就是:RPC能够让本地应用简单、高效地调用服务器中的过程(服务)。

要实现一个PRC框架,需要考虑一下几个问题:
(1)通信模型,在Java中一般基于BIO或NIO
(2)远程代理对象,对于Java而言,远程代理对象可以使用Java的动态对象实现,封装了调用远程方法调用
(3)序列化,将对象名称、方法名称、参数等对象信息进行网络传输需要转换成二进制传输

Java 实现RPC实例:

  1. // Service 接口
  2. public interface HelloService {
  3. String sayHi(String name);
  4. }
  5. // 实现类
  6. public class HelloServiceImpl implements HelloService {
  7. public String sayHi(String name) {
  8. return "Hi, " + name;
  9. }
  10. }
  11. //Server接口
  12. public interface Server {
  13. void stop();
  14. void start() throws IOException;
  15. void register(Class serviceInterface, Class impl);
  16. boolean isRunning();
  17. int getPort();
  18. }
  19. //Server具体类
  20. public class ServiceCenter implements Server {
  21. private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  22. private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();
  23. private static boolean isRunning = false;
  24. private static int port;
  25. public ServiceCenter(int port) {
  26. this.port = port;
  27. }
  28. public void stop() {
  29. isRunning = false;
  30. executor.shutdown();
  31. }
  32. public void start() throws IOException {
  33. ServerSocket server = new ServerSocket();
  34. server.bind(new InetSocketAddress(port));
  35. System.out.println("start server");
  36. try {
  37. while (true) {
  38. // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行
  39. executor.execute(new ServiceTask(server.accept()));
  40. }
  41. } finally {
  42. server.close();
  43. }
  44. }
  45. public void register(Class serviceInterface, Class impl) {
  46. serviceRegistry.put(serviceInterface.getName(), impl);
  47. }
  48. public boolean isRunning() {
  49. return isRunning;
  50. }
  51. public int getPort() {
  52. return port;
  53. }
  54. private static class ServiceTask implements Runnable {
  55. Socket clent = null;
  56. public ServiceTask(Socket client) {
  57. this.clent = client;
  58. }
  59. public void run() {
  60. ObjectInputStream input = null;
  61. ObjectOutputStream output = null;
  62. try {
  63. // 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果
  64. input = new ObjectInputStream(clent.getInputStream());
  65. String serviceName = input.readUTF();
  66. String methodName = input.readUTF();
  67. Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
  68. Object[] arguments = (Object[]) input.readObject();
  69. Class serviceClass = serviceRegistry.get(serviceName);
  70. if (serviceClass == null) {
  71. throw new ClassNotFoundException(serviceName + " not found");
  72. }
  73. Method method = serviceClass.getMethod(methodName, parameterTypes);
  74. Object result = method.invoke(serviceClass.newInstance(), arguments);
  75. // 3.将执行结果反序列化,通过socket发送给客户端
  76. output = new ObjectOutputStream(clent.getOutputStream());
  77. output.writeObject(result);
  78. } catch (Exception e) {
  79. e.printStackTrace();
  80. } finally {
  81. if (output != null) {
  82. try {
  83. output.close();
  84. } catch (IOException e) {
  85. e.printStackTrace();
  86. }
  87. }
  88. if (input != null) {
  89. try {
  90. input.close();
  91. } catch (IOException e) {
  92. e.printStackTrace();
  93. }
  94. }
  95. if (clent != null) {
  96. try {
  97. clent.close();
  98. } catch (IOException e) {
  99. e.printStackTrace();
  100. }
  101. }
  102. }
  103. }
  104. }
  105. }
  106. // 远程代理对象
  107. public class RPCClient<T> {
  108. public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
  109. // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
  110. return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},
  111. new InvocationHandler() {
  112. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  113. Socket socket = null;
  114. ObjectOutputStream output = null;
  115. ObjectInputStream input = null;
  116. try {
  117. // 2.创建Socket客户端,根据指定地址连接远程服务提供者
  118. socket = new Socket();
  119. socket.connect(addr);
  120. // 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者
  121. output = new ObjectOutputStream(socket.getOutputStream());
  122. output.writeUTF(serviceInterface.getName());
  123. output.writeUTF(method.getName());
  124. output.writeObject(method.getParameterTypes());
  125. output.writeObject(args);
  126. // 4.同步阻塞等待服务器返回应答,获取应答后返回
  127. input = new ObjectInputStream(socket.getInputStream());
  128. return input.readObject();
  129. } finally {
  130. if (socket != null) socket.close();
  131. if (output != null) output.close();
  132. if (input != null) input.close();
  133. }
  134. }
  135. });
  136. }
  137. }
  138. // 测试
  139. public class RPCTest {
  140. public static void main(String[] args) throws IOException {
  141. new Thread(new Runnable() {
  142. public void run() {
  143. try {
  144. Server serviceServer = new ServiceCenter(8088);
  145. serviceServer.register(HelloService.class, HelloServiceImpl.class);
  146. serviceServer.start();
  147. } catch (IOException e) {
  148. e.printStackTrace();
  149. }
  150. }
  151. }).start();
  152. HelloService service = RPCClient.getRemoteProxyObj(HelloService.class, new InetSocketAddress("localhost", 8088));
  153. System.out.println(service.sayHi("test"));
  154. }
  155. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注