rabbitmq里面有死信队列的功能,但是《rabbitmq实战》书里面好像没有提到。下面稍微提一下。我对死信的理解通俗的讲就是,将一些由于意外没有被正常消费的消息重新发配到一个单独的队列中,后续可以对这些死信队列的消息做处理。
配置死信有两种方式。然后官方文档是如下http://www.rabbitmq.com/dlx.html
第一种是用queue_declare里的arguments参数指定。第二种是可以开启rabbitmq的web manage插件,然后在web页面里面做policy配置,如果需要将消息发往死信队列只需要本应该消息确认的那里改为不确认basic_nack(delivery_tag=method.delivery_tag,multiple=False,requeue=False)就行了
对了还有需要注意的是 rabbitmq镜像队列的设置问题。书上提到的是在queue_declare里的arguments参数中添加"x-ha-policy":"all"键值对。但是我网上查了下,目前3.0版本以上后,就不支持这种方式了。需要在rabbitmq中添加policy.这个可以开启rabbitmq的webmanage插件,然后在web页面里面方便的做配置.
What changed? In RabbitMQ 3.0, queue mirroring is no longer controlled by the x-ha-policy argument when declaring a queue. Your applications can continue to declare this argument, but it won't cause queues to be mirrored. Instead you can declare one or more policies which control which queues are mirrored, and how.
下面是具体的consumer脚本已做参考
#!/usr/bin/env python #_*_ coding:utf-8 _*_ import pika import time def msg_consumer(channel,method,properties,body): #if send msg is quit then consumer script exist if body == "quit": channel.basic_cancel(consumer_tag="pt-consumer") channel.stop_consuming() else: try: raise Exception print(" [x] %r:%r" % (method.routing_key, body)) #time.sleep(1) #确认消息 这个放在后面 channel.basic_ack(delivery_tag=method.delivery_tag) except: print("asdasdfa") channel.basic_nack(delivery_tag=method.delivery_tag,multiple=False,requeue=False) return if __name__ == "__main__": ####用户认证 credentials = pika.PlainCredentials("test","test111") conn_params = pika.ConnectionParameters(host="192.168.1.215",port=5672,credentials = credentials) connection = pika.BlockingConnection(conn_params) channel = connection.channel() #声明交换器和队列 channel.exchange_declare(exchange='pt-exchange', exchange_type='direct', passive=False, durable=True, auto_delete=False) #创建业务队列并且添加死信设置 channel.queue_declare(queue="pt-queue",arguments={"x-ha-policy":"all","x-dead-letter-exchange":"pt-dlx-exchange","x-dead-letter-routing-key":"pt-fail"}) channel.queue_bind(queue="pt-queue",exchange='pt-exchange',routing_key='hola') #创建死信交换机和队列 channel.exchange_declare(exchange='pt-dlx-exchange', exchange_type='direct', passive=False, durable=True, auto_delete=False) channel.queue_declare(queue="pt-dlx-queue") channel.queue_bind(queue="pt-dlx-queue",exchange='pt-dlx-exchange',routing_key='pt-fail') #设置消费程序方法 channel.basic_consume(msg_consumer, queue="pt-queue", consumer_tag="pt-consumer",) #开始消费 channel.start_consuming()
Cloudhu 个人随笔|built by django|
沪ICP备16019452号-1