在Java中使用RabbitMQ

2020-03-17 16:06:49来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

在Java中使用RabbitMQ

 

依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>

 

 

 

 生产者

public class Producer {
    private final static String QUEUE_NAME = "my_queue"; //队列名称
    private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名称
    private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名称
    private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key

    public static void send() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.9"); //设置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的端口号
        connectionFactory.setVirtualHost("/");  //使用的虚拟主机

        //由连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //通过连接创建信道
        Channel channel = connection.createChannel();

        //通过信道声明一个exchange,若已存在则直接使用,不存在会自动创建
        //参数:name、type、是否支持持久化、此交换机没有绑定一个queue时是否自动删除、是否只在rabbitmq内部使用此交换机、此交换机的其它参数(map)
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);

        //通过信道声明一个queue,如果此队列已存在,则直接使用;如果不存在,会自动创建
        //参数:name、是否支持持久化、是否是排它的、是否支持自动删除、其他参数(map)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //将queue绑定至某个exchange。一个exchange可以绑定多个queue
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);

        //发送消息
        String msg = "hello";  //消息内容
        String routing_key = "my_routing_key.key1";  //发送消息使用的routing-key
        channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes()); //消息是byte[],可以传递所有类型(转换为byte[]),不局限于字符串
        System.out.println("send message:"+msg);

        //关闭连接
        channel.close();
        connection.close();
    }

}

 

 

 

 

exchange详解

rabbitmq自带了7个交换机,都是以amq开头。我们可以使用自带的,也可以自己新建交换机。

 

 

交换机的参数

先看最下面的add a new exchange:

  • name   交换机的name
  • type  交换机的类型,说白了就是routing-key匹配queue使用哪种匹配规则
  • durability  消息是否支持持久化,durable是支持持久化,重启rabbitmq,rabbitmq上的消息还在、不会丢失,上面features里的D就是durable;transient是不支持持久化,重启rabbitmq,rabbitmq上的消息丢失。因为消息是保存在内初中的,不持久化到硬盘,关闭rabbitmq消息直接就没了,持久化后再次启动时会从硬盘加载消息。transient关键字用于阻止序列化。
  • auto delete   如果此交换机一个queue都没有绑定,是否自动删除此交换机
  • internal  此交换机是否只在rabbitmq内部使用,大部分交换机都要暴露出来,给消息生产者用,只有少数(一般是rabbitmq自带的)是内部使用的。features里的I就是internal,表示只在内部使用,自带的amq.rabbitmq.trace用来跟踪rabbitmq内部的消息投递过程,只在内部使用。
  • arguments  给此交换机设置一些其它参数

 

//通过信道声明一个exchange,若已存在则直接使用,不存在会自动创建
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);

声明一个交换机,参数和上面控制台add a new exchange的参数顺序是一致的,arguments是用map表示,一般不必设置其它参数,写成null即可。

 

自带的7个交换机,第一个是(AMQP default),不是说这个交换机的名字是AMQP default。

这个交换机的名字没有名字(空),如果要使用这个交换机,代码里交换机的name要写成空串,括号是说明这个交换机是rabbitmq默认的交换机。

 

 

 

 

exchange的4种类型

我们绑定exchange、queue时使用了一个routing-key:

private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);

这个exchange使用routing-key来匹配绑定的queue,即要将消息发送到哪些队列。

 

 

 在发送消息时又使用了一个routing-key:

 String routing_key = "my_routing_key.key1"; //发送消息使用的routing-key

 channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes());

rabbitmq会将这个消息发送到指定的exchange,



如何匹配?是完全匹配还是模糊匹配?这就涉及到exchange的4种type:

 

(1)direct  完全匹配

发送消息使用的routing-key,要与交换机使用的routing-key完全相同。

比如exchange使用的routing-key是"my_routing_key",那发送消息使用的routing_key也要是"my_routing_key"

 

 

(2)topic  模糊匹配,可以使用通配符

*只能匹配一个词,#可以匹配一个或多个词。

注意是词,不是字符。"my_routing_key.key1",my_routing_key是一个词,key1是一个词,词之间用点号分隔。

比如exchange的routing-key是"my_routing_key.#" ,则发送消息使用的routing-key以"my_routing_key."开头即可

我在示例中用的就是这种。这种方式非常适合把一个消息投递到多个queue(应用)

 

 

(3)fanout  广播模式

不使用routing-key,一般把routing-key都设置为空串,当然设置为什么字符串都行,反正都不用。

exchange会把消息投递(广播)到此exchange绑定的所有的queue。

这种模式效率很高,因为不进行routing-key的匹配,大大减小了时间开销。

 

 

(4)headers  首部模式(了解即可)

不使用routing-key(路由键),根据header将消息投递到匹配的queue。

Map<String, Object> exchange_headers = new Hashtable<String, Object>();
headers.put("x-match", "any"); //指定键值对匹配模式any、all
headers.put("key1", "value1");  //放入一些键值对
headers.put("key2", "value2");

//绑定queue时指定指定map。至于routing-key,设置为什么串都行,反正不使用
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"", exchange_headers);


//发送消息时也要使用map
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("key1", "value1");  //放入一些键值对


Builder properties = new BasicProperties.Builder();
properties.headers(headers);


String msg="hello";
//指定消息要使用的header。要使用properties的形式,不能直接发送map。会放在http请求头中
channel.basicPublish(EXCHANGE_NAME, "",properties.build(),msg.getBytes());

x-match指定匹配模式,all:发送消息的header(map)中的键值对要和exchange的header中的所有的键值对都要相同,exchange的header有2个键值对key1、key2,发送消息的header中也要有这2个键值对(需要完全相同)。any:发送消息的header中只要有一个键值对和exchange中的键值对完全相同就行,比如key1、key2都行,只要有一个就行了。

header这种方式不常用,因为有点复杂。都要匹配queue,用routing-key它不简单,它不香吗?非要搞得这么复杂。

 

不过properties的使用还是需要了解一下:

一个消息由properties、body组成,也就是basicPublish()的后2个参数。

properties可以设置此消息的一些参数,比如延时投递、优先级,这些参数写成键值对放在map中,将map转换为properties,再将properties作为basicPublish()的参数。

 

 

 

 

Queue详解

 

type:指定queue类型,默认为classic(主队列),还可以设置为quorum(从队列),将主队列同步到从队列,主队列故障时还可以用从队列。

name:此queue的名称

durability:queue中的消息是否持久化到硬盘。exchange也有此属性,消息会先发给exchange保存,exchange再投递到某些queue,消费者还没处理此消息时,消息会保存在queue中。

auto delete:如果此queue没有绑定到任何一个exchange,是否自动删除此queue

arguments:设置一些其他参数

 

//声明一个queue
//参数:name、是否支持持久化、是否是排它的、是否支持自动删除、其他参数(map)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

排它:这个queue只能在当前连接中使用(拒绝在其他连接中使用),由当前连接声明|创建,断开连接会自动删除此queue。

 

一个exchange可以绑定多gueue,一个queue也可以绑定到多个exchange上。





 

消费者

public class Consumer {
    private final static String QUEUE_NAME = "my_queue"; //队列名称

    public static void receive() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.9"); //设置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的端口号
        connectionFactory.setVirtualHost("/");  //使用的虚拟主机

        //由连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //通过连接创建信道
        Channel channel = connection.createChannel();

        //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String msg = new java.lang.String(body);
                System.out.println("received msg: " + msg);
            }
        };

        //监听指定的queue。会一直监听。
        //参数:要监听的queue、是否自动确认消息、使用的Consumer
        channel.basicConsume(QUEUE_NAME, true, consumer);

    }
}

这段代码表面上没有问题,监听queue就完事了。但有一个隐患:

我们是在生产者中声明|创建的exchange、queue,如果生产者尚未运行,并且rabbitmq上没有对应的exchange、queue(之前没有创建),启动消费者,消费者要监听指定的queue,根本就连接不上queue,指定的queue创都没创建,监听什么?直接报错。

 

 

更加健壮的写法是:在消费者中也声明exchange、queue,有就直接用,没有会自动创建。

public class Consumer {
    private final static String QUEUE_NAME = "my_queue"; //队列名称
    private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名称
    private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名称
    private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key

    public static void receive() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.9"); //设置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的端口号
        connectionFactory.setVirtualHost("/");  //使用的虚拟主机

        //由连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //通过连接创建信道
        Channel channel = connection.createChannel();

        //通过信道声明一个exchange,若已存在则直接使用,不存在会自动创建
        //参数:name、type、是否支持持久化、此交换机没有绑定一个queue时是否自动删除、是否只在rabbitmq内部使用此交换机、此交换机的其它参数(map)
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);

        //通过信道声明一个queue,如果此队列已存在,则直接使用;如果不存在,会自动创建
        //参数:name、是否支持持久化、是否是排它的、是否支持自动删除、其他参数(map)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //将queue绑定至某个exchange。一个exchange可以绑定多个queue
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);

        //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String msg = new java.lang.String(body);
                System.out.println("received msg: " + msg);
            }
        };

        //监听指定的queue。会一直监听。
        //参数:要监听的queue、是否自动确认消息、使用的Consumer
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

 

 

 

 

rabbitmq控制台的queue:

ready是此queue中待投递的消息数,unacked是已投递、但消费者尚未确认的消息数(和快递已签收、未收货差不多),total是消息总数,即前面2个之和。

incoming是一个消息从exchange进入queue花的时间

deliver/get是一个消息从queue投递到消费者花的时间,

ack是一条消息投递给消费者后,过了多长时间queue才收到消费者的应答。

/s表示单位是秒

 

 

ack  确认、应答。

消费者收到queue投递的消息,然后处理消息,处理后发送一个数据包给queue作为确认、应答(相当于拿到包裹,试了下没问题,收货),

queue将消息投递给消费者后,queue中仍然保留此消息,要消费者应答后才会删除此消息。

 

 

//参数:要监听的queue、是否自动确认消息、使用的Consumer
channel.basicConsume(QUEUE_NAME, true, consumer);

第二个参数:autoAck,是否自动应答。

true:自动应答,queue把消息投递给消费者,就认为消费者签收了,投递了一个消息就直接删除该消息。

这并不可靠,如果消费者还没处理完就出故障了,那这条消息就丢失了、没被处理到。

fasle:不使用自动应答,需要消费者自己应答。

 

 

消费者手动应答:

     //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String msg = new java.lang.String(body);
                System.out.println("received msg: " + msg);
                channel.basicAck(envelope.getDeliveryTag(), false);  //处理完了,应答|签收
                //channel.basicReject(envelope.getDeliveryTag(), true); //拒收
            }
        };

        //监听指定的queue。会一直监听。
        //参数:要监听的queue、是否自动确认消息、使用的Consumer
        channel.basicConsume(QUEUE_NAME, false, consumer);

就算消费者处理消息时宕机,只要不应答,queue中的这条消息就一直存在,消费者再次启动时还会投递此消息。

basicAck()的第一个参数是DeliveryTag,在一个queue中唯一标识一条消息,相当于一条消息的id。

第二个参数是multiple,多个、批处理,是否将多个消息的应答放在一起、一次性发给queue,设置为true可减少网络流量、防止网络阻塞,但是之前消息的应答有时延。

 

 

如果处理消息时发生了异常(代码执行出了问题),在catch中拒收就是了:

catch (...){ 
  channel.basicReject(envelope.getDeliveryTag(), true); //拒收,重新入队
  //..... //记录日志
}

第二个参数是requeue,是否重新入队,设置为fasle,不再重新入队,queue会删除此消息;设置为true,重新入队,queue会将此消息重新投递给消费者。

没必要把消息的整个处理流程都放在try中,只把可能出现异常的代码块放在try中即可,在catch中拒收。

 

这就是rabbitmq提供的可靠投递机制。再加上消息的持久化,做到了rabbitmq的高可靠性。

 

但重新入队有一个问题:如果大量的消息重新入队,重新投递这些消息会占用资源,使其它消息的投递变慢。

 

 

 

 

开发过程中可能遇到的问题

1、exchange的name唯一标识一个exchange,调试时可能修改了exchange的类型,如果之前存在一个同名的exchange,会报错。

 如果之前的同名的exchange不要了,到rabbitmq控制台删除同名的exchange即可;如果之前的同名的exchange还要用,就把现在的exchange改下name。

 

2、消费者要一直监听queue,所以消费者的channel、connection都没关闭,再次启动时可能连接不上,会报错,因为rabbitmq上还保持着这个连接。

 等几分钟再运行,等连接超时被删除即可。

 

 

 

说明

1、可以在rabbitmq控制台设置queue的绑定、发送消息到queue

delivery  投递、交付

persistent  持续的、持久的、坚持不懈的。表示此消息会持久化到硬盘。

payload即消息的body。

点击publish message会将此消息投递到当前queue。

 

 

2、消息的有效期

有些消息对即时性要求很高,过了一些时间,如果这条消息还积压在queue中,这条消息可能就没有使用价值了,没必要再投递,需要删除这条消息。

可以设置消息的有效期,如果指定的时间内没有投递此消息,queue会自动删除此消息,不再投递。

具体代码参考:https://blog.csdn.net/liu0808/article/details/81356552

 


原文链接:https://www.cnblogs.com/chy18883701161/p/12501428.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:我去,同事居然用明文存储密码!!!

下一篇:一个线程池 bug 引发的 GC 思考!