一,rabbitmq简介
1. Apache ActiveMQ曝光率最高,但是可能会丢消息。
2. ZeroMQ延迟很低、支持灵活拓扑,但是不支持消息持久化和崩溃恢复。
1.2、几个概念说明
producer&Consumer
producer:指的是消息生产者,消息生产者连接RabbitMQ服务器然后将消息投递到Exchange。
consumer:消息的消费者。消息消费者订阅队列,RabbitMQ将Queue中的消息发送到消息消费者。
Exchange:类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。
Queue:消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失。设置为临时队列,queue中的数据在系统重启之后就会丢失。设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除Exchange
RoutingKey:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。
Connection: (连接)。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。
Channels: (信道)。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。
Exchange有4种类型
:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:
Direct:
直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
fanout:
广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。
topic:
主题交换器,工作方式类似于组播,每个消费者会受到队列中所有的消息,比如一个生产者生产10条数据,那么A,B两个消费者会没人受到10条数据。Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,
ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。
( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
headers:
消息体的header匹配(ignore)
Binding:
所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多
的关系。
virtual host:
在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质
上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以
为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer
连接rabbit server需要指定一个vhost。
1.3、消息队列的使用过程
1. 客户端连接到消息队列服务器,打开一个channel。
2. 客户端声明一个exchange,并设置相关属性。
3. 客户端声明一个queue,并设置相关属性。
4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
5. 客户端投递消息到exchange。
6. exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里
1.4 ,rabbitmq架构图
从图中可以看出RabbitMQ主要由Exchange和Queue两部分组成,然后通过RoutingKey关联起来,消息投递到Exchange然后通过Queue接收。此外,消费者可以从Queue中获取消息并消费。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
1.5 为什么要使用mq?
2,松耦合 如今项目都采用分布式架构,因此送耦合是十分必要的,当一个应用的变化会强制其他应用也跟着改变,那么这些应用就是紧耦合;反之就是松耦合,很明显松耦合比紧耦合更适应时代发展的需求。
COM,CORBA,DCE和EJB等使用RPC的技术,它们是紧耦合的。使用RPC,当一个应用调用另一个应用,调用者将被阻塞知道被调用者返回结果。
传统的rpc通讯要求当请求从生产者发送到消费者是要求双方都必须正常工作,这样就降低了系统的稳定性;此外,一个很小的变更需求就要很高的维护代价。要解除生产者与消费者之间的这种耦合,就可以使用消息通道模式(Massage Channel)。在生产者与消费者之间加上消息通道就可以解除二者的依赖,同时该模式可以支持多个生产者与消费者。但它又同时引入了各自对消息通道的依赖,因为它们必须知道通道资源的位置。要解除这种对通道的依赖,可以考虑引入Lookup服务来查找该通道资源。若要做到充分的灵活性,可以将与通道相关的信息存储到配置文件中,Lookup服务首先通过读取配置文件来获得通道。
3,安全性 消息通道通常以队列的形式存在,这种先进先出的数据结构无疑最为适合这种处理消息的场景。而且消息通道也可暂时存储堆积等待的消息,在极端断电的情况下也不至于让消息丢失。在rabbitmq中对应的功能就是queue队列。
1.操作系统环境
操作系统:CentOS6.5 / 64bit用户:root
2.RabbitMQ编译安装
RabbitMQ是使用Erlang开发,所以安装RabbitMQ前需要先安装Erlang。
2.1.获取Erlang和RabbitMQ的源文件
http://erlang.org/download/下查找最新的源文件,我选择的是版本17找到后,执行以下命令直接在Linux下获取源码
[root@iZ250x18mnzZ ~]# wget http://erlang.org/download/otp_src_17.0.tar.gz
http://www.rabbitmq.com/releases/rabbitmq-server查找到最新的源码版本,我选择v3.6.0找到后,执行以下命令直接在Linux下获取源码[root@iZ250x18mnzZ ~]# wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.0/rabbitmq-server-3.6.0-1.noarch.rpm
2.2.编译安装Erlang
2.2.1.解压otp_src_17.0.tar.gz
[root@iZ250x18mnzZ ~]# tar -zxvf otp_src_17.0.tar.gz
编译安装Erlang对环境有要求,为防止在编译的时候提示某些软件包未安装之类的错误,所以我将Erlang需要的软件提前安装,直接使用yum进行安装即可
2.2.2.利用yum安装erlang编译环境
$ yum -y install make ncurses-devel gcc gcc-c++ unixODBC unixODBC-devel openssl openssl-devel
2.2.3.编译安装Erlang
[root@iZ250x18mnzZ ~]# cd otp_src_17.0 [root@iZ250x18mnzZ ~]# ./configure --prefix=/usr/local/erlang --enable-smp-support --enable-threads --enable-sctp --enable-kernel-poll --enable-hipe --with-ssl
编译和安装
[root@iZ250x18mnzZ otp_src_17.0]# make && make install
2.3.设置环境变量
环境变量为追加
[root@iZ250x18mnzZ ~]# vi /etc/profile ERL_HOME=/usr/local/erlang PATH=$ERL_HOME/bin:$PATH export ERL_HOME PATH
保存后,重新激活环境变量生效
[root@iZ250x18mnzZ ~]# source /etc/profile
验证是否已经追加成功
[root@iZ250x18mnzZ ~]# echo $ERL_HOME /usr/local/erlang [root@iZ250x18mnzZ ~]# echo $PATH /usr/local/erlang/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin
2.4.安装RabbitMQ
[root@iZ250x18mnzZ ~]# rpm -i rabbitmq-server-3.6.0-1.noarch.rpm warning: rabbitmq-server-3.6.0-1.noarch.rpm: Header V4 DSA/SHA1 Signature, key ID 056e8e56: NOKEY error: Failed dependencies: erlang >= R16B-03 is needed by rabbitmq-server-3.6.0-1.noarch
上述错误信息显示安装失败,因为rabbigmq的依赖关系所导致,所以要忽略依赖,执行以下命令
[root@iZ250x18mnzZ ~]# rpm -i --nodeps rabbitmq-server-3.6.0-1.noarch.rpm
2.5.启停RabbitMQ
官网提供启动方式
使用root用户启动和停止服务[root@iZ250x18mnzZ ~]#service rabbitmq-server start 启动服务 [root@iZ250x18mnzZ ~]#service rabbitmq-server etc 查看哪些命令可以使用 [root@iZ250x18mnzZ ~]#service rabbitmq-server stop 停止服务 [root@iZ250x18mnzZ ~]#service rabbitmq-server status查看服务状态
如果出现类似以下错误/usr/lib/rabbitmq/bin/rabbitmq-server: line 50: erl: command not found
是因为环境变量不同,导致无法找到相应命令,按照指引
将erlang的erl软连接到/usr/bin目录下ln -s /usr/local/erlang/bin/erl /usr/bin/erl
到此算是安装完毕
下面启动
执行命令:
到/usr/lib/rabbitmq目录下执行命令:service rabbitmq-server start
如果是在虚拟机上启动的,可在虚拟机的浏览器上输入localhost:15672 启动,默认的登陆用户和密码都是guest如果要在windows浏览器上启动,要关闭windows和虚拟机上的防火墙。然后在浏览器上输入“192.168.182.137:15672”访问 192,168.182.137是虚拟机的ip不过,guest用户不能登陆,因为该默认用户只能在安装rabbitmq本地启动。
这里我们可以通过配置文件来实现从远程登录管理界面,只要编辑/etc/rabbitmq/rabbitmq.config文件(没有就新增),添加以下配置就可以了。
执行命令:#vim /etc/rabbitmq/rabbitmq.config
进入编辑模式,加入以下内容:
[ {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]} ].
注意上面有个点号
现在添加了一个新授权用户asdf,可以远程使用这个用户名。记得要先用命令添加这个命令才行:
- cd /usr/lib/rabbitmq/bin/
#用户名与密码
- sudo rabbitmqctl add_user asdf 123456
用户设置为administrator才能远程访问
- sudo rabbitmqctl set_user_tags asdf administrator
- sudo rabbitmqctl set_permissions -p / asdf “.*” “.*” “.*”
其实也可以通过管理平台页面直接添加用户和密码等信息。如果还不能远程访问或远程登录检查是不是5672, 15672端口没有开放!!!!!!
第一次访问一般会访问成功,当关闭虚拟机再次启动rabbitmq时会报错,如下
[root@controller ~]# rabbitmqctl change_password guest Rabbit123Changing password for user "guest" ...Error: unable to connect to node rabbit@controller: nodedownDIAGNOSTICS===========nodes in question: [rabbit@controller]hosts, their running nodes and ports:- controller: [{rabbitmqctl3435,40060}]current node details:- node name: rabbitmqctl3435@controller- home dir: /var/lib/rabbitmq- cookie hash: KkWnl06AR+v86hEhVTp8/g==解决方法:进入到/usr目录下,执行命令:
# /sbin/service rabbitmq-server stop
# /sbin/service rabbitmq-server start也可以直接执行命令:rabbitmqctl status
rabbitmqctl start
rabbitmqctl stop
发现正常关闭启动,浏览器也能正常访问
3,简单java实例
下面来演示一个使用java的简单实例:
随便创建一个maven工程,然后
1、首先是消息生产者和提供者的基类
package test.s; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * * 功能概要: EndPoint类型的队列 * * @author linbingwen * @since 2016年1月11日 */ public abstract class EndPoint{ protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws IOException{ this.endPointName = endpointName; //Create a connection factory ConnectionFactory factory = new ConnectionFactory(); //hostname of your rabbitmq server factory.setHost("192.168.182.137"); factory.setPort(5672); //5672是 默认开放端口,不用改,只改上面的host就行了 factory.setUsername("asdf"); factory.setPassword("123456"); //getting a connection connection = factory.newConnection(); //creating a channel channel = connection.createChannel(); //declaring a queue for this channel. If queue does not exist, //it will be created on the server. channel.queueDeclare(endpointName, false, false, false, null); } /** * 关闭channel和connection。并非必须,因为隐含是自动调用的。 * @throws IOException */ public void close() throws IOException{ this.channel.close(); this.connection.close(); } }
2、消息提供者
package test.s; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang.SerializationUtils; /** * * 功能概要:消息生产者 * * @author linbingwen * @since 2016年1月11日 */ public class Producer extends EndPoint{ public Producer(String endPointName) throws IOException{ super(endPointName); } public void sendMessage(Serializable object) throws IOException { channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object)); } }
3、消息消费者
package test.s; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.SerializationUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; /** * * 功能概要:读取队列的程序端,实现了Runnable接口 * * @author linbingwen * @since 2016年1月11日 */ public class QueueConsumer extends EndPoint implements Runnable, Consumer{ public QueueConsumer(String endPointName) throws IOException{ super(endPointName); } public void run() { try { //start consuming messages. Auto acknowledge messages. channel.basicConsume(endPointName, true,this); } catch (IOException e) { e.printStackTrace(); } } /** * Called when consumer is registered. */ public void handleConsumeOk(String consumerTag) { System.out.println("Consumer "+consumerTag +" registered"); } /** * Called when new message is available. */ public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { Map map = (HashMap)SerializationUtils.deserialize(body); System.out.println("Message Number "+ map.get("message number") + " received."); } public void handleCancel(String consumerTag) {} public void handleCancelOk(String consumerTag) {} public void handleRecoverOk(String consumerTag) {} public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {} }
4、测试
package test.s; import java.io.IOException; import java.sql.SQLException; import java.util.HashMap; public class Test { public Test() throws Exception{ QueueConsumer consumer = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); consumerThread.start(); Producer producer = new Producer("queue"); for (int i = 0; i < 10; i++) { HashMap message = new HashMap(); message.put("message number", i); producer.sendMessage(message); System.out.println("Message Number "+ i +" sent."); } } /** * @param args * @throws SQLException * @throws IOException */ public static void main(String[] args) throws Exception{ new Test(); } }
其中引入的jar包:
<!-- rabbitmq客户端 --> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.0.4</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.1</version> </dependency> </dependencies>
测试结果:
然后同时打开rabbitmq的服务端,输入如下:
- rabbitmqctl list_queues
这个命令是用来查看服务端中有多少个消息队列的。
可以看到有个名为queue的消息队列,如下
同时在web监控界面也可观察相应的波动。
文章转自:http://blog.csdn.net/younger_z/article/details/53243990
和
文章转自http://www.cnblogs.com/balaamwe/p/3678527.html
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/106290.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...