天天看點

Canal連接配接kafka實作實時同步mysql資料

Canal連接配接kafka實作實時同步mysql資料

大資料技術與架構

點選右側關注,大資料開發領域最強公衆号!

Canal連接配接kafka實作實時同步mysql資料
Canal連接配接kafka實作實時同步mysql資料

暴走大資料

點選右側關注,暴走大資料!

Canal連接配接kafka實作實時同步mysql資料

canal-kafka是阿裡雲最近更新的一個新的安裝包。主要功能是實作canal與kafka的對接,實作海量的消息傳輸同步。在canal-kafka中,消息是以ByteString進行傳輸的,并且使用者隻能通過配置來指定一些kafka的配置,從某種程度上有一定的局限性,是以我們使用canal來自定義用戶端kafka,會有更好的靈活性,但維護成本會更大,是以如何選擇根據實際情況而定。

建構maven依賴

<dependency>	
    <groupId>com.alibaba.otter</groupId>	
    <artifactId>canal.client</artifactId>	
    <version>1.0.25</version>	
</dependency> 	
<dependency>	
    <groupId>org.apache.kafka</groupId>	
    <artifactId>kafka-clients</artifactId>	
    <version>1.1.0</version>	
</dependency>           

SimpleCanalClient

package com.unigroup.client.canal;	

	
import java.lang.reflect.InvocationTargetException;	
import java.lang.reflect.Method;	
import java.net.InetSocketAddress;	
import java.util.List;	

	
import com.alibaba.otter.canal.client.CanalConnector;	
import com.alibaba.otter.canal.client.CanalConnectors;	
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;	

	
import com.alibaba.otter.canal.protocol.Message;	
import com.unigroup.core.canal.CanalToKG;	

	
/**   	
* @Title: SimpleCanalClient.java 	
* @Package com.unigroup.canal 	
* @Description: canal單執行個體接口	
*/	
public class SimpleCanalClient {	

	
    private CanalConnector connector=null;	

	
    public SimpleCanalClient(String ip,String port,String instance) {	

	
        // 建立連結	
        connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, Integer.parseInt(port)),instance, "", "");	
    }	
    public List<Entry> execute(int batchSize,Class<?> clazz ) throws InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException {	

	
        //int batchSize = 1;	
        int emptyCount = 0;	
        Object obj = clazz.newInstance();	
        Method method = clazz.getMethod("send",Message.class);	
        try {	
            connector.connect();	
            // connector.subscribe(".*\\..*");	
            connector.subscribe("test.test1");	

	
            connector.rollback();	
            int totalEmptyCount = 120;	
            while (emptyCount < totalEmptyCount) {	
                Message message = connector.getWithoutAck(batchSize); // 擷取指定數量的資料	
                long batchId = message.getId();	
                int size = message.getEntries().size();	
                if (batchId == -1 || size == 0) {	
                    emptyCount++;	
                    System.out.println("empty count : " + emptyCount);	
                    try {	
                        Thread.sleep(1000);	
                    } catch (InterruptedException e) {	
                    }	
                } else {	
                    emptyCount = 0;	
                    method.invoke(obj, message);            	
                }	
                connector.ack(batchId); // 送出确認	

	
                // connector.rollback(batchId); // 處理失敗, 復原資料	
            }	

	
            System.out.println("empty too many times, exit");	
        } catch (IllegalAccessException e) {	
            // TODO Auto-generated catch block	
            e.printStackTrace();	
        } catch (IllegalArgumentException e) {	
            // TODO Auto-generated catch block	
            e.printStackTrace();	
        } catch (InvocationTargetException e) {	
            // TODO Auto-generated catch block	
            e.printStackTrace();	
        } finally {	
            connector.disconnect();	
        }	
        return null;	
    }	
}           

CanalKafkaProducer

package com.unigroup.kafka.producer;	

	
import java.io.IOException;	
import java.util.Properties;	

	
import org.apache.kafka.clients.producer.KafkaProducer;	
import org.apache.kafka.clients.producer.Producer;	
import org.apache.kafka.clients.producer.ProducerRecord;	
import org.apache.kafka.common.serialization.StringSerializer;	
import org.slf4j.Logger;	
import org.slf4j.LoggerFactory;	

	
import com.alibaba.otter.canal.protocol.Message;	
import com.unigroup.kafka.producer.KafkaProperties.Topic;	
import com.unigroup.utils.MessageSerializer;	

	
/**   	
* @Title: CanalKafkaProducer.java 	
* @Package com.unigroup.kafka.producer 	
* @version V1.0   	
*/	
public class CanalKafkaProducer {	

	
    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);	

	
    private Producer<String, Message> producer;	

	
    public void init(KafkaProperties kafkaProperties) {	
        Properties properties = new Properties();	
        properties.put("bootstrap.servers", kafkaProperties.getServers());	
        properties.put("acks", "all");	
        properties.put("retries", kafkaProperties.getRetries());	
        properties.put("batch.size", kafkaProperties.getBatchSize());	
        properties.put("linger.ms", kafkaProperties.getLingerMs());	
        properties.put("buffer.memory", kafkaProperties.getBufferMemory());	
        properties.put("key.serializer", StringSerializer.class.getName());	
        properties.put("value.serializer", MessageSerializer.class.getName());	
        producer = new KafkaProducer<String, Message>(properties);	
    }	

	
    public void stop() {	
        try {	
            logger.info("## stop the kafka producer");	
            producer.close();	
        } catch (Throwable e) {	
            logger.warn("##something goes wrong when stopping kafka producer:", e);	
        } finally {	
            logger.info("## kafka producer is down.");	
        }	
    }	

	
    public void send(Topic topic, Message message) throws IOException {	

	
        ProducerRecord<String, Message> record;	
        if (topic.getPartition() != null) {	
            record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);	
        } else {	
            record = new ProducerRecord<String, Message>(topic.getTopic(), message);	
        }	
        producer.send(record);	
        if (logger.isDebugEnabled()) {	
            logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());	
        }	
    }	
}	

           

canalToKafkaServer

package com.unigroup.kafka.server;	

	
import com.unigroup.client.canal.SimpleCanalClient;	
import com.unigroup.kafka.producer.CanalKafkaProducer;	
import com.unigroup.utils.GetProperties;	

	
/**   	
* @Title: canal.java 	
* @Package com.unigroup.kafka.server 	
* @version V1.0   	
*/	
public class canalToKafkaServer {	
    public static void execute() {	
        SimpleCanalClient simpleCanalClient = new SimpleCanalClient(GetProperties.getValue("MYSQL_HOST"),	
                GetProperties.getValue("MTSQL_PORT"), GetProperties.getValue("INSTANCE"));	
        try {	
            simpleCanalClient.execute(1,CanalKafkaProducer.class);	
        } catch (Exception e) {	
            e.printStackTrace();	
        } 	
    }	
}	

           

至此一個簡單的canal到kafka的demo已經完成。這些都隻是測試代碼,實際應用中根據不同的情況,可以自己開發更多功能。

歡迎點贊+收藏+轉發朋友圈素質三連