using EasyNetQ; using EasyNetQ.FluentConfiguration; using SMP.Message.Consumer.Config; namespace SMP.Message.Consumer.MQConsumers { public class MQManager { private static IBus _bus; private readonly static object syncRoot = new object(); private MQManager() { } private static IBus bus { get { if (_bus == null) { lock (syncRoot) { if (_bus == null) { string connStr = RabbitMqConfig.MqConnection; _bus = RabbitHutch.CreateBus(connStr); } } } return _bus; } } /// /// 发送一条消息 /// /// /// public static void Publish(T message) where T : class { try { bus.Publish(message); } catch (Exception ex) { Log.Log.Error(ex, ex.Message); } } /// /// 异步发送消息 /// /// /// /// public static Task PublishAsync(T message) where T : class { return bus.PublishAsync(message); } /// /// 指定配置项发送消息 /// /// /// /// public static void Publish(T message, Action configure) where T : class { bus.Publish(message, configure); } /// /// 订阅消息 /// /// /// public static void Subscribe(BaseConsumer consumer) where T : class { try { bus.Subscribe(string.Empty, consumer.Consume); } catch (Exception ex) { Log.Log.Error(ex, ex.Message); } } /// /// 指定配置订阅消息 /// /// /// /// public static void Subscribe(Action action, Action configure) where T : class { try { bus.Subscribe(string.Empty, action, configure); } catch (Exception ex) { Log.Log.Error(ex, ex.Message); } } /// /// 发送一条死信 /// /// /// public static void PublishDeadMessage(DeadMessage deadMessage) where T : class { try { string typeName = typeof(T).FullName; string queueName = $"dead-{typeName}"; var x = bus.Advanced.ExchangeDeclare("Dead_Message_Exchange", "direct"); var q = bus.Advanced.QueueDeclare(queueName); bus.Advanced.Bind(x, q, queueName); string routingKey = queueName; var msg = new Message>(deadMessage); bus.Advanced.PublishAsync>(x, routingKey, true, msg); } catch (Exception ex) { Log.Log.Error(ex, ex.Message); } } } }