本文记录了再python中,使用pika库与RabbitMQ进行工作,包括发消息,消费消息,以及一个封装的客户端类。

1.pika封装的RabbitMQClient类:RabbitMQClient.py

import pika
import logging


class RabbitMQClient:

    def __init__(self, conn_str):
        self.routing_key = "#"
        self.exchange_type = "topic"
        self.connection_string = conn_str
        self.connection = None
        self.channel = None
        self.queue_consumer_list = []

    def open_connection(self):
        self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
        self.channel = self.connection.channel()
        logging.debug("connection established")

    def close_connection(self):
        self.connection.close()
        logging.debug("connection closed")

    def prepare_consume_info(self):
        # 设置消费者一次只领取一条消息
        self.channel.basic_qos(prefetch_count=1)

        for q in self.queue_consumer_list:
            self.declare_exchange(q[1])
            self.declare_queue(q[0])
            self.queue_bind(q[0], q[1])

    def declare_exchange(self, exchange):
        self.channel.exchange_declare(exchange=exchange,
                                      exchange_type=self.exchange_type,
                                      durable=True)

    def declare_queue(self, queue):
        self.channel.queue_declare(queue=queue,
                                   durable=True,
                                   arguments={
                                       'x-dead-letter-exchange': 'RetryExchange'
                                   })

    def queue_bind(self, queue, exchange):
        self.channel.queue_bind(queue=queue,
                                exchange=exchange,
                                routing_key=self.routing_key)

    def do_publish(self, msg, exchange):
        self.channel.basic_publish(exchange=exchange,
                                   routing_key=self.routing_key,
                                   body=msg,
                                   properties=pika.BasicProperties(
                                       delivery_mode=2,
                                       type=exchange
                                   ))

    def declare_retry_queue(self):
        """
        创建异常交换器和队列,用于存放没有正常处理的消息。
        :return:
        """
        self.channel.exchange_declare(exchange='RetryExchange',
                                      exchange_type='fanout',
                                      durable=True)
        self.channel.queue_declare(queue='RetryQueue',
                                   durable=True)
        self.queue_bind('RetryQueue', 'RetryExchange')

    def publish_message(self, exchange, msg):
        """
        发送消息到指定的交换器
        :param exchange: RabbitMQ交换器
        :param msg: 消息实体,是一个序列化的JSON字符串
        :return:
        """
        self.open_connection()
        self.declare_retry_queue()
        self.declare_exchange(exchange)
        self.do_publish(msg, exchange)
        self.close_connection()
        logging.debug("message send out to %s" % exchange)

    def add_consumer(self, queue, exchange, callback):
        """
        注册其消费者
        :param queue: 要消费的队列
        :param exchange: 队列绑定的交换器
        :param callback: 消费者回调函数
        :return:
        """
        self.queue_consumer_list.append([queue, exchange, callback])

    def start_consume(self):
        """
        启动消费者,开始消费RabbitMQ中的消息
        :return:
        """
        self.open_connection()
        self.declare_retry_queue()
        self.prepare_consume_info()
        self.register_consumer()

        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.stop_consuming()

    def register_consumer(self):
        for queue in self.queue_consumer_list:
            self.channel.basic_consume(queue[2], queue[0])

    def stop_consuming(self):
        self.channel.stop_consuming()
        self.close_connection()

    def message_handle_successfully(channel, method):
        """
        如果消息处理正常完成,必须调用此方法,
        否则RabbitMQ会认为消息处理不成功,重新将消息放回待执行队列中
        :param channel: 回调函数的channel参数
        :param method: 回调函数的method参数
        :return:
        """
        channel.basic_ack(delivery_tag=method.delivery_tag)

    def message_handle_failed(channel, method):
        """
        如果消息处理失败,应该调用此方法,会自动将消息放入异常队列
        :param channel: 回调函数的channel参数
        :param method: 回调函数的method参数
        :return:
        """
        channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

上述代码中,注意声明queue时候的参数x-dead-letter-exchange,这表示拒绝的消息都会自动被放到此参数指定的交换器。

消息是发给某个交换器,而不是发给队列。

订阅消息,是订阅某个队列。

交换器负责按照路由绑定和路由key,以及转发模式(direct,fanout,topic,headers),将消息转发给队列。

2.消费者Consumer.py

from AccountActivedMsg import AccountActivedMsg
from AccountMainContactApplyApprovedMsg import AccountMainContactApplyApprovedMsg

from RabbitMQClient import RabbitMQClient

# logging.basicConfig(level=logging.DEBUG)
print("start program")

#conn_str = 'amqp://admin:admin@192.168.13.41:5672/%2F?heartbeat_interval=1'
conn_str = 'amqp://RightICMQ:ICHJ_678!@120.76.190.87:5672/%2F?heartbeat_interval=1&'
client = RabbitMQClient(conn_str)


def foo(ch, method, properties, body):
    msg = AccountActivedMsg(body.decode())
    print("******************AccountActived********************")
    print("ICHAccountId:%s" % msg.ICHAccountId)

    # 如果处理失败,则调用此方法拒绝消息,消息会被自动放到retryqueue队列。
    RabbitMQClient.message_handle_failed(ch, method)


def boo(ch, method, properties, body):
    msg = AccountMainContactApplyApprovedMsg(body.decode())
    print("******************AccountMainContactApplyApprovedMsg********************")
    print("DBMAccountId:%s" % msg.DBMAccountId)

    # 如果处理成功,则调用此消息回复ack,表示消息成功处理完成。
    RabbitMQClient.message_handle_successfully(ch, method)


queue_name = "EventBus.Msg.ICHUB.AllotProductManagerMsg:EventBus.Msg_Handler"
exchange = "EventBus.Msg.ICHUB.AllotProductManagerMsg:EventBus.Msg"

client.add_consumer(queue_name, exchange, foo)


client.start_consume()

注意,上述RabbitMQ连接字符串使用的是默认的virtualHost(/),下面的链接字符串连接到dev虚拟host上。

amqp://admin:admin@192.168.13.41:5672/dev/%2F?heartbeat_interval=1

3.发布者Publisher.py

from RabbitMQClient import RabbitMQClient

# logging.basicConfig(level=logging.DEBUG)
print("start program")

# 注意,heartbeat_interval设置太短,会导致链接被重置的错误:error(104, 'Connection reset by peer')
conn_str = 'amqp://dev:123456@192.168.13.237:5672/%2F?heartbeat_interval=20'
client = RabbitMQClient(conn_str)

exchange1 = "EventBus.Msg.ICHUB.AllotProductManagerMsg:EventBus.Msg"

msg1 = '{"IchubId":129,"Description":"测试消息","Code":"3A0222","ControlHK":"","UpdateAt":"2018-06-23 10:09:26","ControlMainland":"","CreateAt":"2018-06-23 10:09:26","CreateBy":1,"UpdateBy":1}'

client.publish_message(exchange1, msg1)

print("message send out")

另外,python里面需要进行消息实体的反序列化,所以一个特定的消息体在python里面定义如下:

import json


class AccountActivedMsg:
    def __init__(self, json_def):
        self.__dict__ = json.loads(json_def)

反序列化的时候这样使用:

def foo(ch, method, properties, body):
    msg = AccountActivedMsg(body.decode())
    print("******************AccountActived********************")
    print("ICHAccountId:%s" % msg.ICHAccountId)

    RabbitMQClient.message_handle_failed(ch, method)

发消息截图

消费者截图

RabbitMQ队列截图

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注