123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- package com.iamberry.app.international.util.kafka;
- import java.net.InetAddress;
- import java.net.UnknownHostException;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Properties;
- import javax.security.auth.login.Configuration;
- import javax.servlet.ServletContextEvent;
- import javax.servlet.ServletContextListener;
- import net.sf.json.JSONArray;
- import net.sf.json.JSONObject;
- import org.apache.kafka.clients.CommonClientConfigs;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.ApplicationContext;
- import org.springframework.stereotype.Component;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.context.WebApplicationContext;
- import org.springframework.web.context.support.WebApplicationContextUtils;
- import com.alibaba.fastjson.JSON;
- import com.iamberry.app.face.MachineService;
- import com.iamberry.app.face.MilkService;
- public class KafkaConsumer implements ServletContextListener {
- private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
-
- public void contextInitialized(ServletContextEvent sce) {
- WebApplicationContext webApplicationContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
- MachineService machineService = webApplicationContext.getBean(MachineService.class);
- MilkService milkService = webApplicationContext.getBean(MilkService.class);
-
- String appKey = "5kkyurvvtt58bbuxueee";//填APP KEY
- String secretKey = "rhj6na6u3y6uhy6qrbb3944mg5uqqpbb";//APP SECRET
- org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = null;
- Configuration configuration = Configuration.getConfiguration();
- Configuration.setConfiguration(null);
- try {
- java.security.Security.setProperty("login.configuration.provider", "com.iamberry.app.international.util.kafka.SaslConfiguration");
- Properties props = new Properties();
- //根据不同区域填写,不同区域URL参考文档 https://docs.tuya.com/cn/cloudapi/cloud_access/#kafka
- props.put("bootstrap.servers", "kafka.cloud.tuyaus.com:8092");
- String groupId = "aby-real-group";
- props.put("group.id", groupId);
- InetAddress netAddress = InetAddress.getLocalHost();
- String clientId = "cloud_" + appKey + "_" + netAddress.getHostAddress();
- props.put("client.id", clientId);
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "10000");
- props.put("session.timeout.ms", "30000");
- props.put("max.poll.records", 1000);
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- //可以自定义JSON格式的序列化
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
- consumer.subscribe(Arrays.asList("device-cloud-out-" + appKey));
- } catch (UnknownHostException e) {
- e.printStackTrace();
- } finally {
- Configuration.setConfiguration(configuration);
- }
- final org.apache.kafka.clients.consumer.KafkaConsumer<String, String> finalKafkaConsumer = consumer;
- if (consumer != null) {
- new Thread(){
- @Override
- public void run(){
- while (true) {
- ConsumerRecords<String, String> records = finalKafkaConsumer.poll(1000);
- //ConsumerRecord<String, String> record = new ConsumerRecord<String, String>("4", 1, 1, "data", "JD/i7iF1hWmH08X54IGwjQkOy2gSEPTryEPUGbopFa3tC0lAyPx3dHN8enyKEY1zBdZt+bu5RYHnK00TvWD07S+Ryi3vHIGtWhTh7JLSVMAI+PNQeZZ87ZCh+vQaLQletGi/2n9B/pp+GLDDWFEhnl+xaLeVgcR9oGhUxBTwRmQsM1gUrTyEjFxfEhSWDM7Y+oj8NKWtJKpSS2/dv6wxNv+OIs6LCVBq+R1cLb+c0WI=");
- boolean flog = false;
- String milkPowder = "";
- Long milkTime = 0l;
- try {
- for (ConsumerRecord<String, String> record : records) {
- /*logger.info("Received message: (" + record.key() + ", " + record.value() + ") at offset "
- + record.offset() + ",decrypt data="
- + AESBase64Util.decrypt(JSONObject.fromObject(record.value()).getString("data"), secretKey.substring(8, 24)));
- // + AESBase64Util.decrypt(record.value(), secretKey.substring(8, 24)));
- */ String data = AESBase64Util.decrypt(JSONObject.fromObject(record.value()).getString("data"), secretKey.substring(8, 24));//解析后的真正数据
- JSONObject jasonObject = JSONObject.fromObject(data);
- if(jasonObject.has("dps")){
- JSONArray dps = jasonObject.getJSONArray("dps");
- for(int i = 0; i < dps.size(); i++){
- Map maps = (Map) JSON.parse(dps.getString(i));
- for (Object obj : maps.keySet()){
- if(obj.equals("DEVECE_CONTROL_MILK")){
- milkPowder = maps.get(obj).toString();
- flog = true;//如果数据中包含当前字段,表示当前推送的数据为奶粉记录
- }
- if(obj.equals("t")){
- milkTime = (Long)maps.get((obj));
- }
- }
- }
- if(flog){
- // 根据机器ID 获取对应的用户id
- String devId = jasonObject.getString("devId");
- Long userId = machineService.selectUserIDByDevId(devId);
- milkService.addRecordData(devId, milkPowder, milkTime, userId);
- flog = false;
- }
- }
- }
- } catch (Exception e) {
- logger.error("", e);
- }
- }
- }
- }.start();;
- }
- }
- @Override
- public void contextDestroyed(ServletContextEvent sce) {
- // TODO Auto-generated method stub
-
- }
- }
|