初识RabbitMQ

2018-12-20 09:34:09来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

  好久没有更新技术博客了,今天正好有点时间,自学了一下RabbitMQ,现在就将我所学到的东西分享给大家,让你们也能同我一起进步,在程序员的技术道路上,一直前进下去。

  首先学习技术,我觉得还是能在官网去学习,这样才能学到最权威,最新的技术。毕竟是初学者,如果就一味的通过百度去学习技术,有可能会遇到不断的坑,只看别人写的博客,对于初学者来说,又不知对否,只能一味的接纳,最终会遇到不可预料的坑,都不知道如何下手,所以我建议大家能去官网学习,虽然说官网都是英文的,对于一些英语能力不好的同学,可能看的会有点头疼,但是只要自己坚持下去,一直看英文文档,遇到不懂的词,可以查一下,这样日积月累,达到从量变到质变的过程,最终再看什么英文文档,都不在话下了。好了,接下来,进入正题。

一、安装RabbitMQ

链接: https://pan.baidu.com/s/1zsNlA1IB0o05AEHNjijxGA 提取码: 2sph 

可以在这里下载安装文档

二、6种队列模式

RabbitMQ的官网地址是http://www.rabbitmq.com,进入官网教程(RabbitMQ Tutorials),看到有6个模式:

1、简单的队列模式

这个图是官网上面的图,P是生产者,是发送消息的一方,C是消费者,是接收消息的一方,可以把RabbitMQ和邮局类比,当你想要寄信的时候,你会把信放在邮箱里,然后等待邮差把信收走,然后送到想要寄给的那个人,唯一不同点就是邮局是处理纸质信件,然而RabbitMQ是处理二进制数据。有三个术语,生产者(producer)是发送消息的一方,队列(queue)相当于邮箱,存储消息,消费者(consumer)是接收消息的一方,注意:生产者、消费者、队列可以不再同一台机器上,可以分布在不同的机器,一个应用既可以是生产者,也可以是消费者。

接下来开始用代码实现简单队列模式:

使用maven项目完成这个例子,先在pom.xml文件里添加rabbitmq的依赖

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.4</version>
</dependency>

①建一个工具类RabbitMQConnection,方便连接RabbitMQ
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* 连接rabbitmq
*/
public class RabbitMQConnection {

private static Connection connection;

static {
ConnectionFactory connectionFactory = new ConnectionFactory();
try{
/**
* rabbitmq服务器
*/
connectionFactory.setHost("127.0.0.1");
/**
* 虚拟主机名
*/
connectionFactory.setVirtualHost("/testRabbitMQ");
/**
* 用户名
*/
connectionFactory.setUsername("renruibin");
/**
* 密码
*/
connectionFactory.setPassword("111111");
/**
* 默认端口
*/
connectionFactory.setPort(5672);
connection = connectionFactory.newConnection();
} catch (Exception e){
e.printStackTrace();
}
}

/**
* 获取连接
* @return
*/
public static Connection getConnection(){
return connection;
}
}
②建立生产者,发送消息
/**
* 生产者
*/
public class SendMessage {

/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
String message = "第"+new Random().nextInt()+"条消息";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送"+message+"成功");
}
}
执行这段程序之后,会发送一条消息到RabbitMQ服务器的队列里,可以访问localhost:15672查看
目前有一个连接

目前有一个管道

目前有一个队列,而且有一条消息等待被消费

③建立消费者,去消费这条消息

public class ReceiveMessage {

/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 4、定义队列的消费者
*/
QueueingConsumer consumer = new QueueingConsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ACK
* 第三个参数:消费者
*/
channel.basicConsume(QUEUE_NAME,true,consumer);
/**
* 6、开始消费数据
*/
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("收到"+new String(delivery.getBody())+"成功");
}
}
执行这段程序之后,会消费simple_queue队列里的一条消息,看一下控制台的情况

可以看到消息已经被消费了,simple_queue队列里的消息已经空了

总结:简单队列模式就是生产者发送一条消息到队列里,消费者从队列里获取消息,不会经过交换机,也没有路由规则,这种模式是P2P的,也就是点对点,一个生产者只能对应一个消费者,不支持一个生产者对应多个消费者。

2、工作模式

顾名思义:工作模式的意思是这个模式跟工作一样,员工分工合作,每人干的活都是一样多的,没有竞争,就相当于多个消费者都是获取的消息是不一样的,所有消费者消费的消息之和为总的消息,与kafka的consumer group很类似,也是同一个组内的消费者获取的消息互斥,之和为总的消息。

①建立生产者,去生产一批消息

/**
* 生产者
*/
public class SendMessage {

/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "work_queue";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
for(int i=0;i<100;i++){
String message = "第"+i+"条消息";
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("发送"+message+"成功");
}
channel.close();
connection.close();
}
}
②建立消费者1,睡眠10毫秒,模拟工作能力快
public class ReceiveMessage1 {

/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "work_queue";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
/**
* 4、定义队列的消费者
*/
QueueingConsumer consumer = new QueueingConsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ACK
* 第三个参数:消费者
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicQos(1);
/**
* 6、开始消费数据
*/
while (true){
Thread.sleep(10);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者1收到"+new String(delivery.getBody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliveryTag
* 第二个参数:multiple
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
③建立消费者2,睡眠1000毫秒,模拟工作能力差
public class ReceiveMessage2 {

/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "work_queue";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
/**
* 4、定义队列的消费者
*/
QueueingConsumer consumer = new QueueingConsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ACK
* 第三个参数:消费者
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicQos(1);
/**
* 6、开始消费数据
*/
while (true){
Thread.sleep(1000);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者2收到"+new String(delivery.getBody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliveryTag
* 第二个参数:multiple
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
3、发布订阅模式

此种模式是生产者把消息发送到交换机(X)上,不用管是发送那哪个队列上了,消费者得把自己消费的那个队列与生产者的交换机进行绑定,通过一定的路由规则,才能把消息获取到。

①建立生产者,发送消息

/**
* 生产者(发布订阅模式)
*/
public class SendMessage {

/**
* 定义交换机名称
*/
private static final String EXCHANGE_NAME = "ps_exchange";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明交换机,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:交换机名称
* 第二个参数:交换机类型 fanout direct headers topic
*/
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
for(int i=0;i<100;i++){
String message = "第"+i+"条消息";
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("发送"+message+"成功");
}
channel.close();
connection.close();
}
}
执行这段程序以后,会发送100条消息,但是该交换机没有任何队列绑定,所以消息是会丢失的
②建立消费者1
public class ReceiveMessage1 {

/**
* 定义交换机名称
*/
private static final String EXCHANGE_NAME = "ps_exchange";
/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "ps_queue_1";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
/**
* 4、定义队列的消费者
*/
QueueingConsumer consumer = new QueueingConsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ACK
* 第三个参数:消费者
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicQos(1);
/**
* 6、开始消费数据
*/
while (true){
Thread.sleep(10);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者1收到"+new String(delivery.getBody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliveryTag
* 第二个参数:multiple
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
③建立消费者2public class ReceiveMessage2 {

/**
* 定义交换机名称
*/
private static final String EXCHANGE_NAME = "ps_exchange";
/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "ps_queue_2";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
/**
* 4、定义队列的消费者
*/
QueueingConsumer consumer = new QueueingConsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ACK
* 第三个参数:消费者
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicQos(1);
/**
* 6、开始消费数据
*/
while (true){
Thread.sleep(10);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者2收到"+new String(delivery.getBody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliveryTag
* 第二个参数:multiple
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
先把两个消费者启动,然后再启动一次生产者,可以看到两个消费者消费了同样的消息。消费者这块的代码与之前的不同点就是,增加了队列的绑定 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
消费者1消费的消息:
                        

消费者1收到第0条消息成功
消费者1收到第1条消息成功
消费者1收到第2条消息成功
消费者1收到第3条消息成功
消费者1收到第4条消息成功
消费者1收到第5条消息成功
消费者1收到第6条消息成功
消费者1收到第7条消息成功
消费者1收到第8条消息成功
消费者1收到第9条消息成功
消费者1收到第10条消息成功

  

 消费者2消费的消息:

消费者2收到第0条消息成功
消费者2收到第1条消息成功
消费者2收到第2条消息成功
消费者2收到第3条消息成功
消费者2收到第4条消息成功
消费者2收到第5条消息成功
消费者2收到第6条消息成功
消费者2收到第7条消息成功
消费者2收到第8条消息成功
消费者2收到第9条消息成功
消费者2收到第10条消息成功

可以在管控台看到多了一个交换机

看到多了两个队列

4、路由模式

这种模式是路由模式,有一定的路由规则,只有符合这个路由规则的队列,才可以消费消息,可以达到消费者自主选择想要消费的消息,不用一味的获取生产者发送的所有消息。

①建立生产者,发送一条消息

/**
* 生产者(路由模式)
*/
public class SendMessage {

/**
* 定义交换机名称
*/
private static final String EXCHANGE_NAME = "direct_exchange";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明交换机,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:交换机名称
* 第二个参数:交换机类型 fanout direct headers topic
*/
channel.exchangeDeclare(EXCHANGE_NAME,"direct");

/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
String message = "warn日志";
channel.basicPublish(EXCHANGE_NAME,"warn",null,message.getBytes());
System.out.println("发送"+message+"成功");
channel.close();
connection.close();
}
}
执行完这段程序,会建立一个direct类型的交换机,并且指定的路由key是warn,在管控台可以看到多了一个交换机

此时还没有队列绑定,所以发送的消息是会丢失的

②建立一个消费者1,消费error的消息

public class ReceiveMessage1 {

/**
* 定义交换机名称
*/
private static final String EXCHANGE_NAME = "direct_exchange";
/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "direct_queue_1";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
/**
* 4、定义队列的消费者
*/
QueueingConsumer consumer = new QueueingConsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ACK
* 第三个参数:消费者
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicQos(1);
while (true){
/**
* 6、开始消费数据
*/
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者1收到"+new String(delivery.getBody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliveryTag
* 第二个参数:multiple
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
③建立消费者2,消费info和warn的消息
public class ReceiveMessage2 {

/**
* 定义交换机名称
*/
private static final String EXCHANGE_NAME = "direct_exchange";
/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "direct_queue_2";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warn");
/**
* 4、定义队列的消费者
*/
QueueingConsumer consumer = new QueueingConsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ACK
* 第三个参数:消费者
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicQos(1);
while (true){
/**
* 6、开始消费数据
*/
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者2收到"+new String(delivery.getBody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliveryTag
* 第二个参数:multiple
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
重新执行生产者,生产一条warn的消息
生产者:发送warn日志成功
消费者1:
消费者2:消费者2收到warn日志成功
重新执行生产者,生产一条info的消息
生产者:发送info日志成功
消费者1:
消费者2:消费者2收到info日志成功

重新执行生产者,生产一条error的消息
生产者:发送error日志成功
消费者1:消费者1收到error日志成功
消费者2:
5、主题模式

 此种模式是之前两种模式的一种升级,消费者可以很自由的获取到自己想要的消息

  • * (star) can substitute for exactly one word.      一个
  • # (hash) can substitute for zero or more words. 0个或多个

 

如果把绑定key设为#,此种模式就相当于fanout,如果在绑定key里没有使用*和#,此种模式就相当于direct,所以topic模式非常的灵活

①建立生产者,发送消息

/**
* 生产者(路由模式)
*/
public class SendMessage {

/**
* 定义交换机名称
*/
private static final String EXCHANGE_NAME = "topic_exchange";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明交换机,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:交换机名称
* 第二个参数:交换机类型 fanout direct headers topic
*/
channel.exchangeDeclare(EXCHANGE_NAME,"topic");

/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
String message = "quick.orange.rabbit";
channel.basicPublish(EXCHANGE_NAME,"quick.orange.rabbit",null,message.getBytes());
System.out.println("发送"+message+"成功");
channel.close();
connection.close();
}
}
执行这段程序,会建立一个topic类型的交换机,在管控台可以看到多了一个交换机

但是此时没有队列绑定,所以消息会丢失。

②建立消费者1,路由key为*.orange.*

public class ReceiveMessage1 {

/**
* 定义交换机名称
*/
private static final String EXCHANGE_NAME = "topic_exchange";
/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "topic_queue_1";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
/**
* 4、定义队列的消费者
*/
QueueingConsumer consumer = new QueueingConsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ACK
* 第三个参数:消费者
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicQos(1);
while (true){
/**
* 6、开始消费数据
*/
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者1收到"+new String(delivery.getBody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliveryTag
* 第二个参数:multiple
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
③建立消费者2,路由key是*.*.rabbit,lazy.#
public class ReceiveMessage2 {

/**
* 定义交换机名称
*/
private static final String EXCHANGE_NAME = "topic_exchange";
/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "topic_queue_2";

public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
/**
* 4、定义队列的消费者
*/
QueueingConsumer consumer = new QueueingConsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ACK
* 第三个参数:消费者
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicQos(1);
while (true){
/**
* 6、开始消费数据
*/
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消费者1收到"+new String(delivery.getBody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliveryTag
* 第二个参数:multiple
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
重新执行生产者发送消息,生产一条quick.orange.rabbit消息
生产者:发送quick.orange.rabbit成功
消费者1:消费者1收到quick.orange.rabbit成功
消费者2:消费者2收到quick.orange.rabbit成功

重新执行生产者发送消息,生产一条lazy.orange.elephant消息
生产者:发送lazy.orange.elephant成功
消费者1:消费者1收到lazy.orange.elephant成功
消费者2:消费者2收到lazy.orange.elephant成功

重新执行生产者发送消息,生产一条quick.orange.fox消息
生产者:发送quick.orange.fox成功
消费者1:消费者1收到quick.orange.fox成功
消费者2:

重新执行生产者发送消息,生产一条lazy.pink.rabbit消息
生产者:发送lazy.pink.rabbit成功
消费者1:
消费者2:消费者2收到lazy.pink.rabbit成功

重新执行生产者发送消息,生产一条quick.brown.fox消息
生产者:发送quick.brown.fox成功
消费者1:
消费者2:
6、RPC模式

这种模式不常用,因为有专业的技术,去实现,所以就没有必要去学习这个模式了。过几天再研究一下RabbitMQ的集群模式,再来跟大家分享。

 

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:springMVC_06数据的处理

下一篇:oracle数据库中将clob字段内容利用java提取出至文本文档中