using EasyNetQ;
using EasyNetQ.FluentConfiguration;
using SMP.Message.Consumer.Config;
namespace SMP.Message.Consumer.MQConsumers
{
public class MQManager
{
private static IBus _bus;
private readonly static object syncRoot = new object();
private MQManager()
{
}
private static IBus bus
{
get
{
if (_bus == null)
{
lock (syncRoot)
{
if (_bus == null)
{
string connStr = RabbitMqConfig.MqConnection;
_bus = RabbitHutch.CreateBus(connStr);
}
}
}
return _bus;
}
}
///
/// 发送一条消息
///
///
///
public static void Publish(T message) where T : class
{
try
{
bus.Publish(message);
}
catch (Exception ex)
{
Log.Log.Error(ex, ex.Message);
}
}
///
/// 异步发送消息
///
///
///
///
public static Task PublishAsync(T message) where T : class
{
return bus.PublishAsync(message);
}
///
/// 指定配置项发送消息
///
///
///
///
public static void Publish(T message, Action configure) where T : class
{
bus.Publish(message, configure);
}
///
/// 订阅消息
///
///
///
public static void Subscribe(BaseConsumer consumer) where T : class
{
try
{
bus.Subscribe(string.Empty, consumer.Consume);
}
catch (Exception ex)
{
Log.Log.Error(ex, ex.Message);
}
}
///
/// 指定配置订阅消息
///
///
///
///
public static void Subscribe(Action action, Action configure) where T : class
{
try
{
bus.Subscribe(string.Empty, action, configure);
}
catch (Exception ex)
{
Log.Log.Error(ex, ex.Message);
}
}
///
/// 发送一条死信
///
///
///
public static void PublishDeadMessage(DeadMessage deadMessage) where T : class
{
try
{
string typeName = typeof(T).FullName;
string queueName = $"dead-{typeName}";
var x = bus.Advanced.ExchangeDeclare("Dead_Message_Exchange", "direct");
var q = bus.Advanced.QueueDeclare(queueName);
bus.Advanced.Bind(x, q, queueName);
string routingKey = queueName;
var msg = new Message>(deadMessage);
bus.Advanced.PublishAsync>(x, routingKey, true, msg);
}
catch (Exception ex)
{
Log.Log.Error(ex, ex.Message);
}
}
}
}