RabbitMQ学习笔记之(三) 发送方确认机制


上篇文章中,我们介绍了消费端的确认,那么在生产者有没有确认机制呢?答案是有的。 参考我们发送消息的代码段:

channel.basic_publish(exchange='testexchange', routing_key='hello', body='Hello World!{}'.format(i),
                          mandatory=True,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息持久化
                          ))

如果在发送方发出消息后,如果exchange写错了,或者没有任何队列绑定我们发送的exchange,那么在这时候发送方是对此浑然不知的,而rabbitmq为了解决这个问题下面介绍下。

生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理,如下图所示: 通过调用channel.confirm_delivery()就可以将信道设置为comfirm模式。

import pika

user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()

channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)

channel.queue_declare(queue='hello', durable=True)
channel.queue_bind(exchange='testexchange', queue='hello')

channel.confirm_delivery()

for i in range(10):
    channel.basic_publish(exchange='testexchange123', routing_key='hello', body='Hello World!{}'.format(i),
                          mandatory=True,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息持久化

                          ))
    print(" [x] Sent 'Hello World!{}'".format(i))
connection.close()

现在,如果我们写错了交换机,在发送的时候就会报错:pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - no exchange 'testexchange123' in vhost '/'")。 而mandatory=True参数来确认发出的消息是否有queue接收, 并且所有queue都成功接收,如果没有绑定队列在交换机上的话,在发送的时候就会报错:pika.exceptions.UnroutableError: 1 unroutable message(s) returned

发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果rabbitmq因为自身内部错误导致消息丢失,就会发送一条nack(basic.nack)命令,生产者应用程序同样可以在回调方法中处理该nack命令。