Python操作Rabbit MQ的5种模式
2018-09-01 05:52:32来源:博客园 阅读 ()
python版本: 2.7.14
一 消息生产者代码:
1 # -*- coding: utf-8 -*- 2 3 import json 4 import pika 5 import urllib 6 import urllib2 7 import chardet 8 import sys 9 import json 10 from common import CommonMethod 11 import pika 12 import time 13 14 HOST_NAME = "172.21.204.14" 15 USER_NAME = "xxx" 16 PASSWORD = "xxx" 17 18 # 1."Hello World!" 19 def hello_world(): 20 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 21 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 22 channel = connection.channel() 23 24 channel.queue_declare(queue='hello') 25 channel.basic_publish(exchange='', 26 routing_key='hello', # specify queue name 27 body='Hello World!') 28 print(" [x] Sent 'Hello World!'") 29 connection.close() 30 31 # 2."Work queues" 32 def new_task(): 33 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 34 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 35 channel = connection.channel() 36 37 channel.queue_declare(queue='task_queue', durable=True) # 设置队列持久化 38 message = ' '.join(sys.argv[1:]) or "Hello World!" 39 channel.basic_publish(exchange='', 40 routing_key='task_queue', 41 body=message, 42 properties=pika.BasicProperties( 43 delivery_mode = 2, # 设置消息持久化 44 )) 45 print(" [x] Sent %r" % message) 46 connection.close() 47 48 # 3."Publish/Subscribe" 49 def emit_log(message): 50 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 51 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 52 channel = connection.channel() 53 54 channel.exchange_declare(exchange='logs', # 申明logs交换机 55 exchange_type='fanout') # 交换机类型: 发布/订阅 56 57 channel.basic_publish(exchange='logs', 58 routing_key='', 59 body=message) 60 print(" [x] Sent %r" % message) 61 connection.close() 62 63 # 4."Routing" 64 def emit_log_direct(log_level,message): 65 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 66 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 67 channel = connection.channel() 68 69 channel.exchange_declare(exchange='direct_logs', # 申明logs交换机 70 exchange_type='direct') # 交换机类型: 路由(Routing) 71 72 channel.basic_publish(exchange='direct_logs', 73 routing_key=log_level, 74 body=message) 75 print(" [x] Sent %r:%r" % (log_level, message)) 76 connection.close() 77 78 emit_log_direct("info", "info log message:...") 79 emit_log_direct("error", "error log message:...") 80 81 # 5."Topic" 82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
二 消息消费者代码:
1 # -*- coding: utf-8 -*- 2 3 import json 4 import pika 5 import urllib 6 import urllib2 7 import chardet 8 import sys 9 import json 10 from common import CommonMethod 11 import pika 12 import time 13 14 HOST_NAME = "172.21.204.14" 15 USER_NAME = "xxx" 16 PASSWORD = "xxx" 17 18 # 1."Hello World!" 19 def hello_world(): 20 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 21 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 22 channel = connection.channel() 23 24 channel.queue_declare(queue='hello') 25 channel.basic_publish(exchange='', 26 routing_key='hello', # specify queue name 27 body='Hello World!') 28 print(" [x] Sent 'Hello World!'") 29 connection.close() 30 31 # 2."Work queues" 32 def new_task(): 33 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 34 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 35 channel = connection.channel() 36 37 channel.queue_declare(queue='task_queue', durable=True) # 设置队列持久化 38 message = ' '.join(sys.argv[1:]) or "Hello World!" 39 channel.basic_publish(exchange='', 40 routing_key='task_queue', 41 body=message, 42 properties=pika.BasicProperties( 43 delivery_mode = 2, # 设置消息持久化 44 )) 45 print(" [x] Sent %r" % message) 46 connection.close() 47 48 # 3."Publish/Subscribe" 49 def emit_log(message): 50 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 51 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 52 channel = connection.channel() 53 54 channel.exchange_declare(exchange='logs', # 申明logs交换机 55 exchange_type='fanout') # 交换机类型: 发布/订阅 56 57 channel.basic_publish(exchange='logs', 58 routing_key='', 59 body=message) 60 print(" [x] Sent %r" % message) 61 connection.close() 62 63 # 4."Routing" 64 def emit_log_direct(log_level,message): 65 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 66 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 67 channel = connection.channel() 68 69 channel.exchange_declare(exchange='direct_logs', # 申明logs交换机 70 exchange_type='direct') # 交换机类型: 路由(Routing) 71 72 channel.basic_publish(exchange='direct_logs', 73 routing_key=log_level, 74 body=message) 75 print(" [x] Sent %r:%r" % (log_level, message)) 76 connection.close() 77 78 emit_log_direct("info", "info log message:...") 79 emit_log_direct("error", "error log message:...") 80 81 # 5."Topic" 82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
三 图片
官网参考文档: http://www.rabbitmq.com/getstarted.html
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
上一篇:一封来自“Python”的信
- python3基础之“术语表(2)” 2019-08-13
- python3 之 字符串编码小结(Unicode、utf-8、gbk、gb2312等 2019-08-13
- Python3安装impala 2019-08-13
- 小白如何入门 Python 爬虫? 2019-08-13
- python_字符串方法 2019-08-13
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash