环境介绍:
RabbitMQ版本:RabbitMQ 3.10.5 (docker 版本)
程序代码:.NetCore3.1
RabbitMQ.Client:Version=6.0.0.0
1、消息发送端
using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
namespace DirectServer
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("DirectServer发布服务器启动...");
//1.创建连接工厂
var factory = new ConnectionFactory()
{
HostName = "192.168.1.105",
UserName = "guest",
Password = "guest"
};
using (var conn = factory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
//durable=true 交换机持久化
channel.ExchangeDeclare("directExchange", "direct",true);
durable=true 队列持久化
//channel.QueueDeclare("directQueue", true, false, false, null);
//channel.QueueBind("directQueue", "directExchange", "", null);
//消息持久化
IBasicProperties basicProperties = channel.CreateBasicProperties();
basicProperties.Persistent = true;
//basicProperties.DeliveryMode = 2;
string msg = "";
for (int i = 0; i < 4000; i++)
{
msg = $"发布消息{i}";
string ROUTE_KEY = "";
var body = Encoding.UTF8.GetBytes(msg);
if (i % 3 == 0)
{
ROUTE_KEY = "route1";
}
else
{
ROUTE_KEY = "route2";
}
//channel.BasicPublish("directExchange", ROUTE_KEY, null, body);
channel.BasicPublish("directExchange", ROUTE_KEY, basicProperties, body);
Console.WriteLine($"向{ROUTE_KEY}发布消息成功:{msg}");
Thread.Sleep(1000);
}
Console.ReadKey();
}
}
}
}
}
2,消费端一
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace DirectClient
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("DirectClient接收客户端启动...");
//1.创建连接工厂
var factory = new ConnectionFactory()
{
HostName = "192.168.1.105",
UserName = "guest",
Password = "guest"
};
using (var conn = factory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
//var queue = channel.QueueDeclare(durable:true).QueueName;
//exclusive:true 独占,只对首次声明它的连接(Connection)可见 会在其连接断开的时候自动删除
//durable: true 持久化
var queue = channel.QueueDeclare("ChatRoomNotice", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queue, "directExchange", "route1");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
//接收消息
var body = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"接收route1消息:{body.ToString()}");
//确认消息:deliveryTag参数是分发的标记,multiple表示是否确认多条
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
拒绝消息:deliveryTag参数也是分发的标记,requeue表示消息被拒绝后是否重新入队
//channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
Thread.Sleep(1000);
};
//autoAck=false 为防止信息丢失,处理完信息后再手动修改
channel.BasicConsume(queue, false, consumer);
Console.ReadKey();
}
}
}
}
}
3、消费端二,开启四个消费者,并模仿真实环境处理消息
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace DirectClientMulCusume
{
public enum DealResult
{
success = 1,
fail = 2,
delete=3
}
delegate DealResult DealMessage(string message, int consumeId);
internal class Program
{
static void Main(string[] args)
{
Console.WriteLine("DirectClient接收客户端启动...");
//1.创建连接工厂
var factory = new ConnectionFactory()
{
HostName = "192.168.1.105",
UserName = "guest",
Password = "guest",
//设置端口后自动恢复连接属性
AutomaticRecoveryEnabled = true,
//心跳检测10s
RequestedHeartbeat = TimeSpan.FromSeconds(10000)
};
factory.AutomaticRecoveryEnabled = true;
using (var conn = factory.CreateConnection())
{
//启动4个消费端
for (int i = 0; i < 4; i++)
{
StartRece(conn, i);
}
Console.ReadKey();
}
}
static void StartRece(IConnection conn, int consumeId)
{
var channel = conn.CreateModel();
//var queue = channel.QueueDeclare(durable:true).QueueName;
//exclusive:true 独占,只对首次声明它的连接(Connection)可见 会在其连接断开的时候自动删除
//durable: true 持久化
var queue = channel.QueueDeclare("OrderInfoNotice", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queue, "directExchange", "route2");
//使用异步消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
//接收消息
var body = Encoding.UTF8.GetString(ea.Body.ToArray());
DealMessage showMessage = new DealMessage(Print);
var result = showMessage(body, consumeId);
if (result == DealResult.success)
//确认消息:deliveryTag参数是分发的标记,multiple表示是否确认多条
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
else if(result==DealResult.fail)
//拒绝消息:deliveryTag参数也是分发的标记,requeue表示消息被拒绝后是否重新入队
{
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
showMessage(body+ "---异常消息", consumeId);
}
//不重新入队,丢弃
else
{
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
showMessage(body + "---多次异常丢弃", consumeId);
}
};
//autoAck=false 为防止信息丢失,处理完信息后再手动修改
channel.BasicConsume(queue, false, consumer);
//param1:prefetchSize,预取大小服务器将传递的最大内容量(以八位字节为单位),如果不受限制,则为0 默认值:0
//param2:prefetchCount,服务器一次请求将传递的最大邮件数,如果没有限制,则为0 调用此方法时,该值必填。默认值:0
//param3:global,是否将设置应用于整个频道,而不是每个消费者
//默认值:false,应用于本身(一个消费者)true:应用于整个频道
//channel.BasicQos(0, 1, false); //分发机制为触发式;
}
static DealResult Print(string msg, int consumeId)
{
//随机500ms-1s延迟,模拟真实处理
Random rand = new Random();
var r = rand.Next(5, 10);
Thread.Sleep(r * 100);
Console.WriteLine(msg + "--consumer:" + consumeId);
//随机1/10的处理异常,从新入队,真实情况可以设置从新入队3次,临时标志可以放在redis中
var rr = rand.Next(1, 10);
if (rr >= 9)
return DealResult.fail;
//随机1/20丢弃,模仿真实情况三次入队还没处理成功则记录日志后丢弃
var rrr = rand.Next(1, 20);
if (rrr >= 19)
return DealResult.delete;
return DealResult.success;
}
}
}
4、启动发送端,消费端,打开mq管理端查看查看
可以看到消息按照route1和route2转发,一个消费端逐个消费,另一个消费端4个消费者消费
4.1队列信息如下:
4.2 OrderInfoNotice 四个消费者
更多信息请关注公众号: