1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- package com.iamberry.wechat.handles.mq;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import com.iamberry.app.tool.log.RatFWLogger;
- import com.iamberry.wechat.core.entity.mq.MQMessage;
- import com.iamberry.wechat.face.mq.MQSerivce;
- /**
- * @company 深圳爱贝源科技有限公司
- * @website www.iamberry.com
- * @author 献
- * @tel 18271840547
- * @date 2016年11月3日
- * @explain 定时器,定时处理队列消息
- */
- public class MQTimer extends TimerTask {
- // 队列
- public static BlockingQueue<MQMessage> messageStaticQueue = new ArrayBlockingQueue<MQMessage>(500);
-
- private MQSerivce mQSerivce;
-
- private RatFWLogger ratFWLogger;
-
- public MQTimer(MQSerivce mqSerivce, RatFWLogger logger) {
- this.mQSerivce = mqSerivce;
- this.ratFWLogger = logger;
- }
- @Override
- public void run() {
- ratFWLogger.info("启动订阅程序...");
- // 订阅,但是一旦发生异常,就会导致程序异常退出,可能造成数据丢失,需要再次使用
- try {
- while (true) {
- // 如果空闲,阻塞,让出系统资源
- MQMessage mqMessage = messageStaticQueue.take();
- ratFWLogger.info("收到订阅请求...");
- // 远程调用 Service
- boolean flag = mQSerivce.updateMessageOne(mqMessage);
- if (flag) {
- ratFWLogger.info(mqMessage + " execution true");
- } else {
- ratFWLogger.error(this, mqMessage + " execution error");
- }
- }
- } catch (InterruptedException e) {
- ratFWLogger.error(this, e.getMessage());
- // 如果异常,当做继续启动调用,异常线程退出被回收,新线程继续
- new Timer().schedule(new MQTimer(mQSerivce, ratFWLogger), 5000);
- }
- }
- }
|