using Microsoft.Extensions.Hosting; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using XYY.TaskTrack.Standard; using XYY.gRpc.Tracking; using Grpc.Net.Client; using Microsoft.Extensions.Logging; using Grpc.Core; using System.Diagnostics; using GOrder = XYY.gRpc.Order; namespace XYY.WindowsService.ReshMQ.Works { public class Work : BackgroundService { ISeventeenGRPC SeventeenGRPC; TrackingGrpcChanel TrackingGrpcChanel; OrderGrpcChannel PushZipChannel; private readonly IMQManager _mQManager; private readonly ILogger _logger; IMoreThanOneTicketGrpc moreThanOneTicketGrpc; public Work(ISeventeenGRPC seventeenGRPC, IMQManager mQManager, TrackingGrpcChanel TrackingGrpcChanel, ILogger logger, OrderGrpcChannel pushZipChannel, IMoreThanOneTicketGrpc moreThanOneTicketGrpc) { SeventeenGRPC = seventeenGRPC; _mQManager = mQManager; this.TrackingGrpcChanel = TrackingGrpcChanel; _logger = logger; PushZipChannel = pushZipChannel; this.moreThanOneTicketGrpc = moreThanOneTicketGrpc; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await LoopAutoUpdateServiceTrackingNumber(stoppingToken); await _mQManager.Subscribe(SeventeenGRPC); await LoopUpdate17(stoppingToken); await LoopPushZipCode(stoppingToken); await LoopAutoSupplementTracking(stoppingToken); await _mQManager.Subscribe(moreThanOneTicketGrpc); } protected async Task LoopPushZipCode(CancellationToken stoppingToken) { GOrder.ZipCodePush.ZipCodePushClient client = new GOrder.ZipCodePush.ZipCodePushClient(PushZipChannel.Channel); TaskFactory taskFactory = new TaskFactory(); await taskFactory.StartNew(async () => { while (!stoppingToken.IsCancellationRequested) { try { var result = await client.GetWaitPushAsync(new GOrder.Empty()); if (result.Items.Count > 0) { var pushResult = await client.PushAsync(result); if (!pushResult.Success) { _logger.LogError("邮编分区上传数据失败,休眠1分钟{message}", pushResult.Message); System.Threading.Thread.Sleep(new TimeSpan(0, 1, 0)); } } else { //没有需要上传的数据我们等待5分钟再次上传 _logger.LogError("邮编分区暂无需上传数据,休眠5分钟"); System.Threading.Thread.Sleep(new TimeSpan(0, 5, 0)); } } catch (Exception ex) { _logger.LogError("邮编分区执行失败" + ex.Message); System.Threading.Thread.Sleep(new TimeSpan(1, 0, 0)); } } }); } private class trackingResult { public bool success { get; set; } public string message { get; set; } public bool data { get; set; } } private async Task LoopAutoUpdateServiceTrackingNumber(CancellationToken stoppingToken) { string url = "http://47.244.232.78:9518/api/order/AsyncServiceTrackingNumber?qty=10"; System.Net.WebClient client = new System.Net.WebClient(); client.Headers.Add("Authorization", "token 132A7468DE079C6CEB59F383A661E612"); TaskFactory taskFactory = new TaskFactory(); await taskFactory.StartNew(async () => { while (!stoppingToken.IsCancellationRequested) { try { Console.WriteLine("开始执行-获取服务商转单号"); string result = await client.DownloadStringTaskAsync(url); Console.WriteLine(result); var obj = Newtonsoft.Json.JsonConvert.DeserializeObject(result); if (obj.success) { Console.WriteLine("执行完成-获取服务商转单号"); if (obj.data) { //没有可执行的内容 _logger.LogError("执行完成-获取服务商转单号,没有可执行的运单号,系统暂停30分钟后再次获取"); System.Threading.Thread.Sleep(1000 * 60 * 30); } else { //还有未执行的内容 _logger.LogError("执行完成-获取服务商转单号,还有待执行的运单号,系统3S后再次获取"); System.Threading.Thread.Sleep(1000 * 3); } } else { _logger.LogError("执行失败(api)-获取服务商转单号" + obj.message + " 系统暂停5分钟后重试"); System.Threading.Thread.Sleep(1000 * 60 * 5); } } catch (System.Net.WebException web) { var rs = web.Response != null ? new System.IO.StreamReader(web.Response.GetResponseStream()) : null; _logger.LogError($"执行失败-获取服务商转单号 {web.Message},{rs?.ReadToEnd()} 系统暂停5分钟后重试"); System.Threading.Thread.Sleep(1000 * 5); } catch (Exception ex) { _logger.LogError("执行失败-获取服务商转单号" + ex.Message + " 系统暂停5分钟后重试"); System.Threading.Thread.Sleep(1000 * 60 * 5); } } }); } protected async Task LoopAutoSupplementTracking(CancellationToken stoppingToken) { Db.DbClient dbClient = new Db.DbClient(TrackingGrpcChanel.Channel); TaskFactory taskFactory = new TaskFactory(); await taskFactory.StartNew(async () => { while (!stoppingToken.IsCancellationRequested) { try { //获取待上传的数据 var result = await dbClient.GetWaitSupplementTrackingAsync(new Google.Protobuf.WellKnownTypes.Empty()); if (result.List.Count == 0) { //没有需要上传的数据我们等待5分钟再次上传 _logger.LogError("轨迹补充-暂无需上传数据,休眠30分钟"); Thread.Sleep(new TimeSpan(0, 30, 0)); } else { Console.WriteLine("开始补充"); var trajectoryResult = await dbClient.AddTrackingTrajectoryAsync(result); await dbClient.AddTrackingTrajectoryErrorsAsync(trajectoryResult); //休眠1S,防止想不到的错误 Thread.Sleep(new TimeSpan(0, 0, 1)); } } catch (Exception ex) { _logger.LogError("轨迹补充-执行失败" + ex.Message); Thread.Sleep(new TimeSpan(1, 0, 0)); } } } ); } protected async Task LoopUpdate17(CancellationToken stoppingToken) { SeventeenPush.SeventeenPushClient client = new SeventeenPush.SeventeenPushClient(TrackingGrpcChanel.Channel); Db.DbClient dbClient = new Db.DbClient(TrackingGrpcChanel.Channel); TaskFactory taskFactory = new TaskFactory(); await taskFactory.StartNew(async () => { while (!stoppingToken.IsCancellationRequested) { try { var result = await client.GetWaitPushDataAsync(new Empty()); if (result.Success) { if (result.Items.Count == 0) { //没有需要上传的数据我们等待5分钟再次上传 _logger.LogError("17Tracking-暂无需上传数据,休眠5分钟"); Thread.Sleep(new TimeSpan(0, 5, 0)); } else { var stopwatch = new Stopwatch(); stopwatch.Start(); var pushResult = await client.PostPushDataAsync(result); stopwatch.Stop(); foreach (var item in pushResult.Items.Where(x => !x.IsSuccess)) { _logger.LogWarning("17Tracking-上传数据失败:{trackingNumber} :{message}", item.TrackingNumber, item.FailMessage); } var dbAsync = await dbClient.AsyncDbPushStatusAsync(pushResult); if (!dbAsync.IsSuccess) { _logger.LogError("17Tracking-上传数据失败,休眠1分钟{message}", dbAsync.Message); Thread.Sleep(new TimeSpan(0, 1, 0)); } else { //333毫秒一个请求 int cz = 400 - (int)Math.Floor(stopwatch.Elapsed.TotalMilliseconds); if (cz > 0) System.Threading.Thread.Sleep(new TimeSpan(0, 0, 0, 0, cz)); } } } else { //错误时我们等待10分钟再执行 _logger.LogError("17Tracking-查找待执行的数据失败{message},休眠10分钟保护", result.Message); System.Threading.Thread.Sleep(new TimeSpan(0, 10, 0)); } } catch (Exception ex) { _logger.LogError("17Tracking-执行失败" + ex.Message); System.Threading.Thread.Sleep(new TimeSpan(1, 0, 0)); } } }); } } }