Work.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. using Microsoft.Extensions.Hosting;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using XYY.TaskTrack.Standard;
  9. using XYY.gRpc.Tracking;
  10. using Grpc.Net.Client;
  11. using Microsoft.Extensions.Logging;
  12. using Grpc.Core;
  13. using System.Diagnostics;
  14. using GOrder = XYY.gRpc.Order;
  15. namespace XYY.WindowsService.ReshMQ.Works
  16. {
  17. public class Work : BackgroundService
  18. {
  19. ISeventeenGRPC SeventeenGRPC;
  20. TrackingGrpcChanel TrackingGrpcChanel;
  21. OrderGrpcChannel PushZipChannel;
  22. private readonly IMQManager _mQManager;
  23. private readonly ILogger<Work> _logger;
  24. IMoreThanOneTicketGrpc moreThanOneTicketGrpc;
  25. public Work(ISeventeenGRPC seventeenGRPC, IMQManager mQManager, TrackingGrpcChanel TrackingGrpcChanel, ILogger<Work> logger, OrderGrpcChannel pushZipChannel, IMoreThanOneTicketGrpc moreThanOneTicketGrpc)
  26. {
  27. SeventeenGRPC = seventeenGRPC;
  28. _mQManager = mQManager;
  29. this.TrackingGrpcChanel = TrackingGrpcChanel;
  30. _logger = logger;
  31. PushZipChannel = pushZipChannel;
  32. this.moreThanOneTicketGrpc = moreThanOneTicketGrpc;
  33. }
  34. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  35. {
  36. await LoopAutoUpdateServiceTrackingNumber(stoppingToken);
  37. await _mQManager.Subscribe(SeventeenGRPC);
  38. await LoopUpdate17(stoppingToken);
  39. await LoopPushZipCode(stoppingToken);
  40. await LoopAutoSupplementTracking(stoppingToken);
  41. await _mQManager.Subscribe(moreThanOneTicketGrpc);
  42. }
  43. protected async Task LoopPushZipCode(CancellationToken stoppingToken)
  44. {
  45. GOrder.ZipCodePush.ZipCodePushClient client = new GOrder.ZipCodePush.ZipCodePushClient(PushZipChannel.Channel);
  46. TaskFactory taskFactory = new TaskFactory();
  47. await taskFactory.StartNew(async () =>
  48. {
  49. while (!stoppingToken.IsCancellationRequested)
  50. {
  51. try
  52. {
  53. var result = await client.GetWaitPushAsync(new GOrder.Empty());
  54. if (result.Items.Count > 0)
  55. {
  56. var pushResult = await client.PushAsync(result);
  57. if (!pushResult.Success)
  58. {
  59. _logger.LogError("邮编分区上传数据失败,休眠1分钟{message}", pushResult.Message);
  60. System.Threading.Thread.Sleep(new TimeSpan(0, 1, 0));
  61. }
  62. }
  63. else
  64. {
  65. //没有需要上传的数据我们等待5分钟再次上传
  66. _logger.LogError("邮编分区暂无需上传数据,休眠5分钟");
  67. System.Threading.Thread.Sleep(new TimeSpan(0, 5, 0));
  68. }
  69. }
  70. catch (Exception ex)
  71. {
  72. _logger.LogError("邮编分区执行失败" + ex.Message);
  73. System.Threading.Thread.Sleep(new TimeSpan(1, 0, 0));
  74. }
  75. }
  76. });
  77. }
  78. private class trackingResult
  79. {
  80. public bool success { get; set; }
  81. public string message { get; set; }
  82. public bool data { get; set; }
  83. }
  84. private async Task LoopAutoUpdateServiceTrackingNumber(CancellationToken stoppingToken)
  85. {
  86. string url = "http://47.244.232.78:9518/api/order/AsyncServiceTrackingNumber?qty=10";
  87. System.Net.WebClient client = new System.Net.WebClient();
  88. client.Headers.Add("Authorization", "token 132A7468DE079C6CEB59F383A661E612");
  89. TaskFactory taskFactory = new TaskFactory();
  90. await taskFactory.StartNew(async () =>
  91. {
  92. while (!stoppingToken.IsCancellationRequested)
  93. {
  94. try
  95. {
  96. Console.WriteLine("开始执行-获取服务商转单号");
  97. string result = await client.DownloadStringTaskAsync(url);
  98. Console.WriteLine(result);
  99. var obj = Newtonsoft.Json.JsonConvert.DeserializeObject<trackingResult>(result);
  100. if (obj.success)
  101. {
  102. Console.WriteLine("执行完成-获取服务商转单号");
  103. if (obj.data)
  104. {
  105. //没有可执行的内容
  106. _logger.LogError("执行完成-获取服务商转单号,没有可执行的运单号,系统暂停30分钟后再次获取");
  107. System.Threading.Thread.Sleep(1000 * 60 * 30);
  108. }
  109. else
  110. {
  111. //还有未执行的内容
  112. _logger.LogError("执行完成-获取服务商转单号,还有待执行的运单号,系统3S后再次获取");
  113. System.Threading.Thread.Sleep(1000 * 3);
  114. }
  115. }
  116. else
  117. {
  118. _logger.LogError("执行失败(api)-获取服务商转单号" + obj.message + " 系统暂停5分钟后重试");
  119. System.Threading.Thread.Sleep(1000 * 60 * 5);
  120. }
  121. }
  122. catch (System.Net.WebException web)
  123. {
  124. var rs = web.Response != null ? new System.IO.StreamReader(web.Response.GetResponseStream()) : null;
  125. _logger.LogError($"执行失败-获取服务商转单号 {web.Message},{rs?.ReadToEnd()} 系统暂停5分钟后重试");
  126. System.Threading.Thread.Sleep(1000 * 5);
  127. }
  128. catch (Exception ex)
  129. {
  130. _logger.LogError("执行失败-获取服务商转单号" + ex.Message + " 系统暂停5分钟后重试");
  131. System.Threading.Thread.Sleep(1000 * 60 * 5);
  132. }
  133. }
  134. });
  135. }
  136. protected async Task LoopAutoSupplementTracking(CancellationToken stoppingToken)
  137. {
  138. Db.DbClient dbClient = new Db.DbClient(TrackingGrpcChanel.Channel);
  139. TaskFactory taskFactory = new TaskFactory();
  140. await taskFactory.StartNew(async () =>
  141. {
  142. while (!stoppingToken.IsCancellationRequested)
  143. {
  144. try
  145. {
  146. //获取待上传的数据
  147. var result = await dbClient.GetWaitSupplementTrackingAsync(new Google.Protobuf.WellKnownTypes.Empty());
  148. if (result.List.Count == 0)
  149. {
  150. //没有需要上传的数据我们等待5分钟再次上传
  151. _logger.LogError("轨迹补充-暂无需上传数据,休眠30分钟");
  152. Thread.Sleep(new TimeSpan(0, 30, 0));
  153. }
  154. else
  155. {
  156. Console.WriteLine("开始补充");
  157. var trajectoryResult = await dbClient.AddTrackingTrajectoryAsync(result);
  158. await dbClient.AddTrackingTrajectoryErrorsAsync(trajectoryResult);
  159. //休眠1S,防止想不到的错误
  160. Thread.Sleep(new TimeSpan(0, 0, 1));
  161. }
  162. }
  163. catch (Exception ex)
  164. {
  165. _logger.LogError("轨迹补充-执行失败" + ex.Message);
  166. Thread.Sleep(new TimeSpan(1, 0, 0));
  167. }
  168. }
  169. }
  170. );
  171. }
  172. protected async Task LoopUpdate17(CancellationToken stoppingToken)
  173. {
  174. SeventeenPush.SeventeenPushClient client = new SeventeenPush.SeventeenPushClient(TrackingGrpcChanel.Channel);
  175. Db.DbClient dbClient = new Db.DbClient(TrackingGrpcChanel.Channel);
  176. TaskFactory taskFactory = new TaskFactory();
  177. await taskFactory.StartNew(async () =>
  178. {
  179. while (!stoppingToken.IsCancellationRequested)
  180. {
  181. try
  182. {
  183. var result = await client.GetWaitPushDataAsync(new Empty());
  184. if (result.Success)
  185. {
  186. if (result.Items.Count == 0)
  187. {
  188. //没有需要上传的数据我们等待5分钟再次上传
  189. _logger.LogError("17Tracking-暂无需上传数据,休眠5分钟");
  190. Thread.Sleep(new TimeSpan(0, 5, 0));
  191. }
  192. else
  193. {
  194. var stopwatch = new Stopwatch();
  195. stopwatch.Start();
  196. var pushResult = await client.PostPushDataAsync(result);
  197. stopwatch.Stop();
  198. foreach (var item in pushResult.Items.Where(x => !x.IsSuccess))
  199. {
  200. _logger.LogWarning("17Tracking-上传数据失败:{trackingNumber} :{message}", item.TrackingNumber, item.FailMessage);
  201. }
  202. var dbAsync = await dbClient.AsyncDbPushStatusAsync(pushResult);
  203. if (!dbAsync.IsSuccess)
  204. {
  205. _logger.LogError("17Tracking-上传数据失败,休眠1分钟{message}", dbAsync.Message);
  206. Thread.Sleep(new TimeSpan(0, 1, 0));
  207. }
  208. else
  209. {
  210. //333毫秒一个请求
  211. int cz = 400 - (int)Math.Floor(stopwatch.Elapsed.TotalMilliseconds);
  212. if (cz > 0)
  213. System.Threading.Thread.Sleep(new TimeSpan(0, 0, 0, 0, cz));
  214. }
  215. }
  216. }
  217. else
  218. {
  219. //错误时我们等待10分钟再执行
  220. _logger.LogError("17Tracking-查找待执行的数据失败{message},休眠10分钟保护", result.Message);
  221. System.Threading.Thread.Sleep(new TimeSpan(0, 10, 0));
  222. }
  223. }
  224. catch (Exception ex)
  225. {
  226. _logger.LogError("17Tracking-执行失败" + ex.Message);
  227. System.Threading.Thread.Sleep(new TimeSpan(1, 0, 0));
  228. }
  229. }
  230. });
  231. }
  232. }
  233. }