MQManager.cs 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. using EasyNetQ;
  2. using Microsoft.Extensions.Logging;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Text;
  6. using System.Threading.Tasks;
  7. namespace XYY.TaskTrack.Standard
  8. {
  9. public class MQManager : IMQManager
  10. {
  11. private IBus _bus;
  12. private ILogger<MQManager> _log;
  13. public MQManager(IBus bus, ILogger<MQManager> log)
  14. {
  15. _bus = bus;
  16. _log = log;
  17. }
  18. /// <summary>
  19. /// 发送一条消息
  20. /// </summary>
  21. /// <typeparam name="T"></typeparam>
  22. /// <param name="message"></param>
  23. public async Task Publish<T>(T message, Action<IPublishConfiguration> configure = null) where T : class
  24. {
  25. try
  26. {
  27. if (configure != null)
  28. await _bus.PubSub.PublishAsync(message, configure);
  29. else
  30. _bus.PubSub.Publish(message);
  31. }
  32. catch (Exception ex)
  33. {
  34. _log.LogError(ex.Message);
  35. throw ex;
  36. }
  37. }
  38. public async Task Subscribe<T>(IBaseConsumer<T> consumer) where T : class
  39. {
  40. try
  41. {
  42. await _bus.PubSub.SubscribeAsync<T>(string.Empty, consumer.Consume);
  43. }
  44. catch (Exception ex)
  45. {
  46. _log.LogError(ex, ex.Message);
  47. }
  48. }
  49. /// <summary>
  50. /// 发送一条死信
  51. /// </summary>
  52. /// <typeparam name="T"></typeparam>
  53. /// <param name="deadMessage"></param>
  54. public async Task PublishDeadMessage<T>(DeadMessage<T> deadMessage) where T : class
  55. {
  56. try
  57. {
  58. string typeName = typeof(T).FullName;
  59. string queueName = $"dead-{typeName}";
  60. var x = _bus.Advanced.ExchangeDeclare("Dead", "topic");
  61. var q = _bus.Advanced.QueueDeclare(queueName);
  62. _bus.Advanced.Bind(x, q, "dead.#");
  63. string routingKey = $"dead.{typeName}";
  64. var msg = new Message<DeadMessage<T>>(deadMessage);
  65. await _bus.Advanced.PublishAsync<DeadMessage<T>>(x, routingKey, true, msg);
  66. }
  67. catch (Exception ex)
  68. {
  69. _log.LogError(ex, ex.Message);
  70. }
  71. }
  72. }
  73. }