Python操作Rabbit MQ的5种模式

2018-09-01 05:52:32来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

python版本:   2.7.14

一 消息生产者代码:

 1 # -*- coding: utf-8 -*-
 2 
 3 import json
 4 import pika
 5 import urllib
 6 import urllib2
 7 import chardet
 8 import sys
 9 import json
10 from common import CommonMethod
11 import pika
12 import time
13 
14 HOST_NAME = "172.21.204.14"
15 USER_NAME = "xxx"
16 PASSWORD = "xxx"
17 
18 # 1."Hello World!"
19 def hello_world():
20     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
21     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
22     channel = connection.channel()
23     
24     channel.queue_declare(queue='hello')
25     channel.basic_publish(exchange='',
26                             routing_key='hello',      # specify queue  name
27                             body='Hello World!')
28     print(" [x] Sent 'Hello World!'")
29     connection.close()
30 
31 # 2."Work queues"
32 def new_task():
33     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
34     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
35     channel = connection.channel()
36 
37     channel.queue_declare(queue='task_queue', durable=True)   # 设置队列持久化
38     message = ' '.join(sys.argv[1:]) or "Hello World!"
39     channel.basic_publish(exchange='',
40                         routing_key='task_queue',
41                         body=message,
42                         properties=pika.BasicProperties(
43                             delivery_mode = 2,                # 设置消息持久化
44                         ))
45     print(" [x] Sent %r" % message) 
46     connection.close()
47 
48 # 3."Publish/Subscribe"
49 def emit_log(message):
50     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
51     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
52     channel = connection.channel()
53 
54     channel.exchange_declare(exchange='logs',        # 申明logs交换机
55                          exchange_type='fanout')     # 交换机类型: 发布/订阅
56 
57     channel.basic_publish(exchange='logs',
58                         routing_key='',
59                         body=message)
60     print(" [x] Sent %r" % message)
61     connection.close()
62 
63 # 4."Routing"
64 def emit_log_direct(log_level,message):
65     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
66     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
67     channel = connection.channel()
68 
69     channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
70                          exchange_type='direct')     # 交换机类型: 路由(Routing)
71 
72     channel.basic_publish(exchange='direct_logs',
73                         routing_key=log_level,
74                         body=message)
75     print(" [x] Sent %r:%r" % (log_level, message))
76     connection.close()
77 
78 emit_log_direct("info", "info log message:...")
79 emit_log_direct("error", "error log message:...")
80 
81 # 5."Topic"
82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
View Code

 

二 消息消费者代码:

 1 # -*- coding: utf-8 -*-
 2 
 3 import json
 4 import pika
 5 import urllib
 6 import urllib2
 7 import chardet
 8 import sys
 9 import json
10 from common import CommonMethod
11 import pika
12 import time
13 
14 HOST_NAME = "172.21.204.14"
15 USER_NAME = "xxx"
16 PASSWORD = "xxx"
17 
18 # 1."Hello World!"
19 def hello_world():
20     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
21     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
22     channel = connection.channel()
23     
24     channel.queue_declare(queue='hello')
25     channel.basic_publish(exchange='',
26                             routing_key='hello',      # specify queue  name
27                             body='Hello World!')
28     print(" [x] Sent 'Hello World!'")
29     connection.close()
30 
31 # 2."Work queues"
32 def new_task():
33     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
34     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
35     channel = connection.channel()
36 
37     channel.queue_declare(queue='task_queue', durable=True)   # 设置队列持久化
38     message = ' '.join(sys.argv[1:]) or "Hello World!"
39     channel.basic_publish(exchange='',
40                         routing_key='task_queue',
41                         body=message,
42                         properties=pika.BasicProperties(
43                             delivery_mode = 2,                # 设置消息持久化
44                         ))
45     print(" [x] Sent %r" % message) 
46     connection.close()
47 
48 # 3."Publish/Subscribe"
49 def emit_log(message):
50     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
51     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
52     channel = connection.channel()
53 
54     channel.exchange_declare(exchange='logs',        # 申明logs交换机
55                          exchange_type='fanout')     # 交换机类型: 发布/订阅
56 
57     channel.basic_publish(exchange='logs',
58                         routing_key='',
59                         body=message)
60     print(" [x] Sent %r" % message)
61     connection.close()
62 
63 # 4."Routing"
64 def emit_log_direct(log_level,message):
65     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
66     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
67     channel = connection.channel()
68 
69     channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
70                          exchange_type='direct')     # 交换机类型: 路由(Routing)
71 
72     channel.basic_publish(exchange='direct_logs',
73                         routing_key=log_level,
74                         body=message)
75     print(" [x] Sent %r:%r" % (log_level, message))
76     connection.close()
77 
78 emit_log_direct("info", "info log message:...")
79 emit_log_direct("error", "error log message:...")
80 
81 # 5."Topic"
82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
View Code

 

三 图片

 

 

 

官网参考文档: http://www.rabbitmq.com/getstarted.html

 

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:一封来自“Python”的信

下一篇:爬虫 Scrapy框架 爬取图虫图片并下载