|
-
- using CsvHelper;
- using Microsoft.AspNetCore.Mvc;
- using Nest;
- using NPOI.OpenXmlFormats.Spreadsheet;
- using NPOI.SS.Formula.Functions;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using RestSharp;
- using System;
- using System.Collections.Generic;
- using System.Data.SqlClient;
- using System.Linq;
- using System.Runtime.CompilerServices;
- using System.Text;
- using System.Threading.Tasks;
- using XYY.Core.Standard.Mvc;
- using XYY.Service.Strandard.EMailService;
- using XYY.TaskTrack.Standard;
- namespace XYY.Service.JobManage
- {
- public class BaseJobModelWaitLog : BaseJobModel
- {
- public string Data { get; set; }
- }
- public class BaseJobModel<T> : BaseJobModel where T : class
- {
- public T Data { get; set; }
- }
- public class BaseJobModel
- {
- public string Key1 { get; set; }
- public string Key2 { get; set; }
- public DateTime StartTime { get; set; }
- public string JobModelId { get; set; }
- public string JobType { get; set; }
- }
- public class ExecuteJobReturn : BaseJobModel
- {
- public DateTime ExecuteJobTime
- {
- get; set;
- }
- public string ResultId
- {
- get; set;
- }
- public bool IsSuccess
- {
- get; set;
- }
- public string ErrorMsg
- {
- get; set;
- }
- public string Result
- {
- get; set;
- }
- public string Data
- {
- get; set;
- }
- }
- public abstract class JobManage<T> where T : class
- {
- IJobGetService<T> jobGetService;
- string jobName { get; set; }
- public virtual async Task PublishEnd(BaseJobModel<T> t)
- {
- }
- public abstract void ExecuteJob(BaseJobModel<T> t);
- public JobManage(IMQManager mQManager, IJobGetService<T> jobGetService, string jobName)
- {
- this.jobGetService = jobGetService;
- this.jobName = jobName;
- }
- string exChangeName = "JobTask";
- public virtual async Task StartJobs()
- {
- ConnectionFactory factory = new ConnectionFactory();
- // "guest"/"guest" by default, limited to localhost connections
- factory.UserName = "xyy";
- factory.Password = "xyy20200703";
- factory.VirtualHost = "/";
- factory.HostName = "120.24.149.148";
- factory.ContinuationTimeout = new TimeSpan(0, 0, 60);
- factory.AutomaticRecoveryEnabled = true;
- factory.ClientProvidedName = "app:xyy component:jobService";
- IConnection conn;
- string queryName = exChangeName + this.jobName;
- string key = queryName;
- conn = factory.CreateConnection();
- var list = jobGetService.GetAndSendJobs();
- IModel channel = conn.CreateModel();
- channel.ExchangeDeclare(exChangeName, ExchangeType.Direct, durable: true, autoDelete: false);
- channel.QueueDeclare(queryName, durable: true, exclusive: false, autoDelete: false);
- channel.QueueBind(queryName, exChangeName, key, null);
- IBasicProperties props = channel.CreateBasicProperties();
- props.DeliveryMode = 2;
- props.ContentType = "application/json";
- channel.ConfirmSelect();
- foreach (var item in list)
- {
- string json = Newtonsoft.Json.JsonConvert.SerializeObject(item);
- byte[] bytes = Encoding.UTF8.GetBytes(json);
- channel.BasicPublish(exChangeName, key, props, bytes);
- var t = channel.WaitForConfirms();
- await PublishEnd(item);
- }
- }
- public virtual void ClientReg()
- {
- ConnectionFactory factory = new ConnectionFactory();
- // "guest"/"guest" by default, limited to localhost connections
- factory.UserName = "xyy";
- factory.Password = "xyy20200703";
- factory.VirtualHost = "/";
- factory.HostName = "120.24.149.148";
- factory.ContinuationTimeout = new TimeSpan(0, 0, 60);
- factory.AutomaticRecoveryEnabled = true;
- factory.ClientProvidedName = "app:xyy component:jobClient";
- string queryName = exChangeName + this.jobName;
- string key = queryName;
- var conn = factory.CreateConnection();
- IModel channel = conn.CreateModel();
- channel.ExchangeDeclare(exChangeName, ExchangeType.Direct, true, false);
- channel.QueueDeclare(queryName, durable: true, exclusive: false, autoDelete: false);
- channel.QueueBind(queryName, exChangeName, key, null);
- var consume = new EventingBasicConsumer(channel);
- consume.Received += (sender, e) =>
- {
- var td = e.Body.ToArray();
- string data = Encoding.UTF8.GetString(td);
- var m = Newtonsoft.Json.JsonConvert.DeserializeObject<BaseJobModel<T>>(data);
- ExecuteJob(m);
- channel.BasicAck(e.DeliveryTag, false);
- };
- channel.BasicConsume(queryName, false, "clinet", consume);
- }
- }
- public interface IJobDbService
- {
- void AddWaitLog(BaseJobModelWaitLog model);
- void AddResultLog(ExecuteJobReturn executeJobReturn);
- }
- /// <summary>
- /// 以web-api接口的方式解耦
- /// 每条API执行失败时,会以邮件的形式发生失败消息
- /// 注意依赖邮件服务与MQ服务
- /// </summary>
- /// <typeparam name="T"></typeparam>
- public class WebApiJobManage<T> : JobManage<T> where T : class
- {
- readonly string actionUrl;
- readonly Dictionary<string, string> headers;
- readonly IEmailService emailService;
- readonly string[] toList;
- readonly string jobName;
- readonly IJobDbService jobDbService;
- public WebApiJobManage(IMQManager mQManager, string ActionUrl, Dictionary<string, string> Headers
- , IEmailService emailService, string[] toList, string jobName, IJobDbService jobDbService, IJobGetService<T> jobGetService)
- : base(mQManager, jobGetService, jobName)
- {
- this.actionUrl = ActionUrl;
- this.headers = Headers;
- this.emailService = emailService;
- this.toList = toList;
- this.jobName = jobName;
- this.jobDbService = jobDbService;
- }
- string baseTitle = $"任务调度@JobName【异常】消息";
- public override async Task PublishEnd(BaseJobModel<T> t)
- {
- BaseJobModelWaitLog executeJobReturn = new BaseJobModelWaitLog
- {
- Data = t.Data == null ? string.Empty : Newtonsoft.Json.JsonConvert.SerializeObject(t.Data),
- JobModelId = t.JobModelId,
- JobType = t.JobType,
- Key1 = t.Key1,
- Key2 = t.Key2,
- StartTime = t.StartTime
- };
- this.jobDbService.AddWaitLog(executeJobReturn);
- }
- public override void ExecuteJob(BaseJobModel<T> t)
- {
- RestClient client = new RestClient(this.actionUrl);
- if (t != null)
- {
- RestRequest request = new RestRequest();
- foreach (var item in headers)
- {
- request.AddHeader(item.Key, item.Value);
- }
- request.AddJsonBody(t.Data);
- request.Method = Method.POST;
- var result = client.Execute(request);
- ConvertResult(t, result);
- }
- }
- private void ConvertResult(BaseJobModel<T> t, IRestResponse result)
- {
- string req = Newtonsoft.Json.JsonConvert.SerializeObject(t.Data);
- ExecuteJobReturn executeJobReturn = new ExecuteJobReturn();
- executeJobReturn.ExecuteJobTime = DateTime.Now;
- executeJobReturn.Result = result.Content ?? result.ErrorMessage;
- executeJobReturn.ResultId = Guid.NewGuid().ToString();
- executeJobReturn.JobModelId = t.JobModelId;
- executeJobReturn.Key1 = t.Key1;
- executeJobReturn.Key2 = t.Key2;
- executeJobReturn.StartTime = t.StartTime;
- executeJobReturn.Result = result.Content;
- executeJobReturn.Data = req;
- try
- {
- if ((result.StatusCode == System.Net.HttpStatusCode.OK))
- {
- ApiJsonModel apiJsonModel = Newtonsoft.Json.JsonConvert.DeserializeObject<ApiJsonModel>(result.Content);
- if (apiJsonModel == null || apiJsonModel.success == false)
- {
- string error = string.IsNullOrEmpty(result.Content) ? result.ErrorMessage : result.Content;
- error = "发生于:" + DateTime.Now.ToString_yyyyMMddHHmmss() + "</br>返回值</br>" + error + "</br>"
- + "请求值</br>" + req;
- if (apiJsonModel != null && !string.IsNullOrEmpty(apiJsonModel.error))
- {
- error = apiJsonModel.error + error;
- //emailService.Send(toList.ToList(),
- // baseTitle.Replace("@JobName", jobName),
- // error, true, null);
- executeJobReturn.ErrorMsg = error;
- }
- }
- else
- {
- executeJobReturn.IsSuccess = true;
- executeJobReturn.ErrorMsg = string.Empty;
- }
- }
- else
- {
- string error = string.IsNullOrEmpty(result.Content) ? result.ErrorMessage : result.Content;
- error = "发生于:" + DateTime.Now.ToString_yyyyMMddHHmmss() + "</br>返回值</br>" + error + "</br>" + "请求值</br>" + req;
- //emailService.Send(toList.ToList(),
- // baseTitle.Replace("@JobName", jobName)
- // , error, true, null);
- executeJobReturn.ErrorMsg = error;
- }
- }
- catch (Exception ex)
- {
- }
- finally
- {
- try
- {
- jobDbService.AddResultLog(executeJobReturn);
- }
- catch (Exception e)
- {
- Console.WriteLine(e.Message);
- }
- }
- }
- }
- }
|