BaseConsumer.cs 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. using System;
  2. using System.Threading.Tasks;
  3. namespace XYY.TaskTrack.Standard
  4. {
  5. /// <summary>
  6. /// 消费者基类,主要是用于统一消费者出现异常时,能通过发送死信队列的方式来捕获及善后处理,而不是因
  7. /// 消费者出现异常时没有ACK导致消息被持久化并重复消费。
  8. /// </summary>
  9. /// <typeparam name="T"></typeparam>
  10. public abstract class BaseConsumer<T> : IBaseConsumer<T> where T : class
  11. {
  12. public IMQManager _mQManager;
  13. public BaseConsumer(IMQManager mQManager)
  14. {
  15. mQManager = _mQManager;
  16. }
  17. /// <summary>
  18. /// 处理消费者出现的异常。当出现异常时,通过该方法发送一条死信消息供后续处理。
  19. /// </summary>
  20. /// <param name="ex"></param>
  21. /// <param name="message"></param>
  22. protected void PublishDeadMessage(Exception ex, T message)
  23. {
  24. DeadMessage<T> dealMessage = new DeadMessage<T>
  25. {
  26. Exception = ex,
  27. Message = message
  28. };
  29. _mQManager.PublishDeadMessage(dealMessage);
  30. }
  31. /// <summary>
  32. /// 消费消息
  33. /// </summary>
  34. /// <param name="message"></param>
  35. public abstract Task Consume(T message);
  36. }
  37. }