RabbitMQ入门:远程过程调用(RPC)

假如我们想要调用远程的一个方法或函数并等待执行结果,也就是我们通常说的远程过程调用(RemoteProcedureCall)。怎么办?今天我们就用RabbitMQ来实现一个简单的RPC系统:客户

大家好,又见面了,我是全栈君。

假如我们想要调用远程的一个方法或函数并等待执行结果,也就是我们通常说的远程过程调用(Remote Procedure Call)。怎么办?

今天我们就用RabbitMQ来实现一个简单的RPC系统:客户端发送一个请求消息,服务端以一个响应消息回应。为了能够接收到响应,客户端在发送消息的同时发送一个回调队列用来告诉服务端响应消息发送到哪个队列里面。也就是说每个消息一个回调队列,在此基础上我们变下,将回调队列定义成类的属性,这个每个客户端一个队列,同一个客户端的请求共用一个队列。那么接下来有个问题,怎么知道这个队列里面的响应消息是属于哪个队列的呢?

我们会用到关联标识(correlationId),每个请求我们都会生成一个唯一的值作为correlationId,这样每次有响应消息来的时候,我们就去看correlationId来确定到底是哪个请求的响应消息,将请求和响应关联起来。如果收到一个不知道的correlationId,就可以确定不是这个客户端的请求的响应,可以直接丢弃掉。

一、工作模型

RabbitMQ入门:远程过程调用(RPC)

  1. 客户端发送启动后,会创建独特的回调队列。对于一个请求发送配置了两个属性的消息:一个是回调队列(图中的replay_to),一个是correlation。 这个请求会发送到rpc_queue队列,然后到达服务端处理。
  2. 服务端等待rpc_queue队列的请求。当有请求到来时,它就会开始干活并将结果通过发送消息来返回,该返回消息发送到replyTo指定的队列。
  3. 客户端将等待回调队列返回数据。当返回的消息到达时,它将检查correlation id属性。如果该属性值和请求匹配,就将响应返回给程序。

二、代码实现

接下来看代码实现:

  1.  客户端
    public class RpcClient {
    
        Connection connection = null;
        Channel channel = null;
        //回调队列:用来接收服务端的响应消息
        String queueName = "";
    
        // 定义RpcClient
        public RpcClient() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
            queueName = channel.queueDeclare().getQueue();
        }
    
        // 真正的处理逻辑
        public String call(String msg) throws IOException, InterruptedException {
            final String uuid = UUID.randomUUID().toString();
            //后续,服务端根据"replyTo"来指定将返回信息写入到哪个队列
            //后续,服务端根据关联标识"correlationId"来指定返回的响应是哪个请求的
            AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().replyTo(queueName).correlationId(uuid).build();
    
            channel.basicPublish("", RpcServer.QUEUE_NAME, prop, msg.getBytes());
            final BlockingQueue<String> blockQueue = new ArrayBlockingQueue<String>(1);
            channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                        com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    if (properties.getCorrelationId().equals(uuid)) {
                        String msg = new String(body, "UTF-8");
    
                        blockQueue.offer(msg);
                        System.out.println("**** rpc client reciver response :[" + msg + "]");
                    }
                }
    
            });
    
            return blockQueue.take();
        }
    
        //关闭连接
        public void close() throws IOException {
            connection.close();
        }
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            RpcClient client = new RpcClient();
            client.call("4");
            client.close();
        }
    }

    发送请求的时候,它是生产者;接受响应的时候,它是消费者。

  2. 服务端
    public class RpcServer {
    
        //RPC队列名
        public static final String QUEUE_NAME = "rpc_queue";
    
        //斐波那契数列,用来模拟工作任务
        public static int fib(int num) {
            if (num == 0)
                return 0;
            if (num == 1)
                return 1;
            return fib(num - 1) + fib(num - 2);
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            try {
                // 1.connection & channel
                connection = factory.newConnection();
                final Channel channel = connection.createChannel();
    
                // 2.declare queue
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
                System.out.println("****** rpc server waiting for client request ......");
    
                // 3.每次只接收一个消息(任务)
                channel.basicQos(1);
                //4.获取消费实例
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        BasicProperties prop = new BasicProperties().builder().correlationId(properties.getCorrelationId())
                                .build();
                        String resp = "";
                        try {
                            String msg = new String(body, "UTF-8");
                            resp = fib(Integer.valueOf(msg)) + "";
                            System.out.println("*** will response to rpc client :" + resp);
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        } finally {
                            channel.basicPublish("", properties.getReplyTo(), prop, resp.getBytes());
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
    
                    }
                };
                // 5.消费消息(处理任务)
                channel.basicConsume(QUEUE_NAME, false, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }

    接受请求的时候,它是消费者;发送响应的时候,它是生产者。

  3. 运行服务端,开始等待请求RabbitMQ入门:远程过程调用(RPC)

     

  4. 然后运行客户端,控制台log:
    服务端(多了一条打印):
    ****** rpc server waiting for client request ......
    *** will response to rpc client :3
    
    客户端:
    **** rpc client reciver response :[3]

     

三、小插曲

刚开始我在写demo的时候,client中没有用到阻塞队列final BlockingQueue<String> blockQueue = new ArrayBlockingQueue<String>(1);,而是直接这样写:

@Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {

                if (properties.getCorrelationId().equals(uuid)) {
                    String msg = new String(body, "UTF-8");

                    //blockQueue.offer(msg);
                    System.out.println("**** rpc client reciver response :[" + msg + "]");
                }
            }

期望能打印出结果来,但是运行后发现并没有打印:**** rpc client reciver response :[” + msg + “]的值。

原因是handleDelivery()这个方法是在子线程中运行的,这个子线程运行的时候,主线程会继续往后执行直到执行了client.close();方法而结束了。

由于主线程终止了,导致没有打印出结果。加了阻塞队列之后将主线程阻塞不执行close()方法,问题就解决了。

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/120894.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 11. shell循环 for

    11. shell循环 forshell循环for一级目录二级目录三级目录一级目录二级目录三级目录

  • swagger2导出api为word文档(java实现)[通俗易懂]

    swagger2导出api为word文档(java实现)[通俗易懂]导出后的样式分析1,swagger2页面展示实际就是将返回的包含所有接口的json数据(在swagger界面,打开浏览器控制台即可看到该json数据)进行解析,并渲染到页面上。2,按照java面向对象思路分析,上述表格即为一个接口(一个单元),一共三个对象:Table.java、Request.java、Response.java。3,将原始swagger2的json数据进行…

  • 85℃蛋糕店_蛋糕吧

    85℃蛋糕店_蛋糕吧题目描述今天是路路的生日,生日蛋糕自然是少不了。路路的朋友们一起去蛋糕店来买蛋糕,可是等一行人到了蛋糕店之后,发现那里是人山人海啊-_-。这下可把店家给急坏了,因为人数过多,需求过大,所以人们要等好长时间才能拿到自己的蛋糕。老板为了最大限度的使每位客人尽快拿到蛋糕,因此他需要安排一个制作顺序,使每位客人的平均等待时间最少(如果制作时间相同的,先来的先做)。这使他发愁了,于是他请你来帮忙安排一个…

    2022年10月28日
  • SQL数据库Rownumber()的两种排序方式

    SQL数据库Rownumber()的两种排序方式提示。先按一个字段分组,再按一些字段排序,最后编号。代码如下:selectrow_number()over(partitionbyUserIporderbyinsertTime),*fromuseraccess以上就是SQLServer数据库row_number()over()来自动产生行号的过程,ROW_NUMBER()OVER(PARTITIO…

  • 【动画教程】真封神南极服务端2.52架设第五集「建议收藏」

    【动画教程】真封神南极服务端2.52架设第五集「建议收藏」官方网站www.zfs2014.com动画名称:真封神南极服务端2.52架设第五集主讲人:diablo2208教程下载地址:http://pan.baidu.com/s/1dDchMjJ

  • ebpf技术_EBM技术

    ebpf技术_EBM技术1.ebpf概述1.1ebpf发展历史BPF,及伯克利包过滤器BerkeleyPacketFilter,最初构想提出于1992年,其目的是为了提供一种过滤包的方法,并且要避免从内核空间到用户空间的无用的数据包复制行为。它最初是由从用户空间注入到内核的一个简单的字节码构成,它在那个位置利用一个校验器进行检查——以避免内核崩溃或者安全问题——并附着到一个套接字上,接着在每个接…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号