using EasyNetQ; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace XYY.TaskTrack.Standard { public class MQManager : IMQManager { private IBus _bus; private ILogger _log; public MQManager(IBus bus, ILogger log) { _bus = bus; _log = log; } /// /// 发送一条消息 /// /// /// public async Task Publish(T message, Action configure = null) where T : class { try { if (configure != null) await _bus.PubSub.PublishAsync(message, configure); else _bus.PubSub.Publish(message); } catch (Exception ex) { _log.LogError(ex.Message); throw ex; } } public async Task Subscribe(IBaseConsumer consumer) where T : class { try { await _bus.PubSub.SubscribeAsync(string.Empty, consumer.Consume); } catch (Exception ex) { _log.LogError(ex, ex.Message); } } /// /// 发送一条死信 /// /// /// public async Task PublishDeadMessage(DeadMessage deadMessage) where T : class { try { string typeName = typeof(T).FullName; string queueName = $"dead-{typeName}"; var x = _bus.Advanced.ExchangeDeclare("Dead", "topic"); var q = _bus.Advanced.QueueDeclare(queueName); _bus.Advanced.Bind(x, q, "dead.#"); string routingKey = $"dead.{typeName}"; var msg = new Message>(deadMessage); await _bus.Advanced.PublishAsync>(x, routingKey, true, msg); } catch (Exception ex) { _log.LogError(ex, ex.Message); } } } }