依赖配置pom.xml
<dependency>
<groupId>cc.blynk.clickhouse</groupId>
<artifactId>clickhouse4j</artifactId>
<version>1.4.4</version>
</dependency>
CK基本信息
String driver = "cc.blynk.clickhouse.ClickHouseDriver";
String ip = "xxx.xxx.xxx.xxx";
String port = "8123";
String db = "db";
String user = "user";
String pwd = "pwd";
// 数据输出文件
String fileName = "/data/table1_cols123_20211125.txt.gz";
创建ClickHouse连接
Class.forName(driver);
StringBuffer urlSb = new StringBuffer()
.append("jdbc:clickhouse://")
.append(ip).append(":").append(port).append("/").append(db)
.append("?characterEncoding=utf8&useSSL=false");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource(urlSb.toString());
ClickHouseConnection connection = dataSource.getConnection(user, pwd);
ClickHouseStatement statement = connection.createStatement();
数据推送模式
String query = new StringBuilder()
.append("insert into cq_report_db.xxx (x1, x2, x3)")
.append(" FORMAT TabSeparated")
.toString();
(1)可以指定导入数据到表的指定列
(2)数据的列数必须等于指定的表的列,否则会导入异常
(3)支持多线性并发导入
批量推送数据
InputStream inputStream = new GZIPInputStream(new FileInputStream(fileName));
Scanner scanner = new Scanner(inputStream);
StringBuffer data = new StringBuffer();
int size = 0;
int maxSize = 20000;
while (scanner.hasNextLine()) {
if(data.length() > 1) data.append("\n");
data.append(scanner.nextLine());
size++;
// 打包批量推送条件。
if(size >= maxSize) {
InputStream dataStream = new ByteArrayInputStream(data.toString().getBytes());
statement.sendStreamSQL(dataStream, query);
dataStream.close();
// 重置参数
size = 0;
data = new StringBuffer();
}
}
// 剩余部分数据
if(data.length() > 1) {
InputStream dataStream = new ByteArrayInputStream(data.toString().getBytes());
statement.sendStreamSQL(dataStream, query);
dataStream.close();
// 重置参数
size = 0;
data = new StringBuffer();
}
关闭相关连接
if(scanner != null) scanner.close();
if(inputStream != null) inputStream.close();
if(connection != null && !connection.isClosed()) connection.close();
if(statement != null && !statement.isClosed()) statement.close();