ActiveMQ
2018-06-18 01:15:18来源:未知 阅读 ()
一、什么是消息中间件(MQ)
1.1 为什么会需要消息队列(MQ)?
主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。
2.2 什么是消息中间件
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)
常见的消息中间件产品:
(1)ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。我们在本次课程中介绍 ActiveMQ的使用。
(2)RabbitMQ
AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。
(3)ZeroMQ
史上最快的消息队列系统
(4)Kafka
Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。
二、JMS简介
2.1 什么是JMS
JMS(Java?Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC(java?Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· TextMessage--一个字符串对象
· MapMessage--一套名称-值对
· ObjectMessage--一个序列化的 Java 对象
· BytesMessage--一个字节的数据流
· StreamMessage -- Java 原始值的数据流
2.2 JMS 消息传递类型
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应:
另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收:
三、ActiveMQ下载与安装
3.1 下载
官方下载地址:http://activemq.apache.org/activemq-5153-release.html
3.2 安装(OSX,Linux类同)
解压下载文件,apache-activemq-5.12.0-bin.tar.gz
tar zxvf apache-activemq-5.12.0-bin.tar.gz
为apache-activemq-5.12.0目录赋权
chmod 777 apache-activemq-5.12.0
3.3 启动、访问与关闭
#切换至安装目录macosx下
macosx Mac$ pwd
/Users/Mac/JavaUtils/apache-activemq-5.15.3/bin/macosx
#启动activemq服务
macosx Mac$ ./activemq start
Starting ActiveMQ Broker...
#关闭activemq服务
:macosx Mac$ ./activemq stop
Stopping ActiveMQ Broker...
Stopped ActiveMQ Broker.
:macosx Mac$
四、JMS入门
4.1 点对点模式
一个生成者产生一个消息 只能被被一个消费者消费,消费完,消息就没有了。
4.1.1 消息生产者
(1)创建工程,引入依赖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency>
(2)创建生产者
1 public class QueueProducer { 2 3 public static void main(String[] args) throws JMSException { 4 //1.创建连接工厂 5 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 6 //2.获取连接 7 Connection connection = connectionFactory.createConnection(); 8 //3.启动连接 9 connection.start(); 10 /*4.获取session (参数1:是否启动事务, 11 参数2:消息确认模式[ 12 AUTO_ACKNOWLEDGE = 1 自动确认 13 CLIENT_ACKNOWLEDGE = 2 客户端手动确认 14 DUPS_OK_ACKNOWLEDGE = 3 自动批量确认 15 SESSION_TRANSACTED = 0 事务提交并确认 16 ])*/ 17 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 18 //5.创建队列对象 19 Queue queue = session.createQueue("test-queue"); 20 //6.创建消息生产者 21 MessageProducer producer = session.createProducer(queue); 22 //7.创建消息 23 TextMessage textMessage = session.createTextMessage("欢迎来到MQ世界"); 24 //8.发送消息 25 producer.send(textMessage); 26 //9.关闭资源 27 producer.close(); 28 session.close(); 29 connection.close(); 30 } 31 32 }
(3)运行通过界面查看
4.1.2 消息消费者
(1)创建消息消费者
1 public class QueueConsumer { 2 public static void main(String[] args) throws JMSException, IOException { 3 //1.创建连接工厂 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 5 //2.获取连接 6 Connection connection = connectionFactory.createConnection(); 7 //3.启动连接 8 connection.start(); 9 //4.获取session (参数1:是否启动事务,参数2:消息确认模式) 10 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 11 //5.创建队列对象 12 Queue queue = session.createQueue("test-queue"); 13 //6.创建消息消费者 14 MessageConsumer consumer = session.createConsumer(queue); 15 //7.监听消息 16 consumer.setMessageListener(new MessageListener() { 17 @Override 18 public void onMessage(Message message) { 19 TextMessage textMessage = (TextMessage) message; 20 try { 21 System.out.println("接收到消息:"+textMessage.getText()); 22 } catch (JMSException e) { 23 e.printStackTrace(); 24 } 25 } 26 }); 27 //8.等待键盘输入 28 System.in.read(); 29 //9.关闭资源 30 consumer.close(); 31 session.close(); 32 connection.close(); 33 } 34 35 }
(2)运行查看控制台输出与通过界面查看
4.2 发布/订阅模式
4.2.1 消息生产者
1 public class TopicProducer { 2 3 public static void main(String[] args) throws JMSException { 4 //1.创建连接工厂 5 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 6 //2.获取连接 7 Connection connection = connectionFactory.createConnection(); 8 //3.启动连接 9 connection.start(); 10 /*4.获取session (参数1:是否启动事务, 11 参数2:消息确认模式)*/ 12 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 13 //5.创建主题对象 14 Topic topic = session.createTopic("test-topic"); 15 //6.创建消息生产者 16 MessageProducer producer = session.createProducer(topic); 17 //7.创建消息 18 TextMessage textMessage = session.createTextMessage("欢迎来到MQ世界!"); 19 //8.发送消息 20 producer.send(textMessage); 21 //9.关闭资源 22 producer.close(); 23 session.close(); 24 connection.close(); 25 } 26 27 }
4.2.2 消息2个消费者(消费者2代码同消费者1)
1 public class TopicConsumer1 { 2 public static void main(String[] args) throws JMSException, IOException { 3 //1.创建连接工厂 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 5 //2.获取连接 6 Connection connection = connectionFactory.createConnection(); 7 //3.启动连接 8 connection.start(); 9 //4.获取session (参数1:是否启动事务,参数2:消息确认模式) 10 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 11 //5.创建主题对象 12 Topic topic = session.createTopic("test-topic"); 13 //6.创建消息消费者 14 MessageConsumer consumer = session.createConsumer(topic); 15 //7.监听消息 16 consumer.setMessageListener(new MessageListener() { 17 @Override 18 public void onMessage(Message message) { 19 TextMessage textMessage = (TextMessage) message; 20 try { 21 System.out.println("消费1--接收到消息:"+textMessage.getText()); 22 } catch (JMSException e) { 23 e.printStackTrace(); 24 } 25 } 26 }); 27 //8.等待键盘输入 28 System.in.read(); 29 //9.关闭资源 30 consumer.close(); 31 session.close(); 32 connection.close(); 33 } 34 35 }
4.2.3 运行查看测试结果
同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。
4.3 总结
发布订阅的模式 默认的请情况下:消息的内容不存在服务器,当生产者发送了一个消息,如果消费者之前没有订阅,就没了。
?点对点的方式:默认的请情况下:将消息存储在服务器上,消费者随时来取,但是一旦一个消费者获取到了消息,这个消息就没有了。
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
上一篇:IDEA安装插件
- 数据源管理 | Kafka集群环境搭建,消息存储机制详解 2020-06-11
- 5月到6月程序员到底经历了和什么,工资狂跌***元,你是否也 2020-06-10
- 为什么阿里巴巴Java开发手册中不允许魔法值出现在代码中? 2020-06-09
- 计算机基础到底是哪些基础?为什么很重要! 2020-06-08
- 为什么阿里巴巴Java开发手册中强制要求接口返回值不允许使用 2020-06-06
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash