基于 netty 的 rpc 框架

本贴最后更新于 1810 天前,其中的信息可能已经事过境迁

自主实现

==simplerpc==

实现思路

netty 消息定义

请求参数

@Getter
@Setter
public class RpcRequest {
    /**
     * 唯一请求ID
     */
    private String requestId;
    /**
     * 接口名称
     */
    private String interfaceName;
    /**
     * 接口版本
     */
    private String serviceVersion;
    /**
     * 方法名称
     */
    private String methodName;
    /**
     * 参数类型
     */
    private Class<?>[] parameterTypes;
    /**
     * 参数
     */
    private Object[] parameters;
}

响应参数

@Getter
@Setter
public class RpcResponse {
    /**
     * 请求id
     */
    private String requestId;
    /**
     * 异常
     */
    private Exception exception;
    /**
     * 返回数据
     */
    private Object result;
}

消息解析

解码器

public class RpcDecoder extends ByteToMessageDecoder {

    private Class<?> genericClass;

    public RpcDecoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);
        out.add(SerializationUtil.deserialize(data, genericClass));
    }
}

编码器

public class RpcEncoder extends MessageToByteEncoder {

    private Class<?> genericClass;

    public RpcEncoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            byte[] data = SerializationUtil.serialize(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

服务端实现

注解,用来声明服务

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {

    /**
     * 服务接口类
     */
    Class<?> value();

    /**
     * 服务版本号
     */
    String version() default "";
}

handler 处理通讯, 然后反射调用实际对应的 bean

@Slf4j
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RpcServerHandler.class);

    private final Map<String, Object> handlerMap;

    public RpcServerHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    @Override
    public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
        // 创建并初始化 RPC 响应对象
        log.info("requestId" + request.getRequestId());
        RpcResponse response = new RpcResponse();
        response.setRequestId(request.getRequestId());
        try {
            Object result = handle(request);
            response.setResult(result);
        } catch (Exception e) {
            LOGGER.error("handle result failure", e);
            response.setException(e);
        }
        ctx.writeAndFlush(response);
    }

    private Object handle(RpcRequest request) throws Exception {
        // 获取服务对象
        String serviceName = request.getInterfaceName();
        String serviceVersion = request.getServiceVersion();
        if (StrUtil.isNotEmpty(serviceVersion)) {
            serviceName += "-" + serviceVersion;
        }
        Object serviceBean = handlerMap.get(serviceName);
        if (serviceBean == null) {
            throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName));
        }
        // 获取反射调用所需的参数
        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParameters();

        FastClass serviceFastClass = FastClass.create(serviceClass);
        FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
        return serviceFastMethod.invoke(serviceBean, parameters);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        LOGGER.error("server caught exception", cause);
        ctx.close();
    }
}

服务端实现,主要获取 serviceBean 集合,启动 netty 服务,其中需要将服务注册到 zk 上,方便客户端消费

@Slf4j
public class RpcServer implements ApplicationContextAware, InitializingBean, DisposableBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);

    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    private String serviceAddress;

    private ServiceRegistry serviceRegistry;

    /**
     * 存放 服务名 与 服务对象 之间的映射关系
     */
    private Map<String, Object> handlerMap = new HashMap<>();

    public RpcServer(String serviceAddress) {
        this.serviceAddress = serviceAddress;
    }

    public RpcServer(String serviceAddress, ServiceRegistry serviceRegistry) {
        this.serviceAddress = serviceAddress;
        this.serviceRegistry = serviceRegistry;
    }

    @Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        // 扫描带有 RpcService 注解的类并初始化 handlerMap 对象
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
        if (CollUtil.isNotEmpty(serviceBeanMap)) {
            for (Object serviceBean : serviceBeanMap.values()) {
                RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class);
                String serviceName = rpcService.value().getName();
                String serviceVersion = rpcService.version();
                if (StrUtil.isNotEmpty(serviceVersion)) {
                    serviceName += "-" + serviceVersion;
                }
                handlerMap.put(serviceName, serviceBean);
            }
        }
        log.info("handlerMap " +  handlerMap.size());
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        try {
            // 创建并初始化 Netty 服务端 Bootstrap 对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();
                    //pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0));
                    pipeline.addLast(new RpcDecoder(RpcRequest.class)); // 解码 RPC 请求
                    pipeline.addLast(new RpcEncoder(RpcResponse.class)); // 编码 RPC 响应
                    pipeline.addLast(new RpcServerHandler(handlerMap)); // 处理 RPC 请求
                }
            });
            bootstrap.option(ChannelOption.SO_BACKLOG, 128);
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            // 获取 RPC 服务器的 IP 地址与端口号
            String[] addressArray = StrUtil.split(serviceAddress, ":");
            String ip = addressArray[0];
            int port = Integer.parseInt(addressArray[1]);
            // 启动 RPC 服务器
            ChannelFuture future = bootstrap.bind(ip, port).sync();
            // 注册 RPC 服务地址
            if (serviceRegistry != null) {
                for (String interfaceName : handlerMap.keySet()) {
                    log.info(serviceAddress);
                    serviceRegistry.register(interfaceName, serviceAddress);
                    LOGGER.debug("register service: {} => {}", interfaceName, serviceAddress);
                }
            }
            LOGGER.debug("server started on port {}", port);
            // 关闭 RPC 服务器
            future.channel().closeFuture().sync();
        }catch (Exception e){
            LOGGER.error("开启服务异常",e);
        }
    }

    @Override
    public void destroy() throws Exception {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }
}

客户端实现

注解,声明哪里需要代理

@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcReference {

    /**
     * 服务版本号
     */
    String version() default "";
}

网络通讯处理,通过 requestId,来区分或者接受返回,其中使用 CompletableFuture 实现异步

@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {

    private ConcurrentHashMap<String, CompletableFuture<RpcResponse>> responses = new ConcurrentHashMap<>();

    private volatile Channel channel;

    public Channel getChannel() {
        return channel;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        this.channel = ctx.channel();
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
        String requestId = response.getRequestId();
        log.info("requestId {}", requestId);
        //异步处理消息返回,有消息就更新 然后get才能够获取到
        CompletableFuture<RpcResponse> rpcFuture = responses.get(requestId);
        if (rpcFuture != null) {
            responses.remove(requestId);
            rpcFuture.complete(response);
            log.info("read done {}" , rpcFuture.isDone());
        }else{
            log.error("没有对应的CompletableFuture");
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("api caught exception", cause);
        ctx.close();
    }

    public CompletableFuture<RpcResponse> sendRequest(RpcRequest rpcRequest){
        try {
            log.info("sendRequest {}" , JSON.toJSONString(rpcRequest));
            CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
            String name = rpcRequest.getInterfaceName() + "-" + rpcRequest.getServiceVersion();
            responses.put(rpcRequest.getRequestId(), responseFuture);
            //发送请求
            channel.writeAndFlush(rpcRequest).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    log.info("发送请求 " + rpcRequest.getRequestId());
                }
            });;
            log.info("isActive {}", channel.isActive());
            return responseFuture;
        }catch(Exception e){
            log.info(e.getMessage());
        }
        return null;
    }
}

缓存 handler(其本质是 channel),在代理的时候获取并发起请求 (sendRequest)

@Slf4j
public class ConnectManager {

    private static volatile ConnectManager connectManager;

    /**
     * key 为 interfaceName + version
     * value 为对应服务的channel
     */
    private Map<String, List<RpcClientHandler>> rpcClientHandlerMap = new ConcurrentHashMap<>();

    /**
     * key 为 interfaceName
     * value 为对应服务的list
     */
    private Map<String,List<String>> rpcServerMap = new ConcurrentHashMap<>();

    public static ConnectManager getInstance() {
        if (connectManager == null) {
            synchronized (ConnectManager.class) {
                if (connectManager == null) {
                    connectManager = new ConnectManager();
                }
            }
        }
        return connectManager;
    }

    public void addServer(String name,String server){
        List<String> servers = rpcServerMap.get(name);
        if(servers == null){
            servers = new ArrayList<>();
        }
        servers.add(server);
        log.info("servers {} , size {}", name ,servers.size());
        rpcServerMap.put(name,servers);
    }

    public List<String> getServers(String name){
        return rpcServerMap.get(name);
    }


    public void addRpcClientHandler(RpcClientHandler rpcClientHandler,String name){
        List<RpcClientHandler> handlers = rpcClientHandlerMap.get(name);
        if(handlers == null){
            handlers = new ArrayList<>();
        }
        handlers.add(rpcClientHandler);
        log.info("handlers {} , size {}", name ,handlers.size());
        rpcClientHandlerMap.put(name,handlers);
    }

    public RpcClientHandler getRpcClientHandler(String name){
        log.info("getRpcClientHandler name {}" , name);
        RpcClientHandler handler = null;
        List<RpcClientHandler> handlers = rpcClientHandlerMap.get(name);
        if(handlers.size() > 0 ){
            handler = handlers.get(RandomUtil.randomInt(handlers.size()));
        }
        log.info("handler== {}", handler);
        return handler;
    }

}

代理工厂,替换实际调用,无感知使用 netty,sendRequest 获取 CompletableFuture,然后使用 get()获取返回
需要注意的是如果想要异步返回,目前是没有实现的。

可行方案:

  1. 接口返回声明为 CompletableFuture 然后在此加判断返回为 CompletableFuture 类型,直接返回 !

    问题:是否可以直接返回 CompletableFuture< 具体对象 >

  2. 放在 threadLocal 中?是否可行

@Slf4j
public class RpcProxy {
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcProxy.class);

    private String serviceAddress;

    private ServiceDiscovery serviceDiscovery;

    public RpcProxy(String serviceAddress) {
        this.serviceAddress = serviceAddress;
    }

    public RpcProxy(ServiceDiscovery serviceDiscovery) {
        this.serviceDiscovery = serviceDiscovery;
    }

    @SuppressWarnings("unchecked")
    public <T> T create(final Class<?> interfaceClass) {
        return create(interfaceClass, "");
    }

    @SuppressWarnings("unchecked")
    public <T> T create(final Class<?> interfaceClass, final String serviceVersion) {
        // 创建动态代理对象
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        // 创建 RPC 请求对象并设置请求属性
                        RpcRequest request = new RpcRequest();
                        request.setRequestId(UUID.randomUUID().toString());
                        request.setInterfaceName(method.getDeclaringClass().getName());
                        request.setServiceVersion(serviceVersion);
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(args);
                        // 创建 RPC 客户端对象并发送 RPC 请求
                        String name = ServiceUtil.buildServiceName(request.getInterfaceName(),request.getServiceVersion());

                        log.info("invoke {}" , name);
                        RpcClientHandler handler = ConnectManager.getInstance().getRpcClientHandler(name);

                        log.info("handler {}" , handler != null );
                        CompletableFuture<RpcResponse> rpcResponse = handler.sendRequest(request);

                        RpcResponse response = rpcResponse.get();
                        if(response.getException() == null) {
                            return response.getResult();
                        }else{
                            throw response.getException();
                        }
                    }

                }
        );
    }
}

根据注解,注入代理 bean

@Slf4j
public class ClientFactory extends InstantiationAwareBeanPostProcessorAdapter implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {

        log.info("test inject");
        ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
            @Override
            public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                if (field.isAnnotationPresent(RpcReference.class)) {
                    // valid
                    Class interfaceName = field.getType();
                    if (!interfaceName.isInterface()) {
                        log.error("声明类不是接口异常");
                    }

                    RpcReference rpcReference = field.getAnnotation(RpcReference.class);
                    String version = rpcReference.version();
                    String inName = interfaceName.getName();
                    String name = ServiceUtil.buildServiceName(inName,version);
                    //代理

                    log.info("interfaceName {}" , inName);

                    //获取数据
                    RpcProxy proxy = applicationContext.getBean(RpcProxy.class);
                    Object serviceProxy = proxy.create(interfaceName,version);

                    ServiceDiscovery serviceDiscovery = applicationContext.getBean(ServiceDiscovery.class);

                    List<String> addressList = null;
                    // 获取 RPC 服务地址
                    if (serviceDiscovery != null) {
                        String serviceName = interfaceName.getName();
                        if (StrUtil.isNotEmpty(version)) {
                            serviceName += "-" + version;
                        }
                        addressList = serviceDiscovery.discover(serviceName);
                        log.info("serviceName {} addressList {}",name,addressList);
                    }
                    if (CollectionUtil.isEmpty(addressList)) {
                        log.error("server address is empty");
                        throw new RuntimeException("server address is empty");
                    }
                    // 从 RPC 服务地址中解析主机名与端口号
                    for(String serviceAddress:addressList) {
                        log.info("serviceAddress {}",serviceAddress);
                        String[] array = StrUtil.split(serviceAddress, ":");
                        String host = array[0];
                        int port = Integer.parseInt(array[1]);

                        ConnectManager connectManager = ConnectManager.getInstance();
                        List<String> servers = connectManager.getServers(name);
                        if(CollectionUtil.contains(servers,serviceAddress)){
                            //已开启对应服务
                            continue;
                        }
                        connectManager.addServer(name,serviceAddress);
                        //需要保证 host + port唯一 避免启动多个客户端连接同一个服务端
                        RpcClient client = new RpcClient(host, port, name);
                        //启动连接
                        client.start();
                    }

                    // set bean
                    field.setAccessible(true);
                    field.set(bean, serviceProxy);

                }
            }
        });

        return super.postProcessAfterInstantiation(bean, beanName);
    }

}

启动连接,也就是 rpcClient

@Slf4j
public class RpcClient {

    private final String host;
    private final int port;
    private final String name;

    public RpcClient(String host, int port, String name) {
        this.host = host;
        this.port = port;
        this.name = name;
    }

    public void start(){

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建并初始化 Netty 客户端 Bootstrap 对象
            Bootstrap bootstrap = new Bootstrap();
            RpcClientHandler rpcClientHandler = new RpcClientHandler();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();
                    //pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0));
                    pipeline.addLast(new RpcEncoder(RpcRequest.class)); // 编码 RPC 请求
                    pipeline.addLast(new RpcDecoder(RpcResponse.class)); // 解码 RPC 响应
                    pipeline.addLast(rpcClientHandler); // 处理 RPC 响应
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);

            log.info("host{}:port{}", host , port);
            ChannelFuture future = bootstrap.connect(host, port).sync();

            //创建的连接之后需要共享

            Channel channel = future.channel();

            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(final ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        log.info("连接成功");
                        ConnectManager.getInstance().addRpcClientHandler(rpcClientHandler,name);
                    }
                }
            });
        } catch(Exception e){
            log.error("开启连接异常",e);
        }
    }

}

待扩展点

服务注册与服务发现(完善)

负载均衡,如何选取 server(目前单纯的随机)

项目 starter 化,更加简单的使用

netty 线程模型

  1. 单线程模型
  2. 多线程模型
  3. 主从多线程模型

客户端

EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);
Bootstrap b = new Bootstrap();
 b.group(eventLoopGroup)
         .channel(NioSocketChannel.class)
         .handler(new RpcClientInitializer());

 ChannelFuture channelFuture = b.connect(remotePeer);
 channelFuture.addListener(new ChannelFutureListener() {
     @Override
     public void operationComplete(final ChannelFuture channelFuture) throws Exception {
         if (channelFuture.isSuccess()) {
             logger.debug("Successfully connect to remote server. remote peer = " + remotePeer);
             RpcClientHandler handler = channelFuture.channel().pipeline().get(RpcClientHandler.class);
             addHandler(handler);
         }
     }
 });

服务端

bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline()
                        .addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0))
                        .addLast(new RpcDecoder(RpcRequest.class))
                        .addLast(new RpcEncoder(RpcResponse.class))
                        .addLast(new RpcHandler(handlerMap));
            }
        })
        .option(ChannelOption.SO_BACKLOG, 128)
        .childOption(ChannelOption.SO_KEEPALIVE, true);

String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);

ChannelFuture future = bootstrap.bind(host, port).sync();
logger.info("Server started on port {}", port);

if (serviceRegistry != null) {
    serviceRegistry.register(serverAddress);
}

future.channel().closeFuture().sync();
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 24 关注
  • RPC
    16 引用 • 22 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...