فهرست منبع

项目内容改进原有MQ实现方式,目前改用Spring事件+异步实现。

xian 5 سال پیش
والد
کامیت
7607fc5b6e
17فایلهای تغییر یافته به همراه110 افزوده شده و 96 حذف شده
  1. 4 5
      watero-common-tool/src/main/java/com/iamberry/app/tool/log/RatFWLogger.java
  2. 1 0
      watero-common-tool/src/main/java/com/iamberry/zk/SpringContextHolder.java
  3. 5 1
      watero-wechat-interface/src/main/java/com/iamberry/wechat/face/mq/MQSerivce.java
  4. 11 16
      watero-wechat-service/src/main/java/com/iamberry/wechat/service/mq/MQSerivceImpl.java
  5. 2 1
      watero-wechat-web/src/main/java/com/iamberry/wechat/handles/admin/AdminOrderHandlers.java
  6. 5 0
      watero-wechat-web/src/main/java/com/iamberry/wechat/handles/admin/LoginController.java
  7. 2 1
      watero-wechat-web/src/main/java/com/iamberry/wechat/handles/agent/AgentHandler.java
  8. 2 1
      watero-wechat-web/src/main/java/com/iamberry/wechat/handles/cart/ProbationOrderHandler.java
  9. 8 8
      watero-wechat-web/src/main/java/com/iamberry/wechat/handles/mq/MQServiceProxy.java
  10. 4 2
      watero-wechat-web/src/main/java/com/iamberry/wechat/handles/mq/MQTask.java
  11. 49 33
      watero-wechat-web/src/main/java/com/iamberry/wechat/handles/mq/MQTimer.java
  12. 2 1
      watero-wechat-web/src/main/java/com/iamberry/wechat/handles/order/OrderHandler.java
  13. 2 1
      watero-wechat-web/src/main/java/com/iamberry/wechat/handles/pay/ResponseWechatPayHandler.java
  14. 2 1
      watero-wechat-web/src/main/java/com/iamberry/wechat/handles/wechat/template/Template.java
  15. 4 1
      watero-wechat-web/src/main/java/com/iamberry/wechat/listener/MessageQueueListener.java
  16. 5 22
      watero-wechat-web/src/main/resources/ioc.xml
  17. 2 2
      watero-wechat-web/src/main/webapp/WEB-INF/web.xml

+ 4 - 5
watero-common-tool/src/main/java/com/iamberry/app/tool/log/RatFWLogger.java

@@ -1,17 +1,18 @@
 package com.iamberry.app.tool.log;
 
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 /**
  * desc:平台日志
  */
-@Component
+@Component("ratFWLogger")
 public class RatFWLogger {
 	
 	private static String delimiter = " >> ";
 	
-	private Logger logger = null;
+	private Logger logger = LoggerFactory.getLogger(RatFWLogger.class);
 	
 	private String appId;
 
@@ -21,7 +22,6 @@ public class RatFWLogger {
 	
 	public RatFWLogger(String appId) {
 		this.appId = appId;
-		logger = Logger.getLogger(appId);
 	}
 	
 	/** 通过Spring IoC注入 
@@ -29,7 +29,6 @@ public class RatFWLogger {
 	 **/
 	public void setAppId(String appId) {
 		this.appId = appId;
-		logger = Logger.getLogger(appId);		
 	}	
 	
 	public void info(String msg) {

+ 1 - 0
watero-common-tool/src/main/java/com/iamberry/zk/SpringContextHolder.java

@@ -15,6 +15,7 @@ public class SpringContextHolder implements ApplicationContextAware {
     /**
      * 实现ApplicationContextAware接口的context注入函数, 将其存入静态变量.
      */
+    @Override
     public void setApplicationContext(ApplicationContext applicationContext) {
         SpringContextHolder.applicationContext = applicationContext;
     }

+ 5 - 1
watero-wechat-interface/src/main/java/com/iamberry/wechat/face/mq/MQSerivce.java

@@ -15,6 +15,7 @@ public interface MQSerivce {
 	 * select wait handler message.
 	 * @return
 	 */
+	@Deprecated
 	public List<MQMessage> selectWaitHandlerMessage();
 
 	/**
@@ -22,6 +23,7 @@ public interface MQSerivce {
 	 * @param messages
 	 * @return
 	 */
+	@Deprecated
 	public Integer updateBatchMessage(List<MQMessage> messages);
 	
 	/**
@@ -29,13 +31,14 @@ public interface MQSerivce {
 	 * @param message
 	 * @return
 	 */
-	public boolean insertMQMessage(MQMessage message);
+	public void insertMQMessage(MQMessage message);
 	
 	/**
 	 * update One Message
 	 * @param message
 	 * @return
 	 */
+	@Deprecated
 	public boolean updateMessageOne(MQMessage message);
 	
 	/**
@@ -43,5 +46,6 @@ public interface MQSerivce {
 	 * @param message
 	 * @return
 	 */
+	@Deprecated
 	public Integer insertOneMQMessageContainError(MQMessage message);
 }

+ 11 - 16
watero-wechat-service/src/main/java/com/iamberry/wechat/service/mq/MQSerivceImpl.java

@@ -19,7 +19,8 @@ import com.iamberry.zk.SpringContextHolder;
  * @description: 具体实现
  * @createDate:2016年5月25日
  */
-@Service
+//@Service
+@Deprecated
 public class MQSerivceImpl implements MQSerivce {
 	
 	@Autowired
@@ -46,19 +47,11 @@ public class MQSerivceImpl implements MQSerivce {
 	}
 
 	@Override
-	public boolean insertMQMessage(MQMessage message) {
+	@Deprecated
+	public void insertMQMessage(MQMessage message) {
 		// TODO Auto-generated method stub
-		
 		// 首先进入队列, 如果存储队列装满,同步数据库
-		boolean flag = false;
-		if (!flag) {
-			// 如果serviceToMessage、serivceOtherMessage、serviceStatus相同时,不允许发送第二次请求
-			/*if (mQDao.judgeMessageIsExist(message) != null) {
-				return true;
-			}*/
-			return mQDao.insertOneMQMessage(message) >= 1 ? true : false;
-		}
-		return flag;
+		mQDao.insertOneMQMessage(message);
 	}
 
 	@Override
@@ -71,7 +64,7 @@ public class MQSerivceImpl implements MQSerivce {
 			@SuppressWarnings("unchecked")
 			
 			// get Bean to Method.
-			Method method = classes.getMethod(mqMessage.getServiceHandlerMethodName(), MQMessage.class);;
+			Method method = classes.getMethod(mqMessage.getServiceHandlerMethodName(), MQMessage.class);
 
 			if (method == null) {
 				mqMessage.setServiceErrorMessage("Method is not defined:" + mqMessage.getServiceHandlerMethodName());
@@ -82,8 +75,10 @@ public class MQSerivceImpl implements MQSerivce {
 				Object resultObject = null;
 				DataSourceTransactionManager transactionManager = (DataSourceTransactionManager) SpringContextHolder.getBean("transactionManager");
 				DefaultTransactionDefinition def = new DefaultTransactionDefinition();
-				def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务
-				TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
+				// 事物隔离级别,开启新事务
+				def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
+				// 获得事务状态
+				TransactionStatus status = transactionManager.getTransaction(def);
 				try {
 					//逻辑代码
 					resultObject = method.invoke(object, mqMessage);
@@ -109,7 +104,7 @@ public class MQSerivceImpl implements MQSerivce {
 			mqMessage.setServiceIsSend(1);
 		}
 		// step 3, update Message status and serviceIsSend.
-		return mQDao.insertOneMQMessageContainError(mqMessage) >= 1 ? true : false;
+		return mQDao.insertOneMQMessageContainError(mqMessage) >= 1;
 	}
 
 	@Override

+ 2 - 1
watero-wechat-web/src/main/java/com/iamberry/wechat/handles/admin/AdminOrderHandlers.java

@@ -11,6 +11,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import com.iamberry.wechat.core.entity.cart.ShopOrderItemDto;
 import com.iamberry.wechat.face.cart.CartService;
+import com.iamberry.wechat.face.mq.MQSerivce;
 import com.iamberry.wechat.tools.ObjectExcelView;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -88,7 +89,7 @@ public class AdminOrderHandlers {
 	private RatFWLogger logger;
 	
 	@Autowired
-	private MQServiceProxy mQservice;
+	private MQSerivce mQservice;
 	
 	@Autowired
 	private RebackServices rebackServices;

+ 5 - 0
watero-wechat-web/src/main/java/com/iamberry/wechat/handles/admin/LoginController.java

@@ -1,6 +1,8 @@
 package com.iamberry.wechat.handles.admin;
 
 import com.iamberry.sys.Admin;
+import com.iamberry.wechat.core.entity.mq.MQMessage;
+import com.iamberry.wechat.face.mq.MQSerivce;
 import com.iamberry.wechat.face.sys.SysService;
 import com.iamberry.wechat.realm.IamberryRealm;
 import com.iamberry.wechat.tools.ResponseJson;
@@ -45,6 +47,9 @@ public class LoginController {
     @Autowired
     private IamberryRealm iamberryRealm;
 
+    @Autowired
+    private MQSerivce mQSerivce;
+
     @RequestMapping("/_login")
     public ModelAndView loginUI(HttpServletRequest request) {
         // 判断用户是否登录,如果已经登录,则跳转

+ 2 - 1
watero-wechat-web/src/main/java/com/iamberry/wechat/handles/agent/AgentHandler.java

@@ -9,6 +9,7 @@ import javax.servlet.http.HttpSession;
 import com.iamberry.wechat.core.entity.activity.ActivityDate;
 import com.iamberry.wechat.core.entity.coupon.CouponItemDto;
 import com.iamberry.wechat.face.coupon.CouponItemService;
+import com.iamberry.wechat.face.mq.MQSerivce;
 import com.iamberry.wechat.face.order.CodeService;
 import com.iamberry.wechat.handles.thanksgiving.ThanksGivingHandler;
 import com.iamberry.wechat.service.ActivityUtil;
@@ -56,7 +57,7 @@ public class AgentHandler {
 	@Autowired
 	private CartService cartService;
 	@Autowired
-	private MQServiceProxy mQservice;
+	private MQSerivce mQservice;
 	@Autowired
 	private ActivityUtil activityUtil;
 	@Autowired

+ 2 - 1
watero-wechat-web/src/main/java/com/iamberry/wechat/handles/cart/ProbationOrderHandler.java

@@ -7,6 +7,7 @@ import java.util.Map;
 
 import javax.servlet.http.HttpServletRequest;
 
+import com.iamberry.wechat.face.mq.MQSerivce;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -43,7 +44,7 @@ public class ProbationOrderHandler {
 	@Autowired
 	private SendMessageUtil sendMessageUtil;
 	@Autowired
-	private MQServiceProxy mQservice;
+	private MQSerivce mQservice;
 	@Autowired
 	private PlaceInfoService placeInfoService;
 	@Autowired

+ 8 - 8
watero-wechat-web/src/main/java/com/iamberry/wechat/handles/mq/MQServiceProxy.java

@@ -2,6 +2,9 @@ package com.iamberry.wechat.handles.mq;
 
 import java.util.List;
 
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import com.iamberry.wechat.core.entity.mq.MQMessage;
@@ -19,6 +22,9 @@ import com.iamberry.zk.SpringContextHolder;
 @Service
 public class MQServiceProxy implements MQSerivce {
 
+	@Autowired
+	private ApplicationContext context;
+
 	@Override
 	public List<MQMessage> selectWaitHandlerMessage() {
 		// TODO Auto-generated method stub
@@ -34,14 +40,8 @@ public class MQServiceProxy implements MQSerivce {
 	}
 
 	@Override
-	public boolean insertMQMessage(MQMessage message) {
-		// TODO Auto-generated method stub
-		boolean flag = MQTimer.messageStaticQueue.offer(message);;
-		if (!flag) {
-			MQSerivce mqSerivce = SpringContextHolder.getBean("MQSerivceImpl");
-			return mqSerivce.insertMQMessage(message);
-		}
-		return false;
+	public void insertMQMessage(MQMessage message) {
+		context.publishEvent(message);
 	}
 
 	@Override

+ 4 - 2
watero-wechat-web/src/main/java/com/iamberry/wechat/handles/mq/MQTask.java

@@ -17,6 +17,7 @@ import com.iamberry.wechat.face.coupon.CouponItemService;
 import com.iamberry.wechat.face.coupon.CouponTypeService;
 import com.iamberry.wechat.face.ism.IntegralLogService;
 import com.iamberry.wechat.face.member.MemberService;
+import com.iamberry.wechat.face.mq.MQSerivce;
 import com.iamberry.wechat.service.ism.IntegralLogServiceImpl;
 import com.iamberry.wechat.service.mapper.MemberMapper;
 import com.iamberry.wechat.tools.*;
@@ -54,7 +55,7 @@ import com.iamberry.zk.SpringContextHolder;
 public class MQTask implements InitializingBean {
 	
 	@Autowired
-	private MQServiceProxy mQSerivce;
+	private MQSerivce mQSerivce;
 	@Autowired
 	private RatFWLogger logger;
 	@Autowired
@@ -79,7 +80,8 @@ public class MQTask implements InitializingBean {
 	private Lock lock = new ReentrantLock();
 
 	@SuppressWarnings(value = {"unchecked", "rawtypes"})
-	@Scheduled(cron = "0/30 * * * * ?")
+	@Deprecated
+//	@Scheduled(cron = "0/30 * * * * ?")
 	public void sendOrderService() {
 		logger.info("------------task start-----------");
 		// If the timer has not stopped, then the next time the timer can not start.

+ 49 - 33
watero-wechat-web/src/main/java/com/iamberry/wechat/handles/mq/MQTimer.java

@@ -1,48 +1,64 @@
 package com.iamberry.wechat.handles.mq;
 
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import com.iamberry.app.tool.log.RatFWLogger;
 import com.iamberry.wechat.core.entity.mq.MQMessage;
 import com.iamberry.wechat.face.mq.MQSerivce;
+import com.iamberry.zk.SpringContextHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.context.event.EventListener;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.DefaultTransactionDefinition;
 
-public class MQTimer extends TimerTask {
+import java.lang.reflect.Method;
 
-	// 队列
-	public static BlockingQueue<MQMessage> messageStaticQueue = new ArrayBlockingQueue<MQMessage>(500);
-	
-	private MQSerivce mQSerivce;
-	
-	private RatFWLogger ratFWLogger;
-	
-	public MQTimer(MQSerivce mqSerivce, RatFWLogger logger) {
-		this.mQSerivce = mqSerivce;
-		this.ratFWLogger = logger;
-	}
+@Component
+@EnableAsync
+public class MQTimer {
+
+	@Autowired
+	private DataSourceTransactionManager transactionManager;
 
-	@Override
-	public void run() {
-		ratFWLogger.info("启动订阅程序...");
+	private static final Logger LOGGER = LoggerFactory.getLogger(MQTimer.class);
+
+	@Async
+	@EventListener
+	public void run(MQMessage mqMessage) {
 		// 订阅
 		try {
-			while (true) {
-				// 如果空闲,阻塞,让出系统资源
-				MQMessage mqMessage = messageStaticQueue.take();
-				ratFWLogger.info("收到订阅请求...");
-				// 远程调用 Service
-				boolean flag = mQSerivce.updateMessageOne(mqMessage);
-				if (flag) {
-					ratFWLogger.info(mqMessage + "   execution true");
-				} else {
-					ratFWLogger.error(this, mqMessage + "   execution error");
+			LOGGER.info("收到订阅请求...");
+			// 远程调用 Service
+			Object object = SpringContextHolder.getBean(mqMessage.getServiceHandlerObjectName());
+			if (object == null) {
+				throw new RuntimeException(mqMessage.getServiceHandlerObjectName() + "{} 对象不存在,请设置");
+			}
+			Class classes = object.getClass();
+			// 获取方法
+			Method method = classes.getMethod(mqMessage.getServiceHandlerMethodName(), MQMessage.class);
+			if (method == null) {
+				throw new RuntimeException(mqMessage.getServiceHandlerMethodName() + "{} 方法不存在,请设置");
+			} else {
+				// 执行方法,对方法启动事务
+				DefaultTransactionDefinition def = new DefaultTransactionDefinition();
+				def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
+				TransactionStatus status = transactionManager.getTransaction(def);
+				try {
+					//逻辑代码
+					method.invoke(object, mqMessage);
+					transactionManager.commit(status);
+				} catch (Exception e) {
+					transactionManager.rollback(status);
 				}
 			}
 		} catch (Exception e) {
-			ratFWLogger.error(this, e.getMessage());
-			new Timer().schedule(new MQTimer(mQSerivce, ratFWLogger), 1000);
+			LOGGER.error("{}", e.getMessage());
 		}
 	}
 

+ 2 - 1
watero-wechat-web/src/main/java/com/iamberry/wechat/handles/order/OrderHandler.java

@@ -27,6 +27,7 @@ import com.iamberry.wechat.face.home.HomeService;
 import com.iamberry.wechat.face.ism.IntegralLogService;
 import com.iamberry.wechat.face.member.CashLogService;
 import com.iamberry.wechat.face.member.MemberService;
+import com.iamberry.wechat.face.mq.MQSerivce;
 import com.iamberry.wechat.face.pay.PayService;
 import com.iamberry.wechat.face.porduct.ProductColorService;
 import com.iamberry.wechat.face.qrcode.TemporaryQrcodeService;
@@ -92,7 +93,7 @@ public class OrderHandler {
 	@Autowired
 	private RebackServices rebackServices;
 	@Autowired
-	private MQServiceProxy mQservice;
+	private MQSerivce mQservice;
 	@Autowired
 	private ProductInfoService productInfoService;
 	@Autowired

+ 2 - 1
watero-wechat-web/src/main/java/com/iamberry/wechat/handles/pay/ResponseWechatPayHandler.java

@@ -17,6 +17,7 @@ import com.iamberry.wechat.core.entity.eo.Promotions;
 import com.iamberry.wechat.core.entity.ism.IntegralConfig;
 import com.iamberry.wechat.core.entity.rent.ReserveOrder;
 import com.iamberry.wechat.face.ism.IntegralLogService;
+import com.iamberry.wechat.face.mq.MQSerivce;
 import com.iamberry.wechat.face.order.AdminOrderService;
 import com.iamberry.wechat.face.reserve.ReserveService;
 import com.iamberry.wechat.service.ActivityUtil;
@@ -98,7 +99,7 @@ public class ResponseWechatPayHandler {
 	private MemberService memberService;
 	
 	@Autowired
-	private MQServiceProxy mQservice;
+	private MQSerivce mQservice;
 
 	@Autowired
 	private ReserveService reserveService;

+ 2 - 1
watero-wechat-web/src/main/java/com/iamberry/wechat/handles/wechat/template/Template.java

@@ -2,6 +2,7 @@ package com.iamberry.wechat.handles.wechat.template;
 
 import java.util.Date;
 
+import com.iamberry.wechat.face.mq.MQSerivce;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -24,7 +25,7 @@ import com.iamberry.wechat.tools.WeixinUtil;
 public class Template {
 	
 	@Autowired
-	private MQServiceProxy mqSerivce;
+	private MQSerivce mqSerivce;
 	
 	@Autowired
 	private RatFWLogger fwLogger;

+ 4 - 1
watero-wechat-web/src/main/java/com/iamberry/wechat/listener/MessageQueueListener.java

@@ -18,15 +18,18 @@ import com.iamberry.zk.SpringContextHolder;
  * @date	2016年10月18日
  * @explain	通过监听器启动定时器,定时器内永久循环
  */
+@Deprecated
 public class MessageQueueListener implements ServletContextListener {
 
+	@Override
 	public void contextDestroyed(ServletContextEvent arg0) {
 	}
 
+	@Override
 	public void contextInitialized(ServletContextEvent arg0) {
 		MQSerivce mQSerivce = SpringContextHolder.getBean("MQServiceProxy");
 	    RatFWLogger ratFWLogger = SpringContextHolder.getBean("logger");
 	    ratFWLogger.info("订阅队列消息 监听器 启动....");
-	    new Timer().schedule(new MQTimer(mQSerivce, ratFWLogger), 5000);
+	    //new Timer().schedule(new MQTimer(mQSerivce, ratFWLogger), 5000);
 	}
 }

+ 5 - 22
watero-wechat-web/src/main/resources/ioc.xml

@@ -26,31 +26,14 @@
     
     <!-- 配置springContextHolder -->
     <bean class="com.iamberry.zk.SpringContextHolder" lazy-init="false" />
-    
-	<!-- 平台LOG配置,立即加载,拒绝懒加载 -->
-	<bean id="logger" class="com.iamberry.app.tool.log.RatFWLogger" lazy-init="false">
-		<property name="appId" value="iamberry" />
-	</bean>
 
-	<!-- 开启注解启动定时器 -->
-    <task:annotation-driven/>
-    <!-- 将30天后奖励积分的定时任务的xml引入 -->
+	<!-- 开启注解启动 -->
+    <task:annotation-driven executor="asyncExecutor" scheduler="schedulerExecutor"/>
+	<task:executor id="asyncExecutor" pool-size="20" queue-capacity="10"/>
+	<task:scheduler id="schedulerExecutor" pool-size="10"/>
 
+	<!-- DB -->
 	<import resource="classpath:db.xml"/>
-
 	<!-- Shiro -->
 	<import resource="classpath:shiro.xml"/>
-	<!-- spring监控
-    <bean id="druid-stat-interceptor" class="com.alibaba.druid.support.spring.stat.DruidStatInterceptor">
-	</bean>
-	<bean id="druid-stat-pointcut" class="org.springframework.aop.support.JdkRegexpMethodPointcut" scope="prototype">
-		<property name="patterns">
-			<list>
-				<value>com.iamberry.wechat.*</value>
-			</list>
-		</property>
-	</bean>
-	<aop:config proxy-target-class="true">
-		<aop:advisor advice-ref="druid-stat-interceptor" pointcut-ref="druid-stat-pointcut" />
-	</aop:config> -->
 </beans>

+ 2 - 2
watero-wechat-web/src/main/webapp/WEB-INF/web.xml

@@ -95,10 +95,10 @@
             <location>/view/system_view/500.jsp</location>
         </error-page>
 -->
-
+<!--
 	<listener>
 		<listener-class>com.iamberry.wechat.listener.MessageQueueListener</listener-class>
-	</listener>
+	</listener>-->
 
 	<!-- 数据库监控
 	<servlet>