现在的位置: 首页 > 综合 > 正文

Rabbitmq教程翻译(四)routing

2012年03月25日 ⁄ 综合 ⁄ 共 3761字 ⁄ 字号 评论关闭

路由

(使用Java客户端)

前面的教程中,我们建立了一个简单的日志记录系统。我们许多接收机能够播放日志消息。
在本教程中,我们将要给它添加一个功能
- 我们将使它有可能仅可认购的消息的一个子集。
例如,我们将能够直接记录严重错误到消息日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定

在前面的例子中,我们已经创建绑定。您可能还记得这样的代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定是交换和队列之间的关系。这可以简单地理解为:这种交换的消息队列感兴趣。

绑定可以采取额外routingKey参数。为了避免混乱与basic_publish我们要调用它有约束力的关键参数 这是我们怎么可以创建一个绑定的关键:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

这意味着绑定key取决于交换类型(exchange type)。 扇交换类型(the
fanout exchanges)
,这是我们以前用过,只是忽略了它的价值。

直接交换

我们的记录系统从以前的教程播放所有的消息给所有消费者。我们要扩展,为了让过滤消息依据其严重程度。例如,我们可能想要的脚本,这是日志消息写入到磁盘上只接收严重错误,警告或信息的日志消息,而不是浪费磁盘空间。

我们使用的是交换(fanout exchange),不给我们很大的灵活性-它是唯一能够盲目的广播。

我们将使用一个直接的交流代替。直接交换(direct
exchange)的路由算法背后很简单-一个消息的绑定键完全匹配的路由关键的消息队列 。

为了说明这一点,考虑以下设置:

在此设置中,我们可以看到两个队列绑定到它的直接交换(direct
exchange)
X。第一个队列具有约束力的关键橙色的约束,第二个有两个绑定,一个绑定的key的黑色另一个绿色

在这样的设置中一个消息会通过路由键(routing key)橙色发布给队列Q1。一些消息通过黑色 或绿色的路由键会去Q2。所有其他消息将被丢弃。

多个绑定

具有相同绑定键绑定多个队列,这是完全合法的。在我们的例子中,我们可以添加具有约束力的关键黑色的XQ1之间的绑定。在这种情况下,直接的减缓会像,将把该消息广播到所有匹配的队列。一个消息通过路由键黑色传递给Q1和Q2

发射日志

我们为我们的记录系统使用这个模型,。替代扇类型我们将消息发送到一个直接交换类型。我们将提供日志严重性作为路由的关键。这样,接收的脚本将能够选择想要接收的严重程度。让我们专注于发射日志。

与往常一样,我们需要首先创建一个交换:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

我们已经准备好发送消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了简单起见,我们假设'严重'是'信息','警告','错误'其中的一种。

订阅

接收消息将工作就像在前面的教程中,为一个异常 - 我们要创建一个新的绑定为我们感兴趣的每个严重。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

全部放在一起

EmitLogDirect.java的代码是

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //..
}

 ReceiveLogsDirect.java的代码是:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

像往常一样编译(看教程一个编译和classpath意见)。为了方便起见,我们现在将使用环境变量$ CP(%CP%在Windows上)运行时的classpath的例子。

如果你想保存到一个文件中只有“警告”和“错误”(而不是“信息”)日志消息,只需打开一个控制台,然后键入:

$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果你想在你的屏幕上看到所有的日志信息,打开一个新的终端,并做到:

$ java -cp $CP ReceiveLogsDirect info warning error
 [*] Waiting for logs. To exit press CTRL+C

例如,发出一个错误日志消息只需键入:

$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

(完整的源代码(EmitLogDirect.java源) 和(ReceiveLogsDirect.java源)

移动教程5,找出如何监听消息基于模式。

rabbitmq教程地址:http://www.rabbitmq.com/tutorials/tutorial-four-java.html

抱歉!评论已关闭.