MQManager.cs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. using EasyNetQ;
  2. using EasyNetQ.FluentConfiguration;
  3. using SMP.Message.Consumer.Config;
  4. namespace SMP.Message.Consumer.MQConsumers
  5. {
  6. public class MQManager
  7. {
  8. private static IBus _bus;
  9. private readonly static object syncRoot = new object();
  10. private MQManager()
  11. {
  12. }
  13. private static IBus bus
  14. {
  15. get
  16. {
  17. if (_bus == null)
  18. {
  19. lock (syncRoot)
  20. {
  21. if (_bus == null)
  22. {
  23. string connStr = RabbitMqConfig.MqConnection;
  24. _bus = RabbitHutch.CreateBus(connStr);
  25. }
  26. }
  27. }
  28. return _bus;
  29. }
  30. }
  31. /// <summary>
  32. /// 发送一条消息
  33. /// </summary>
  34. /// <typeparam name="T"></typeparam>
  35. /// <param name="message"></param>
  36. public static void Publish<T>(T message) where T : class
  37. {
  38. try
  39. {
  40. bus.Publish(message);
  41. }
  42. catch (Exception ex)
  43. {
  44. Log.Log.Error(ex, ex.Message);
  45. }
  46. }
  47. /// <summary>
  48. /// 异步发送消息
  49. /// </summary>
  50. /// <typeparam name="T"></typeparam>
  51. /// <param name="message"></param>
  52. /// <returns></returns>
  53. public static Task PublishAsync<T>(T message) where T : class
  54. {
  55. return bus.PublishAsync(message);
  56. }
  57. /// <summary>
  58. /// 指定配置项发送消息
  59. /// </summary>
  60. /// <typeparam name="T"></typeparam>
  61. /// <param name="message"></param>
  62. /// <param name="configure"></param>
  63. public static void Publish<T>(T message, Action<IPublishConfiguration> configure) where T : class
  64. {
  65. bus.Publish(message, configure);
  66. }
  67. /// <summary>
  68. /// 订阅消息
  69. /// </summary>
  70. /// <typeparam name="T"></typeparam>
  71. /// <param name="consumer"></param>
  72. public static void Subscribe<T>(BaseConsumer<T> consumer) where T : class
  73. {
  74. try
  75. {
  76. bus.Subscribe<T>(string.Empty, consumer.Consume);
  77. }
  78. catch (Exception ex)
  79. {
  80. Log.Log.Error(ex, ex.Message);
  81. }
  82. }
  83. /// <summary>
  84. /// 指定配置订阅消息
  85. /// </summary>
  86. /// <typeparam name="T"></typeparam>
  87. /// <param name="action"></param>
  88. /// <param name="configure"></param>
  89. public static void Subscribe<T>(Action<T> action, Action<ISubscriptionConfiguration> configure) where T : class
  90. {
  91. try
  92. {
  93. bus.Subscribe<T>(string.Empty, action, configure);
  94. }
  95. catch (Exception ex)
  96. {
  97. Log.Log.Error(ex, ex.Message);
  98. }
  99. }
  100. /// <summary>
  101. /// 发送一条死信
  102. /// </summary>
  103. /// <typeparam name="T"></typeparam>
  104. /// <param name="deadMessage"></param>
  105. public static void PublishDeadMessage<T>(DeadMessage<T> deadMessage) where T : class
  106. {
  107. try
  108. {
  109. string typeName = typeof(T).FullName;
  110. string queueName = $"dead-{typeName}";
  111. var x = bus.Advanced.ExchangeDeclare("Dead_Message_Exchange", "direct");
  112. var q = bus.Advanced.QueueDeclare(queueName);
  113. bus.Advanced.Bind(x, q, queueName);
  114. string routingKey = queueName;
  115. var msg = new Message<DeadMessage<T>>(deadMessage);
  116. bus.Advanced.PublishAsync<DeadMessage<T>>(x, routingKey, true, msg);
  117. }
  118. catch (Exception ex)
  119. {
  120. Log.Log.Error(ex, ex.Message);
  121. }
  122. }
  123. }
  124. }