消息中间件--ActiveMQ&JMS消息服务
2018-06-18 01:52:55来源:未知 阅读 ()
### 消息中间件 ###
**消息中间件**
### JMS消息服务 ###
**JMS的概述**
### 消息中间件:ActiveMQ ###
**ActiveMQ的下载与安装**
**ActiveMQ的消息队列方式入门**(P2P模式)
<!-- activemq start --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.2.0</version> </dependency> <!-- activemq end -->
<!-- spring 与 mq整合 start --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.2.4.RELEASE</version> </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.7</version> </dependency> <!-- spring 与 mq整合 end -->
@Test public void sendQueueMessage() throws JMSException { // 1 创建连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); // 2 使用工厂,创建连接 Connection connection = factory.createConnection(); // 3 启动连接 connection.start(); // 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 5 创建队列队形(myQueue--队列的名字)/topic-----------session创建 Queue queue = session.createQueue("myQueue"); // 6 创建生产者-----------session创建 MessageProducer producer = session.createProducer(queue); // 7 创建消息----文本消息-------session创建 TextMessage message = session.createTextMessage(); message.setText("helloworld!!!"); // 8 发送消息 producer.send(message); // 9 提交事务 session.commit(); session.close(); connection.close(); }
@Test public void receiverQueueMessage() throws JMSException { // 1 创建连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); // 2 使用工厂,创建连接 Connection connection = factory.createConnection(); // 3 启动连接 connection.start(); // 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 5 创建队列队形(hello--队列的名字)/topic-----------session创建 Queue queue = session.createQueue("myQueue"); // 6 创建消费者-----------session创建 MessageConsumer consumer = session.createConsumer(queue); // 7 接收消息----text格式 TextMessage receive = (TextMessage) consumer.receive(); String text = receive.getText(); System.out.println("接收到的消息====" + text); // 8 提交事务 session.commit(); session.close(); connection.close(); }
/** *异步方式 Queue接受用Listener方式接受,多用 如果有多个监听listener,则交替执行 * @throws Exception */ @Test public void receiverQueueListener() throws Exception{ // 1 创建连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); // 2 使用工厂,创建连接 Connection connection = factory.createConnection(); // 3 启动连接 connection.start(); // 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务//死循环的不能用事物 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5 创建队列队形(hello--队列的名字)/topic-----------session创建 Queue queue = session.createQueue("myQueue"); // 6 创建消费者-----------session创建 MessageConsumer consumer = session.createConsumer(queue); //7 // 给消费者添加监听器 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message msg) { TextMessage message = (TextMessage) msg; try { System.out.println("Listener1111111111接收到的消息是=="+message.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); while(true){} // 使用监听器的方式不能关闭,需要监听器一直工作 // session.commit(); // session.close(); // connection.close(); }
**ActiveMQ的消息订阅方式入门**(Pub/Sub模式)
/** * Topic发送 * @throws JMSException */ @Test public void sendTopicMessage() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建消息订阅 Topic topic = session.createTopic("myTopic"); // 创建生产者 MessageProducer producer = session.createProducer(topic); // 创建消息,一组可以存储key value的消息 MapMessage message = session.createMapMessage(); message.setString("username", "cgx"); message.setString("password", "123456"); // 发送消息 producer.send(message); // 提交事务 session.commit(); session.close(); connection.close(); }
/** * Topic接受 * * @throws JMSException */ @Test public void testReceiverMessage() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建消息订阅 Topic topic = session.createTopic("myTopic"); // 创建消费者 MessageConsumer consumer = session.createConsumer(topic); // 接收消息 MapMessage message = (MapMessage) consumer.receive(); System.out.println(message.getString("username")); System.out.println(message.getString("password")); session.commit(); session.close(); connection.close(); }
/** * Topic接受Listener监听方式 * * @throws Exception */ @Test public void receiverQueueListener() throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建消息订阅 Topic topic = session.createTopic("myTopic"); // 创建消费者 MessageConsumer consumer = session.createConsumer(topic); // 给消费者添加监听器consumer添加监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message msg) { MapMessage message = (MapMessage) msg; try { System.out.println(message.getString("username")); System.out.println(message.getString("password")); } catch (JMSException e) { e.printStackTrace(); } } }); while (true) { } }
### Spring整合ActiveMQ ###★★★★★
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> </beans>
<!-- 配置连接工厂 --> <!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- Session缓存数量和链接数有关 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- 定义JmsTemplate的Queue类型★★★★★ --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 定义JmsTemplate的Topic类型★★★★★ --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate" > <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- pub/sub模型(发布/订阅) --> <property name="pubSubDomain" value="true" /> </bean>
@Autowired @Qualifier(value="jmsQueueTemplate") private JmsTemplate queueTemplate;//Queue /** * Queue发送消息---spring框架 */ @Test public void sendQueueMessage() { // 发送消息 构造参数指定目标,因为配置文件中的队列和订阅模式是通过id与false和true进行区分 queueTemplate.send("myQueue", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { // 使用session创建消息,发送 TextMessage textMessage = session.createTextMessage("测试结合spring框架发送queue消息"); return textMessage; } }); }
@Autowired @Qualifier(value = "jmsTopicTemplate") private JmsTemplate topicTemplate;//Topic /** * Topic发送消息---spring框架 */ @Test public void sendTopicMessage() { topicTemplate.send("spring_topic", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("username", "mdzz"); return mapMessage; } }); }
/** * Queue接收消息---spring框架 * 同步手动:不提倡 * receive("myQueue")要写目标,不写目标的话会报找不到目标的错误NO defaultDestination */ @Test public void receiverMessage() { //接收消息textMessage类型 TextMessage textMessage = (TextMessage) queueTemplate.receive("myQueue"); try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }
@Component(value="queueConsumer1") public class QueueListener implements MessageListener { @Override public void onMessage(Message arg0) { // 把arg0强转 TextMessage textMessage = (TextMessage) arg0; try { // 输出消息 System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
@Component public class TopicConsumer1 implements MessageListener { @Override public void onMessage(Message arg0) { MapMessage mapMessage = (MapMessage) arg0; try { System.out.println("TopicConsumer1===="+mapMessage.getString("username")); } catch (JMSException e) { e.printStackTrace(); } } } @Component public class TopicConsumer2 implements MessageListener { //... }
<!-- 扫描包 --> <context:component-scan base-package="com.my.jms.consumer" /> <!-- ActiveMQ 连接工厂 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> <!-- Spring Caching连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <property name="sessionCacheSize" value="100" /> </bean> <!-- Spring JmsTemplate 的消息生产者 start--> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory"> <jms:listener destination="myQueue" ref="queueConsumer1"/> </jms:listener-container> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory"> <jms:listener destination="spring_topic" ref="topicConsumer1"/> <jms:listener destination="spring_topic" ref="topicConsumer2" /> </jms:listener-container>
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext-mq-consumer.xml") public class SpringQueueListenerTest { @Test public void test(){ while(true); } }
spring整合总结:
消息发送
1. 创建spring容器
2. 从容器中获取JMSTemplate对象,发送消息
3. 定义Destination
4. 使用JMSTemplate对象发送消息
消息接受
1. 创建一个类实现MessageListener 接口。业务处理在此类中实现。
2.在spring容器中配置DefaultMessageListenerContainer对象,引用MessageListener 实现类对象接收消息。
项目整合ActiveMQ:
1. 消息生产者整合ActiveMQ
消息生产者只需要发送消息
需要把JMSTemplate和Destination交给spring进行管理
/**===========================activeMQ消息发送========================================*/ // 发送消息!!! this.send("save", item.getId()); } @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination destination; /** * 此方法就是用来发送消息的 * 考虑:1、发送什么数据?2、我需要什么数据? * 在消息中需要:1、消息的标识:save,delete,update;2、商品的ID */ private void send(final String type, final Long itemId) { // TODO Auto-generated method stub jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { //创建消息体 TextMessage textMessage = new ActiveMQTextMessage(); //设置消息内容 Map<String, Object> map = new HashMap<>(); map.put("type", type); map.put("itemId", itemId); try { ObjectMapper mapper = new ObjectMapper(); textMessage.setText(mapper.writeValueAsString(map)); } catch (Exception e) { e.printStackTrace(); } return textMessage; } }); }
2. 消息消费改造
在search-service添加
ItemMessageListener:
/**===========================activeMQ消息发送========================================*/ @Autowired private SearchService searchService; @Override public void onMessage(Message message) { //先判断此消息类型是否是TextMessage if(message instanceof TextMessage){ //如果是,强转 TextMessage textMessage = (TextMessage)message; try { //获取消息:json String json = textMessage.getText(); //杰克逊第三作用:直接解析json数据 ObjectMapper mapper = new ObjectMapper(); JsonNode jsonNode = mapper.readTree(json); String type = jsonNode.get("type").asText(); Long itemId = jsonNode.get("itemId").asLong(); //根据解析出来的type,判断此type=save的时候我应该调用indexSearch方法 if("save".equals(type)){ searchService.indexItem(itemId); } } catch (Exception e) { e.printStackTrace(); } } }
索引库增加商品会触发mq:
SearchServiceImpl:
@Override public void indexItem(Long itemId) throws Exception { Item item = this.itemMapper.selectByPrimaryKey(itemId); SolrInputDocument doc = new SolrInputDocument(); doc.addField("id", item.getId()); doc.addField("item_title", item.getTitle()); doc.addField("item_image", item.getImage()); doc.addField("item_cid", item.getCid()); doc.addField("item_price", item.getPrice()); doc.addField("item_status", item.getStatus()); this.cloudSolrServer.add(doc); this.cloudSolrServer.commit(); }
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
下一篇:java整理1
- 数据源管理 | Kafka集群环境搭建,消息存储机制详解 2020-06-11
- 拼多多三面惨败,问题在于java中间件、数据库与spring框架. 2020-06-04
- 高吞吐量的分布式发布订阅消息系统Kafka之Producer源码分析 2020-05-30
- SpringBoot2.3整合RabbitMQ实现延迟消费消息 2020-05-26
- RedisTemplate实现消息队列并且批量插入数据。 2020-05-22
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash