MQTask.java 18 KB


  1. package com.iamberry.wechat.handles.mq;
  2. import java.lang.reflect.Method;
  3. import java.text.MessageFormat;
  4. import java.util.*;
  5. import java.util.concurrent.locks.Lock;
  6. import java.util.concurrent.locks.ReentrantLock;
  7. import com.fasterxml.jackson.databind.ObjectMapper;
  8. import com.auth0.jwt.internal.org.apache.commons.lang3.StringEscapeUtils;
  9. import com.iamberry.app.config.ImberryConfig;
  10. import com.iamberry.wechat.core.entity.coupon.CouponItem;
  11. import com.iamberry.wechat.core.entity.coupon.CouponType;
  12. import com.iamberry.wechat.core.entity.ism.IntegralLog;
  13. import com.iamberry.wechat.core.entity.member.Member;
  14. import com.iamberry.wechat.core.entity.order.NumberBacklogDto;
  15. import com.iamberry.wechat.core.entity.order.Order;
  16. import com.iamberry.wechat.face.coupon.CouponItemService;
  17. import com.iamberry.wechat.face.coupon.CouponTypeService;
  18. import com.iamberry.wechat.face.ism.IntegralLogService;
  19. import com.iamberry.wechat.face.member.MemberService;
  20. import com.iamberry.wechat.face.sendmsg.CodeService;
  21. import com.iamberry.wechat.tools.*;
  22. import net.sf.json.JSONObject;
  23. import org.springframework.beans.factory.InitializingBean;
  24. import org.springframework.beans.factory.annotation.Autowired;
  25. import org.springframework.context.annotation.Lazy;
  26. import org.springframework.scheduling.annotation.Scheduled;
  27. import org.springframework.stereotype.Component;
  28. import com.iamberry.app.tool.log.RatFWLogger;
  29. import com.iamberry.wechat.core.entity.mq.MQMessage;
  30. import com.iamberry.wechat.core.entity.task.TaskModel;
  31. import com.iamberry.wechat.core.entity.task.WechatTask;
  32. import com.iamberry.wechat.core.entity.task.WechatTaskLogs;
  33. import com.iamberry.wechat.face.order.AdminOrderService;
  34. import com.iamberry.wechat.face.reback.RebackServices;
  35. import com.iamberry.wechat.face.task.WechatTaskService;
  36. import com.iamberry.zk.SpringContextHolder;
  37. /**
  38. * @company 深圳爱贝源科技有限公司
  39. * @website www.iamberry.com
  40. * @author 献
  41. * @tel 18271840547
  42. * @date 2016年11月3日
  43. * @explain MQ模拟实现
  44. */
  45. @Component(value="mqTask")
  46. @Lazy(false)
  47. public class MQTask implements InitializingBean {
  48. @Autowired
  49. private AdminOrderService adminOrderService;
  50. @Autowired
  51. private MQServiceProxy mQSerivce;
  52. @Autowired
  53. private RatFWLogger logger;
  54. @Autowired
  55. private WechatTaskService wechatTaskService;
  56. @Autowired
  57. private RebackServices rebackServices;
  58. @Autowired
  59. private CodeService codeService;
  60. @Autowired
  61. private MemberService memberService;
  62. @Autowired
  63. private CouponTypeService couponTypeService;
  64. @Autowired
  65. private CouponItemService couponItemService;
  66. @Autowired
  67. private SendMessageUtil sendMessageUtil;
  68. @Autowired
  69. private IntegralLogService integralLogService;
  70. private Lock lock = new ReentrantLock();
  71. @SuppressWarnings(value = {"unchecked", "rawtypes"})
  72. @Scheduled(cron = "0/30 * * * * ?")
  73. public void sendOrderService() {
  74. logger.info("------------task start-----------");
  75. // If the timer has not stopped, then the next time the timer can not start.
  76. lock.lock();
  77. try {
  78. // step 1, Gets the message that is not executed in the database.
  79. List<MQMessage> messages = mQSerivce.selectWaitHandlerMessage();
  80. if (messages == null || messages.size() <= 0) {
  81. return;
  82. }
  83. // step 2, Handle 10 messages at a time.
  84. for (MQMessage mqMessage : messages) {
  85. try {
  86. Object object = SpringContextHolder.getBean(mqMessage.getServiceHandlerObjectName());
  87. Class classes = object.getClass();
  88. Method method = classes.getMethod(mqMessage.getServiceHandlerMethodName(), MQMessage.class);
  89. if (method == null) {
  90. mqMessage.setServiceErrorMessage("Method is not defined:" + mqMessage.getServiceHandlerMethodName());
  91. mqMessage.setServiceStatus(2);
  92. continue;
  93. }
  94. // invoke Method.
  95. Object resultObject = method.invoke(object, mqMessage);
  96. if (!(resultObject instanceof Boolean)) {
  97. mqMessage.setServiceStatus(2);
  98. mqMessage.setServiceErrorMessage(mqMessage.getServiceHandlerObjectName() + "." + mqMessage.getServiceHandlerMethodName() + "()Return value is not type Boolean");
  99. continue;
  100. }
  101. if (!(Boolean) resultObject) {
  102. mqMessage.setServiceErrorMessage("invoke Method Error");
  103. mqMessage.setServiceStatus(2);
  104. continue;
  105. }
  106. mqMessage.setServiceStatus(1);
  107. continue;
  108. } catch (Exception e) {
  109. // TODO: handle exception
  110. mqMessage.setServiceErrorMessage(e.getMessage());
  111. mqMessage.setServiceStatus(2);
  112. continue;
  113. } finally {
  114. mqMessage.setServiceIsSend(1);
  115. }
  116. }
  117. // step 3, update Message status and serviceIsSend.
  118. mQSerivce.updateBatchMessage(messages);
  119. // step 4, is next ?
  120. /*if (messages.size() >= 10) {
  121. StaticCacheMemory.isStartTask = true;
  122. this.sendOrderService();
  123. }*/
  124. } catch (Exception e) {
  125. e.printStackTrace();
  126. logger.error(e, "MQ Task Error:" + e.getMessage());
  127. } finally {
  128. // reset lock
  129. lock.unlock();
  130. }
  131. logger.info("------------task invoke success-----------");
  132. }
  133. /**
  134. * 根据订单号拉取物流信息
  135. * @throws Exception
  136. */
  137. @Scheduled(cron = "0 0 */1 * * ?")//每小时执行一次
  138. //@Scheduled(cron = "0 0/2 * * * ?")//每2分钟执行一次
  139. //@Scheduled(cron = "*/10 * * * * ?")
  140. public void syncLgisticsInfoByOrderId() throws Exception {
  141. System.out.println("---------------- 根据订单号拉取物流信息开始 ---------------");
  142. logger.info("---------------- 根据订单号拉取物流信息开始 ---------------");
  143. lock.lock();
  144. List<Order> orderList = null;
  145. String url = NameUtils.getConfig("rst_efast_base_url");
  146. String sdId = NameUtils.getConfig("sd_id");
  147. Map<String, Object> orderData = new HashMap<String, Object>();
  148. Map<String, String> requestData = new HashMap<String, String>();
  149. com.fasterxml.jackson.databind.ObjectMapper mapper = new ObjectMapper();
  150. try {
  151. logger.info("---------------- selectProbationShopOrderList begin ---------------");
  152. orderList = wechatTaskService.selectAiberleShopOrderList();
  153. if(orderList != null && orderList.size() > 0){
  154. for (Order order:orderList) {
  155. String salesOrderid = order.getSalesOrderid();
  156. orderData.put("orderId", salesOrderid);
  157. orderData.put("sd_id", sdId);
  158. requestData.put("app_act", "rst.trade.logistics.get");
  159. String orderDataStr = mapper.writeValueAsString(orderData);
  160. requestData.put("info", orderDataStr);
  161. String result = HttpClient431Util.doPost(requestData, url);
  162. result = StringEscapeUtils.unescapeJava(result); // unicode 编码
  163. logger.info("拉取订单号为:" + order.getSalesOrderid() + "的订单,rst返回信息:" + result);
  164. JSONObject jsonObject = JSONObject.fromObject(result);
  165. String msg = jsonObject.getString("msg");
  166. logger.info("---------"+msg+"---------");
  167. if ("success".equals(msg)) {
  168. Date date = new Date();
  169. String salesPostFirm = jsonObject.getString("salesPostFirm");
  170. String salesPostNum = jsonObject.getString("salesPostNum");
  171. order.setSalesPostFirm(salesPostFirm);
  172. order.setSalesPostNum(salesPostNum);
  173. order.setSalesSendDate(date);
  174. wechatTaskService.updateAiberleShopOrder(order);
  175. }
  176. }
  177. }
  178. }catch (Exception e){
  179. logger.error("拉取订单异常",e.getMessage());
  180. }finally {
  181. lock.unlock();
  182. }
  183. logger.info("---------------- 根据订单号拉取物流信息结束 ---------------");
  184. }
  185. /**
  186. * 订阅消息
  187. */
  188. public void subscribeMessageQueue() {
  189. }
  190. // 强迫线程可见
  191. private volatile boolean oldState = true;
  192. /**
  193. * 微信定期任务奖励-上个月的定时器
  194. */
  195. @SuppressWarnings("deprecation")
  196. @Scheduled(cron = "0/59 * * * * ?")
  197. public void oldMonthTask() {
  198. if (!oldState) {
  199. logger.info("线程正忙...");
  200. return;
  201. }
  202. oldState = false;
  203. try {
  204. // 判断上一个月的数据是否奖励完成
  205. logger.info("上一个月的定时奖励任务奖励...");
  206. // 当前时间
  207. Date nowDate = new Date();
  208. // 回到上一个月
  209. nowDate.setMonth(nowDate.getMonth() - 1);
  210. // 上一个月的结束时间 类似 :2016-02-29 23:59:59
  211. Date oldMonthLastDay = DateTimeUtil.getLastDayOfYear(nowDate);
  212. // 上一个月的开始时间 类似 :2016-02-01 00:00:00
  213. Date oldMonthFirstDay = DateTimeUtil.getFirstDayOfYear(nowDate);
  214. List<WechatTask> tasks = wechatTaskService.getRunTask(oldMonthFirstDay, oldMonthLastDay);
  215. if (tasks == null || tasks.size() == 0) {
  216. logger.info("暂时没有上一个月的任务奖励...");
  217. oldState = true;
  218. return;
  219. }
  220. // 获取在上一个月销售达到X台的数据
  221. for (WechatTask wechatTask : tasks) {
  222. List<TaskModel> models = rebackServices.getByDate(wechatTask.getTaskProductType(), oldMonthFirstDay, oldMonthLastDay, wechatTask.getTaskSalesNum());
  223. if (models == null || models.size() == 0) {
  224. logger.info("暂时没有人完成上一个月的任务" + wechatTask.getTaskId() + "奖励...");
  225. continue;
  226. }
  227. // 达到的用户是否已经领取奖励
  228. for (TaskModel taskModel : models) {
  229. WechatTaskLogs logs = new WechatTaskLogs();
  230. logs.setLogsTaskId(wechatTask.getTaskId());
  231. logs.setLogsUserOpenId(taskModel.getOpenID());
  232. Integer logsID = wechatTaskService.getByTaskIdAndOpenId(logs);
  233. if (logsID != null) {
  234. // 如果当前用户已经领取了奖励,那么跳过
  235. continue;
  236. }
  237. // 若用没有领取,那么奖励
  238. try {
  239. wechatTaskService.handlerTask(taskModel.getOpenID(), wechatTask.getTaskMoney(), wechatTask.getTaskId(), wechatTask.getTaskProductType());
  240. } catch (Exception e) {
  241. logger.error(this, "对:" + taskModel.getOpenID() + ",奖励失败!error:" + e.getMessage());
  242. }
  243. }
  244. }
  245. } catch (Exception e) {
  246. logger.error(this, "发生异常,奖励失败!error:" + e.getMessage());
  247. } finally {
  248. // 归还锁
  249. oldState = true;
  250. }
  251. }
  252. // 强迫线程可见
  253. private volatile boolean nowState = true;
  254. /**
  255. * 微信定期任务奖励-本月的定时器
  256. */
  257. @Scheduled(cron = "0/59 * * * * ?")
  258. public void nowMonthTask() {
  259. if (!nowState) {
  260. logger.info("线程正忙...");
  261. return;
  262. }
  263. nowState = false;
  264. try {
  265. // 判断本月是否有奖励数据
  266. logger.info("本月的定时奖励任务奖励...");
  267. // 当前时间
  268. Date nowDate = new Date();
  269. // 上一个月的结束时间 类似 :2016-02-29 23:59:59
  270. Date oldMonthLastDay = DateTimeUtil.getLastDayOfYear(nowDate);
  271. // 上一个月的开始时间 类似 :2016-02-01 00:00:00
  272. Date oldMonthFirstDay = DateTimeUtil.getFirstDayOfYear(nowDate);
  273. List<WechatTask> tasks = wechatTaskService.getRunTask(oldMonthFirstDay, oldMonthLastDay);
  274. if (tasks == null || tasks.size() == 0) {
  275. logger.info("暂时没有本月的任务奖励...");
  276. nowState = true;
  277. return;
  278. }
  279. // 获取在上一个月销售达到X台的数据
  280. for (WechatTask wechatTask : tasks) {
  281. List<TaskModel> models = rebackServices.getByDate(wechatTask.getTaskProductType(), oldMonthFirstDay, oldMonthLastDay, wechatTask.getTaskSalesNum());
  282. if (models == null || models.size() == 0) {
  283. logger.info("暂时没有人完成本月的任务" + wechatTask.getTaskId() + "奖励...");
  284. continue;
  285. }
  286. // 达到的用户是否已经领取奖励
  287. for (TaskModel taskModel : models) {
  288. WechatTaskLogs logs = new WechatTaskLogs();
  289. logs.setLogsTaskId(wechatTask.getTaskId());
  290. logs.setLogsUserOpenId(taskModel.getOpenID());
  291. Integer logsID = wechatTaskService.getByTaskIdAndOpenId(logs);
  292. if (logsID != null) {
  293. // 如果当前用户已经领取了奖励,那么跳过
  294. continue;
  295. }
  296. // 若用没有领取,那么奖励
  297. try {
  298. wechatTaskService.handlerTask(taskModel.getOpenID(), wechatTask.getTaskMoney(), wechatTask.getTaskId(), wechatTask.getTaskProductType());
  299. } catch (Exception e) {
  300. logger.error(this, "对:" + taskModel.getOpenID() + ",奖励失败!error:" + e.getMessage());
  301. }
  302. }
  303. }
  304. } catch (Exception e) {
  305. // TODO: handle exception
  306. logger.error(this, "发生异常:" + e.getMessage());
  307. } finally {
  308. nowState = true;
  309. }
  310. }
  311. public void afterPropertiesSet() throws Exception {
  312. }
  313. /**
  314. * 待处理订单提醒---每天下午2:50,短信提醒益霜、运营一次(15814645335、18271840547);
  315. */
  316. // @SuppressWarnings(value = {"unchecked", "rawtypes"})
  317. // @Scheduled(cron = "0 50 14 * * ?")//每天下午2点50分执行一次
  318. // public void pendingOrderReminder() {
  319. // logger.info("=======发送短信通知========");
  320. // //查询待发货事项、申请退款订单、申请退货订单、申请换货订单 的数量
  321. // NumberBacklogDto numberBacklogDto = adminOrderService.selectNumberBacklog();
  322. // Integer num = (numberBacklogDto.getExchange()+numberBacklogDto.getReturnGoods()+numberBacklogDto.getReturnRefund()+numberBacklogDto.getSendTheGoods());
  323. // String text = MessageFormat.format(ImberryConfig.PENDING_ORDER_REMINDER, num);
  324. // if(num > 0){
  325. // codeService.informShipping("15814645335", text);
  326. // codeService.informShipping("18271840547", text);
  327. // }
  328. // }
  329. /**
  330. * 生日提醒 - 赠送优惠券 - 需要在0点就赠送出去
  331. * @throws Exception
  332. */
  333. // @Scheduled(cron = "0 0 */1 * * ?")//每小时执行一次
  334. // @Scheduled(cron = "0 0 9 * * ?")//每天早上9点触发
  335. // @Scheduled(cron = "0 0/2 * * * ?")//每2分钟执行一次
  336. @Scheduled(cron = "0 5 0 * * ?")//每天早上0点过5分执行
  337. public void birthDatePromptCoupon() throws Exception {
  338. logger.info("---------------- 生日提醒-赠送优惠券-开始 ---------------");
  339. List<Member> memberList = memberService.getNowBirthDate();
  340. logger.info("---今日生日的用户数量:"+memberList.size());
  341. Integer couponId = 40000;
  342. for (Member member:memberList) {
  343. CouponType couponType = couponTypeService.getCouponTypeById(couponId);
  344. Calendar calendar = Calendar.getInstance();
  345. calendar.set(Calendar.HOUR_OF_DAY,23);
  346. calendar.set(Calendar.MINUTE,59);
  347. calendar.set(Calendar.SECOND,50);
  348. //创建优惠券
  349. CouponItem couponItem = new CouponItem();
  350. String uuidStr = StrUtils.getUUID();
  351. couponItem.setCouponItemId(uuidStr);
  352. couponItem.setCouponItemUseropenid(member.getUserOpenid());
  353. couponItem.setCouponId(couponType.getCouponId());
  354. couponItem.setCouponReceiveDate(new Date());
  355. couponItem.setCouponUseEndDate(calendar.getTime());
  356. couponItem.setCouponUseStatus(1);
  357. couponItem.setCouponItemRemark(couponType.getCouponRemark());
  358. Integer flag = couponItemService.insertCouponItem(couponItem);
  359. if(flag < 1){
  360. logger.info("---生日优惠券领取失败,userOpenId:"+member.getUserOpenid() + "");
  361. }else{
  362. logger.info("---生日优惠券领取成功,userOpenId:"+member.getUserOpenid() + "");
  363. }
  364. }
  365. logger.info("---------------- 生日提醒-赠送优惠券-结束 ---------------");
  366. }
  367. /**
  368. * 生日提醒-9点通知客户今天优惠券和积分三倍情况
  369. * @throws Exception
  370. */
  371. // @Scheduled(cron = "0 0 */1 * * ?")//每小时执行一次
  372. @Scheduled(cron = "0 0/2 * * * ?")//每2分钟执行一次
  373. // @Scheduled(cron = "0 0 9 * * ?")//每天早上9点触发
  374. public void birthDatePrompt() throws Exception {
  375. logger.info("---------------- 生日提醒-提醒用户-开始 ---------------");
  376. List<Member> memberList = memberService.getNowBirthDate();
  377. logger.info("---今日生日的用户数量:"+memberList.size());
  378. for (Member member:memberList) {
  379. //推送微信模板消息
  380. try {
  381. sendMessageUtil.memberActivation(
  382. "尊敬的会员,祝您生日快乐,特送上一份生日礼包。",
  383. member.getUserTel(),
  384. String.valueOf(member.getUserSurplusIntegral()),
  385. "1、免费领取50元代金券,满99元即可使用,在会员中心-券包即可查看使用。\\\\r\\\\n" +
  386. "2、会员生日当天下单购买产品,即可享受3倍积分。" +
  387. "生日福利仅限生日当天领取和使用,赶快享受你的专属生日礼包吧!",
  388. member.getUserOpenid(),
  389. ResultInfo.COUPON_URL);
  390. }catch (Exception e){
  391. logger.info("推送今日生日的用户消息失败,失败openId:" + member.getUserOpenid());
  392. e.printStackTrace();
  393. }
  394. }
  395. logger.info("---------------- 生日提醒-提醒用户-结束 ---------------");
  396. }
  397. /**
  398. * 将待入账转为已入账
  399. * @throws Exception
  400. */
  401. // @Scheduled(cron = "0 0/2 * * * ?")//每2分钟执行一次
  402. @Scheduled(cron = "0 0 */1 * * ?")//每小时执行一次
  403. public void accountEntry() throws Exception {
  404. logger.info("---------------- 待入账转为已入账-开始 ---------------");
  405. List<IntegralLog> integralLogList = integralLogService.getIntegralLogListByWaitUser();
  406. if(integralLogList!=null && integralLogList.size()>0){
  407. logger.info("---------------- 当前待入账的处理数量-"+ integralLogList.size() +" ---------------");
  408. for (IntegralLog integralLog:integralLogList) {
  409. try {
  410. integralLogService.waitUser(integralLog);
  411. }catch (Exception e){
  412. logger.info("--待入账转为已入账失败;待入账积分记录id:"+integralLog.getInteLogId()+";" +
  413. "订单编号:"+ integralLog.getInteLogWaitingOrderId() +";失败原因:"+e.getMessage());
  414. e.printStackTrace();
  415. }
  416. }
  417. logger.info("---------------- 待入账转为已入账-结束 ---------------");
  418. }
  419. }
  420. /**
  421. * 20分钟刷新-未支付订单自动取消订单
  422. * @throws Exception
  423. */
  424. // @Scheduled(cron = "0 0/2 * * * ?")//每2分钟执行一次
  425. @Scheduled(cron = "0 0/20 * * * ?")//每20分钟执行一次
  426. public void cancelOrder() throws Exception {
  427. logger.info("---------------- 取消订单-开始 ---------------");
  428. Calendar calendar = Calendar.getInstance();
  429. calendar.setTime(new Date());
  430. calendar.add(Calendar.MINUTE, -20);//20分钟前的时间
  431. Order order = new Order();
  432. order.setCancelDate(calendar.getTime());
  433. List<Order> orderList = adminOrderService.cancelOrderList(order);
  434. if(orderList!=null && orderList.size()>0){
  435. logger.info("---------------- 当前订单处理数量-"+ orderList.size() +" ---------------");
  436. for (Order ord:orderList) {
  437. Integer flag = adminOrderService.cancelOrder(ord.getSalesOrderid());
  438. if(flag < 1){
  439. logger.info("---------------- 订单取消失败-订单号:"+ ord.getSalesOrderid() +" ---------------");
  440. }
  441. }
  442. logger.info("---------------- 取消订单-结束 ---------------");
  443. }
  444. }
  445. }