123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- using EasyNetQ;
- using Microsoft.Extensions.Logging;
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading.Tasks;
- namespace XYY.TaskTrack.Standard
- {
- public class MQManager : IMQManager
- {
- private IBus _bus;
- private ILogger<MQManager> _log;
- public MQManager(IBus bus, ILogger<MQManager> log)
- {
- _bus = bus;
- _log = log;
- }
- /// <summary>
- /// 发送一条消息
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="message"></param>
- public async Task Publish<T>(T message, Action<IPublishConfiguration> configure = null) where T : class
- {
- try
- {
- if (configure != null)
- await _bus.PubSub.PublishAsync(message, configure);
- else
- _bus.PubSub.Publish(message);
- }
- catch (Exception ex)
- {
- _log.LogError(ex.Message);
- throw ex;
- }
- }
- public async Task Subscribe<T>(IBaseConsumer<T> consumer) where T : class
- {
- try
- {
- await _bus.PubSub.SubscribeAsync<T>(string.Empty, consumer.Consume);
- }
- catch (Exception ex)
- {
- _log.LogError(ex, ex.Message);
- }
- }
- /// <summary>
- /// 发送一条死信
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="deadMessage"></param>
- public async Task PublishDeadMessage<T>(DeadMessage<T> deadMessage) where T : class
- {
- try
- {
- string typeName = typeof(T).FullName;
- string queueName = $"dead-{typeName}";
- var x = _bus.Advanced.ExchangeDeclare("Dead", "topic");
- var q = _bus.Advanced.QueueDeclare(queueName);
- _bus.Advanced.Bind(x, q, "dead.#");
- string routingKey = $"dead.{typeName}";
- var msg = new Message<DeadMessage<T>>(deadMessage);
- await _bus.Advanced.PublishAsync<DeadMessage<T>>(x, routingKey, true, msg);
- }
- catch (Exception ex)
- {
- _log.LogError(ex, ex.Message);
- }
- }
- }
- }
|