@yudesong
2018-02-12T03:01:01.000000Z
字数 3967
阅读 938
Java RPC
RPC,全称为Remote Procedure Call 即远程过程调用,它是一个计算机通信协议,它允许像调用本地服务一样调用远程服务。其核心思想就是:RPC能够让本地应用简单、高效地调用服务器中的过程(服务)。
要实现一个PRC框架,需要考虑一下几个问题:
(1)通信模型,在Java中一般基于BIO或NIO
(2)远程代理对象,对于Java而言,远程代理对象可以使用Java的动态对象实现,封装了调用远程方法调用
(3)序列化,将对象名称、方法名称、参数等对象信息进行网络传输需要转换成二进制传输
Java 实现RPC实例:
// Service 接口public interface HelloService {String sayHi(String name);}// 实现类public class HelloServiceImpl implements HelloService {public String sayHi(String name) {return "Hi, " + name;}}//Server接口public interface Server {void stop();void start() throws IOException;void register(Class serviceInterface, Class impl);boolean isRunning();int getPort();}//Server具体类public class ServiceCenter implements Server {private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();private static boolean isRunning = false;private static int port;public ServiceCenter(int port) {this.port = port;}public void stop() {isRunning = false;executor.shutdown();}public void start() throws IOException {ServerSocket server = new ServerSocket();server.bind(new InetSocketAddress(port));System.out.println("start server");try {while (true) {// 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行executor.execute(new ServiceTask(server.accept()));}} finally {server.close();}}public void register(Class serviceInterface, Class impl) {serviceRegistry.put(serviceInterface.getName(), impl);}public boolean isRunning() {return isRunning;}public int getPort() {return port;}private static class ServiceTask implements Runnable {Socket clent = null;public ServiceTask(Socket client) {this.clent = client;}public void run() {ObjectInputStream input = null;ObjectOutputStream output = null;try {// 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果input = new ObjectInputStream(clent.getInputStream());String serviceName = input.readUTF();String methodName = input.readUTF();Class<?>[] parameterTypes = (Class<?>[]) input.readObject();Object[] arguments = (Object[]) input.readObject();Class serviceClass = serviceRegistry.get(serviceName);if (serviceClass == null) {throw new ClassNotFoundException(serviceName + " not found");}Method method = serviceClass.getMethod(methodName, parameterTypes);Object result = method.invoke(serviceClass.newInstance(), arguments);// 3.将执行结果反序列化,通过socket发送给客户端output = new ObjectOutputStream(clent.getOutputStream());output.writeObject(result);} catch (Exception e) {e.printStackTrace();} finally {if (output != null) {try {output.close();} catch (IOException e) {e.printStackTrace();}}if (input != null) {try {input.close();} catch (IOException e) {e.printStackTrace();}}if (clent != null) {try {clent.close();} catch (IOException e) {e.printStackTrace();}}}}}}// 远程代理对象public class RPCClient<T> {public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {// 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},new InvocationHandler() {public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {Socket socket = null;ObjectOutputStream output = null;ObjectInputStream input = null;try {// 2.创建Socket客户端,根据指定地址连接远程服务提供者socket = new Socket();socket.connect(addr);// 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者output = new ObjectOutputStream(socket.getOutputStream());output.writeUTF(serviceInterface.getName());output.writeUTF(method.getName());output.writeObject(method.getParameterTypes());output.writeObject(args);// 4.同步阻塞等待服务器返回应答,获取应答后返回input = new ObjectInputStream(socket.getInputStream());return input.readObject();} finally {if (socket != null) socket.close();if (output != null) output.close();if (input != null) input.close();}}});}}// 测试public class RPCTest {public static void main(String[] args) throws IOException {new Thread(new Runnable() {public void run() {try {Server serviceServer = new ServiceCenter(8088);serviceServer.register(HelloService.class, HelloServiceImpl.class);serviceServer.start();} catch (IOException e) {e.printStackTrace();}}}).start();HelloService service = RPCClient.getRemoteProxyObj(HelloService.class, new InetSocketAddress("localhost", 8088));System.out.println(service.sayHi("test"));}}