Class1.cs 10 KB


  1. 
  2. using CsvHelper;
  3. using Microsoft.AspNetCore.Mvc;
  4. using Nest;
  5. using NPOI.OpenXmlFormats.Spreadsheet;
  6. using NPOI.SS.Formula.Functions;
  7. using RabbitMQ.Client;
  8. using RabbitMQ.Client.Events;
  9. using RestSharp;
  10. using System;
  11. using System.Collections.Generic;
  12. using System.Data.SqlClient;
  13. using System.Linq;
  14. using System.Runtime.CompilerServices;
  15. using System.Text;
  16. using System.Threading.Tasks;
  17. using XYY.Core.Standard.Mvc;
  18. using XYY.Service.Strandard.EMailService;
  19. using XYY.TaskTrack.Standard;
  20. namespace XYY.Service.JobManage
  21. {
  22. public class BaseJobModelWaitLog : BaseJobModel
  23. {
  24. public string Data { get; set; }
  25. }
  26. public class BaseJobModel<T> : BaseJobModel where T : class
  27. {
  28. public T Data { get; set; }
  29. }
  30. public class BaseJobModel
  31. {
  32. public string Key1 { get; set; }
  33. public string Key2 { get; set; }
  34. public DateTime StartTime { get; set; }
  35. public string JobModelId { get; set; }
  36. public string JobType { get; set; }
  37. }
  38. public class ExecuteJobReturn : BaseJobModel
  39. {
  40. public DateTime ExecuteJobTime
  41. {
  42. get; set;
  43. }
  44. public string ResultId
  45. {
  46. get; set;
  47. }
  48. public bool IsSuccess
  49. {
  50. get; set;
  51. }
  52. public string ErrorMsg
  53. {
  54. get; set;
  55. }
  56. public string Result
  57. {
  58. get; set;
  59. }
  60. public string Data
  61. {
  62. get; set;
  63. }
  64. }
  65. public abstract class JobManage<T> where T : class
  66. {
  67. IJobGetService<T> jobGetService;
  68. string jobName { get; set; }
  69. public virtual async Task PublishEnd(BaseJobModel<T> t)
  70. {
  71. }
  72. public abstract void ExecuteJob(BaseJobModel<T> t);
  73. public JobManage(IMQManager mQManager, IJobGetService<T> jobGetService, string jobName)
  74. {
  75. this.jobGetService = jobGetService;
  76. this.jobName = jobName;
  77. }
  78. string exChangeName = "JobTask";
  79. public virtual async Task StartJobs()
  80. {
  81. ConnectionFactory factory = new ConnectionFactory();
  82. // "guest"/"guest" by default, limited to localhost connections
  83. factory.UserName = "xyy";
  84. factory.Password = "xyy20200703";
  85. factory.VirtualHost = "/";
  86. factory.HostName = "120.24.149.148";
  87. factory.ContinuationTimeout = new TimeSpan(0, 0, 60);
  88. factory.AutomaticRecoveryEnabled = true;
  89. factory.ClientProvidedName = "app:xyy component:jobService";
  90. IConnection conn;
  91. string queryName = exChangeName + this.jobName;
  92. string key = queryName;
  93. conn = factory.CreateConnection();
  94. var list = jobGetService.GetAndSendJobs();
  95. IModel channel = conn.CreateModel();
  96. channel.ExchangeDeclare(exChangeName, ExchangeType.Direct, durable: true, autoDelete: false);
  97. channel.QueueDeclare(queryName, durable: true, exclusive: false, autoDelete: false);
  98. channel.QueueBind(queryName, exChangeName, key, null);
  99. IBasicProperties props = channel.CreateBasicProperties();
  100. props.DeliveryMode = 2;
  101. props.ContentType = "application/json";
  102. channel.ConfirmSelect();
  103. foreach (var item in list)
  104. {
  105. string json = Newtonsoft.Json.JsonConvert.SerializeObject(item);
  106. byte[] bytes = Encoding.UTF8.GetBytes(json);
  107. channel.BasicPublish(exChangeName, key, props, bytes);
  108. var t = channel.WaitForConfirms();
  109. await PublishEnd(item);
  110. }
  111. }
  112. public virtual void ClientReg()
  113. {
  114. ConnectionFactory factory = new ConnectionFactory();
  115. // "guest"/"guest" by default, limited to localhost connections
  116. factory.UserName = "xyy";
  117. factory.Password = "xyy20200703";
  118. factory.VirtualHost = "/";
  119. factory.HostName = "120.24.149.148";
  120. factory.ContinuationTimeout = new TimeSpan(0, 0, 60);
  121. factory.AutomaticRecoveryEnabled = true;
  122. factory.ClientProvidedName = "app:xyy component:jobClient";
  123. string queryName = exChangeName + this.jobName;
  124. string key = queryName;
  125. var conn = factory.CreateConnection();
  126. IModel channel = conn.CreateModel();
  127. channel.ExchangeDeclare(exChangeName, ExchangeType.Direct, true, false);
  128. channel.QueueDeclare(queryName, durable: true, exclusive: false, autoDelete: false);
  129. channel.QueueBind(queryName, exChangeName, key, null);
  130. var consume = new EventingBasicConsumer(channel);
  131. consume.Received += (sender, e) =>
  132. {
  133. var td = e.Body.ToArray();
  134. string data = Encoding.UTF8.GetString(td);
  135. var m = Newtonsoft.Json.JsonConvert.DeserializeObject<BaseJobModel<T>>(data);
  136. ExecuteJob(m);
  137. channel.BasicAck(e.DeliveryTag, false);
  138. };
  139. channel.BasicConsume(queryName, false, "clinet", consume);
  140. }
  141. }
  142. public interface IJobDbService
  143. {
  144. void AddWaitLog(BaseJobModelWaitLog model);
  145. void AddResultLog(ExecuteJobReturn executeJobReturn);
  146. }
  147. /// <summary>
  148. /// 以web-api接口的方式解耦
  149. /// 每条API执行失败时,会以邮件的形式发生失败消息
  150. /// 注意依赖邮件服务与MQ服务
  151. /// </summary>
  152. /// <typeparam name="T"></typeparam>
  153. public class WebApiJobManage<T> : JobManage<T> where T : class
  154. {
  155. readonly string actionUrl;
  156. readonly Dictionary<string, string> headers;
  157. readonly IEmailService emailService;
  158. readonly string[] toList;
  159. readonly string jobName;
  160. readonly IJobDbService jobDbService;
  161. public WebApiJobManage(IMQManager mQManager, string ActionUrl, Dictionary<string, string> Headers
  162. , IEmailService emailService, string[] toList, string jobName, IJobDbService jobDbService, IJobGetService<T> jobGetService)
  163. : base(mQManager, jobGetService, jobName)
  164. {
  165. this.actionUrl = ActionUrl;
  166. this.headers = Headers;
  167. this.emailService = emailService;
  168. this.toList = toList;
  169. this.jobName = jobName;
  170. this.jobDbService = jobDbService;
  171. }
  172. string baseTitle = $"任务调度@JobName【异常】消息";
  173. public override async Task PublishEnd(BaseJobModel<T> t)
  174. {
  175. BaseJobModelWaitLog executeJobReturn = new BaseJobModelWaitLog
  176. {
  177. Data = t.Data == null ? string.Empty : Newtonsoft.Json.JsonConvert.SerializeObject(t.Data),
  178. JobModelId = t.JobModelId,
  179. JobType = t.JobType,
  180. Key1 = t.Key1,
  181. Key2 = t.Key2,
  182. StartTime = t.StartTime
  183. };
  184. this.jobDbService.AddWaitLog(executeJobReturn);
  185. }
  186. public override void ExecuteJob(BaseJobModel<T> t)
  187. {
  188. RestClient client = new RestClient(this.actionUrl);
  189. if (t != null)
  190. {
  191. RestRequest request = new RestRequest();
  192. foreach (var item in headers)
  193. {
  194. request.AddHeader(item.Key, item.Value);
  195. }
  196. request.AddJsonBody(t.Data);
  197. request.Method = Method.POST;
  198. var result = client.Execute(request);
  199. ConvertResult(t, result);
  200. }
  201. }
  202. private void ConvertResult(BaseJobModel<T> t, IRestResponse result)
  203. {
  204. string req = Newtonsoft.Json.JsonConvert.SerializeObject(t.Data);
  205. ExecuteJobReturn executeJobReturn = new ExecuteJobReturn();
  206. executeJobReturn.ExecuteJobTime = DateTime.Now;
  207. executeJobReturn.Result = result.Content ?? result.ErrorMessage;
  208. executeJobReturn.ResultId = Guid.NewGuid().ToString();
  209. executeJobReturn.JobModelId = t.JobModelId;
  210. executeJobReturn.Key1 = t.Key1;
  211. executeJobReturn.Key2 = t.Key2;
  212. executeJobReturn.StartTime = t.StartTime;
  213. executeJobReturn.Result = result.Content;
  214. executeJobReturn.Data = req;
  215. try
  216. {
  217. if ((result.StatusCode == System.Net.HttpStatusCode.OK))
  218. {
  219. ApiJsonModel apiJsonModel = Newtonsoft.Json.JsonConvert.DeserializeObject<ApiJsonModel>(result.Content);
  220. if (apiJsonModel == null || apiJsonModel.success == false)
  221. {
  222. string error = string.IsNullOrEmpty(result.Content) ? result.ErrorMessage : result.Content;
  223. error = "发生于:" + DateTime.Now.ToString_yyyyMMddHHmmss() + "</br>返回值</br>" + error + "</br>"
  224. + "请求值</br>" + req;
  225. if (apiJsonModel != null && !string.IsNullOrEmpty(apiJsonModel.error))
  226. {
  227. error = apiJsonModel.error + error;
  228. //emailService.Send(toList.ToList(),
  229. // baseTitle.Replace("@JobName", jobName),
  230. // error, true, null);
  231. executeJobReturn.ErrorMsg = error;
  232. }
  233. }
  234. else
  235. {
  236. executeJobReturn.IsSuccess = true;
  237. executeJobReturn.ErrorMsg = string.Empty;
  238. }
  239. }
  240. else
  241. {
  242. string error = string.IsNullOrEmpty(result.Content) ? result.ErrorMessage : result.Content;
  243. error = "发生于:" + DateTime.Now.ToString_yyyyMMddHHmmss() + "</br>返回值</br>" + error + "</br>" + "请求值</br>" + req;
  244. //emailService.Send(toList.ToList(),
  245. // baseTitle.Replace("@JobName", jobName)
  246. // , error, true, null);
  247. executeJobReturn.ErrorMsg = error;
  248. }
  249. }
  250. catch (Exception ex)
  251. {
  252. }
  253. finally
  254. {
  255. try
  256. {
  257. jobDbService.AddResultLog(executeJobReturn);
  258. }
  259. catch (Exception e)
  260. {
  261. Console.WriteLine(e.Message);
  262. }
  263. }
  264. }
  265. }
  266. }