MQTask.java 16 KB


  1. package com.iamberry.wechat.handles.mq;
  2. import java.lang.reflect.Method;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. import java.util.HashMap;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.concurrent.locks.Lock;
  9. import java.util.concurrent.locks.ReentrantLock;
  10. import com.fasterxml.jackson.databind.ObjectMapper;
  11. import com.iamberry.wechat.core.entity.admin.ShopSystemRule;
  12. import com.iamberry.wechat.core.entity.order.Order;
  13. import com.iamberry.wechat.face.admin.SystemService;
  14. import com.iamberry.wechat.tools.HttpClient431Util;
  15. import com.iamberry.wechat.tools.NameUtils;
  16. import net.sf.json.JSONObject;
  17. import org.apache.commons.lang.StringUtils;
  18. import org.apache.commons.lang3.StringEscapeUtils;
  19. import org.springframework.beans.factory.InitializingBean;
  20. import org.springframework.beans.factory.annotation.Autowired;
  21. import org.springframework.context.annotation.Lazy;
  22. import org.springframework.scheduling.annotation.Scheduled;
  23. import org.springframework.stereotype.Component;
  24. import com.iamberry.app.tool.log.RatFWLogger;
  25. import com.iamberry.wechat.core.entity.mq.MQMessage;
  26. import com.iamberry.wechat.core.entity.order.ProbationOrderDto;
  27. import com.iamberry.wechat.core.entity.probation.ProbationAwardRelu;
  28. import com.iamberry.wechat.core.entity.probation.ProbationNewLogs;
  29. import com.iamberry.wechat.core.entity.task.TaskModel;
  30. import com.iamberry.wechat.core.entity.task.WechatTask;
  31. import com.iamberry.wechat.core.entity.task.WechatTaskLogs;
  32. import com.iamberry.wechat.face.order.CodeService;
  33. import com.iamberry.wechat.face.order.ProbationShopOrderService;
  34. import com.iamberry.wechat.face.reback.RebackServices;
  35. import com.iamberry.wechat.face.task.WechatTaskService;
  36. import com.iamberry.wechat.service.StaticCacheMemory;
  37. import com.iamberry.wechat.tools.DateTimeUtil;
  38. import com.iamberry.wechat.tools.ResultInfo;
  39. import com.iamberry.wechat.utils.SendMessageUtil;
  40. import com.iamberry.zk.SpringContextHolder;
  41. /**
  42. * @author:何秀刚
  43. * @description: MQ模拟实现,每天最多处理1382400条
  44. * @createDate:2016年5月25日
  45. */
  46. @Component(value="mqTask")
  47. @Lazy(false)
  48. public class MQTask implements InitializingBean {
  49. @Autowired
  50. private MQServiceProxy mQSerivce;
  51. @Autowired
  52. private RatFWLogger logger;
  53. @Autowired
  54. private WechatTaskService wechatTaskService;
  55. @Autowired
  56. private RebackServices rebackServices;
  57. @Autowired
  58. private ProbationShopOrderService probationShopOrderService;
  59. @Autowired
  60. private SendMessageUtil sendMessageUtil;
  61. @Autowired
  62. private CodeService codeService;
  63. @Autowired
  64. private SystemService systemService;
  65. private Lock lock = new ReentrantLock();
  66. @SuppressWarnings(value = {"unchecked", "rawtypes"})
  67. @Scheduled(cron = "0/30 * * * * ?")
  68. public void sendOrderService() {
  69. logger.info("------------task start-----------");
  70. // If the timer has not stopped, then the next time the timer can not start.
  71. lock.lock();
  72. try {
  73. // step 1, Gets the message that is not executed in the database.
  74. List<MQMessage> messages = mQSerivce.selectWaitHandlerMessage();
  75. if (messages == null || messages.size() <= 0) {
  76. return;
  77. }
  78. // step 2, Handle 10 messages at a time.
  79. for (MQMessage mqMessage : messages) {
  80. try {
  81. Object object = SpringContextHolder.getBean(mqMessage.getServiceHandlerObjectName());
  82. Class classes = object.getClass();
  83. Method method = classes.getMethod(mqMessage.getServiceHandlerMethodName(), MQMessage.class);
  84. if (method == null) {
  85. mqMessage.setServiceErrorMessage("Method is not defined:" + mqMessage.getServiceHandlerMethodName());
  86. mqMessage.setServiceStatus(2);
  87. continue;
  88. }
  89. // invoke Method.
  90. Object resultObject = method.invoke(object, mqMessage);
  91. if (!(resultObject instanceof Boolean)) {
  92. mqMessage.setServiceStatus(2);
  93. mqMessage.setServiceErrorMessage(mqMessage.getServiceHandlerObjectName() + "." + mqMessage.getServiceHandlerMethodName() + "()Return value is not type Boolean");
  94. continue;
  95. }
  96. if (!(Boolean) resultObject) {
  97. mqMessage.setServiceErrorMessage("invoke Method Error");
  98. mqMessage.setServiceStatus(2);
  99. continue;
  100. }
  101. mqMessage.setServiceStatus(1);
  102. continue;
  103. } catch (Exception e) {
  104. // TODO: handle exception
  105. mqMessage.setServiceErrorMessage(e.getMessage());
  106. mqMessage.setServiceStatus(2);
  107. continue;
  108. } finally {
  109. mqMessage.setServiceIsSend(1);
  110. }
  111. }
  112. // step 3, update Message status and serviceIsSend.
  113. mQSerivce.updateBatchMessage(messages);
  114. // step 4, is next ?
  115. if (messages.size() >= 10) {
  116. StaticCacheMemory.isStartTask = true;
  117. this.sendOrderService();
  118. }
  119. } catch (Exception e) {
  120. e.printStackTrace();
  121. logger.error(e, "MQ Task Error:" + e.getMessage());
  122. } finally {
  123. // reset lock
  124. lock.unlock();
  125. }
  126. logger.info("------------task invoke success-----------");
  127. }
  128. /**
  129. * 根据订单号拉取物流信息
  130. * @throws Exception
  131. */
  132. //@Scheduled(cron = "0 0 */1 * * ?")//每小时执行一次
  133. @Scheduled(cron = "0 0/2 * * * ?")//每2分钟执行一次
  134. //@Scheduled(cron = "*/10 * * * * ?")
  135. public void syncLgisticsInfoByOrderId() throws Exception {
  136. logger.info("---------------- 根据订单号拉取物流信息开始 ---------------");
  137. lock.lock();
  138. List<Order> orderList = null;
  139. String url = NameUtils.getConfig("rst_base_url");
  140. String sdId = NameUtils.getConfig("sd_id");
  141. Map<String, Object> orderData = new HashMap<String, Object>();
  142. Map<String, String> requestData = new HashMap<String, String>();
  143. com.fasterxml.jackson.databind.ObjectMapper mapper = new ObjectMapper();
  144. try {
  145. logger.info("---------------- selectProbationShopOrderList begin ---------------");
  146. orderList = probationShopOrderService.selectWateroPFOrderList();
  147. if(orderList != null && orderList.size() > 0){
  148. for (Order order:orderList) {
  149. String salesOrderid = order.getSalesOrderid();
  150. orderData.put("orderId", salesOrderid);
  151. orderData.put("sd_id", sdId);
  152. requestData.put("app_act", "rst.trade.logistics.get");
  153. String orderDataStr = mapper.writeValueAsString(orderData);
  154. requestData.put("info", orderDataStr);
  155. String result = HttpClient431Util.doPost(requestData, url);
  156. result = StringEscapeUtils.unescapeJava(result); // unicode 编码
  157. logger.info("拉取订单号为:" + order.getSalesOrderid() + "的订单,rst返回信息:" + result);
  158. JSONObject jsonObject = JSONObject.fromObject(result);
  159. String msg = jsonObject.getString("msg");
  160. logger.info("---------"+msg+"---------");
  161. if ("success".equals(msg)) {
  162. String salesPostFirm = jsonObject.getString("salesPostFirm");
  163. String salesPostNum = jsonObject.getString("salesPostNum");
  164. order.setSalesPostFirm(salesPostFirm);
  165. order.setSalesPostNum(salesPostNum);
  166. probationShopOrderService.updateWateroPFOrder(order);
  167. }
  168. }
  169. }
  170. }catch (Exception e){
  171. logger.error("拉取订单异常",e.getMessage());
  172. }finally {lock.unlock();
  173. }
  174. logger.info("---------------- 根据订单号拉取物流信息结束 ---------------");
  175. }
  176. /**
  177. * 订阅消息
  178. */
  179. public void subscribeMessageQueue() {
  180. }
  181. // 强迫线程可见
  182. private volatile boolean oldState = true;
  183. /**
  184. * 微信定期任务奖励-上个月的定时器
  185. */
  186. @SuppressWarnings("deprecation")
  187. // @Scheduled(cron = "0/59 * * * * ?")
  188. public void oldMonthTask() {
  189. if (!oldState) {
  190. logger.info("线程正忙...");
  191. return;
  192. }
  193. oldState = false;
  194. try {
  195. // 判断上一个月的数据是否奖励完成
  196. logger.info("上一个月的定时奖励任务奖励...");
  197. // 当前时间
  198. Date nowDate = new Date();
  199. // 回到上一个月
  200. nowDate.setMonth(nowDate.getMonth() - 1);
  201. // 上一个月的结束时间 类似 :2016-02-29 23:59:59
  202. Date oldMonthLastDay = DateTimeUtil.getLastDayOfYear(nowDate);
  203. // 上一个月的开始时间 类似 :2016-02-01 00:00:00
  204. Date oldMonthFirstDay = DateTimeUtil.getFirstDayOfYear(nowDate);
  205. List<WechatTask> tasks = wechatTaskService.getRunTask(oldMonthFirstDay, oldMonthLastDay);
  206. if (tasks == null || tasks.size() == 0) {
  207. logger.info("暂时没有上一个月的任务奖励...");
  208. oldState = true;
  209. return;
  210. }
  211. // 获取在上一个月销售达到X台的数据
  212. for (WechatTask wechatTask : tasks) {
  213. List<TaskModel> models = rebackServices.getByDate(wechatTask.getTaskProductType(), oldMonthFirstDay, oldMonthLastDay, wechatTask.getTaskSalesNum());
  214. if (models == null || models.size() == 0) {
  215. logger.info("暂时没有人完成上一个月的任务" + wechatTask.getTaskId() + "奖励...");
  216. continue;
  217. }
  218. // 达到的用户是否已经领取奖励
  219. for (TaskModel taskModel : models) {
  220. WechatTaskLogs logs = new WechatTaskLogs();
  221. logs.setLogsTaskId(wechatTask.getTaskId());
  222. logs.setLogsUserOpenId(taskModel.getOpenID());
  223. Integer logsID = wechatTaskService.getByTaskIdAndOpenId(logs);
  224. if (logsID != null) {
  225. // 如果当前用户已经领取了奖励,那么跳过
  226. continue;
  227. }
  228. // 若用没有领取,那么奖励
  229. try {
  230. wechatTaskService.handlerTask(taskModel.getOpenID(), wechatTask.getTaskMoney(), wechatTask.getTaskId(), wechatTask.getTaskProductType());
  231. } catch (Exception e) {
  232. logger.error(this, "对:" + taskModel.getOpenID() + ",奖励失败!error:" + e.getMessage());
  233. }
  234. }
  235. }
  236. } catch (Exception e) {
  237. logger.error(this, "发生异常,奖励失败!error:" + e.getMessage());
  238. } finally {
  239. // 归还锁
  240. oldState = true;
  241. }
  242. }
  243. // 强迫线程可见
  244. private volatile boolean nowState = true;
  245. /**
  246. * 微信定期任务奖励-本月的定时器
  247. */
  248. // @Scheduled(cron = "0/59 * * * * ?")
  249. public void nowMonthTask() {
  250. if (!nowState) {
  251. logger.info("线程正忙...");
  252. return;
  253. }
  254. nowState = false;
  255. try {
  256. // 判断本月是否有奖励数据
  257. logger.info("本月的定时奖励任务奖励...");
  258. // 当前时间
  259. Date nowDate = new Date();
  260. // 上一个月的结束时间 类似 :2016-02-29 23:59:59
  261. Date oldMonthLastDay = DateTimeUtil.getLastDayOfYear(nowDate);
  262. // 上一个月的开始时间 类似 :2016-02-01 00:00:00
  263. Date oldMonthFirstDay = DateTimeUtil.getFirstDayOfYear(nowDate);
  264. List<WechatTask> tasks = wechatTaskService.getRunTask(oldMonthFirstDay, oldMonthLastDay);
  265. if (tasks == null || tasks.size() == 0) {
  266. logger.info("暂时没有本月的任务奖励...");
  267. nowState = true;
  268. return;
  269. }
  270. // 获取在上一个月销售达到X台的数据
  271. for (WechatTask wechatTask : tasks) {
  272. List<TaskModel> models = rebackServices.getByDate(wechatTask.getTaskProductType(), oldMonthFirstDay, oldMonthLastDay, wechatTask.getTaskSalesNum());
  273. if (models == null || models.size() == 0) {
  274. logger.info("暂时没有人完成本月的任务" + wechatTask.getTaskId() + "奖励...");
  275. continue;
  276. }
  277. // 达到的用户是否已经领取奖励
  278. for (TaskModel taskModel : models) {
  279. WechatTaskLogs logs = new WechatTaskLogs();
  280. logs.setLogsTaskId(wechatTask.getTaskId());
  281. logs.setLogsUserOpenId(taskModel.getOpenID());
  282. Integer logsID = wechatTaskService.getByTaskIdAndOpenId(logs);
  283. if (logsID != null) {
  284. // 如果当前用户已经领取了奖励,那么跳过
  285. continue;
  286. }
  287. // 若用没有领取,那么奖励
  288. try {
  289. wechatTaskService.handlerTask(taskModel.getOpenID(), wechatTask.getTaskMoney(), wechatTask.getTaskId(), wechatTask.getTaskProductType());
  290. } catch (Exception e) {
  291. logger.error(this, "对:" + taskModel.getOpenID() + ",奖励失败!error:" + e.getMessage());
  292. }
  293. }
  294. }
  295. } catch (Exception e) {
  296. // TODO: handle exception
  297. logger.error(this, "发生异常:" + e.getMessage());
  298. } finally {
  299. nowState = true;
  300. }
  301. }
  302. public void afterPropertiesSet() throws Exception {
  303. }
  304. /**
  305. * 试用订单短信推送服务(满足XX天奖励XX奖励)
  306. */
  307. // @Scheduled(cron = "0 0 */1 * * ?")//每小时执行一次
  308. public void probationRewardPush(){
  309. logger.info("------------执行推送满足奖励条件用户 start-----------");
  310. //查询奖励推送规则
  311. List<ProbationAwardRelu> reluList = probationShopOrderService.selectProbationAwardRelu();
  312. SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  313. if(reluList!=null && !reluList.isEmpty()){
  314. for(ProbationAwardRelu awardRelu : reluList){
  315. //查询所有待支付订单
  316. List<ProbationOrderDto> orderDtoList = probationShopOrderService.selectProbationStartOrder();
  317. if(orderDtoList!=null && !orderDtoList.isEmpty()){
  318. for(ProbationOrderDto orderDto : orderDtoList ){
  319. //计算总试用期限(小时)
  320. Long totalDate = (orderDto.getProbationEndTime().getTime() - orderDto.getProbationStartTime().getTime()) / 1000 / 60 / 60;
  321. //计算试用剩余期限(小时)
  322. Long remainingDate = (orderDto.getProbationEndTime().getTime() - new Date().getTime()) / 1000 / 60 / 60;
  323. //暂停时间
  324. /*Long pauseDate = 0l;
  325. if(orderDto.getProbationPauseStartTime() != null && orderDto.getProbationPauseEndTime() != null){
  326. pauseDate = (orderDto.getProbationPauseEndTime().getTime() - orderDto.getProbationPauseStartTime().getTime()) / 1000 / 60 / 60;
  327. }*/
  328. //实际试用时间 (总试用时间 - 试用剩余期限 - 暂停时间)
  329. Long actualProbationDate = totalDate - remainingDate - orderDto.getProbationPauseTotalLength() / 1000 / 60 / 60;
  330. //奖励奖品天数转换为小时
  331. Long awardDate = Long.valueOf(awardRelu.getAwardNum()) * 24;
  332. //奖励条件时间 - 实际试用时间 (大于23小时或者小于26小时则推送)
  333. if( awardDate -actualProbationDate > 23 && awardDate -actualProbationDate < 26 ){
  334. ProbationNewLogs probationNewLogs = probationShopOrderService.selectProbationNewLogsByOrderId(orderDto.getProbationOrderid());
  335. //添加日志信息
  336. ProbationNewLogs newLogs = new ProbationNewLogs();
  337. newLogs.setLogsPorderid(orderDto.getProbationOrderid());
  338. newLogs.setLogsTime(awardRelu.getAwardNum());
  339. newLogs.setLogsCreateTime(new Date());
  340. if(probationNewLogs == null){
  341. //推送操作
  342. sendMessageUtil.probationPush(awardRelu.getAwardInfo(),
  343. orderDto.getProductName(),
  344. orderDto.getProbationOrderid(),
  345. formatter.format(orderDto.getProbationStartTime()),
  346. formatter.format(
  347. new Date(
  348. orderDto.getProbationStartTime().getTime() +
  349. (awardRelu.getAwardNum() * 24 + orderDto.getProbationPauseTotalLength()) * 60 * 60 * 1000)),
  350. awardRelu.getAwardInfoTwo(),
  351. orderDto.getProbationOpenid(),
  352. ResultInfo.TRY_ORDER_DETAIL+"?orderId="+orderDto.getProbationOrderid());
  353. //添加日志
  354. probationShopOrderService.insertProbationLogs(newLogs);
  355. //发送短信给收货人
  356. if(StringUtils.isNotEmpty(orderDto.getProbationReceiveTel())){
  357. codeService.sendTextToUser(orderDto.getProbationReceiveTel(), 6);
  358. }
  359. }else if(probationNewLogs.getLogsTime() < awardRelu.getAwardNum()){
  360. //执行推送
  361. sendMessageUtil.probationPush(
  362. awardRelu.getAwardInfo(),
  363. orderDto.getProductName(),
  364. orderDto.getProbationOrderid(),
  365. formatter.format(orderDto.getProbationStartTime()),
  366. formatter.format(
  367. new Date(orderDto.getProbationStartTime().getTime()
  368. + (awardRelu.getAwardNum() * 24 + orderDto.getProbationPauseTotalLength()) * 60 * 60 * 1000)),
  369. awardRelu.getAwardInfoTwo(),
  370. orderDto.getProbationOpenid(),
  371. ResultInfo.TRY_ORDER_DETAIL+"?orderId="+orderDto.getProbationOrderid());
  372. //修改日志
  373. probationShopOrderService.updatePauseLogsDate(newLogs);
  374. //发送短信给收货人
  375. if(StringUtils.isNotEmpty(orderDto.getProbationReceiveTel())){
  376. codeService.sendTextToUser(orderDto.getProbationReceiveTel(), awardRelu.getAwardNum().intValue());
  377. }
  378. }
  379. }
  380. }
  381. }
  382. }
  383. }
  384. }
  385. @SuppressWarnings(value = {"unchecked", "rawtypes"})
  386. @Scheduled(cron = "0 0/1 * * * ?")
  387. public void selectEfastOrder() {
  388. try{
  389. //获取cookie值
  390. ShopSystemRule rule = systemService.selectOneShopRuleById(244);
  391. String phone = "13590159563";
  392. String url = NameUtils.getConfig("efast_order_url") + phone;
  393. Map<String,String> params = new HashMap<String,String>();
  394. String cookie = rule.getRuleDesc();
  395. //查询百胜订单信息
  396. String str = HttpClient431Util.doGet(params,url,null,null,cookie);
  397. } catch(Exception e) {
  398. System.out.println("------------请求百胜接口失败-------------");
  399. }
  400. }
  401. }