天天看点

Clickhouse-Java使用JDBC连接大批量导入(本地文件2表)依赖配置pom.xmlCK基本信息创建ClickHouse连接数据推送模式批量推送数据关闭相关连接

依赖配置pom.xml

<dependency>
    <groupId>cc.blynk.clickhouse</groupId>
    <artifactId>clickhouse4j</artifactId>
    <version>1.4.4</version>
</dependency>           

CK基本信息

String driver = "cc.blynk.clickhouse.ClickHouseDriver";
String ip = "xxx.xxx.xxx.xxx";
String port = "8123";
String db = "db";
String user = "user";
String pwd = "pwd";
// 数据输出文件
String fileName = "/data/table1_cols123_20211125.txt.gz";           

创建ClickHouse连接

Class.forName(driver);
StringBuffer urlSb = new StringBuffer()
        .append("jdbc:clickhouse://")
        .append(ip).append(":").append(port).append("/").append(db)
        .append("?characterEncoding=utf8&useSSL=false");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource(urlSb.toString());
ClickHouseConnection connection = dataSource.getConnection(user, pwd);
ClickHouseStatement statement = connection.createStatement();           

数据推送模式

String query = new StringBuilder()
        .append("insert into cq_report_db.xxx (x1, x2, x3)")
        .append(" FORMAT TabSeparated")
        .toString();           

(1)可以指定导入数据到表的指定列

(2)数据的列数必须等于指定的表的列,否则会导入异常

(3)支持多线性并发导入

批量推送数据

InputStream inputStream = new GZIPInputStream(new FileInputStream(fileName));
Scanner scanner = new Scanner(inputStream);
StringBuffer data = new StringBuffer();
int size = 0;
int maxSize = 20000;
while (scanner.hasNextLine()) {
    if(data.length() > 1) data.append("\n");
    data.append(scanner.nextLine());
    size++; 
    
    // 打包批量推送条件。
    if(size >= maxSize) {
        InputStream dataStream = new ByteArrayInputStream(data.toString().getBytes());
        statement.sendStreamSQL(dataStream, query);
        dataStream.close();
        // 重置参数
        size = 0;
        data = new StringBuffer();
    }
}

// 剩余部分数据
if(data.length() > 1) {
    InputStream dataStream = new ByteArrayInputStream(data.toString().getBytes());
    statement.sendStreamSQL(dataStream, query);
    dataStream.close();
    // 重置参数
    size = 0;
    data = new StringBuffer();
}           

关闭相关连接

if(scanner != null) scanner.close(); 
if(inputStream != null) inputStream.close();
if(connection != null && !connection.isClosed()) connection.close();
if(statement != null && !statement.isClosed()) statement.close();