123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- using Microsoft.Extensions.Hosting;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using XYY.TaskTrack.Standard;
- using XYY.gRpc.Tracking;
- using Grpc.Net.Client;
- using Microsoft.Extensions.Logging;
- using Grpc.Core;
- using System.Diagnostics;
- using GOrder = XYY.gRpc.Order;
- namespace XYY.WindowsService.ReshMQ.Works
- {
- public class Work : BackgroundService
- {
- ISeventeenGRPC SeventeenGRPC;
- TrackingGrpcChanel TrackingGrpcChanel;
- OrderGrpcChannel PushZipChannel;
- private readonly IMQManager _mQManager;
- private readonly ILogger<Work> _logger;
- IMoreThanOneTicketGrpc moreThanOneTicketGrpc;
- public Work(ISeventeenGRPC seventeenGRPC, IMQManager mQManager, TrackingGrpcChanel TrackingGrpcChanel, ILogger<Work> logger, OrderGrpcChannel pushZipChannel, IMoreThanOneTicketGrpc moreThanOneTicketGrpc)
- {
- SeventeenGRPC = seventeenGRPC;
- _mQManager = mQManager;
- this.TrackingGrpcChanel = TrackingGrpcChanel;
- _logger = logger;
- PushZipChannel = pushZipChannel;
- this.moreThanOneTicketGrpc = moreThanOneTicketGrpc;
- }
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- await LoopAutoUpdateServiceTrackingNumber(stoppingToken);
- await _mQManager.Subscribe(SeventeenGRPC);
- await LoopUpdate17(stoppingToken);
- await LoopPushZipCode(stoppingToken);
- await LoopAutoSupplementTracking(stoppingToken);
- await _mQManager.Subscribe(moreThanOneTicketGrpc);
- }
- protected async Task LoopPushZipCode(CancellationToken stoppingToken)
- {
- GOrder.ZipCodePush.ZipCodePushClient client = new GOrder.ZipCodePush.ZipCodePushClient(PushZipChannel.Channel);
- TaskFactory taskFactory = new TaskFactory();
- await taskFactory.StartNew(async () =>
- {
- while (!stoppingToken.IsCancellationRequested)
- {
- try
- {
- var result = await client.GetWaitPushAsync(new GOrder.Empty());
- if (result.Items.Count > 0)
- {
- var pushResult = await client.PushAsync(result);
- if (!pushResult.Success)
- {
- _logger.LogError("邮编分区上传数据失败,休眠1分钟{message}", pushResult.Message);
- System.Threading.Thread.Sleep(new TimeSpan(0, 1, 0));
- }
- }
- else
- {
- //没有需要上传的数据我们等待5分钟再次上传
- _logger.LogError("邮编分区暂无需上传数据,休眠5分钟");
- System.Threading.Thread.Sleep(new TimeSpan(0, 5, 0));
- }
- }
- catch (Exception ex)
- {
- _logger.LogError("邮编分区执行失败" + ex.Message);
- System.Threading.Thread.Sleep(new TimeSpan(1, 0, 0));
- }
- }
- });
- }
- private class trackingResult
- {
- public bool success { get; set; }
- public string message { get; set; }
- public bool data { get; set; }
- }
- private async Task LoopAutoUpdateServiceTrackingNumber(CancellationToken stoppingToken)
- {
- string url = "http://47.244.232.78:9518/api/order/AsyncServiceTrackingNumber?qty=10";
- System.Net.WebClient client = new System.Net.WebClient();
- client.Headers.Add("Authorization", "token 132A7468DE079C6CEB59F383A661E612");
- TaskFactory taskFactory = new TaskFactory();
- await taskFactory.StartNew(async () =>
- {
- while (!stoppingToken.IsCancellationRequested)
- {
- try
- {
- Console.WriteLine("开始执行-获取服务商转单号");
- string result = await client.DownloadStringTaskAsync(url);
- Console.WriteLine(result);
- var obj = Newtonsoft.Json.JsonConvert.DeserializeObject<trackingResult>(result);
- if (obj.success)
- {
- Console.WriteLine("执行完成-获取服务商转单号");
- if (obj.data)
- {
- //没有可执行的内容
- _logger.LogError("执行完成-获取服务商转单号,没有可执行的运单号,系统暂停30分钟后再次获取");
- System.Threading.Thread.Sleep(1000 * 60 * 30);
- }
- else
- {
- //还有未执行的内容
- _logger.LogError("执行完成-获取服务商转单号,还有待执行的运单号,系统3S后再次获取");
- System.Threading.Thread.Sleep(1000 * 3);
- }
- }
- else
- {
- _logger.LogError("执行失败(api)-获取服务商转单号" + obj.message + " 系统暂停5分钟后重试");
- System.Threading.Thread.Sleep(1000 * 60 * 5);
- }
- }
- catch (System.Net.WebException web)
- {
- var rs = web.Response != null ? new System.IO.StreamReader(web.Response.GetResponseStream()) : null;
- _logger.LogError($"执行失败-获取服务商转单号 {web.Message},{rs?.ReadToEnd()} 系统暂停5分钟后重试");
- System.Threading.Thread.Sleep(1000 * 5);
- }
- catch (Exception ex)
- {
- _logger.LogError("执行失败-获取服务商转单号" + ex.Message + " 系统暂停5分钟后重试");
- System.Threading.Thread.Sleep(1000 * 60 * 5);
- }
- }
- });
- }
- protected async Task LoopAutoSupplementTracking(CancellationToken stoppingToken)
- {
- Db.DbClient dbClient = new Db.DbClient(TrackingGrpcChanel.Channel);
- TaskFactory taskFactory = new TaskFactory();
- await taskFactory.StartNew(async () =>
- {
- while (!stoppingToken.IsCancellationRequested)
- {
- try
- {
- //获取待上传的数据
- var result = await dbClient.GetWaitSupplementTrackingAsync(new Google.Protobuf.WellKnownTypes.Empty());
- if (result.List.Count == 0)
- {
- //没有需要上传的数据我们等待5分钟再次上传
- _logger.LogError("轨迹补充-暂无需上传数据,休眠30分钟");
- Thread.Sleep(new TimeSpan(0, 30, 0));
- }
- else
- {
- Console.WriteLine("开始补充");
- var trajectoryResult = await dbClient.AddTrackingTrajectoryAsync(result);
- await dbClient.AddTrackingTrajectoryErrorsAsync(trajectoryResult);
- //休眠1S,防止想不到的错误
- Thread.Sleep(new TimeSpan(0, 0, 1));
- }
- }
- catch (Exception ex)
- {
- _logger.LogError("轨迹补充-执行失败" + ex.Message);
- Thread.Sleep(new TimeSpan(1, 0, 0));
- }
- }
- }
- );
- }
- protected async Task LoopUpdate17(CancellationToken stoppingToken)
- {
- SeventeenPush.SeventeenPushClient client = new SeventeenPush.SeventeenPushClient(TrackingGrpcChanel.Channel);
- Db.DbClient dbClient = new Db.DbClient(TrackingGrpcChanel.Channel);
- TaskFactory taskFactory = new TaskFactory();
- await taskFactory.StartNew(async () =>
- {
- while (!stoppingToken.IsCancellationRequested)
- {
- try
- {
- var result = await client.GetWaitPushDataAsync(new Empty());
- if (result.Success)
- {
- if (result.Items.Count == 0)
- {
- //没有需要上传的数据我们等待5分钟再次上传
- _logger.LogError("17Tracking-暂无需上传数据,休眠5分钟");
- Thread.Sleep(new TimeSpan(0, 5, 0));
- }
- else
- {
- var stopwatch = new Stopwatch();
- stopwatch.Start();
- var pushResult = await client.PostPushDataAsync(result);
- stopwatch.Stop();
- foreach (var item in pushResult.Items.Where(x => !x.IsSuccess))
- {
- _logger.LogWarning("17Tracking-上传数据失败:{trackingNumber} :{message}", item.TrackingNumber, item.FailMessage);
- }
- var dbAsync = await dbClient.AsyncDbPushStatusAsync(pushResult);
- if (!dbAsync.IsSuccess)
- {
- _logger.LogError("17Tracking-上传数据失败,休眠1分钟{message}", dbAsync.Message);
- Thread.Sleep(new TimeSpan(0, 1, 0));
- }
- else
- {
- //333毫秒一个请求
- int cz = 400 - (int)Math.Floor(stopwatch.Elapsed.TotalMilliseconds);
- if (cz > 0)
- System.Threading.Thread.Sleep(new TimeSpan(0, 0, 0, 0, cz));
- }
- }
- }
- else
- {
- //错误时我们等待10分钟再执行
- _logger.LogError("17Tracking-查找待执行的数据失败{message},休眠10分钟保护", result.Message);
- System.Threading.Thread.Sleep(new TimeSpan(0, 10, 0));
- }
- }
- catch (Exception ex)
- {
- _logger.LogError("17Tracking-执行失败" + ex.Message);
- System.Threading.Thread.Sleep(new TimeSpan(1, 0, 0));
- }
- }
- });
- }
- }
- }
|