现在的位置: 首页 > 综合 > 正文

RabbitMQ学习(三).NET Client之Publish/Subscribe

2019年10月14日 ⁄ 综合 ⁄ 共 4786字 ⁄ 字号 评论关闭

转载请注明出处:jiq•钦's technical Blog

Publish/Subscribe(发布/订阅)

(using the .NET Client)

前面的教程我们已经学习了如何创建工作队列,工作队列背后的假设是每一个任务都被准确地递送给一个worker进行处理。这里我们将介绍完全不同的模式,即一个消息可以递送给多个consumer,这种模式叫做“发布/订阅”模式。

作为例子我们将构建一个简单的日志系统,这个系统由两个部分组成,一个部分产生日志消息,一个部分接收、打印日志消息。在我们的日志系统中,每一个receiver程序的副本都能够获得消息,这样的话我们能够运行一个receiver用于将日志存储到磁盘,同时运行另外一个receiver将日志打印到屏幕。

Exchanges(交换机)

前面的部分我们的producer都是直接和队列进行消息交互,现在是时候介绍一下RabbitMQ中的“全消息模型”(full messaging model)了。

让我们快速回顾一下前面涉及到的一些基本概念:

  • producer 是一个发送消息的用户程序
  • queue 是一个存储消息的缓冲区
  • consumer 是一个接收消息的用户程序

RabbitMQ的这种消息模型的核心思想就是: 生产者从来不直接发送任何消息到队列,事实上生产者压根就不知道消息是否会被发送,以及发送到哪个队列。生产者唯一能够做的事情就是将消息发送到“交换机(Exchange)”。

交换机所做的事情就是从生产者接收消息,然后将消息推送到对应的消息队列。

交换机需要知道它收到的消息的含义,是否应该将其放到某个特定队列中?是否应该将其放到多个队列中?这些规则由“交换类型(exchange type)”来定义。

常用的交换类型包括:directtopicheaders 以及 fanout.
我们这里要用到的是最后一种交换类型,即分列(fanout)
,接下来我们就创建一个这种类型的交换机,将其命名为logs:

channel.ExchangeDeclare("logs", "fanout");

备注: 分列交换类型所做的事情很简单,就是将它收到的消息广播给所有和它绑定的消息队列。

接下来生产者所要做的就是将消息发送给这个定义好的交换机,也就相当于发布者进行消息的发布:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("logs", "", null, body);

Temporary queues(临时队列)

前面介绍的队列都是命名队列,这在生产者和消费者的情况下是至关重要的,因为需要指出队列的名称,才能让生产者将消息放到队列,消费者才能从这个队列中获取消息。

但是在这里的日志系统中命名队列并不适用,RabbitMQ 官网教程原文是这么说的:

But that's not the case for our logger. We want to hear about all log messages, not just a subset of them. We're also interested only in currently flowing messages not in the old ones. To solve that we need two things.

Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.

Secondly, once we disconnect the consumer the queue should be automatically deleted.

译者注:为什么在这里命名队列不适用,即为什么“发布/订阅”模式需要靠“临时队列”来实现?

我的理解是如果使用命名队列:首先不能够保证订阅者能够接收到所有订阅的消息,因为这个命名队列的名称其它进程也知道,可以来将部分消息取走;其次不能够保证订阅者接收到的所有消息都是自己感兴趣的,因为其他进程有可能将“脏消息”放进这个命名队列。

如果我们使用一个只有订阅者自己知道名字的临时队列,这个队列刚刚创建没有任何东西,这个临时队列别人也不知道,那么就能够保障消息的订阅者能够独占地连接到这个临时队列拿到自己感兴趣的所有消息。这就像一个只有消息发布者
和 消息订阅者 知道的一个私有邮箱。

当然如果消息订阅者对这个消息不感兴趣了,那么这个队列的存在也就没有必要了,所以需要提供连接断开时队列自动删除的功能。

针对我们这个日志系统的例子,一个receiver程序用于存储消息到硬盘,一个receiver程序用于打印消息到屏幕,它们两个都需要订阅同一种消息,所以各自都需要创建一个自己的临时队列,用于接收自己感兴趣的消息。

使用.NET客户端, 我们只需要调用无参的 queueDeclare() 方法就能够创建一个临时的、独占的、能够自动删除的并具备随机生成的名称的队列:

var queueName = channel.QueueDeclare().QueueName;

返回的queueName 是一个随机生成的队列名称,例如看起来就像 amq.gen-JzTY20BRgKO-HjmUJj0wLg.

Bindings(绑定exchange和temporary queue)

我们已经创建好了分列交换机(fanout exchange)以及一个临时队列,现在所需要做的事情就是告诉分列交换机将消息推送给我们定义的这个临时队列,这个将交换机和队列进行关联的操作叫做“绑定(binding)”:

channel.QueueBind(queueName, "logs", "");

从现在起名叫logs这个交换机会将它收到的所有消息都推送到我们创建好的、订阅者独享的这个临时队列。

Putting it all together(代码总览)

生产者进程发出日志消息,和以前教程中的生产者最大的不同是,这里先定义了一个“分列交换机”,然后将日志消息发送给这个交换机。

我们将这个程序称作 EmitLog.cs :

using System;
using RabbitMQ.Client;
using System.Text;

class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare("logs", "fanout"); //申明交换机

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("logs", "", null, body);  //将消息发送到交换机
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }

    private static string GetMessage(string[] args)
    {
      return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!");
    }
}

(EmitLog.cs source)

接下来的程序是接收者也声明一个和生产者同样的交换机,并创建一个接收者专用于订阅消息的临时消息队列,然后将两者绑定在一起。

然后就和之前的接收者一样开始等待接收从这个队列接收消息了。

如果没有任何队列绑定到交换机,那么消息就会被丢弃,这对我们来说没什么,因为既然没有订阅者监听,那么抛弃掉这些消息也无妨。

我们将这个订阅者程序称作 ReceiveLogs.cs:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("logs", "fanout");  //申明分列交换机

                var queueName = channel.QueueDeclare().QueueName;  //创建临时队列

                channel.QueueBind(queueName, "logs", "");  //绑定临时队列到交换机
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queueName, true, consumer);  //开始接收消息

                Console.WriteLine(" [*] Waiting for logs." +
                                  "To exit press CTRL+C");
                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();  //阻塞等待异步消息推送过来

                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] {0}", message);
                }
            }
        }
    }
}

(ReceiveLogs.cs source)

和前面的方法一样进行编译.

$ csc /r:"RabbitMQ.Client.dll" EmitLogs.cs
$ csc /r:"RabbitMQ.Client.dll" ReceiveLogs.cs

假如你想要将接收到的日志信息保存到特定日志文件,只需要在终端输入:

$ ReceiveLogs.exe > logs_from_rabbit.log

假如你想要将接收到的日志信息打印到屏幕上,直接输入:

$ ReceiveLogs.exe

当然发出日志消息可以执行:

$ EmitLog.exe

使用 rabbitmqctl list_bindings 可以验证我们的代码确实创建了绑定以及临时队列,如果运行了两个 ReceiveLogs.cs 程序你就可以看到类似下面的输出:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

这个结果很直接地表明: 来自logs交换机的数据被推送到了两个特定的临时队列中,这正是我们期望看到的。

To find out how to listen for a subset of messages, let's move on to tutorial 4

抱歉!评论已关闭.