MQTask.java 18 KB


  1. package com.iamberry.wechat.handles.mq;
  2. import java.lang.reflect.Method;
  3. import java.text.SimpleDateFormat;
  4. import java.util.*;
  5. import java.util.concurrent.locks.Lock;
  6. import java.util.concurrent.locks.ReentrantLock;
  7. import com.fasterxml.jackson.databind.ObjectMapper;
  8. import com.iamberry.wechat.core.entity.coupon.CouponItemDto;
  9. import com.iamberry.wechat.core.entity.order.Order;
  10. import com.iamberry.wechat.face.coupon.CouponItemService;
  11. import com.iamberry.wechat.tools.HttpClient431Util;
  12. import com.iamberry.wechat.tools.NameUtils;
  13. import net.sf.json.JSONObject;
  14. import org.apache.commons.lang.StringUtils;
  15. import org.apache.commons.lang3.StringEscapeUtils;
  16. import org.springframework.beans.factory.InitializingBean;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.context.annotation.Lazy;
  19. import org.springframework.scheduling.annotation.Scheduled;
  20. import org.springframework.stereotype.Component;
  21. import com.iamberry.app.tool.log.RatFWLogger;
  22. import com.iamberry.wechat.core.entity.mq.MQMessage;
  23. import com.iamberry.wechat.core.entity.order.ProbationOrderDto;
  24. import com.iamberry.wechat.core.entity.probation.ProbationAwardRelu;
  25. import com.iamberry.wechat.core.entity.probation.ProbationNewLogs;
  26. import com.iamberry.wechat.core.entity.task.TaskModel;
  27. import com.iamberry.wechat.core.entity.task.WechatTask;
  28. import com.iamberry.wechat.core.entity.task.WechatTaskLogs;
  29. import com.iamberry.wechat.face.order.CodeService;
  30. import com.iamberry.wechat.face.order.ProbationShopOrderService;
  31. import com.iamberry.wechat.face.reback.RebackServices;
  32. import com.iamberry.wechat.face.task.WechatTaskService;
  33. import com.iamberry.wechat.service.StaticCacheMemory;
  34. import com.iamberry.wechat.tools.DateTimeUtil;
  35. import com.iamberry.wechat.tools.ResultInfo;
  36. import com.iamberry.wechat.utils.SendMessageUtil;
  37. import com.iamberry.zk.SpringContextHolder;
  38. /**
  39. * @author:何秀刚
  40. * @description: MQ模拟实现,每天最多处理1382400条
  41. * @createDate:2016年5月25日
  42. */
  43. @Component(value="mqTask")
  44. @Lazy(false)
  45. public class MQTask implements InitializingBean {
  46. @Autowired
  47. private MQServiceProxy mQSerivce;
  48. @Autowired
  49. private RatFWLogger logger;
  50. @Autowired
  51. private WechatTaskService wechatTaskService;
  52. @Autowired
  53. private RebackServices rebackServices;
  54. @Autowired
  55. private ProbationShopOrderService probationShopOrderService;
  56. @Autowired
  57. private SendMessageUtil sendMessageUtil;
  58. @Autowired
  59. private CodeService codeService;
  60. @Autowired
  61. private CouponItemService couponItemService;
  62. private Lock lock = new ReentrantLock();
  63. @SuppressWarnings(value = {"unchecked", "rawtypes"})
  64. @Scheduled(cron = "0/30 * * * * ?")
  65. public void sendOrderService() {
  66. logger.info("------------task start-----------");
  67. // If the timer has not stopped, then the next time the timer can not start.
  68. lock.lock();
  69. try {
  70. // step 1, Gets the message that is not executed in the database.
  71. List<MQMessage> messages = mQSerivce.selectWaitHandlerMessage();
  72. if (messages == null || messages.size() <= 0) {
  73. return;
  74. }
  75. // step 2, Handle 10 messages at a time.
  76. for (MQMessage mqMessage : messages) {
  77. try {
  78. Object object = SpringContextHolder.getBean(mqMessage.getServiceHandlerObjectName());
  79. Class classes = object.getClass();
  80. Method method = classes.getMethod(mqMessage.getServiceHandlerMethodName(), MQMessage.class);
  81. if (method == null) {
  82. mqMessage.setServiceErrorMessage("Method is not defined:" + mqMessage.getServiceHandlerMethodName());
  83. mqMessage.setServiceStatus(2);
  84. continue;
  85. }
  86. // invoke Method.
  87. Object resultObject = method.invoke(object, mqMessage);
  88. if (!(resultObject instanceof Boolean)) {
  89. mqMessage.setServiceStatus(2);
  90. mqMessage.setServiceErrorMessage(mqMessage.getServiceHandlerObjectName() + "." + mqMessage.getServiceHandlerMethodName() + "()Return value is not type Boolean");
  91. continue;
  92. }
  93. if (!(Boolean) resultObject) {
  94. mqMessage.setServiceErrorMessage("invoke Method Error");
  95. mqMessage.setServiceStatus(2);
  96. continue;
  97. }
  98. mqMessage.setServiceStatus(1);
  99. continue;
  100. } catch (Exception e) {
  101. // TODO: handle exception
  102. mqMessage.setServiceErrorMessage(e.getMessage());
  103. mqMessage.setServiceStatus(2);
  104. continue;
  105. } finally {
  106. mqMessage.setServiceIsSend(1);
  107. }
  108. }
  109. // step 3, update Message status and serviceIsSend.
  110. mQSerivce.updateBatchMessage(messages);
  111. // step 4, is next ?
  112. if (messages.size() >= 10) {
  113. StaticCacheMemory.isStartTask = true;
  114. this.sendOrderService();
  115. }
  116. } catch (Exception e) {
  117. e.printStackTrace();
  118. logger.error(e, "MQ Task Error:" + e.getMessage());
  119. } finally {
  120. // reset lock
  121. lock.unlock();
  122. }
  123. logger.info("------------task invoke success-----------");
  124. }
  125. /**
  126. * 根据订单号拉取物流信息
  127. * @throws Exception
  128. */
  129. @Scheduled(cron = "0 0 */1 * * ?")//每小时执行一次
  130. //@Scheduled(cron = "0 0/2 * * * ?")//每2分钟执行一次
  131. //@Scheduled(cron = "*/10 * * * * ?")
  132. public void syncLgisticsInfoByOrderId() throws Exception {
  133. System.out.println("---------------- 根据订单号拉取物流信息开始 ---------------");
  134. logger.info("---------------- 根据订单号拉取物流信息开始 ---------------");
  135. lock.lock();
  136. List<Order> orderList = null;
  137. String url = NameUtils.getConfig("rst_efast_base_url");
  138. String sdId = NameUtils.getConfig("sd_id");
  139. Map<String, Object> orderData = new HashMap<String, Object>();
  140. Map<String, String> requestData = new HashMap<String, String>();
  141. com.fasterxml.jackson.databind.ObjectMapper mapper = new ObjectMapper();
  142. try {
  143. logger.info("---------------- selectProbationShopOrderList begin ---------------");
  144. orderList = probationShopOrderService.selectProbationShopOrderList();
  145. if(orderList != null && orderList.size() > 0){
  146. for (Order order:orderList) {
  147. String salesOrderid = order.getSalesOrderid();
  148. orderData.put("orderId", salesOrderid);
  149. orderData.put("sd_id", sdId);
  150. requestData.put("app_act", "rst.trade.logistics.get");
  151. String orderDataStr = mapper.writeValueAsString(orderData);
  152. requestData.put("info", orderDataStr);
  153. logger.info("app_act==:" + requestData.get("app_act") + "info==:" + requestData.get("info"));
  154. String result = HttpClient431Util.doPost(requestData, url);
  155. result = StringEscapeUtils.unescapeJava(result); // unicode 编码
  156. logger.info("拉取订单号为:" + order.getSalesOrderid() + "的订单,rst返回信息:" + result);
  157. System.out.println("拉取订单号为:" + order.getSalesOrderid() + "的订单,rst返回信息:" + result);
  158. JSONObject jsonObject = JSONObject.fromObject(result);
  159. String msg = jsonObject.getString("msg");
  160. logger.info("---------"+msg+"---------");
  161. if ("success".equals(msg)) {
  162. Date date = new Date();
  163. String salesPostFirm = jsonObject.getString("salesPostFirm");
  164. String salesPostNum = jsonObject.getString("salesPostNum");
  165. order.setSalesPostFirm(salesPostFirm);
  166. order.setSalesPostNum(salesPostNum);
  167. order.setSalesSendDate(date);
  168. probationShopOrderService.updateProbationShopOrder(order);
  169. }
  170. }
  171. }
  172. }catch (Exception e){
  173. logger.error("拉取订单异常",e.getMessage());
  174. }finally {
  175. lock.unlock();
  176. }
  177. logger.info("---------------- 根据订单号拉取物流信息结束 ---------------");
  178. }
  179. /**
  180. * 订阅消息
  181. */
  182. public void subscribeMessageQueue() {
  183. }
  184. // 强迫线程可见
  185. private volatile boolean oldState = true;
  186. /**
  187. * 微信定期任务奖励-上个月的定时器
  188. */
  189. @SuppressWarnings("deprecation")
  190. // @Scheduled(cron = "0/59 * * * * ?")
  191. public void oldMonthTask() {
  192. if (!oldState) {
  193. logger.info("线程正忙...");
  194. return;
  195. }
  196. oldState = false;
  197. try {
  198. // 判断上一个月的数据是否奖励完成
  199. logger.info("上一个月的定时奖励任务奖励...");
  200. // 当前时间
  201. Date nowDate = new Date();
  202. // 回到上一个月
  203. nowDate.setMonth(nowDate.getMonth() - 1);
  204. // 上一个月的结束时间 类似 :2016-02-29 23:59:59
  205. Date oldMonthLastDay = DateTimeUtil.getLastDayOfYear(nowDate);
  206. // 上一个月的开始时间 类似 :2016-02-01 00:00:00
  207. Date oldMonthFirstDay = DateTimeUtil.getFirstDayOfYear(nowDate);
  208. List<WechatTask> tasks = wechatTaskService.getRunTask(oldMonthFirstDay, oldMonthLastDay);
  209. if (tasks == null || tasks.size() == 0) {
  210. logger.info("暂时没有上一个月的任务奖励...");
  211. oldState = true;
  212. return;
  213. }
  214. // 获取在上一个月销售达到X台的数据
  215. for (WechatTask wechatTask : tasks) {
  216. List<TaskModel> models = rebackServices.getByDate(wechatTask.getTaskProductType(), oldMonthFirstDay, oldMonthLastDay, wechatTask.getTaskSalesNum());
  217. if (models == null || models.size() == 0) {
  218. logger.info("暂时没有人完成上一个月的任务" + wechatTask.getTaskId() + "奖励...");
  219. continue;
  220. }
  221. // 达到的用户是否已经领取奖励
  222. for (TaskModel taskModel : models) {
  223. WechatTaskLogs logs = new WechatTaskLogs();
  224. logs.setLogsTaskId(wechatTask.getTaskId());
  225. logs.setLogsUserOpenId(taskModel.getOpenID());
  226. Integer logsID = wechatTaskService.getByTaskIdAndOpenId(logs);
  227. if (logsID != null) {
  228. // 如果当前用户已经领取了奖励,那么跳过
  229. continue;
  230. }
  231. // 若用没有领取,那么奖励
  232. try {
  233. wechatTaskService.handlerTask(taskModel.getOpenID(), wechatTask.getTaskMoney(), wechatTask.getTaskId(), wechatTask.getTaskProductType());
  234. } catch (Exception e) {
  235. logger.error(this, "对:" + taskModel.getOpenID() + ",奖励失败!error:" + e.getMessage());
  236. }
  237. }
  238. }
  239. } catch (Exception e) {
  240. logger.error(this, "发生异常,奖励失败!error:" + e.getMessage());
  241. } finally {
  242. // 归还锁
  243. oldState = true;
  244. }
  245. }
  246. // 强迫线程可见
  247. private volatile boolean nowState = true;
  248. /**
  249. * 微信定期任务奖励-本月的定时器
  250. */
  251. // @Scheduled(cron = "0/59 * * * * ?")
  252. public void nowMonthTask() {
  253. if (!nowState) {
  254. logger.info("线程正忙...");
  255. return;
  256. }
  257. nowState = false;
  258. try {
  259. // 判断本月是否有奖励数据
  260. logger.info("本月的定时奖励任务奖励...");
  261. // 当前时间
  262. Date nowDate = new Date();
  263. // 上一个月的结束时间 类似 :2016-02-29 23:59:59
  264. Date oldMonthLastDay = DateTimeUtil.getLastDayOfYear(nowDate);
  265. // 上一个月的开始时间 类似 :2016-02-01 00:00:00
  266. Date oldMonthFirstDay = DateTimeUtil.getFirstDayOfYear(nowDate);
  267. List<WechatTask> tasks = wechatTaskService.getRunTask(oldMonthFirstDay, oldMonthLastDay);
  268. if (tasks == null || tasks.size() == 0) {
  269. logger.info("暂时没有本月的任务奖励...");
  270. nowState = true;
  271. return;
  272. }
  273. // 获取在上一个月销售达到X台的数据
  274. for (WechatTask wechatTask : tasks) {
  275. List<TaskModel> models = rebackServices.getByDate(wechatTask.getTaskProductType(), oldMonthFirstDay, oldMonthLastDay, wechatTask.getTaskSalesNum());
  276. if (models == null || models.size() == 0) {
  277. logger.info("暂时没有人完成本月的任务" + wechatTask.getTaskId() + "奖励...");
  278. continue;
  279. }
  280. // 达到的用户是否已经领取奖励
  281. for (TaskModel taskModel : models) {
  282. WechatTaskLogs logs = new WechatTaskLogs();
  283. logs.setLogsTaskId(wechatTask.getTaskId());
  284. logs.setLogsUserOpenId(taskModel.getOpenID());
  285. Integer logsID = wechatTaskService.getByTaskIdAndOpenId(logs);
  286. if (logsID != null) {
  287. // 如果当前用户已经领取了奖励,那么跳过
  288. continue;
  289. }
  290. // 若用没有领取,那么奖励
  291. try {
  292. wechatTaskService.handlerTask(taskModel.getOpenID(), wechatTask.getTaskMoney(), wechatTask.getTaskId(), wechatTask.getTaskProductType());
  293. } catch (Exception e) {
  294. logger.error(this, "对:" + taskModel.getOpenID() + ",奖励失败!error:" + e.getMessage());
  295. }
  296. }
  297. }
  298. } catch (Exception e) {
  299. // TODO: handle exception
  300. logger.error(this, "发生异常:" + e.getMessage());
  301. } finally {
  302. nowState = true;
  303. }
  304. }
  305. public void afterPropertiesSet() throws Exception {
  306. }
  307. /**
  308. * 试用订单短信推送服务(满足XX天奖励XX奖励)
  309. */
  310. // @Scheduled(cron = "0 0 */1 * * ?")//每小时执行一次
  311. public void probationRewardPush(){
  312. logger.info("------------执行推送满足奖励条件用户 start-----------");
  313. //查询奖励推送规则
  314. List<ProbationAwardRelu> reluList = probationShopOrderService.selectProbationAwardRelu();
  315. SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  316. if(reluList!=null && !reluList.isEmpty()){
  317. for(ProbationAwardRelu awardRelu : reluList){
  318. //查询所有待支付订单
  319. List<ProbationOrderDto> orderDtoList = probationShopOrderService.selectProbationStartOrder();
  320. if(orderDtoList!=null && !orderDtoList.isEmpty()){
  321. for(ProbationOrderDto orderDto : orderDtoList ){
  322. //计算总试用期限(小时)
  323. Long totalDate = (orderDto.getProbationEndTime().getTime() - orderDto.getProbationStartTime().getTime()) / 1000 / 60 / 60;
  324. //计算试用剩余期限(小时)
  325. Long remainingDate = (orderDto.getProbationEndTime().getTime() - new Date().getTime()) / 1000 / 60 / 60;
  326. //暂停时间
  327. /*Long pauseDate = 0l;
  328. if(orderDto.getProbationPauseStartTime() != null && orderDto.getProbationPauseEndTime() != null){
  329. pauseDate = (orderDto.getProbationPauseEndTime().getTime() - orderDto.getProbationPauseStartTime().getTime()) / 1000 / 60 / 60;
  330. }*/
  331. //实际试用时间 (总试用时间 - 试用剩余期限 - 暂停时间)
  332. Long actualProbationDate = totalDate - remainingDate - orderDto.getProbationPauseTotalLength() / 1000 / 60 / 60;
  333. //奖励奖品天数转换为小时
  334. Long awardDate = Long.valueOf(awardRelu.getAwardNum()) * 24;
  335. //奖励条件时间 - 实际试用时间 (大于23小时或者小于26小时则推送)
  336. if( awardDate -actualProbationDate > 23 && awardDate -actualProbationDate < 26 ){
  337. ProbationNewLogs probationNewLogs = probationShopOrderService.selectProbationNewLogsByOrderId(orderDto.getProbationOrderid());
  338. //添加日志信息
  339. ProbationNewLogs newLogs = new ProbationNewLogs();
  340. newLogs.setLogsPorderid(orderDto.getProbationOrderid());
  341. newLogs.setLogsTime(awardRelu.getAwardNum());
  342. newLogs.setLogsCreateTime(new Date());
  343. if(probationNewLogs == null){
  344. //推送操作
  345. sendMessageUtil.probationPush(awardRelu.getAwardInfo(),
  346. orderDto.getProductName(),
  347. orderDto.getProbationOrderid(),
  348. formatter.format(orderDto.getProbationStartTime()),
  349. formatter.format(
  350. new Date(
  351. orderDto.getProbationStartTime().getTime() +
  352. (awardRelu.getAwardNum() * 24 + orderDto.getProbationPauseTotalLength()) * 60 * 60 * 1000)),
  353. awardRelu.getAwardInfoTwo(),
  354. orderDto.getProbationOpenid(),
  355. ResultInfo.TRY_ORDER_DETAIL+"?orderId="+orderDto.getProbationOrderid());
  356. //添加日志
  357. probationShopOrderService.insertProbationLogs(newLogs);
  358. //发送短信给收货人
  359. if(StringUtils.isNotEmpty(orderDto.getProbationReceiveTel())){
  360. codeService.sendTextToUser(orderDto.getProbationReceiveTel(), 6);
  361. }
  362. }else if(probationNewLogs.getLogsTime() < awardRelu.getAwardNum()){
  363. //执行推送
  364. sendMessageUtil.probationPush(
  365. awardRelu.getAwardInfo(),
  366. orderDto.getProductName(),
  367. orderDto.getProbationOrderid(),
  368. formatter.format(orderDto.getProbationStartTime()),
  369. formatter.format(
  370. new Date(orderDto.getProbationStartTime().getTime()
  371. + (awardRelu.getAwardNum() * 24 + orderDto.getProbationPauseTotalLength()) * 60 * 60 * 1000)),
  372. awardRelu.getAwardInfoTwo(),
  373. orderDto.getProbationOpenid(),
  374. ResultInfo.TRY_ORDER_DETAIL+"?orderId="+orderDto.getProbationOrderid());
  375. //修改日志
  376. probationShopOrderService.updatePauseLogsDate(newLogs);
  377. //发送短信给收货人
  378. if(StringUtils.isNotEmpty(orderDto.getProbationReceiveTel())){
  379. codeService.sendTextToUser(orderDto.getProbationReceiveTel(), awardRelu.getAwardNum().intValue());
  380. }
  381. }
  382. }
  383. }
  384. }
  385. }
  386. }
  387. }
  388. /**
  389. * 每天有过期的优惠券提醒
  390. * @throws Exception
  391. */
  392. // @Scheduled(cron = "0 0 */1 * * ?")//每小时执行一次
  393. // @Scheduled(cron = "0 0/2 * * * ?")//每2分钟执行一次
  394. @Scheduled(cron = "0 0 9 * * ?")//每天早上9点触发
  395. public void expiredCouponRemind() throws Exception {
  396. logger.info("---------------- 过期优惠券提醒开始 ---------------");
  397. CouponItemDto couponItemDto = new CouponItemDto();
  398. couponItemDto.setCouponUseStatus(1);
  399. Calendar calendar = Calendar.getInstance();
  400. calendar.set(Calendar.HOUR_OF_DAY,0);
  401. calendar.set(Calendar.MINUTE,0);
  402. calendar.set(Calendar.SECOND,5);
  403. couponItemDto.setBeginDate(calendar.getTime());
  404. Calendar calendarEnd = Calendar.getInstance();
  405. calendarEnd.set(Calendar.HOUR_OF_DAY,23);
  406. calendarEnd.set(Calendar.MINUTE,59);
  407. calendarEnd.set(Calendar.SECOND,55);
  408. couponItemDto.setEndDate(calendarEnd.getTime());
  409. List<CouponItemDto> couponItemDtoList = couponItemService.expiredAndNoTake(couponItemDto);
  410. for (CouponItemDto cid:couponItemDtoList) {
  411. String name = cid.getCouponName();
  412. try{
  413. sendMessageUtil.servicePush(
  414. "亲爱的用户,您的"+ name +"即将到期",
  415. name,
  416. "今日到期",
  417. "可在“服务中心—会员中心—券包”查看,请尽快使用。",
  418. cid.getUseropenid(),
  419. ResultInfo.SERVICE_PUSH_URL);
  420. }catch (Exception e){
  421. e.printStackTrace();
  422. logger.info("-------- 优惠券ID:"+ cid.getCouponItemId() + " 优惠券名称:" + name +"即将过期,推送提醒失败 ------");
  423. }
  424. logger.info("-------- 优惠券ID:"+ cid.getCouponItemId() + " 优惠券名称:" + name +"即将过期,推送提醒成功 ------");
  425. }
  426. logger.info("---------------- 过期优惠券提醒结束 ---------------");
  427. }
  428. }