基本介绍
RabbitMQ
, 是一个使用erlang
编写的AMQP(高级消息队列协议)
的服务实现. 简单来说, 就是一个功能强大的消息队列服务.流程上来说,是发消息者(producer)把消息放到队列(queue)中去,然后收消息者(consumer)从队列中取出消息.
RabbitMQ
在这个基本概念之上, 多做了一层抽象, 在发消息者
和队列
之间, 加入了交换器 (Exchange)
. 这样发消息者
和队列
就没有直接联系, 转而变成发消息者
把消息给交换器
, 交换器
根据调度策略再把消息再给队列
。
rabbitmq
中几个比价重要的概念如下:
- Producer,生产者,创建消息,然后发布到rabbitmq
中,消息一般可以包含两个部分:消息体(payload)和标签(label),消息体一般是带有业务逻辑结构的数据。
- Consumer,消费者,接收消息。消费者连接到rabbitmq服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息体,存入到队列中的消息只有消息体。
- Broker 消费中间件的服务节点。
- 交换机 生产者将消息发送到Exchange(交换器,通常也可以用大写的“X”来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。
消息队列运转过程:
使用示例
下面介绍下python
如何使用rabbitmq
,这里假定你已经有了rabbitmq
的环境并且已经配置好了,下面只介绍如何使用。
生产者:
import pika
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
# 从连接上获取channel
channel = connection.channel()
# 定义名为testexchange的交换机类型是fanout,交换机支持持久化
channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)
# 定义名为hello的队列,设置其支持持久化
channel.queue_declare(queue='hello', durable=True)
# 将hello队列绑定到我们定义的testexchange交换机上
channel.queue_bind(exchange='testexchange', queue='hello')
for i in range(10):
channel.basic_publish(exchange='testexchange', routing_key='hello', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print(" [x] Sent 'Hello World!{}'".format(i))
connection.close()
消费者:
import pika
from time import sleep
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
sleep(1)
channel.basic_consume(
queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
这里在消费者和生产者都定义了同样的队列,这样做是因为你不知道消费者还是生产者哪个会先启动起来。
我们这里为交换机,队列,消息都设置了durable=True
使其支持持久化,这样在当rabbitmq异常退出之后,你的消息不至于丢失。
交换机类型
参考在上面的生产者中定义交换机的代码:
channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)
其中exchange_type='fanout'
就是设置交换机的类型,RabbitMQ
常用的交换器类型有fanout
、direct
、topic
、headers
这四种,下面分别介绍下。
fanout
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。 参考如下代码:
import pika
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)
channel.queue_bind(exchange='testexchange', queue='hello')
channel.queue_bind(exchange='testexchange', queue='hello1')
for i in range(10):
channel.basic_publish(exchange='testexchange', routing_key='hello', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print(" [x] Sent 'Hello World!{}'".format(i))
connection.close()
运行结果如下(这里我都没有启动consumer):
这里可以看到,
hello
和hello1
队列中都有消息进入,而hello2
没有,因为他没有绑定。
direct
direct类型的交换器路由规则也很简单,它会把消息路由到那些RoutingKey完全匹配的队列中。 参考如下代码:
import pika
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)
channel.queue_bind(exchange='testexchange', queue='hello', routing_key='hello')
channel.queue_bind(exchange='testexchange', queue='hello1', routing_key='hello1')
channel.queue_bind(exchange='testexchange', queue='hello2', routing_key='hello')
for i in range(10):
channel.basic_publish(exchange='testexchange', routing_key='hello', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print(" [x] Sent 'Hello World!{}'".format(i))
connection.close()
运行结果如下:
这里可以看到,
hello
和hello3
队列中都有消息进入,而hello2
没有,因为他绑定的routing_key
不是hello
。
topic
topic类型的交换器在匹配规则上进行了扩展,它与direct类型的交换器相似,也是将消息路由到routing_key相匹配的队列中。routing_key中可以存在两种特殊字符串*
和#
,用于做模糊匹配,其中*
用于匹配一个单词,#
用于匹配多规格单词(可以是零个)。
参考如下代码:
import pika
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='topic', durable=True)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)
channel.queue_bind(exchange='testexchange', queue='hello', routing_key='hello_1.*.*')
channel.queue_bind(exchange='testexchange', queue='hello1', routing_key='hello_1.#')
channel.queue_bind(exchange='testexchange', queue='hello2', routing_key='hello')
for i in range(10):
channel.basic_publish(exchange='testexchange', routing_key='hello_1', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print(" [x] Sent 'Hello World!{}'".format(i))
channel.basic_publish(exchange='testexchange', routing_key='hello_1.a.b', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
connection.close()
运行结果如下:
我们发出了两条消息,
hello_1
只会被hello_1.#
匹配到,而hello_1.a.b
会被两个都匹配到。
headers
headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.
import pika
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='headers', durable=True)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)
channel.queue_bind(exchange='testexchange', queue='hello', arguments={'a': '1'})
channel.queue_bind(exchange='testexchange', queue='hello1', arguments={'b': '2', 'c': '3', 'x-match': 'all'})
channel.queue_bind(exchange='testexchange', queue='hello2', arguments={'a': '1', 'b': '4', 'c': '5', 'x-match': 'any'})
for i in range(10):
channel.basic_publish(exchange='testexchange', routing_key='', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
headers={'a': '1'}
))
print(" [x] Sent 'Hello World!{}'".format(i))
channel.basic_publish(exchange='testexchange', routing_key='', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
headers={'a': '1', 'b': '2'}
))
connection.close()
运行结果如下:
另外还有消费者的确认机制,我们下篇文章介绍。
lihai
赞!
good job
<div class="codehilite"> </div>nice
good job!!!!!!
good