package com.iamberry.wechat.handles.mq; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import com.iamberry.app.tool.log.RatFWLogger; import com.iamberry.wechat.core.entity.mq.MQMessage; import com.iamberry.wechat.face.mq.MQSerivce; /** * @company 深圳爱贝源科技有限公司 * @website www.iamberry.com * @author 献 * @tel 18271840547 * @date 2016年11月3日 * @explain 定时器,定时处理队列消息 */ public class MQTimer extends TimerTask { // 队列 public static BlockingQueue messageStaticQueue = new ArrayBlockingQueue(500); private MQSerivce mQSerivce; private RatFWLogger ratFWLogger; public MQTimer(MQSerivce mqSerivce, RatFWLogger logger) { this.mQSerivce = mqSerivce; this.ratFWLogger = logger; } @Override public void run() { ratFWLogger.info("启动订阅程序..."); // 订阅,但是一旦发生异常,就会导致程序异常退出,可能造成数据丢失,需要再次使用 try { while (true) { // 如果空闲,阻塞,让出系统资源 MQMessage mqMessage = messageStaticQueue.take(); ratFWLogger.info("收到订阅请求..."); // 远程调用 Service boolean flag = mQSerivce.updateMessageOne(mqMessage); if (flag) { ratFWLogger.info(mqMessage + " execution true"); } else { ratFWLogger.error(this, mqMessage + " execution error"); } } } catch (InterruptedException e) { ratFWLogger.error(this, e.getMessage()); // 如果异常,当做继续启动调用,异常线程退出被回收,新线程继续 new Timer().schedule(new MQTimer(mQSerivce, ratFWLogger), 5000); } } }