Skip to content

2019 08 22 netty案例,netty4.1中级拓展篇七《Netty请求响应同步通信》

fuzhengwei edited this page May 16, 2020 · 1 revision

作者:小傅哥
博客:https://bugstack.cn - 原创系列专题

沉淀、分享、成长,让自己和他人都能有所收获!

前言介绍

在我们实现开发RPC框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们RPC框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。这里我们选择netty作为我们的socket框架,采用future方式进行通信。

Dubbo:国内最早开源的 RPC 框架,由阿里巴巴公司开发并于 2011 年末对外开源,仅支持 Java 语言。 Motan:微博内部使用的 RPC 框架,于 2016 年对外开源,仅支持 Java 语言。 Tars:腾讯内部使用的 RPC 框架,于 2017 年对外开源,仅支持 C++ 语言。 Spring Cloud:国外 Pivotal 公司 2014 年对外开源的 RPC 框架,仅支持 Java 语言 gRPC:Google 于 2015 年对外开源的跨语言 RPC 框架,支持多种语言。 Thrift:最初是由 Facebook 开发的内部系统跨语言的 RPC 框架,2007 年贡献给了 Apache 基金,成为 Apache 开源项目之一,支持多种语言。 hprose:一个MIT开源许可的新型轻量级跨语言跨平台的面向对象的高性能远程动态通讯中间件。它支持众多语言:nodeJs, C++, .NET, Java, Delphi, Objective-C, ActionScript, JavaScript, ASP, PHP, Python, Ruby, Perl, Golang 。

环境准备

  1. jdk1.8【jdk1.7以下只能部分支持netty】
  2. Netty4.1.36.Final【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】

代码示例

itstack-demo-rpc-2-07
└── src
    └── main
    │    └── java
    │        └── org.itstack.demo.netty
    │             ├── client
    │             │   ├── ClientSocket.java
    │             │   └── MyClientHandler.java  
    │             ├── codec
    │             │   ├── RpcDecoder.java
    │             │   └── RpcEncoder.java  
    │             ├── future
    │             │   ├── SyncWrite.java    
    │             │   ├── SyncWriteFuture.java  
    │             │   ├── SyncWriteMap.java 
    │             │   └── WriteFuture.java  
    │             ├── msg
    │             │   ├── Request.java
    │             │   └── Response.java 
    │             ├── server
    │             │   ├── MyServerHandler.java
    │             │   └── ServerSocket.java     
    │             └── util
    │                 └── SerializationUtil.java    
    └── test
         └── java
             └── org.itstack.demo.test
                 ├── StartClient.java
                 └── StartServer.java    

** 展示部分重要代码块,完整代码可以关注公众号获取;bugstack虫洞栈 **

MyClientHandler.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
        Response msg = (Response) obj;
        String requestId = msg.getRequestId();
        SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
        if (future != null) {
            future.setResponse(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}

SyncWrite.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class SyncWrite {

    public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {

        if (channel == null) {
            throw new NullPointerException("channel");
        }
        if (request == null) {
            throw new NullPointerException("request");
        }
        if (timeout <= 0) {
            throw new IllegalArgumentException("timeout <= 0");
        }

        String requestId = UUID.randomUUID().toString();
        request.setRequestId(requestId);

        WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId());
        SyncWriteMap.syncKey.put(request.getRequestId(), future);

        Response response = doWriteAndSync(channel, request, timeout, future);

        SyncWriteMap.syncKey.remove(request.getRequestId());
        return response;
    }

    private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {

        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                writeFuture.setWriteResult(future.isSuccess());
                writeFuture.setCause(future.cause());
                //失败移除
                if (!writeFuture.isWriteSuccess()) {
                    SyncWriteMap.syncKey.remove(writeFuture.requestId());
                }
            }
        });

        Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);
        if (response == null) {
            if (writeFuture.isTimeout()) {
                throw new TimeoutException();
            } else {
                // write exception
                throw new Exception(writeFuture.cause());
            }
        }
        return response;
    }

}

MyServerHandler.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class MyServerHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj){
        Request msg = (Request) obj;
        //反馈
        Response request = new Response();
        request.setRequestId(msg.getRequestId());
        request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。");
        ctx.writeAndFlush(request);
        //释放
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

}

StartClient.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class StartClient {

    private static ChannelFuture future;

    public static void main(String[] args) {
        ClientSocket client = new ClientSocket();
        new Thread(client).start();

        while (true) {
            try {
                //获取future,线程有等待处理时间
                if (null == future) {
                    future = client.getFuture();
                    Thread.sleep(500);
                    continue;
                }
                //构建发送参数
                Request request = new Request();
                request.setResult("查询{bugstack虫洞栈}用户信息");
                SyncWrite s = new SyncWrite();
                Response response = s.writeAndSync(future.channel(), request, 1000);
                System.out.println("调用结果:" + JSON.toJSON(response));
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

StartServer.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class StartServer {

    public static void main(String[] args) {
        new Thread(new ServerSocket()).start();
        System.out.println("itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}");
    }

}

测试结果

启动StartServer

itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}

启动StartClient

调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"188ba00e-3a0d-4094-9475-c7ee93104011"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"bc9d63d4-9d37-406a-9c0f-a68211ac466f"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"cceb510b-8179-46ab-abc6-eb7d5b6c0ac2"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"f65aafd0-07b7-4bdb-be80-a57b4c58ad2d"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"e6700074-380b-441f-ae0d-f71dcd7f84c9"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"25a0a9d5-46d5-4da6-ad3f-1496ca20bb17"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"052dce21-dfb9-42d7-bbcf-46137b9933df"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"4fd968fa-3171-4e4f-838d-4a215a90da00"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"97f6eb6e-8c5d-49f4-beba-2ba7e1ff953f"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"ffc48115-8e62-43a8-b3f7-035390427d37"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"4bafefc9-4beb-49f6-8126-29e0e03a55d1"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"10367786-24fd-4f11-9465-bcd2c87b4027"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"82293ad3-a694-405c-ab03-01624f38b1ad"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"0c163cb8-be5f-4697-931f-61f5bf487bae"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"0f1e7611-4fd0-43b4-86dc-fce09965046e"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"f22ac0dc-974c-4de1-bcdf-1566ca0b2305"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"1907456b-2507-4ddd-8c1e-a86c990e3e72"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"6a99b1f2-5859-4ed7-9d17-98229c13250f"}

Process finished with exit code -1


上一篇:netty案例,netty4.1中级拓展篇六《SpringBoot+Netty+Elasticsearch收集日志信息数据存储》

下一篇:netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》

微信搜索「bugstack虫洞栈」公众号,关注后回复「Netty专题案例」获取本文源码&更多原创专题案例!

📝 首页

🌏 知识星球码农会锁

实战项目:「DDD+RPC分布式抽奖系统」、专属小册、问题解答、简历指导、架构图稿、视频课程

🐲 头条

⛳ 目录

  1. 源码 - :octocat: 公众号:bugstack虫洞栈 文章所涉及到的全部开源代码
  2. Java
  3. Spring
  4. 面向对象
  5. 中间件
  6. Netty 4.x
  7. 字节码编程
  8. 💯实战项目
  9. 部署 Dev-Ops
  10. 📚PDF 下载
  11. 关于

💋 精选

🐾 友链

建立本开源项目的初衷是基于个人学习与工作中对 Java 相关技术栈的总结记录,在这里也希望能帮助一些在学习 Java 过程中遇到问题的小伙伴,如果您需要转载本仓库的一些文章到自己的博客,请按照以下格式注明出处,谢谢合作。

作者:小傅哥
链接:https://bugstack.cn
来源:bugstack虫洞栈

2021年10月24日,小傅哥 的文章全部开源到代码库 CodeGuide 中,与同好同行,一起进步,共同维护。

这里我提供 3 种方式:

  1. 提出 Issue :在 Issue 中指出你觉得需要改进/完善的地方(能够独立解决的话,可以在提出 Issue 后再提交 PR )。
  2. 处理 Issue : 帮忙处理一些待处理的 Issue
  3. 提交 PR: 对于错别字/笔误这类问题可以直接提交PR,无需提交Issue 确认。

详细参考:CodeGuide 贡献指南 - 非常感谢你的支持,这里会留下你的足迹

  • 加群交流 本群的宗旨是给大家提供一个良好的技术学习交流平台,所以杜绝一切广告!由于微信群人满 100 之后无法加入,请扫描下方二维码先添加作者 “小傅哥” 微信(fustack),备注:加群。
微信:fustack

  • 公众号(bugstack虫洞栈) - 沉淀、分享、成长,专注于原创专题案例,以最易学习编程的方式分享知识,让自己和他人都能有所收获。
公众号:bugstack虫洞栈

感谢以下人员对本仓库做出的贡献或者对小傅哥的赞赏,当然不仅仅只有这些贡献者,这里就不一一列举了。如果你希望被添加到这个名单中,并且提交过 Issue 或者 PR,请与我联系。

Clone this wiki locally