b3d4e4b9b583161a83cb5a6676083f8e8d948ffb.svn-base 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package com.iamberry.app.international.util.kafka;
  2. import java.net.InetAddress;
  3. import java.net.UnknownHostException;
  4. import java.util.Arrays;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. import java.util.Properties;
  8. import javax.security.auth.login.Configuration;
  9. import javax.servlet.ServletContextEvent;
  10. import javax.servlet.ServletContextListener;
  11. import net.sf.json.JSONArray;
  12. import net.sf.json.JSONObject;
  13. import org.apache.kafka.clients.CommonClientConfigs;
  14. import org.apache.kafka.clients.consumer.ConsumerRecord;
  15. import org.apache.kafka.clients.consumer.ConsumerRecords;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.context.ApplicationContext;
  20. import org.springframework.stereotype.Component;
  21. import org.springframework.web.bind.annotation.RequestMapping;
  22. import org.springframework.web.context.WebApplicationContext;
  23. import org.springframework.web.context.support.WebApplicationContextUtils;
  24. import com.alibaba.fastjson.JSON;
  25. import com.iamberry.app.face.MachineService;
  26. import com.iamberry.app.face.MilkService;
  27. public class KafkaConsumer implements ServletContextListener {
  28. private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
  29. public void contextInitialized(ServletContextEvent sce) {
  30. WebApplicationContext webApplicationContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
  31. MachineService machineService = webApplicationContext.getBean(MachineService.class);
  32. MilkService milkService = webApplicationContext.getBean(MilkService.class);
  33. String appKey = "5kkyurvvtt58bbuxueee";//填APP KEY
  34. String secretKey = "rhj6na6u3y6uhy6qrbb3944mg5uqqpbb";//APP SECRET
  35. org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = null;
  36. Configuration configuration = Configuration.getConfiguration();
  37. Configuration.setConfiguration(null);
  38. try {
  39. java.security.Security.setProperty("login.configuration.provider", "com.iamberry.app.international.util.kafka.SaslConfiguration");
  40. Properties props = new Properties();
  41. //根据不同区域填写,不同区域URL参考文档 https://docs.tuya.com/cn/cloudapi/cloud_access/#kafka
  42. props.put("bootstrap.servers", "kafka.cloud.tuyaus.com:8092");
  43. String groupId = "aby-real-group";
  44. props.put("group.id", groupId);
  45. InetAddress netAddress = InetAddress.getLocalHost();
  46. String clientId = "cloud_" + appKey + "_" + netAddress.getHostAddress();
  47. props.put("client.id", clientId);
  48. props.put("enable.auto.commit", "true");
  49. props.put("auto.commit.interval.ms", "10000");
  50. props.put("session.timeout.ms", "30000");
  51. props.put("max.poll.records", 1000);
  52. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  53. //可以自定义JSON格式的序列化
  54. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  55. props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
  56. props.put("sasl.mechanism", "PLAIN");
  57. consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
  58. consumer.subscribe(Arrays.asList("device-cloud-out-" + appKey));
  59. } catch (UnknownHostException e) {
  60. e.printStackTrace();
  61. } finally {
  62. Configuration.setConfiguration(configuration);
  63. }
  64. final org.apache.kafka.clients.consumer.KafkaConsumer<String, String> finalKafkaConsumer = consumer;
  65. if (consumer != null) {
  66. new Thread(){
  67. @Override
  68. public void run(){
  69. while (true) {
  70. ConsumerRecords<String, String> records = finalKafkaConsumer.poll(1000);
  71. //ConsumerRecord<String, String> record = new ConsumerRecord<String, String>("4", 1, 1, "data", "JD/i7iF1hWmH08X54IGwjQkOy2gSEPTryEPUGbopFa3tC0lAyPx3dHN8enyKEY1zBdZt+bu5RYHnK00TvWD07S+Ryi3vHIGtWhTh7JLSVMAI+PNQeZZ87ZCh+vQaLQletGi/2n9B/pp+GLDDWFEhnl+xaLeVgcR9oGhUxBTwRmQsM1gUrTyEjFxfEhSWDM7Y+oj8NKWtJKpSS2/dv6wxNv+OIs6LCVBq+R1cLb+c0WI=");
  72. boolean flog = false;
  73. String milkPowder = "";
  74. Long milkTime = 0l;
  75. try {
  76. for (ConsumerRecord<String, String> record : records) {
  77. /*logger.info("Received message: (" + record.key() + ", " + record.value() + ") at offset "
  78. + record.offset() + ",decrypt data="
  79. + AESBase64Util.decrypt(JSONObject.fromObject(record.value()).getString("data"), secretKey.substring(8, 24)));
  80. // + AESBase64Util.decrypt(record.value(), secretKey.substring(8, 24)));
  81. */ String data = AESBase64Util.decrypt(JSONObject.fromObject(record.value()).getString("data"), secretKey.substring(8, 24));//解析后的真正数据
  82. JSONObject jasonObject = JSONObject.fromObject(data);
  83. if(jasonObject.has("dps")){
  84. JSONArray dps = jasonObject.getJSONArray("dps");
  85. for(int i = 0; i < dps.size(); i++){
  86. Map maps = (Map) JSON.parse(dps.getString(i));
  87. for (Object obj : maps.keySet()){
  88. if(obj.equals("DEVECE_CONTROL_MILK")){
  89. milkPowder = maps.get(obj).toString();
  90. flog = true;//如果数据中包含当前字段,表示当前推送的数据为奶粉记录
  91. }
  92. if(obj.equals("t")){
  93. milkTime = (Long)maps.get((obj));
  94. }
  95. }
  96. }
  97. if(flog){
  98. // 根据机器ID 获取对应的用户id
  99. String devId = jasonObject.getString("devId");
  100. Long userId = machineService.selectUserIDByDevId(devId);
  101. milkService.addRecordData(devId, milkPowder, milkTime, userId);
  102. flog = false;
  103. }
  104. }
  105. }
  106. } catch (Exception e) {
  107. logger.error("", e);
  108. }
  109. }
  110. }
  111. }.start();;
  112. }
  113. }
  114. @Override
  115. public void contextDestroyed(ServletContextEvent sce) {
  116. // TODO Auto-generated method stub
  117. }
  118. }