上篇文章中,我们构建了一个简单的日志系统。接下来,我们将丰富它:能够使用不同的severity(严重程度)来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。
1. Bindings绑定
上篇文章中我们是这么做的绑定:
channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//const string ROUTING_KEY = "";
绑定其实就是关联了exchange和queue。或者这么说:queue对exchange的内容感兴趣,exchange要把它的Message deliver(提供)到queue中。
实际上,绑定可以带routing key这个参数,空字符串也是一个routing key的名字。其实这个参数的名称和basic_publish的参数名是相同了。第二篇有介绍当exchange的名称为空字符串的时候,创建queue的时候用到queue的名字和Producer的BasicPublish方法或Consuner的BasicConsume方法的routing key的名字可以是相同的。即queue的名字和routing key的名字是相同的。
为了避免混淆,我们把这个routing key称为binding key(即在Exchange中的routing key)
使用一个binding key来创建binding :
channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);//string routingKey = "指定RoutingKey的名称";
上一篇文章我们讲的是使用fanout类型的exchange,对于fanout的exchange来说,这个参数是被忽略的。
2. Direct exchange
Direct exchange的路由算法非常简单:通过binding key的完全匹配,可以通过下图来说明。
3. Multiple bindings
4. Emitting logs
首先是我们要创建一个direct的exchange:
const string EXCHANGE_NAME = "direct_logs";channel.ExchangeDeclare(EXCHANGE_NAME, "direct");
我们将使用log的severity(严重级别)作为routing key,这样Consumer可以针对不同severity(严重级别)的log进行不同的处理。
channel.BasicPublish(EXCHANGE_NAME, routingKey, null, body);
我们使用三种severity(严重级别):'info', 'warning', 'error'.
5. Subscribing
对于queue,我们需要绑定severity(严重级别):
const string EXCHANGE_NAME = "direct_logs";channel.ExchangeDeclare(EXCHANGE_NAME, "direct");string queueName = channel.QueueDeclare();channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);
6. 最终版本
1 ///2 /// 多个routing key指定同一个queue 3 /// 接收端创建临时queue 4 /// 5 /// 6 /// SendDemo5.exe direct_custom_routing_key_hello1 7 /// SendDemo5.exe direct_custom_routing_key_hello2 8 /// 9 static void Main(string[] args)10 {11 if (args.Length < 1)12 {13 Console.Error.WriteLine("请指定一个新的Routing Key名称", Environment.GetCommandLineArgs()[0]);14 Environment.ExitCode = 1;15 return;16 }17 var factory = new ConnectionFactory() { HostName = "localhost" };18 using (var connection = factory.CreateConnection())19 {20 using (var channel = connection.CreateModel())21 {22 const string EXCHANGE_NAME = "direct_logs";23 channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。24 //Exchange在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。25 var routingKey = args[0];26 var message = "Hello World!";27 var body = Encoding.UTF8.GetBytes(message);28 29 channel.BasicPublish(EXCHANGE_NAME, routingKey, null, body);30 Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);31 }32 }33 }
Consumer.cs
1 ///2 /// 多个routing key指定同一个queue 3 /// 接收端创建临时queue 4 /// 5 /// 6 /// ReceiveDemo5.exe direct_custom_routing_key_hello1 7 /// ReceiveDemo5.exe direct_custom_routing_key_hello2 8 /// 9 static void Main(string[] args)10 {11 if (args.Length < 1)12 {13 Console.Error.WriteLine("请指定一个新的Routing Key名称", Environment.GetCommandLineArgs()[0]);14 Environment.ExitCode = 1;15 return;16 }17 var factory = new ConnectionFactory() { HostName = "localhost" };18 using (var connection = factory.CreateConnection())19 {20 using (var channel = connection.CreateModel())21 {22 const string EXCHANGE_NAME = "direct_logs";23 channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//接收端如果关闭之后,自动创建的Queue会自动被删除24 string queueName = channel.QueueDeclare();//获取临时创建的Queue的名称25 26 foreach (var routingKey in args)27 {28 channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);29 }30 31 Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C");32 33 var consumer = new QueueingBasicConsumer(channel);34 channel.BasicConsume(queueName, true, consumer);35 36 while (true)37 {38 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();39 40 var body = ea.Body;41 var message = Encoding.UTF8.GetString(body);42 var routingKey = ea.RoutingKey;43 Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);44 }45 }46 }47 }
必须先运行Consumer,然后在运行Producer.