站长网 语言 Java 从零开始手写 RPC-Netty4 达成客户端和服务端

Java 从零开始手写 RPC-Netty4 达成客户端和服务端

说明上一篇代码基于 socket 的实现非常简单,但是对于实际生产,一般使用 netty。至于 netty 的优点可以参考:为什么选择 netty?[1]http://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-nettyjava 从零开始手写 RPC (02)-netty4 实现客户端和

说明

上一篇代码基于 socket 的实现非常简单,但是对于实际生产,一般使用 netty。

 

至于 netty 的优点可以参考:

 

为什么选择 netty?[1]

 

http://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-netty

 

java 从零开始手写 RPC (02)-netty4 实现客户端和服务端

代码实现

maven 引入

 

 

<dependency> 

    <groupId>io.netty</groupId> 

    <artifactId>netty-all</artifactId> 

    <version>${netty.version}</version> 

</dependency> 

引入 netty 对应的 maven 包,此处为 4.1.17.Final。

 

服务端代码实现

netty 的服务端启动代码是比较固定的。

 

package com.github.houbb.rpc.server.core; 

 

 

import com.github.houbb.log.integration.core.Log; 

import com.github.houbb.log.integration.core.LogFactory; 

import com.github.houbb.rpc.server.constant.RpcServerConst; 

import com.github.houbb.rpc.server.handler.RpcServerHandler; 

import io.netty.bootstrap.ServerBootstrap; 

import io.netty.channel.*; 

import io.netty.channel.nio.NioEventLoopGroup; 

import io.netty.channel.socket.nio.NioServerSocketChannel; 

 

 

/** 

 * rpc 服务端 

 * @author binbin.hou 

 * @since 0.0.1 

 */ 

public class RpcServer extends Thread { 

 

 

    private static final Log log = LogFactory.getLog(RpcServer.class); 

 

 

    /** 

     * 端口号 

     */ 

    private final int port; 

 

 

    public RpcServer() { 

        this.port = RpcServerConst.DEFAULT_PORT; 

    } 

 

 

    public RpcServer(int port) { 

        this.port = port; 

    } 

 

 

    @Override 

    public void run() { 

        // 启动服务端 

        log.info("RPC 服务开始启动服务端"); 

 

 

        EventLoopGroup bossGroup = new NioEventLoopGroup(); 

        EventLoopGroup workerGroup = new NioEventLoopGroup(); 

 

 

        try { 

            ServerBootstrap serverBootstrap = new ServerBootstrap(); 

            serverBootstrap.group(workerGroup, bossGroup) 

                    .channel(NioServerSocketChannel.class) 

                    .childHandler(new ChannelInitializer<Channel>() { 

                        @Override 

                        protected void initChannel(Channel ch) throws Exception { 

                            ch.pipeline().addLast(new RpcServerHandler()); 

                        } 

                    }) 

                    // 这个参数影响的是还没有被accept 取出的连接 

                    .option(ChannelOption.SO_BACKLOG, 128) 

                    // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 

                    .childOption(ChannelOption.SO_KEEPALIVE, true); 

 

 

            // 绑定端口,开始接收进来的链接 

            ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly(); 

            log.info("RPC 服务端启动完成,监听【" + port + "】端口"); 

 

 

            channelFuture.channel().closeFuture().syncUninterruptibly(); 

            log.info("RPC 服务端关闭完成"); 

        } catch (Exception e) { 

            log.error("RPC 服务异常", e); 

        } finally { 

            workerGroup.shutdownGracefully(); 

            bossGroup.shutdownGracefully(); 

        } 

    } 

 

 

为了简单,服务端启动端口号固定,RpcServerConst 常量类内容如下:

 

public final class RpcServerConst { 

 

 

    private RpcServerConst(){} 

 

 

    /** 

     * 默认端口 

     * @since 0.0.1 

     */ 

    public static final int DEFAULT_PORT = 9627; 

 

 

RpcServerHandler

当然,还有一个比较核心的类就是 RpcServerHandler

 

public class RpcServerHandler extends SimpleChannelInboundHandler { 

    @Override 

    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 

        // do nothing now 

    } 

目前是空实现,后续可以添加对应的日志输出及逻辑处理。

 

测试

启动测试的代码非常简单:

 

/** 

 * 服务启动代码测试 

 * @param args 参数 

 */ 

public static void main(String[] args) { 

    new RpcServer().start(); 

 

 

 

    public RpcClient(int port) { 

        this.port = port; 

    } 

 

 

    public RpcClient() { 

        this(9527); 

    } 

 

 

    @Override 

    public void run() { 

        // 启动服务端 

        log.info("RPC 服务开始启动客户端"); 

 

 

        EventLoopGroup workerGroup = new NioEventLoopGroup(); 

 

 

        try { 

            Bootstrap bootstrap = new Bootstrap(); 

            ChannelFuture channelFuture = bootstrap.group(workerGroup) 

                    .channel(NioSocketChannel.class) 

                    .option(ChannelOption.SO_KEEPALIVE, true) 

                    .handler(new ChannelInitializer<Channel>(){ 

                        @Override 

                        protected void initChannel(Channel ch) throws Exception { 

                            ch.pipeline() 

                                    .addLast(new LoggingHandler(LogLevel.INFO)) 

                                    .addLast(new RpcClientHandler()); 

                        } 

                    }) 

                    .connect("localhost", port) 

                    .syncUninterruptibly(); 

 

 

            log.info("RPC 服务启动客户端完成,监听端口:" + port); 

            channelFuture.channel().closeFuture().syncUninterruptibly(); 

            log.info("RPC 服务开始客户端已关闭"); 

        } catch (Exception e) { 

            log.error("RPC 客户端遇到异常", e); 

        } finally { 

            workerGroup.shutdownGracefully(); 

        } 

    } 

 

 

.connect("localhost", port) 声明了客户端需要连接的服务端,此处和服务端的端口保持一致。

 

RpcClientHandler

客户端处理类也比较简单,暂时留空。

本文来自网络,不代表站长网立场,转载请注明出处:https://www.tzzz.com.cn/html/biancheng/yuyan/2021/1104/19752.html

作者: dawei

【声明】:站长网内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。
联系我们

联系我们

0577-28828765

在线咨询: QQ交谈

邮箱: xwei067@foxmail.com

工作时间:周一至周五,9:00-17:30,节假日休息

返回顶部