MQTimer.java 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package com.iamberry.wechat.handles.mq;
  2. import java.util.Timer;
  3. import java.util.TimerTask;
  4. import java.util.concurrent.ArrayBlockingQueue;
  5. import java.util.concurrent.BlockingQueue;
  6. import com.iamberry.app.tool.log.RatFWLogger;
  7. import com.iamberry.wechat.core.entity.mq.MQMessage;
  8. import com.iamberry.wechat.face.mq.MQSerivce;
  9. /**
  10. * @company 深圳爱贝源科技有限公司
  11. * @website www.iamberry.com
  12. * @author 献
  13. * @tel 18271840547
  14. * @date 2016年11月3日
  15. * @explain 定时器,定时处理队列消息
  16. */
  17. public class MQTimer extends TimerTask {
  18. // 队列
  19. public static BlockingQueue<MQMessage> messageStaticQueue = new ArrayBlockingQueue<MQMessage>(500);
  20. private MQSerivce mQSerivce;
  21. private RatFWLogger ratFWLogger;
  22. public MQTimer(MQSerivce mqSerivce, RatFWLogger logger) {
  23. this.mQSerivce = mqSerivce;
  24. this.ratFWLogger = logger;
  25. }
  26. @Override
  27. public void run() {
  28. ratFWLogger.info("启动订阅程序...");
  29. // 订阅,但是一旦发生异常,就会导致程序异常退出,可能造成数据丢失,需要再次使用
  30. try {
  31. while (true) {
  32. // 如果空闲,阻塞,让出系统资源
  33. MQMessage mqMessage = messageStaticQueue.take();
  34. ratFWLogger.info("收到订阅请求...");
  35. // 远程调用 Service
  36. boolean flag = mQSerivce.updateMessageOne(mqMessage);
  37. if (flag) {
  38. ratFWLogger.info(mqMessage + " execution true");
  39. } else {
  40. ratFWLogger.error(this, mqMessage + " execution error");
  41. }
  42. }
  43. } catch (InterruptedException e) {
  44. ratFWLogger.error(this, e.getMessage());
  45. // 如果异常,当做继续启动调用,异常线程退出被回收,新线程继续
  46. new Timer().schedule(new MQTimer(mQSerivce, ratFWLogger), 5000);
  47. }
  48. }
  49. }