1.SpringMVC整合生産者(Producer)
SpringMVC整合生産者比較簡單,我直接用一個單例對象來管理生産者,這樣保證生産者也是一個單例對象。
1.1 導入kafka的maven依賴
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5CMkJTZ5E2NxI2NhlzMmR2YxQmZhVjZ2EDOwgDNiNDMz8CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
1.2 建立單例對象管理生産者
我使用靜态内部類的方式建立單例對象,保證單例對象的線程安全。直接上代碼
public class KafkaManager { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaManager.class); private static final String BOOTSTRAP_SERVERS = Config.getMqConfig("kafka.bootstrap.servers"); public static final String TOPIC_LOGIN = "Topic_login"; public static final String TOPIC_OLD_API = "Topic_api_use"; public static final String TOPIC_COURSE_VISIT = "Topic_course_visit"; private boolean kafkaAvailable = true; private KafkaProducer producer; private KafkaManager() { this.initManger(); } private static class SingletonManager { private static final KafkaManager singletonObj = new KafkaManager(); } public static KafkaManager getInstance() { return KafkaManager.SingletonManager.singletonObj; } private void initManger() { try { Properties producerProperties = new Properties(); //設定接入點,請通過控制台擷取對應 Topic 的接入點 producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); //消息隊列 Kafka 消息的序列化方式 producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //請求的最長等待時間 producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000); this.producer = new KafkaProducer(producerProperties); } catch (Exception ex) { LOGGER.error("建立kafka失敗",ex); this.kafkaAvailable = false; } } protected void finalize () { if (null != this.producer) { this.producer.close(); } } public boolean sendMessage(String topic, JSONObject body) { if (false == this.kafkaAvailable) { return false; } boolean succeed = true; try { ProducerRecord kafkaMessage = new ProducerRecord(topic, body.toJSONString()); //發送消息,并獲得一個 Future 對象 Future metadataFuture = this.producer.send(kafkaMessage); } catch (Exception e) { LOGGER.error("發送kafka失敗",e); succeed = false; } return succeed; }}
1.3 生産者發送消息
2.SpringMVC整合消費者(Consumer)
相對于生産者,SpringMVC整合消費者就顯得複雜一點。
2.1 引入kafka消費者的maven依賴
org.apache.kafka kafka-clients 0.10.0.0org.springframework.kafka spring-kafka 1.1.1.RELEASE
如上所示,除了kafka用戶端的依賴外還需要spring-kafka的依賴。
這裡說明一下,我使用Spring配置檔案建立bean的方式來管理kafka的消費者。當然還有其他方式,這裡我選了比較簡單的一種,其他方式可以自行百度一下。
2.2 建立監聽類
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.listener.MessageListener;import java.util.ArrayList;import java.util.Date;import java.util.List;public class TopicApiUseKafkaListener implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(TopicApiUseKafkaListener.class); private static final String TABLE_PRE = ""; @Override public void onMessage(ConsumerRecord stringStringConsumerRecord) { IApiUseService apiUseService = SpringContextUtil.getBean("apiUseService"); List dataList = new ArrayList<>(); String tableName = TABLE_PRE + DateUtil.formatDate(new Date(),"yyyyMM"); String value = stringStringConsumerRecord.value(); JSONObject bodyJson = JSONObject.parseObject(value); Integer platform = bodyJson.getInteger("platform"); String url = bodyJson.getString("url"); String apiVersion = bodyJson.getString("apiVersion"); String createDate = bodyJson.getString("createDate"); if (!StringUtils.isEmpty(url) && null != platform && !StringUtils.isEmpty(createDate) && !StringUtils.isEmpty(apiVersion)) { ApiUse item = new ApiUse(); item.setPlatform(platform); item.setUrl(url); item.setApiVersion(apiVersion); item.setCreateDate(createDate); dataList.add(item); } if (dataList.size() > 0) { try { apiUseService.addApiUse(tableName, dataList); } catch (Exception ex1) { LOGGER.error("api使用插入錯誤",ex1); } } }}
2.3 配置監聽類
kafka的監聽類的配置需要配置在dispatcher-servlet.xml檔案中,配置在spring-context.xml檔案中是無效的,這是由Spring的加載機制決定的。
先定義consumer的參數再建立一個全局的consumerFactory的bean對象,如下圖所示。這裡需要注意enable.auto.commit這個參數,這是kafka自動送出的參數,如果設定成true就有kafka服務端來自動送出,但是由于網絡等因素kafka服務端自動送出有時候會失敗導緻重複消費消息。是以這裡設定成false,雖然設定成false但是實際上不需要開發者來手動送出,送出的操作是由Spring-kafka這個sdk類處理的,有興趣的童鞋可以看一下spring-kafka的源碼就知道了。
接下來建立實際執行消費的類,就是我們剛才建立的那個類。
如上圖所示,原則上每個topic就要建立一個對應的監聽類,然後再配置監聽類需要監聽的topic。
内容就到這裡。更多精彩文章敬請期待。