RabbitMQ路由模式多消费模拟真实环境

Z技术 2019年08月12日 524次浏览

环境介绍:

RabbitMQ版本:RabbitMQ 3.10.5 (docker 版本)

程序代码:.NetCore3.1

RabbitMQ.Client:Version=6.0.0.0

1、消息发送端

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;

namespace DirectServer
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("DirectServer发布服务器启动...");

            //1.创建连接工厂
            var factory = new ConnectionFactory()
            {
                HostName = "192.168.1.105",
                UserName = "guest",
                Password = "guest"
            };
            using (var conn = factory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    //durable=true 交换机持久化
                    channel.ExchangeDeclare("directExchange", "direct",true);

                    durable=true 队列持久化
                    //channel.QueueDeclare("directQueue", true, false, false, null);
                    //channel.QueueBind("directQueue", "directExchange", "", null);

                    //消息持久化
                    IBasicProperties basicProperties = channel.CreateBasicProperties();
                    basicProperties.Persistent = true;
                    //basicProperties.DeliveryMode = 2;

                    string msg = "";
                    for (int i = 0; i < 4000; i++)
                    {
                        msg = $"发布消息{i}";
                        string ROUTE_KEY = "";
                        var body = Encoding.UTF8.GetBytes(msg);
                        if (i % 3 == 0)
                        {
                            ROUTE_KEY = "route1";
                        }
                        else
                        {
                            ROUTE_KEY = "route2";
                        }
                        //channel.BasicPublish("directExchange", ROUTE_KEY, null, body);
                        channel.BasicPublish("directExchange", ROUTE_KEY, basicProperties, body);
                        Console.WriteLine($"向{ROUTE_KEY}发布消息成功:{msg}");

                        Thread.Sleep(1000);
                    }
                    Console.ReadKey();
                }
            }
        }
    }
}

2,消费端一

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace DirectClient
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("DirectClient接收客户端启动...");
            //1.创建连接工厂
            var factory = new ConnectionFactory()
            {
                HostName = "192.168.1.105",
                UserName = "guest",
                Password = "guest"
            };
            using (var conn = factory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    //var queue = channel.QueueDeclare(durable:true).QueueName;
                    //exclusive:true 独占,只对首次声明它的连接(Connection)可见 会在其连接断开的时候自动删除
                    //durable: true 持久化
                    var queue = channel.QueueDeclare("ChatRoomNotice", durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queue, "directExchange", "route1");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                      {
                          //接收消息
                          var body = Encoding.UTF8.GetString(ea.Body.ToArray());
                          Console.WriteLine($"接收route1消息:{body.ToString()}");

                          //确认消息:deliveryTag参数是分发的标记,multiple表示是否确认多条
                          channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

                          拒绝消息:deliveryTag参数也是分发的标记,requeue表示消息被拒绝后是否重新入队
                          //channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);

                          Thread.Sleep(1000);
                      };
                    //autoAck=false 为防止信息丢失,处理完信息后再手动修改
                    channel.BasicConsume(queue, false, consumer);

                    Console.ReadKey();
                }
            }
        }
    }
}

3、消费端二,开启四个消费者,并模仿真实环境处理消息

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DirectClientMulCusume
{
    public enum DealResult
    {
        success = 1,
        fail = 2,
        delete=3
    }

    delegate DealResult DealMessage(string message, int consumeId);

    internal class Program
    {
        static void Main(string[] args)
        {

            Console.WriteLine("DirectClient接收客户端启动...");
            //1.创建连接工厂
            var factory = new ConnectionFactory()
            {
                HostName = "192.168.1.105",
                UserName = "guest",
                Password = "guest",
                //设置端口后自动恢复连接属性
                AutomaticRecoveryEnabled = true,
                //心跳检测10s
                RequestedHeartbeat = TimeSpan.FromSeconds(10000)
            };

            factory.AutomaticRecoveryEnabled = true;

            using (var conn = factory.CreateConnection())
            {
                //启动4个消费端
                for (int i = 0; i < 4; i++)
                {
                    StartRece(conn, i);
                }

                Console.ReadKey();

            }
        }

        static void StartRece(IConnection conn, int consumeId)
        {
            var channel = conn.CreateModel();
            //var queue = channel.QueueDeclare(durable:true).QueueName;
            //exclusive:true 独占,只对首次声明它的连接(Connection)可见 会在其连接断开的时候自动删除
            //durable: true 持久化
            var queue = channel.QueueDeclare("OrderInfoNotice", durable: true, exclusive: false, autoDelete: false);
            channel.QueueBind(queue, "directExchange", "route2");

            //使用异步消费者
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
               {
                   //接收消息
                   var body = Encoding.UTF8.GetString(ea.Body.ToArray());

                   DealMessage showMessage = new DealMessage(Print);
                   var result = showMessage(body, consumeId);

                   if (result == DealResult.success)
                       //确认消息:deliveryTag参数是分发的标记,multiple表示是否确认多条
                       channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                   else if(result==DealResult.fail)
                   //拒绝消息:deliveryTag参数也是分发的标记,requeue表示消息被拒绝后是否重新入队
                   { 
                       channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
                       showMessage(body+ "---异常消息", consumeId);
                   }
                   //不重新入队,丢弃
                   else
                   {
                       channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
                       showMessage(body + "---多次异常丢弃", consumeId);
                   }
               };
            //autoAck=false 为防止信息丢失,处理完信息后再手动修改
            channel.BasicConsume(queue, false, consumer);

            //param1:prefetchSize,预取大小服务器将传递的最大内容量(以八位字节为单位),如果不受限制,则为0 默认值:0
            //param2:prefetchCount,服务器一次请求将传递的最大邮件数,如果没有限制,则为0 调用此方法时,该值必填。默认值:0
            //param3:global,是否将设置应用于整个频道,而不是每个消费者
            //默认值:false,应用于本身(一个消费者)true:应用于整个频道
            //channel.BasicQos(0, 1, false); //分发机制为触发式; 
        }

        static DealResult Print(string msg, int consumeId)
        {
            //随机500ms-1s延迟,模拟真实处理
            Random rand = new Random();
            var r = rand.Next(5, 10);
            Thread.Sleep(r * 100);
            
            Console.WriteLine(msg + "--consumer:" + consumeId);

            //随机1/10的处理异常,从新入队,真实情况可以设置从新入队3次,临时标志可以放在redis中
            var rr = rand.Next(1, 10);
            if (rr >= 9)
                return DealResult.fail;

            //随机1/20丢弃,模仿真实情况三次入队还没处理成功则记录日志后丢弃
            var rrr = rand.Next(1, 20);
            if (rrr >= 19)
                return DealResult.delete;

            return DealResult.success;
        }
    }
}

4、启动发送端,消费端,打开mq管理端查看查看

17afc52fbaf345ea802b194e69993099.png

 可以看到消息按照route1和route2转发,一个消费端逐个消费,另一个消费端4个消费者消费

4.1队列信息如下:

f04a8c7aaf6243d7894fa00522535edf.png

4.2 OrderInfoNotice 四个消费者

df591c9dfabd4edeb1cf08d478617df7.png

更多信息请关注公众号:
20220401152838