站长网 语言 Java从零开启手写一 Reflect 反射实现通用调用之客户端

Java从零开启手写一 Reflect 反射实现通用调用之客户端

上一篇我们介绍了,如何实现基于反射的通用服务端。这一节我们来一起学习下如何实现通用客户端。因为内容较多,所以拆分为 2 个部分。基本思路所有的方法调用,基于反射进行相关处理实现。java 从零开始手写 RPC (06) reflect 反射实现通用调用之客户端核心

上一篇我们介绍了,如何实现基于反射的通用服务端。

 

这一节我们来一起学习下如何实现通用客户端。

 

因为内容较多,所以拆分为 2 个部分。

 

基本思路

所有的方法调用,基于反射进行相关处理实现。

 

java 从零开始手写 RPC (06) reflect 反射实现通用调用之客户端

核心类

为了便于拓展,我们把核心类调整如下:

 

package com.github.houbb.rpc.client.core; 

 

 

import com.github.houbb.heaven.annotation.ThreadSafe; 

import com.github.houbb.log.integration.core.Log; 

import com.github.houbb.log.integration.core.LogFactory; 

import com.github.houbb.rpc.client.core.context.RpcClientContext; 

import com.github.houbb.rpc.client.handler.RpcClientHandler; 

import com.github.houbb.rpc.common.constant.RpcConstant; 

import io.netty.bootstrap.Bootstrap; 

import io.netty.channel.*; 

import io.netty.channel.nio.NioEventLoopGroup; 

import io.netty.channel.socket.nio.NioSocketChannel; 

import io.netty.handler.codec.serialization.ClassResolvers; 

import io.netty.handler.codec.serialization.ObjectDecoder; 

import io.netty.handler.codec.serialization.ObjectEncoder; 

import io.netty.handler.logging.LogLevel; 

import io.netty.handler.logging.LoggingHandler; 

 

 

/** 

 * <p> rpc 客户端 </p> 

 * 

 * <pre> Created: 2019/10/16 11:21 下午  </pre> 

 * <pre> Project: rpc  </pre> 

 * 

 * @author houbinbin 

 * @since 0.0.2 

 */ 

@ThreadSafe 

public class RpcClient { 

 

 

    private static final Log log = LogFactory.getLog(RpcClient.class); 

 

 

    /** 

     * 地址信息 

     * @since 0.0.6 

     */ 

    private final String address; 

 

 

    /** 

     * 监听端口号 

     * @since 0.0.6 

     */ 

    private final int port; 

 

 

    /** 

     * 客户端处理 handler 

     * 作用:用于获取请求信息 

     * @since 0.0.4 

     */ 

    private final ChannelHandler channelHandler; 

 

 

    public RpcClient(final RpcClientContext clientContext) { 

        this.address = clientContext.address(); 

        this.port = clientContext.port(); 

        this.channelHandler = clientContext.channelHandler(); 

    } 

 

 

    /** 

     * 进行连接 

     * @since 0.0.6 

     */ 

    public ChannelFuture connect() { 

        // 启动服务端 

        log.info("RPC 服务开始启动客户端"); 

 

 

        EventLoopGroup workerGroup = new NioEventLoopGroup(); 

 

 

        /** 

         * channel future 信息 

         * 作用:用于写入请求信息 

         * @since 0.0.6 

         */ 

        ChannelFuture channelFuture; 

        try { 

            Bootstrap bootstrap = new Bootstrap(); 

            channelFuture = bootstrap.group(workerGroup) 

                    .channel(NioSocketChannel.class) 

                    .option(ChannelOption.SO_KEEPALIVE, true) 

                    .handler(new ChannelInitializer<Channel>(){ 

                        @Override 

                        protected void initChannel(Channel ch) throws Exception { 

                            ch.pipeline() 

                                    // 解码 bytes=>resp 

                                    .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))) 

                                    // request=>bytes 

                                    .addLast(new ObjectEncoder()) 

                                    // 日志输出 

                                    .addLast(new LoggingHandler(LogLevel.INFO)) 

                                    .addLast(channelHandler); 

                        } 

                    }) 

                    .connect(address, port) 

                    .syncUninterruptibly(); 

            log.info("RPC 服务启动客户端完成,监听地址 {}:{}", address, port); 

        } catch (Exception e) { 

            log.error("RPC 客户端遇到异常", e); 

            throw new RuntimeException(e); 

        } 

        // 不要关闭线程池!!! 

 

 

        return channelFuture; 

    } 

 

 

可以灵活指定对应的服务端地址、端口信息。

 

ChannelHandler 作为处理参数传入。

 

ObjectDecoder、ObjectEncoder、LoggingHandler 都和服务端类似,是 netty 的内置实现。

 

RpcClientHandler

客户端的 handler 实现如下:

 

/* 

 * Copyright (c)  2019. houbinbin Inc. 

 * rpc All rights reserved. 

 */ 

 

 

package com.github.houbb.rpc.client.handler; 

 

 

import com.github.houbb.log.integration.core.Log; 

import com.github.houbb.log.integration.core.LogFactory; 

import com.github.houbb.rpc.client.core.RpcClient; 

import com.github.houbb.rpc.client.invoke.InvokeService; 

import com.github.houbb.rpc.common.rpc.domain.RpcResponse; 

import io.netty.channel.ChannelHandlerContext; 

import io.netty.channel.SimpleChannelInboundHandler; 

 

 

/** 

 * <p> 客户端处理类 </p> 

 * 

 * <pre> Created: 2019/10/16 11:30 下午  </pre> 

 * <pre> Project: rpc  </pre> 

 * 

 * @author houbinbin 

 * @since 0.0.2 

 */ 

public class RpcClientHandler extends SimpleChannelInboundHandler { 

 

 

    private static final Log log = LogFactory.getLog(RpcClient.class); 

 

 

    /** 

     * 调用服务管理类 

     * 

     * @since 0.0.6 

     */ 

    private final InvokeService invokeService; 

 

 

    public RpcClientHandler(InvokeService invokeService) { 

        this.invokeService = invokeService; 

    } 

 

 

    @Override 

    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 

        RpcResponse rpcResponse = (RpcResponse)msg; 

        invokeService.addResponse(rpcResponse.seqId(), rpcResponse); 

        log.info("[Client] response is :{}", rpcResponse); 

    } 

 

 

    @Override 

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 

        // 每次用完要关闭,不然拿不到response,我也不知道为啥(目测得了解netty才行) 

        // 个人理解:如果不关闭,则永远会被阻塞。 

        ctx.flush(); 

        ctx.close(); 

    } 

 

 

    @Override 

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 

        cause.printStackTrace(); 

        ctx.close(); 

    } 

只有 channelRead0 做了调整,基于 InvokeService 对结果进行处理。

 

InvokeService

接口

package com.github.houbb.rpc.client.invoke; 

 

 

import com.github.houbb.rpc.common.rpc.domain.RpcResponse; 

 

 

/** 

 * 调用服务接口 

 * @author binbin.hou 

 * @since 0.0.6 

 */ 

public interface InvokeService { 

 

 

    /** 

     * 添加请求信息 

     * @param seqId 序列号 

     * @return this 

     * @since 0.0.6 

     */ 

    InvokeService addRequest(final String seqId); 

 

 

    /** 

     * 放入结果 

     * @param seqId 唯一标识 

     * @param rpcResponse 响应结果 

     * @return this 

     * @since 0.0.6 

     */ 

    InvokeService addResponse(final String seqId, final RpcResponse rpcResponse); 

 

 

    /** 

     * 获取标志信息对应的结果 

     * @param seqId 序列号 

     * @return 结果 

     * @since 0.0.6 

     */ 

    RpcResponse getResponse(final String seqId); 

 

 

主要是对入参、出参的设置,以及出参的获取。

 

实现

package com.github.houbb.rpc.client.invoke.impl; 

 

 

import com.github.houbb.heaven.util.guava.Guavas; 

import com.github.houbb.heaven.util.lang.ObjectUtil; 

import com.github.houbb.log.integration.core.Log; 

import com.github.houbb.log.integration.core.LogFactory; 

import com.github.houbb.rpc.client.core.RpcClient; 

import com.github.houbb.rpc.client.invoke.InvokeService; 

import com.github.houbb.rpc.common.exception.RpcRuntimeException; 

import com.github.houbb.rpc.common.rpc.domain.RpcResponse; 

 

 

import java.util.Set; 

import java.util.concurrent.ConcurrentHashMap; 

 

 

/** 

 * 调用服务接口 

 * @author binbin.hou 

 * @since 0.0.6 

 */ 

public class DefaultInvokeService implements InvokeService { 

 

 

    private static final Log LOG = LogFactory.getLog(DefaultInvokeService.class); 

 

 

    /** 

     * 请求序列号集合 

     * (1)这里后期如果要添加超时检测,可以添加对应的超时时间。 

     * 可以把这里调整为 map 

     * @since 0.0.6 

     */ 

    private final Set<String> requestSet; 

 

 

    /** 

     * 响应结果 

     * @since 0.0.6 

     */ 

    private final ConcurrentHashMap<String, RpcResponse> responseMap; 

 

 

    public DefaultInvokeService() { 

        requestSet = Guavas.newHashSet(); 

        responseMap = new ConcurrentHashMap<>(); 

    } 

 

 

    @Override 

    public InvokeService addRequest(String seqId) { 

        LOG.info("[Client] start add request for seqId: {}", seqId); 

        requestSet.add(seqId); 

        return this; 

    } 

 

 

    @Override 

    public InvokeService addResponse(String seqId, RpcResponse rpcResponse) { 

        // 这里放入之前,可以添加判断。 

        // 如果 seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃。 

        LOG.info("[Client] 获取结果信息,seq: {}, rpcResponse: {}", seqId, rpcResponse); 

        responseMap.putIfAbsent(seqId, rpcResponse); 

 

 

        // 通知所有等待方 

        LOG.info("[Client] seq 信息已经放入,通知所有等待方", seqId); 

 

 

        synchronized (this) { 

            this.notifyAll(); 

        } 

 

 

        return this; 

    } 

 

 

    @Override 

    public RpcResponse getResponse(String seqId) { 

        try { 

            RpcResponse rpcResponse = this.responseMap.get(seqId); 

            if(ObjectUtil.isNotNull(rpcResponse)) { 

                LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse); 

                return rpcResponse; 

            } 

 

 

            // 进入等待 

            while (rpcResponse == null) { 

                LOG.info("[Client] seq {} 对应结果为空,进入等待", seqId); 

                // 同步等待锁 

                synchronized (this) { 

                    this.wait(); 

                } 

 

 

                rpcResponse = this.responseMap.get(seqId); 

                LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse); 

            } 

 

 

            return rpcResponse; 

        } catch (InterruptedException e) { 

            throw new RpcRuntimeException(e); 

        } 

    } 

使用 requestSet 存储对应的请求入参。

 

使用 responseMap 存储对应的请求出参,在获取的时候通过同步 while 循环等待,获取结果。

 

此处,通过 notifyAll() 和 wait() 进行等待和唤醒。

 

ReferenceConfig-服务端配置

说明

我们想调用服务端,首先肯定要定义好要调用的对象。

 

ReferenceConfig 就是要告诉 rpc 框架,调用的服务端信息。

 

接口

package com.github.houbb.rpc.client.config.reference; 

 

 

import com.github.houbb.rpc.common.config.component.RpcAddress; 

 

 

import java.util.List; 

 

 

/** 

 * 引用配置类 

 * 

 * 后期配置: 

 * (1)timeout 调用超时时间 

 * (2)version 服务版本处理 

 * (3)callType 调用方式 oneWay/sync/async 

 * (4)check 是否必须要求服务启动。 

 * 

 * spi: 

 * (1)codec 序列化方式 

 * (2)netty 网络通讯架构 

 * (3)load-balance 负载均衡 

 * (4)失败策略 fail-over/fail-fast 

 * 

 * filter: 

 * (1)路由 

 * (2)耗时统计 monitor 服务治理 

 * 

 * 优化思考: 

 * (1)对于唯一的 serviceId,其实其 interface 是固定的,是否可以省去? 

 * @author binbin.hou 

 * @since 0.0.6 

 * @param <T> 接口泛型 

 */ 

public interface ReferenceConfig<T> { 

 

 

    /** 

     * 设置服务标识 

     * @param serviceId 服务标识 

     * @return this 

     * @since 0.0.6 

     */ 

    ReferenceConfig<T> serviceId(final String serviceId); 

 

 

    /** 

     * 服务唯一标识 

     * @since 0.0.6 

     */ 

    String serviceId(); 

 

 

    /** 

     * 服务接口 

     * @since 0.0.6 

     * @return 接口信息 

     */ 

    Class<T> serviceInterface(); 

 

 

    /** 

     * 设置服务接口信息 

     * @param serviceInterface 服务接口信息 

     * @return this 

     * @since 0.0.6 

     */ 

    ReferenceConfig<T> serviceInterface(final Class<T> serviceInterface); 

 

 

    /** 

     * 设置服务地址信息 

     * (1)单个写法:ip:port:weight 

     * (2)集群写法:ip1:port1:weight1,ip2:port2:weight2 

     * 

     * 其中 weight 权重可以不写,默认为1. 

     * 

     * @param addresses 地址列表信息 

     * @return this 

     * @since 0.0.6 

     */ 

    ReferenceConfig<T> addresses(final String addresses); 

 

 

    /** 

     * 获取对应的引用实现 

     * @return 引用代理类 

     * @since 0.0.6 

     */ 

    T reference(); 

 

 

实现

package com.github.houbb.rpc.client.config.reference.impl; 

 

 

import com.github.houbb.heaven.constant.PunctuationConst; 

import com.github.houbb.heaven.util.common.ArgUtil; 

import com.github.houbb.heaven.util.guava.Guavas; 

import com.github.houbb.heaven.util.lang.NumUtil; 

import com.github.houbb.rpc.client.config.reference.ReferenceConfig; 

import com.github.houbb.rpc.client.core.RpcClient; 

import com.github.houbb.rpc.client.core.context.impl.DefaultRpcClientContext; 

import com.github.houbb.rpc.client.handler.RpcClientHandler; 

import com.github.houbb.rpc.client.invoke.InvokeService; 

import com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService; 

import com.github.houbb.rpc.client.proxy.ReferenceProxy; 

import com.github.houbb.rpc.client.proxy.context.ProxyContext; 

import com.github.houbb.rpc.client.proxy.context.impl.DefaultProxyContext; 

import com.github.houbb.rpc.common.config.component.RpcAddress; 

import io.netty.channel.ChannelFuture; 

import io.netty.channel.ChannelHandler; 

 

 

import java.util.List; 

 

 

/** 

 * 引用配置类默认实现 

 * 

 * @author binbin.hou 

 * @since 0.0.6 

 * @param <T> 接口泛型 

 */ 

public class DefaultReferenceConfig<T> implements ReferenceConfig<T> { 

 

 

    /** 

     * 服务唯一标识 

     * @since 0.0.6 

     */ 

    private String serviceId; 

 

 

    /** 

     * 服务接口 

     * @since 0.0.6 

     */ 

    private Class<T> serviceInterface; 

 

 

    /** 

     * 服务地址信息 

     * (1)如果不为空,则直接根据地址获取 

     * (2)如果为空,则采用自动发现的方式 

     * 

     * TODO: 这里调整为 set 更加合理。 

     * 

     * 如果为 subscribe 可以自动发现,然后填充这个字段信息。 

     * @since 0.0.6 

     */ 

    private List<RpcAddress> rpcAddresses; 

 

 

    /** 

     * 用于写入信息 

     * (1)client 连接 server 端的 channel future 

     * (2)后期进行 Load-balance 路由等操作。可以放在这里执行。 

     * @since 0.0.6 

     */ 

    private List<ChannelFuture> channelFutures; 

 

 

    /** 

     * 客户端处理信息 

     * @since 0.0.6 

     */ 

    @Deprecated 

    private RpcClientHandler channelHandler; 

 

 

    /** 

     * 调用服务管理类 

     * @since 0.0.6 

     */ 

    private InvokeService invokeService; 

 

 

    public DefaultReferenceConfig() { 

        // 初始化信息 

        this.rpcAddresses = Guavas.newArrayList(); 

        this.channelFutures = Guavas.newArrayList(); 

        this.invokeService = new DefaultInvokeService(); 

    } 

 

 

    @Override 

    public String serviceId() { 

        return serviceId; 

    } 

 

 

    @Override 

    public DefaultReferenceConfig<T> serviceId(String serviceId) { 

        this.serviceId = serviceId; 

        return this; 

    } 

 

 

    @Override 

    public Class<T> serviceInterface() { 

        return serviceInterface; 

    } 

 

 

    @Override 

    public DefaultReferenceConfig<T> serviceInterface(Class<T> serviceInterface) { 

        this.serviceInterface = serviceInterface; 

        return this; 

    } 

 

 

    @Override 

    public ReferenceConfig<T> addresses(String addresses) { 

        ArgUtil.notEmpty(addresses, "addresses"); 

 

 

        String[] addressArray = addresses.split(PunctuationConst.COMMA); 

        ArgUtil.notEmpty(addressArray, "addresses"); 

 

 

        for(String address : addressArray) { 

            String[] addressSplits = address.split(PunctuationConst.COLON); 

            if(addressSplits.length < 2) { 

                throw new IllegalArgumentException("Address must be has ip and port, like 127.0.0.1:9527"); 

            } 

            String ip = addressSplits[0]; 

            int port = NumUtil.toIntegerThrows(addressSplits[1]); 

            // 包含权重信息 

            int weight = 1; 

            if(addressSplits.length >= 3) { 

                weight = NumUtil.toInteger(addressSplits[2], 1); 

            } 

 

 

            RpcAddress rpcAddress = new RpcAddress(ip, port, weight); 

            this.rpcAddresses.add(rpcAddress); 

        } 

 

 

        return this; 

    } 

 

 

    /** 

     * 获取对应的引用实现 

     * (1)处理所有的反射代理信息-方法可以抽离,启动各自独立即可。 

     * (2)启动对应的长连接 

     * @return 引用代理类 

     * @since 0.0.6 

     */ 

    @Override 

    public T reference() { 

        // 1. 启动 client 端到 server 端的连接信息 

        // 1.1 为了提升性能,可以将所有的 client=>server 的连接都调整为一个 thread。 

        // 1.2 初期为了简单,直接使用同步循环的方式。 

        // 创建 handler 

        // 循环连接 

        for(RpcAddress rpcAddress : rpcAddresses) { 

            final ChannelHandler channelHandler = new RpcClientHandler(invokeService); 

            final DefaultRpcClientContext context = new DefaultRpcClientContext(); 

            context.address(rpcAddress.address()).port(rpcAddress.port()).channelHandler(channelHandler); 

            ChannelFuture channelFuture = new RpcClient(context).connect(); 

            // 循环同步等待 

            // 如果出现异常,直接中断?捕获异常继续进行?? 

            channelFutures.add(channelFuture); 

        } 

 

 

        // 2. 接口动态代理 

        ProxyContext<T> proxyContext = buildReferenceProxyContext(); 

        return ReferenceProxy.newProxyInstance(proxyContext); 

    } 

 

 

    /** 

     * 构建调用上下文 

     * @return 引用代理上下文 

     * @since 0.0.6 

     */ 

    private ProxyContext<T> buildReferenceProxyContext() { 

        DefaultProxyContext<T> proxyContext = new DefaultProxyContext<>(); 

        proxyContext.serviceId(this.serviceId); 

        proxyContext.serviceInterface(this.serviceInterface); 

        proxyContext.channelFutures(this.channelFutures); 

        proxyContext.invokeService(this.invokeService); 

        return proxyContext; 

    } 

 

 

这里主要根据指定的服务端信息,初始化对应的代理实现。

 

这里还可以拓展指定权重,便于后期负载均衡拓展,本期暂时不做实现。

 

ReferenceProxy

说明

所有的 rpc 调用,客户端只有服务端的接口。

 

那么,怎么才能和调用本地方法一样调用远程方法呢?

 

答案就是动态代理。

本文来自网络,不代表站长网立场,转载请注明出处:https://www.tzzz.com.cn/html/biancheng/yuyan/2021/1103/19129.html

作者: dawei

【声明】:站长网内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。
联系我们

联系我们

0577-28828765

在线咨询: QQ交谈

邮箱: xwei067@foxmail.com

工作时间:周一至周五,9:00-17:30,节假日休息

返回顶部