MQTask.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. package com.iamberry.wechat.handles.mq;
  2. import java.lang.reflect.Method;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. import java.util.HashMap;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.concurrent.locks.Lock;
  9. import java.util.concurrent.locks.ReentrantLock;
  10. import com.iamberry.wechat.core.entity.admin.ShopSystemRule;
  11. import com.iamberry.wechat.face.admin.SystemService;
  12. import com.iamberry.wechat.tools.HttpClient431Util;
  13. import com.iamberry.wechat.tools.NameUtils;
  14. import org.apache.commons.lang.StringUtils;
  15. import org.springframework.beans.factory.InitializingBean;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.context.annotation.Lazy;
  18. import org.springframework.scheduling.annotation.Scheduled;
  19. import org.springframework.stereotype.Component;
  20. import com.iamberry.app.tool.log.RatFWLogger;
  21. import com.iamberry.wechat.core.entity.mq.MQMessage;
  22. import com.iamberry.wechat.core.entity.order.ProbationOrderDto;
  23. import com.iamberry.wechat.core.entity.probation.ProbationAwardRelu;
  24. import com.iamberry.wechat.core.entity.probation.ProbationNewLogs;
  25. import com.iamberry.wechat.core.entity.task.TaskModel;
  26. import com.iamberry.wechat.core.entity.task.WechatTask;
  27. import com.iamberry.wechat.core.entity.task.WechatTaskLogs;
  28. import com.iamberry.wechat.face.order.CodeService;
  29. import com.iamberry.wechat.face.order.ProbationShopOrderService;
  30. import com.iamberry.wechat.face.reback.RebackServices;
  31. import com.iamberry.wechat.face.task.WechatTaskService;
  32. import com.iamberry.wechat.service.StaticCacheMemory;
  33. import com.iamberry.wechat.tools.DateTimeUtil;
  34. import com.iamberry.wechat.tools.ResultInfo;
  35. import com.iamberry.wechat.utils.SendMessageUtil;
  36. import com.iamberry.zk.SpringContextHolder;
  37. /**
  38. * @author:何秀刚
  39. * @description: MQ模拟实现,每天最多处理1382400条
  40. * @createDate:2016年5月25日
  41. */
  42. @Component(value="mqTask")
  43. @Lazy(false)
  44. public class MQTask implements InitializingBean {
  45. @Autowired
  46. private MQServiceProxy mQSerivce;
  47. @Autowired
  48. private RatFWLogger logger;
  49. @Autowired
  50. private WechatTaskService wechatTaskService;
  51. @Autowired
  52. private RebackServices rebackServices;
  53. @Autowired
  54. private ProbationShopOrderService probationShopOrderService;
  55. @Autowired
  56. private SendMessageUtil sendMessageUtil;
  57. @Autowired
  58. private CodeService codeService;
  59. @Autowired
  60. private SystemService systemService;
  61. private Lock lock = new ReentrantLock();
  62. @SuppressWarnings(value = {"unchecked", "rawtypes"})
  63. @Scheduled(cron = "0/30 * * * * ?")
  64. public void sendOrderService() {
  65. logger.info("------------task start-----------");
  66. // If the timer has not stopped, then the next time the timer can not start.
  67. lock.lock();
  68. try {
  69. // step 1, Gets the message that is not executed in the database.
  70. List<MQMessage> messages = mQSerivce.selectWaitHandlerMessage();
  71. if (messages == null || messages.size() <= 0) {
  72. return;
  73. }
  74. // step 2, Handle 10 messages at a time.
  75. for (MQMessage mqMessage : messages) {
  76. try {
  77. Object object = SpringContextHolder.getBean(mqMessage.getServiceHandlerObjectName());
  78. Class classes = object.getClass();
  79. Method method = classes.getMethod(mqMessage.getServiceHandlerMethodName(), MQMessage.class);
  80. if (method == null) {
  81. mqMessage.setServiceErrorMessage("Method is not defined:" + mqMessage.getServiceHandlerMethodName());
  82. mqMessage.setServiceStatus(2);
  83. continue;
  84. }
  85. // invoke Method.
  86. Object resultObject = method.invoke(object, mqMessage);
  87. if (!(resultObject instanceof Boolean)) {
  88. mqMessage.setServiceStatus(2);
  89. mqMessage.setServiceErrorMessage(mqMessage.getServiceHandlerObjectName() + "." + mqMessage.getServiceHandlerMethodName() + "()Return value is not type Boolean");
  90. continue;
  91. }
  92. if (!(Boolean) resultObject) {
  93. mqMessage.setServiceErrorMessage("invoke Method Error");
  94. mqMessage.setServiceStatus(2);
  95. continue;
  96. }
  97. mqMessage.setServiceStatus(1);
  98. continue;
  99. } catch (Exception e) {
  100. // TODO: handle exception
  101. mqMessage.setServiceErrorMessage(e.getMessage());
  102. mqMessage.setServiceStatus(2);
  103. continue;
  104. } finally {
  105. mqMessage.setServiceIsSend(1);
  106. }
  107. }
  108. // step 3, update Message status and serviceIsSend.
  109. mQSerivce.updateBatchMessage(messages);
  110. // step 4, is next ?
  111. if (messages.size() >= 10) {
  112. StaticCacheMemory.isStartTask = true;
  113. this.sendOrderService();
  114. }
  115. } catch (Exception e) {
  116. e.printStackTrace();
  117. logger.error(e, "MQ Task Error:" + e.getMessage());
  118. } finally {
  119. // reset lock
  120. lock.unlock();
  121. }
  122. logger.info("------------task invoke success-----------");
  123. }
  124. /**
  125. * 订阅消息
  126. */
  127. public void subscribeMessageQueue() {
  128. }
  129. // 强迫线程可见
  130. private volatile boolean oldState = true;
  131. /**
  132. * 微信定期任务奖励-上个月的定时器
  133. */
  134. @SuppressWarnings("deprecation")
  135. // @Scheduled(cron = "0/59 * * * * ?")
  136. public void oldMonthTask() {
  137. if (!oldState) {
  138. logger.info("线程正忙...");
  139. return;
  140. }
  141. oldState = false;
  142. try {
  143. // 判断上一个月的数据是否奖励完成
  144. logger.info("上一个月的定时奖励任务奖励...");
  145. // 当前时间
  146. Date nowDate = new Date();
  147. // 回到上一个月
  148. nowDate.setMonth(nowDate.getMonth() - 1);
  149. // 上一个月的结束时间 类似 :2016-02-29 23:59:59
  150. Date oldMonthLastDay = DateTimeUtil.getLastDayOfYear(nowDate);
  151. // 上一个月的开始时间 类似 :2016-02-01 00:00:00
  152. Date oldMonthFirstDay = DateTimeUtil.getFirstDayOfYear(nowDate);
  153. List<WechatTask> tasks = wechatTaskService.getRunTask(oldMonthFirstDay, oldMonthLastDay);
  154. if (tasks == null || tasks.size() == 0) {
  155. logger.info("暂时没有上一个月的任务奖励...");
  156. oldState = true;
  157. return;
  158. }
  159. // 获取在上一个月销售达到X台的数据
  160. for (WechatTask wechatTask : tasks) {
  161. List<TaskModel> models = rebackServices.getByDate(wechatTask.getTaskProductType(), oldMonthFirstDay, oldMonthLastDay, wechatTask.getTaskSalesNum());
  162. if (models == null || models.size() == 0) {
  163. logger.info("暂时没有人完成上一个月的任务" + wechatTask.getTaskId() + "奖励...");
  164. continue;
  165. }
  166. // 达到的用户是否已经领取奖励
  167. for (TaskModel taskModel : models) {
  168. WechatTaskLogs logs = new WechatTaskLogs();
  169. logs.setLogsTaskId(wechatTask.getTaskId());
  170. logs.setLogsUserOpenId(taskModel.getOpenID());
  171. Integer logsID = wechatTaskService.getByTaskIdAndOpenId(logs);
  172. if (logsID != null) {
  173. // 如果当前用户已经领取了奖励,那么跳过
  174. continue;
  175. }
  176. // 若用没有领取,那么奖励
  177. try {
  178. wechatTaskService.handlerTask(taskModel.getOpenID(), wechatTask.getTaskMoney(), wechatTask.getTaskId(), wechatTask.getTaskProductType());
  179. } catch (Exception e) {
  180. logger.error(this, "对:" + taskModel.getOpenID() + ",奖励失败!error:" + e.getMessage());
  181. }
  182. }
  183. }
  184. } catch (Exception e) {
  185. logger.error(this, "发生异常,奖励失败!error:" + e.getMessage());
  186. } finally {
  187. // 归还锁
  188. oldState = true;
  189. }
  190. }
  191. // 强迫线程可见
  192. private volatile boolean nowState = true;
  193. /**
  194. * 微信定期任务奖励-本月的定时器
  195. */
  196. // @Scheduled(cron = "0/59 * * * * ?")
  197. public void nowMonthTask() {
  198. if (!nowState) {
  199. logger.info("线程正忙...");
  200. return;
  201. }
  202. nowState = false;
  203. try {
  204. // 判断本月是否有奖励数据
  205. logger.info("本月的定时奖励任务奖励...");
  206. // 当前时间
  207. Date nowDate = new Date();
  208. // 上一个月的结束时间 类似 :2016-02-29 23:59:59
  209. Date oldMonthLastDay = DateTimeUtil.getLastDayOfYear(nowDate);
  210. // 上一个月的开始时间 类似 :2016-02-01 00:00:00
  211. Date oldMonthFirstDay = DateTimeUtil.getFirstDayOfYear(nowDate);
  212. List<WechatTask> tasks = wechatTaskService.getRunTask(oldMonthFirstDay, oldMonthLastDay);
  213. if (tasks == null || tasks.size() == 0) {
  214. logger.info("暂时没有本月的任务奖励...");
  215. nowState = true;
  216. return;
  217. }
  218. // 获取在上一个月销售达到X台的数据
  219. for (WechatTask wechatTask : tasks) {
  220. List<TaskModel> models = rebackServices.getByDate(wechatTask.getTaskProductType(), oldMonthFirstDay, oldMonthLastDay, wechatTask.getTaskSalesNum());
  221. if (models == null || models.size() == 0) {
  222. logger.info("暂时没有人完成本月的任务" + wechatTask.getTaskId() + "奖励...");
  223. continue;
  224. }
  225. // 达到的用户是否已经领取奖励
  226. for (TaskModel taskModel : models) {
  227. WechatTaskLogs logs = new WechatTaskLogs();
  228. logs.setLogsTaskId(wechatTask.getTaskId());
  229. logs.setLogsUserOpenId(taskModel.getOpenID());
  230. Integer logsID = wechatTaskService.getByTaskIdAndOpenId(logs);
  231. if (logsID != null) {
  232. // 如果当前用户已经领取了奖励,那么跳过
  233. continue;
  234. }
  235. // 若用没有领取,那么奖励
  236. try {
  237. wechatTaskService.handlerTask(taskModel.getOpenID(), wechatTask.getTaskMoney(), wechatTask.getTaskId(), wechatTask.getTaskProductType());
  238. } catch (Exception e) {
  239. logger.error(this, "对:" + taskModel.getOpenID() + ",奖励失败!error:" + e.getMessage());
  240. }
  241. }
  242. }
  243. } catch (Exception e) {
  244. // TODO: handle exception
  245. logger.error(this, "发生异常:" + e.getMessage());
  246. } finally {
  247. nowState = true;
  248. }
  249. }
  250. public void afterPropertiesSet() throws Exception {
  251. }
  252. /**
  253. * 试用订单短信推送服务(满足XX天奖励XX奖励)
  254. */
  255. // @Scheduled(cron = "0 0 */1 * * ?")//每小时执行一次
  256. public void probationRewardPush(){
  257. logger.info("------------执行推送满足奖励条件用户 start-----------");
  258. //查询奖励推送规则
  259. List<ProbationAwardRelu> reluList = probationShopOrderService.selectProbationAwardRelu();
  260. SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  261. if(reluList!=null && !reluList.isEmpty()){
  262. for(ProbationAwardRelu awardRelu : reluList){
  263. //查询所有待支付订单
  264. List<ProbationOrderDto> orderDtoList = probationShopOrderService.selectProbationStartOrder();
  265. if(orderDtoList!=null && !orderDtoList.isEmpty()){
  266. for(ProbationOrderDto orderDto : orderDtoList ){
  267. //计算总试用期限(小时)
  268. Long totalDate = (orderDto.getProbationEndTime().getTime() - orderDto.getProbationStartTime().getTime()) / 1000 / 60 / 60;
  269. //计算试用剩余期限(小时)
  270. Long remainingDate = (orderDto.getProbationEndTime().getTime() - new Date().getTime()) / 1000 / 60 / 60;
  271. //暂停时间
  272. /*Long pauseDate = 0l;
  273. if(orderDto.getProbationPauseStartTime() != null && orderDto.getProbationPauseEndTime() != null){
  274. pauseDate = (orderDto.getProbationPauseEndTime().getTime() - orderDto.getProbationPauseStartTime().getTime()) / 1000 / 60 / 60;
  275. }*/
  276. //实际试用时间 (总试用时间 - 试用剩余期限 - 暂停时间)
  277. Long actualProbationDate = totalDate - remainingDate - orderDto.getProbationPauseTotalLength() / 1000 / 60 / 60;
  278. //奖励奖品天数转换为小时
  279. Long awardDate = Long.valueOf(awardRelu.getAwardNum()) * 24;
  280. //奖励条件时间 - 实际试用时间 (大于23小时或者小于26小时则推送)
  281. if( awardDate -actualProbationDate > 23 && awardDate -actualProbationDate < 26 ){
  282. ProbationNewLogs probationNewLogs = probationShopOrderService.selectProbationNewLogsByOrderId(orderDto.getProbationOrderid());
  283. //添加日志信息
  284. ProbationNewLogs newLogs = new ProbationNewLogs();
  285. newLogs.setLogsPorderid(orderDto.getProbationOrderid());
  286. newLogs.setLogsTime(awardRelu.getAwardNum());
  287. newLogs.setLogsCreateTime(new Date());
  288. if(probationNewLogs == null){
  289. //推送操作
  290. sendMessageUtil.probationPush(awardRelu.getAwardInfo(),
  291. orderDto.getProductName(),
  292. orderDto.getProbationOrderid(),
  293. formatter.format(orderDto.getProbationStartTime()),
  294. formatter.format(
  295. new Date(
  296. orderDto.getProbationStartTime().getTime() +
  297. (awardRelu.getAwardNum() * 24 + orderDto.getProbationPauseTotalLength()) * 60 * 60 * 1000)),
  298. awardRelu.getAwardInfoTwo(),
  299. orderDto.getProbationOpenid(),
  300. ResultInfo.TRY_ORDER_DETAIL+"?orderId="+orderDto.getProbationOrderid());
  301. //添加日志
  302. probationShopOrderService.insertProbationLogs(newLogs);
  303. //发送短信给收货人
  304. if(StringUtils.isNotEmpty(orderDto.getProbationReceiveTel())){
  305. codeService.sendTextToUser(orderDto.getProbationReceiveTel(), 6);
  306. }
  307. }else if(probationNewLogs.getLogsTime() < awardRelu.getAwardNum()){
  308. //执行推送
  309. sendMessageUtil.probationPush(
  310. awardRelu.getAwardInfo(),
  311. orderDto.getProductName(),
  312. orderDto.getProbationOrderid(),
  313. formatter.format(orderDto.getProbationStartTime()),
  314. formatter.format(
  315. new Date(orderDto.getProbationStartTime().getTime()
  316. + (awardRelu.getAwardNum() * 24 + orderDto.getProbationPauseTotalLength()) * 60 * 60 * 1000)),
  317. awardRelu.getAwardInfoTwo(),
  318. orderDto.getProbationOpenid(),
  319. ResultInfo.TRY_ORDER_DETAIL+"?orderId="+orderDto.getProbationOrderid());
  320. //修改日志
  321. probationShopOrderService.updatePauseLogsDate(newLogs);
  322. //发送短信给收货人
  323. if(StringUtils.isNotEmpty(orderDto.getProbationReceiveTel())){
  324. codeService.sendTextToUser(orderDto.getProbationReceiveTel(), awardRelu.getAwardNum().intValue());
  325. }
  326. }
  327. }
  328. }
  329. }
  330. }
  331. }
  332. }
  333. @SuppressWarnings(value = {"unchecked", "rawtypes"})
  334. @Scheduled(cron = "0 0/1 * * * ?")
  335. public void selectEfastOrder() {
  336. try{
  337. //获取cookie值
  338. ShopSystemRule rule = systemService.selectOneShopRuleById(244);
  339. String phone = "13590159563";
  340. String url = NameUtils.getConfig("efast_order_url") + phone;
  341. Map<String,String> params = new HashMap<String,String>();
  342. String cookie = rule.getRuleDesc();
  343. //查询百胜订单信息
  344. String str = HttpClient431Util.doGet(params,url,null,null,cookie);
  345. } catch(Exception e) {
  346. System.out.println("------------请求百胜接口失败-------------");
  347. }
  348. }
  349. }