在Java中使用RabbitMQ
2020-03-17 16:06:49来源:博客园 阅读 ()
在Java中使用RabbitMQ
依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency>
生产者
public class Producer { private final static String QUEUE_NAME = "my_queue"; //队列名称 private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名称 private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名称 private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key public static void send() throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.9"); //设置rabbitmq-server的地址 connectionFactory.setPort(5672); //使用的端口号 connectionFactory.setVirtualHost("/"); //使用的虚拟主机 //由连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //通过连接创建信道 Channel channel = connection.createChannel(); //通过信道声明一个exchange,若已存在则直接使用,不存在会自动创建 //参数:name、type、是否支持持久化、此交换机没有绑定一个queue时是否自动删除、是否只在rabbitmq内部使用此交换机、此交换机的其它参数(map) channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null); //通过信道声明一个queue,如果此队列已存在,则直接使用;如果不存在,会自动创建 //参数:name、是否支持持久化、是否是排它的、是否支持自动删除、其他参数(map) channel.queueDeclare(QUEUE_NAME, true, false, false, null); //将queue绑定至某个exchange。一个exchange可以绑定多个queue channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY); //发送消息 String msg = "hello"; //消息内容 String routing_key = "my_routing_key.key1"; //发送消息使用的routing-key channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes()); //消息是byte[],可以传递所有类型(转换为byte[]),不局限于字符串 System.out.println("send message:"+msg); //关闭连接 channel.close(); connection.close(); } }
exchange详解
rabbitmq自带了7个交换机,都是以amq开头。我们可以使用自带的,也可以自己新建交换机。
交换机的参数
先看最下面的add a new exchange:
- name 交换机的name
- type 交换机的类型,说白了就是routing-key匹配queue使用哪种匹配规则
- durability 消息是否支持持久化,durable是支持持久化,重启rabbitmq,rabbitmq上的消息还在、不会丢失,上面features里的D就是durable;transient是不支持持久化,重启rabbitmq,rabbitmq上的消息丢失。因为消息是保存在内初中的,不持久化到硬盘,关闭rabbitmq消息直接就没了,持久化后再次启动时会从硬盘加载消息。transient关键字用于阻止序列化。
- auto delete 如果此交换机一个queue都没有绑定,是否自动删除此交换机
- internal 此交换机是否只在rabbitmq内部使用,大部分交换机都要暴露出来,给消息生产者用,只有少数(一般是rabbitmq自带的)是内部使用的。features里的I就是internal,表示只在内部使用,自带的amq.rabbitmq.trace用来跟踪rabbitmq内部的消息投递过程,只在内部使用。
- arguments 给此交换机设置一些其它参数
//通过信道声明一个exchange,若已存在则直接使用,不存在会自动创建 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
声明一个交换机,参数和上面控制台add a new exchange的参数顺序是一致的,arguments是用map表示,一般不必设置其它参数,写成null即可。
自带的7个交换机,第一个是(AMQP default),不是说这个交换机的名字是AMQP default。
这个交换机的名字没有名字(空),如果要使用这个交换机,代码里交换机的name要写成空串,括号是说明这个交换机是rabbitmq默认的交换机。
exchange的4种类型
我们绑定exchange、queue时使用了一个routing-key:
private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);
这个exchange使用routing-key来匹配绑定的queue,即要将消息发送到哪些队列。
在发送消息时又使用了一个routing-key:
String routing_key = "my_routing_key.key1"; //发送消息使用的routing-key channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes());
rabbitmq会将这个消息发送到指定的exchange,
如何匹配?是完全匹配还是模糊匹配?这就涉及到exchange的4种type:
(1)direct 完全匹配
发送消息使用的routing-key,要与交换机使用的routing-key完全相同。
比如exchange使用的routing-key是"my_routing_key",那发送消息使用的routing_key也要是"my_routing_key"
(2)topic 模糊匹配,可以使用通配符
*只能匹配一个词,#可以匹配一个或多个词。
注意是词,不是字符。"my_routing_key.key1",my_routing_key是一个词,key1是一个词,词之间用点号分隔。
比如exchange的routing-key是"my_routing_key.#" ,则发送消息使用的routing-key以"my_routing_key."开头即可
我在示例中用的就是这种。这种方式非常适合把一个消息投递到多个queue(应用)
(3)fanout 广播模式
不使用routing-key,一般把routing-key都设置为空串,当然设置为什么字符串都行,反正都不用。
exchange会把消息投递(广播)到此exchange绑定的所有的queue。
这种模式效率很高,因为不进行routing-key的匹配,大大减小了时间开销。
(4)headers 首部模式(了解即可)
不使用routing-key(路由键),根据header将消息投递到匹配的queue。
Map<String, Object> exchange_headers = new Hashtable<String, Object>(); headers.put("x-match", "any"); //指定键值对匹配模式any、all headers.put("key1", "value1"); //放入一些键值对 headers.put("key2", "value2"); //绑定queue时指定指定map。至于routing-key,设置为什么串都行,反正不使用 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"", exchange_headers); //发送消息时也要使用map Map<String,Object> headers = new Hashtable<String, Object>(); headers.put("key1", "value1"); //放入一些键值对 Builder properties = new BasicProperties.Builder(); properties.headers(headers); String msg="hello"; //指定消息要使用的header。要使用properties的形式,不能直接发送map。会放在http请求头中 channel.basicPublish(EXCHANGE_NAME, "",properties.build(),msg.getBytes());
x-match指定匹配模式,all:发送消息的header(map)中的键值对要和exchange的header中的所有的键值对都要相同,exchange的header有2个键值对key1、key2,发送消息的header中也要有这2个键值对(需要完全相同)。any:发送消息的header中只要有一个键值对和exchange中的键值对完全相同就行,比如key1、key2都行,只要有一个就行了。
header这种方式不常用,因为有点复杂。都要匹配queue,用routing-key它不简单,它不香吗?非要搞得这么复杂。
不过properties的使用还是需要了解一下:
一个消息由properties、body组成,也就是basicPublish()的后2个参数。
properties可以设置此消息的一些参数,比如延时投递、优先级,这些参数写成键值对放在map中,将map转换为properties,再将properties作为basicPublish()的参数。
Queue详解
type:指定queue类型,默认为classic(主队列),还可以设置为quorum(从队列),将主队列同步到从队列,主队列故障时还可以用从队列。
name:此queue的名称
durability:queue中的消息是否持久化到硬盘。exchange也有此属性,消息会先发给exchange保存,exchange再投递到某些queue,消费者还没处理此消息时,消息会保存在queue中。
auto delete:如果此queue没有绑定到任何一个exchange,是否自动删除此queue
arguments:设置一些其他参数
//声明一个queue //参数:name、是否支持持久化、是否是排它的、是否支持自动删除、其他参数(map) channel.queueDeclare(QUEUE_NAME, true, false, false, null);
排它:这个queue只能在当前连接中使用(拒绝在其他连接中使用),由当前连接声明|创建,断开连接会自动删除此queue。
一个exchange可以绑定多gueue,一个queue也可以绑定到多个exchange上。
消费者
public class Consumer { private final static String QUEUE_NAME = "my_queue"; //队列名称 public static void receive() throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.9"); //设置rabbitmq-server的地址 connectionFactory.setPort(5672); //使用的端口号 connectionFactory.setVirtualHost("/"); //使用的虚拟主机 //由连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //通过连接创建信道 Channel channel = connection.createChannel(); //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替 DefaultConsumer consumer = new DefaultConsumer(channel){ //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写 @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { java.lang.String msg = new java.lang.String(body); System.out.println("received msg: " + msg); } }; //监听指定的queue。会一直监听。 //参数:要监听的queue、是否自动确认消息、使用的Consumer channel.basicConsume(QUEUE_NAME, true, consumer); } }
这段代码表面上没有问题,监听queue就完事了。但有一个隐患:
我们是在生产者中声明|创建的exchange、queue,如果生产者尚未运行,并且rabbitmq上没有对应的exchange、queue(之前没有创建),启动消费者,消费者要监听指定的queue,根本就连接不上queue,指定的queue创都没创建,监听什么?直接报错。
更加健壮的写法是:在消费者中也声明exchange、queue,有就直接用,没有会自动创建。
public class Consumer { private final static String QUEUE_NAME = "my_queue"; //队列名称 private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名称 private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名称 private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key public static void receive() throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.9"); //设置rabbitmq-server的地址 connectionFactory.setPort(5672); //使用的端口号 connectionFactory.setVirtualHost("/"); //使用的虚拟主机 //由连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //通过连接创建信道 Channel channel = connection.createChannel(); //通过信道声明一个exchange,若已存在则直接使用,不存在会自动创建 //参数:name、type、是否支持持久化、此交换机没有绑定一个queue时是否自动删除、是否只在rabbitmq内部使用此交换机、此交换机的其它参数(map) channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null); //通过信道声明一个queue,如果此队列已存在,则直接使用;如果不存在,会自动创建 //参数:name、是否支持持久化、是否是排它的、是否支持自动删除、其他参数(map) channel.queueDeclare(QUEUE_NAME, true, false, false, null); //将queue绑定至某个exchange。一个exchange可以绑定多个queue channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY); //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替 DefaultConsumer consumer = new DefaultConsumer(channel){ //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写 @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { java.lang.String msg = new java.lang.String(body); System.out.println("received msg: " + msg); } }; //监听指定的queue。会一直监听。 //参数:要监听的queue、是否自动确认消息、使用的Consumer channel.basicConsume(QUEUE_NAME, true, consumer); } }
rabbitmq控制台的queue:
ready是此queue中待投递的消息数,unacked是已投递、但消费者尚未确认的消息数(和快递已签收、未收货差不多),total是消息总数,即前面2个之和。
incoming是一个消息从exchange进入queue花的时间
deliver/get是一个消息从queue投递到消费者花的时间,
ack是一条消息投递给消费者后,过了多长时间queue才收到消费者的应答。
/s表示单位是秒
ack 确认、应答。
消费者收到queue投递的消息,然后处理消息,处理后发送一个数据包给queue作为确认、应答(相当于拿到包裹,试了下没问题,收货),
queue将消息投递给消费者后,queue中仍然保留此消息,要消费者应答后才会删除此消息。
//参数:要监听的queue、是否自动确认消息、使用的Consumer channel.basicConsume(QUEUE_NAME, true, consumer);
第二个参数:autoAck,是否自动应答。
true:自动应答,queue把消息投递给消费者,就认为消费者签收了,投递了一个消息就直接删除该消息。
这并不可靠,如果消费者还没处理完就出故障了,那这条消息就丢失了、没被处理到。
fasle:不使用自动应答,需要消费者自己应答。
消费者手动应答:
//创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替 DefaultConsumer consumer = new DefaultConsumer(channel){ //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写 @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { java.lang.String msg = new java.lang.String(body); System.out.println("received msg: " + msg); channel.basicAck(envelope.getDeliveryTag(), false); //处理完了,应答|签收 //channel.basicReject(envelope.getDeliveryTag(), true); //拒收 } }; //监听指定的queue。会一直监听。 //参数:要监听的queue、是否自动确认消息、使用的Consumer channel.basicConsume(QUEUE_NAME, false, consumer);
就算消费者处理消息时宕机,只要不应答,queue中的这条消息就一直存在,消费者再次启动时还会投递此消息。
basicAck()的第一个参数是DeliveryTag,在一个queue中唯一标识一条消息,相当于一条消息的id。
第二个参数是multiple,多个、批处理,是否将多个消息的应答放在一起、一次性发给queue,设置为true可减少网络流量、防止网络阻塞,但是之前消息的应答有时延。
如果处理消息时发生了异常(代码执行出了问题),在catch中拒收就是了:
catch (...){
channel.basicReject(envelope.getDeliveryTag(), true); //拒收,重新入队
//..... //记录日志
}
第二个参数是requeue,是否重新入队,设置为fasle,不再重新入队,queue会删除此消息;设置为true,重新入队,queue会将此消息重新投递给消费者。
没必要把消息的整个处理流程都放在try中,只把可能出现异常的代码块放在try中即可,在catch中拒收。
这就是rabbitmq提供的可靠投递机制。再加上消息的持久化,做到了rabbitmq的高可靠性。
但重新入队有一个问题:如果大量的消息重新入队,重新投递这些消息会占用资源,使其它消息的投递变慢。
开发过程中可能遇到的问题
1、exchange的name唯一标识一个exchange,调试时可能修改了exchange的类型,如果之前存在一个同名的exchange,会报错。
如果之前的同名的exchange不要了,到rabbitmq控制台删除同名的exchange即可;如果之前的同名的exchange还要用,就把现在的exchange改下name。
2、消费者要一直监听queue,所以消费者的channel、connection都没关闭,再次启动时可能连接不上,会报错,因为rabbitmq上还保持着这个连接。
等几分钟再运行,等连接超时被删除即可。
说明
1、可以在rabbitmq控制台设置queue的绑定、发送消息到queue
delivery 投递、交付
persistent 持续的、持久的、坚持不懈的。表示此消息会持久化到硬盘。
payload即消息的body。
点击publish message会将此消息投递到当前queue。
2、消息的有效期
有些消息对即时性要求很高,过了一些时间,如果这条消息还积压在queue中,这条消息可能就没有使用价值了,没必要再投递,需要删除这条消息。
可以设置消息的有效期,如果指定的时间内没有投递此消息,queue会自动删除此消息,不再投递。
具体代码参考:https://blog.csdn.net/liu0808/article/details/81356552
原文链接:https://www.cnblogs.com/chy18883701161/p/12501428.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- 国外程序员整理的Java资源大全(全部是干货) 2020-06-12
- 2020年深圳中国平安各部门Java中级面试真题合集(附答案) 2020-06-11
- 2020年java就业前景 2020-06-11
- 04.Java基础语法 2020-06-11
- Java--反射(框架设计的灵魂)案例 2020-06-11
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash