天天看點

kafka maven 依賴_SpringMVC整合Kafka實戰1.SpringMVC整合生産者(Producer)2.SpringMVC整合消費者(Consumer)

1.SpringMVC整合生産者(Producer)

SpringMVC整合生産者比較簡單,我直接用一個單例對象來管理生産者,這樣保證生産者也是一個單例對象。

1.1 導入kafka的maven依賴

kafka maven 依賴_SpringMVC整合Kafka實戰1.SpringMVC整合生産者(Producer)2.SpringMVC整合消費者(Consumer)
kafka maven 依賴_SpringMVC整合Kafka實戰1.SpringMVC整合生産者(Producer)2.SpringMVC整合消費者(Consumer)

1.2 建立單例對象管理生産者

我使用靜态内部類的方式建立單例對象,保證單例對象的線程安全。直接上代碼

public class KafkaManager {    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaManager.class);    private static final String BOOTSTRAP_SERVERS = Config.getMqConfig("kafka.bootstrap.servers");    public static final String TOPIC_LOGIN = "Topic_login";    public static final String TOPIC_OLD_API = "Topic_api_use";    public static final String TOPIC_COURSE_VISIT = "Topic_course_visit";    private boolean kafkaAvailable = true;    private KafkaProducer producer;    private KafkaManager() {        this.initManger();    }    private static class SingletonManager {        private static final KafkaManager singletonObj = new KafkaManager();    }    public static KafkaManager getInstance() {        return KafkaManager.SingletonManager.singletonObj;    }    private void initManger() {        try {            Properties producerProperties = new Properties();            //設定接入點,請通過控制台擷取對應 Topic 的接入點            producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);            //消息隊列 Kafka 消息的序列化方式            producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");            producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");            //請求的最長等待時間            producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);            this.producer = new KafkaProducer(producerProperties);        } catch (Exception ex) {            LOGGER.error("建立kafka失敗",ex);            this.kafkaAvailable = false;        }    }    protected void finalize () {        if (null != this.producer) {            this.producer.close();        }    }    public boolean sendMessage(String topic, JSONObject body) {        if (false == this.kafkaAvailable) {            return false;        }        boolean succeed = true;        try {            ProducerRecord  kafkaMessage =  new ProducerRecord(topic, body.toJSONString());            //發送消息,并獲得一個 Future 對象            Future metadataFuture = this.producer.send(kafkaMessage);                    } catch (Exception e) {            LOGGER.error("發送kafka失敗",e);            succeed = false;        }        return succeed;    }}
           

1.3 生産者發送消息

kafka maven 依賴_SpringMVC整合Kafka實戰1.SpringMVC整合生産者(Producer)2.SpringMVC整合消費者(Consumer)
kafka maven 依賴_SpringMVC整合Kafka實戰1.SpringMVC整合生産者(Producer)2.SpringMVC整合消費者(Consumer)

2.SpringMVC整合消費者(Consumer)

相對于生産者,SpringMVC整合消費者就顯得複雜一點。

2.1 引入kafka消費者的maven依賴

org.apache.kafka      kafka-clients      0.10.0.0org.springframework.kafka      spring-kafka      1.1.1.RELEASE
           

如上所示,除了kafka用戶端的依賴外還需要spring-kafka的依賴。

這裡說明一下,我使用Spring配置檔案建立bean的方式來管理kafka的消費者。當然還有其他方式,這裡我選了比較簡單的一種,其他方式可以自行百度一下。

2.2 建立監聽類

import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.listener.MessageListener;import java.util.ArrayList;import java.util.Date;import java.util.List;public class TopicApiUseKafkaListener implements MessageListener {    private static final Logger LOGGER = LoggerFactory.getLogger(TopicApiUseKafkaListener.class);    private static final String TABLE_PRE = "";    @Override    public void onMessage(ConsumerRecord stringStringConsumerRecord) {        IApiUseService apiUseService = SpringContextUtil.getBean("apiUseService");        List dataList = new ArrayList<>();        String tableName = TABLE_PRE + DateUtil.formatDate(new Date(),"yyyyMM");        String value = stringStringConsumerRecord.value();        JSONObject bodyJson = JSONObject.parseObject(value);        Integer platform = bodyJson.getInteger("platform");        String url = bodyJson.getString("url");        String apiVersion = bodyJson.getString("apiVersion");        String createDate = bodyJson.getString("createDate");        if (!StringUtils.isEmpty(url) && null != platform && !StringUtils.isEmpty(createDate) && !StringUtils.isEmpty(apiVersion)) {            ApiUse item = new ApiUse();            item.setPlatform(platform);            item.setUrl(url);            item.setApiVersion(apiVersion);            item.setCreateDate(createDate);            dataList.add(item);        }        if (dataList.size() > 0) {            try {                apiUseService.addApiUse(tableName, dataList);            } catch (Exception ex1) {                LOGGER.error("api使用插入錯誤",ex1);            }        }    }}
           

2.3 配置監聽類

kafka的監聽類的配置需要配置在dispatcher-servlet.xml檔案中,配置在spring-context.xml檔案中是無效的,這是由Spring的加載機制決定的。

先定義consumer的參數再建立一個全局的consumerFactory的bean對象,如下圖所示。這裡需要注意enable.auto.commit這個參數,這是kafka自動送出的參數,如果設定成true就有kafka服務端來自動送出,但是由于網絡等因素kafka服務端自動送出有時候會失敗導緻重複消費消息。是以這裡設定成false,雖然設定成false但是實際上不需要開發者來手動送出,送出的操作是由Spring-kafka這個sdk類處理的,有興趣的童鞋可以看一下spring-kafka的源碼就知道了。

kafka maven 依賴_SpringMVC整合Kafka實戰1.SpringMVC整合生産者(Producer)2.SpringMVC整合消費者(Consumer)
kafka maven 依賴_SpringMVC整合Kafka實戰1.SpringMVC整合生産者(Producer)2.SpringMVC整合消費者(Consumer)

接下來建立實際執行消費的類,就是我們剛才建立的那個類。

kafka maven 依賴_SpringMVC整合Kafka實戰1.SpringMVC整合生産者(Producer)2.SpringMVC整合消費者(Consumer)
kafka maven 依賴_SpringMVC整合Kafka實戰1.SpringMVC整合生産者(Producer)2.SpringMVC整合消費者(Consumer)

如上圖所示,原則上每個topic就要建立一個對應的監聽類,然後再配置監聽類需要監聽的topic。

内容就到這裡。更多精彩文章敬請期待。

繼續閱讀