天天看点

mysql大数量批量插入方案 LOAD DATA LOCAL INFILE实现及相关问题解决

mysql大数量批量插入方案 LOAD DATA LOCAL INFILE实现及相关问题解决

    • 业务场景
    • 过程
    • 代码
    • 性能对比
    • 问题及解决办法

业务场景

项目跟文件内容相关,上传文件将句子内容提取出来后,将每条句子信息插入到数据库,文件中提取出的句子数量较大,几k-几十w不等,为了将数据快速插入数据库,寻找方案。

过程

项目中使用mybatis做持久存储,首先尝试 批量sql的方式:

由于数据量大,遇到两个问题:

1.values后面拼接数据,本地机器内存不够

2.插入效率仍然很慢

思考:使用多线程分批插入

结果:

1.文件大及数量多,线程数量太多

2.效率仍提升不明显

方案寻自: https://blog.csdn.net/baidu_38083619/article/details/83378885,感谢作者!

借鉴前人肩膀,但是仍然遇到许多问题,将问题一一解决后,对此过程中做一些记录,方便大家查阅。

原理:

正如原创博客作者所说:

MySQL使用 LOAD DATA LOCAL INFILE 从文件中导入数据比insert语句要快,MySQL文档上说要快20倍左右。

但是这个方法有个缺点,就是导入数据之前,必须要有文件,也就是说从文件中导入。这样就需要去写文件,以及文件删除等维护。某些情况下,比如数据源并发的话,还会出现写文件并发问题,很难处理。

而且如果先写文件,又增加了IO操作,所以为了避免写文件:

MySQL社区提供这样一个方法:setLocalInfileInputStream(),此方法位于com.mysql.jdbc.PreparedStatement 类中。通过使用 MySQL JDBC 的setLocalInfileInputStream 方法实现从Java InputStream中load data local infile 到MySQL数据库中。可以达到同样的效率,直接从内存(IO流中)中导入数据,而不需要写文件

代码

代码(springboot),根据自己业务改造表及字段参数:

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;

/**
 * @Description :大数量量插入数据库,快速插入方法
 * @Author : 
 * @Date : 2021/3/4
 **/
@Component
public class LoadDataInFileUtil {

    private static final Logger logger = LoggerFactory.getLogger(LoadDataInFileUtil.class);
    private Connection conn = null;
    @Value("${spring.shardingsphere.datasource.shardingdb.driver-class-name}")
    private String driverName;
    @Value("${spring.shardingsphere.datasource.shardingdb.url}")
    private String url;
    @Value("${spring.shardingsphere.datasource.shardingdb.username}")
    private String userName;
    @Value("${spring.shardingsphere.datasource.shardingdb.password}")
    private String password;
    @Value("${database-name}")
    private String database;


    /**
     * 获取原生jdbc连接
     *
     * @return
     * @throws ClassNotFoundException
     * @throws SQLException
     */
    private Connection getConnection() throws ClassNotFoundException, SQLException {
        Class.forName(driverName);
        conn = DriverManager.getConnection(url, userName, password);
        return conn;
    }

    /**
     * 将数据从输入流加载到MySQL。
     *
     * @param loadDataSql SQL语句。
     * @param dataStream  输入流。
     * @return int         成功插入的行数。
     */
    private int bulkLoadFromInputStream(String loadDataSql,
                                        InputStream dataStream) throws SQLException, ClassNotFoundException {
        if (null == dataStream) {
            logger.info("输入流为NULL,没有数据导入。");
            return 0;
        }
        //做测试发现连接池好像不支持,所以获取原生的jdbc连接
        conn = getConnection();
        PreparedStatement statement = null;
        int result = 0;
        try {
            statement = conn.prepareStatement(loadDataSql);
//            mysql8  使用下面的方式
        /*if (statement.isWrapperFor(com.mysql.cj.jdbc.JdbcStatement.class)) {
            com.mysql.cj.jdbc.ClientPreparedStatement mysqlStatement = statement.unwrap(com.mysql.cj.jdbc.ClientPreparedStatement.class);
            mysqlStatement.setLocalInfileInputStream(dataStream);
            mysqlStatement.executeUpdate();
        }*/
//        mysql5 使用下面的方式
            if (statement.isWrapperFor(com.mysql.jdbc.Statement.class)) {
                com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class);
                mysqlStatement.setLocalInfileInputStream(dataStream);
                result = mysqlStatement.executeUpdate();
            }
        } finally {
            if (statement != null) {
                statement.close();
            }
        }
        return result;
    }

    /**
     * 组装 SQL 语句
     *
     * @param dataBaseName 数据库名。
     * @param tableName    表名。
     * @param columnName   要插入数据的列名。
     */
    public String assembleSql(String dataBaseName, String tableName, String[] columnName) {
        String insertColumnName = StringUtils.join(columnName, ",");
        //插入方案的核心就是下面这个语句
        String sql = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE INTO TABLE " + dataBaseName + "." + tableName + " (" + insertColumnName + ")";
        return sql;
    }

    /**
     * 通过 LOAD DATA LOCAL INFILE 大批量导入数据到 MySQL。
     *
     * 原理是使用 setLocalInfileInputStream 会忽略 sql.csv 文件名,不从文件读取,直接从输入流读取数据
     * @param sql     SQL语句。
     * @param builder 组装好的数据。
     */
    public int fastInsertData(String sql, StringBuilder builder) {
        int rows = 0;
        InputStream is = null;
        try {
            byte[] bytes = builder.toString().getBytes();
            is = new ByteArrayInputStream(bytes);
            //批量插入数据。
            long beginTime = System.currentTimeMillis();
            rows = bulkLoadFromInputStream(sql, is);
            long endTime = System.currentTimeMillis();
            logger.info(" LOAD DATA LOCAL INFILE :【插入" + rows + "行数据至MySql中,耗时" + (endTime - beginTime) + "ms。】");
        } catch (SQLException | ClassNotFoundException e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != is) {
                    is.close();
                }
                if (null != conn) {
                    conn.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return rows;
    }

    /**
     * 批量插入方法
     *
     * @return
     */
    public void insertData(String dataBaseName, String tableName, String[] columns, StringBuilder tableColumnValue, int size) {
    	
        //插入语句
        String sql = assembleSql(dataBaseName, tableName, columns);
        //待插入行数,为了判断是否全部插入
        int mainTableInsertRows = fastInsertData(sql, tableColumnValue);
        //插入数据未全部成功视为全部失败
        if (mainTableInsertRows != size)
            System.println.out("插入失败!");
    }
    
}

           

测试代码:

@RunWith(SpringRunner.class)
@SpringBootTest
public class LoadDataInFileUtilTest {
    @Autowired
    LoadDataInFileUtil loadDataInFileUtil;
 
 	@Test
    public void testInsert() {
        //表列值
        StringBuilder tableColumnValue = new StringBuilder();
        //数据库名
        String dataBaseName = "dataBaseName ";
        //待插入的表
        String tableName = "tableName";
        // 要插入数据的列名。(必须与插入的数据一一对应)
        String[] columns = new String[]{"id","name","age"};
        
 		int number = 500000;
        int id = 1;
        for (int i = 0; i < number; i++) {
            //顺序对应上面的列名
            builderAppend(tableColumnValue, id);
            builderAppend(tableColumnValue ,"测试名字");
            builderEnd(tableColumnValue, 10);
        }

        long startTime = System.currentTimeMillis();
        loadDataInFileUtil.insertData(dataBaseName , tableName , columns, tableColumnValue, number);
        long endTime = System.currentTimeMillis();
        System.out.println("插入数据,所用时间:" + (endTime - startTime));

    }
    
   /**
     * 往 StringBuilder 里追加数据。
     *
     * @param builder StringBuilder。
     * @param object  数据。
     */
    public void builderAppend(StringBuilder builder, Object object) {
        builder.append(object);
        builder.append("\t");
    }
 
    /**
     * 往 StringBuilder 里追加一条数据的最后一个字段。
     *
     * @param builder StringBuilder。
     * @param object  数据。
     */
    public void builderEnd(StringBuilder builder, Object object) {
        builder.append(object);
        builder.append("\n");
   }
}
           

执行顺序:

从 insertData() 方法进入,首先通过assembleSql() 方法组织导入数据的sql语句“LOAD DATA LOCAL INFILE ‘sql.csv’ IGNORE INTO TABLE tableName()”,然后通过fastInsertData() 方法传入sql语句及需要导入的数据转化为流,最后通过setLocalInfileInputStream(dataStream) 将数据插入数据库。

性能对比

自己做了一些测试来对比效率,对比的是拼装批量插入方式和此方式,对比结果:

mysql大数量批量插入方案 LOAD DATA LOCAL INFILE实现及相关问题解决

本机做的测试,若在服务器上,效率更佳。

此方案数据量越大,差距越明显。

问题及解决办法

1.mysql版本问题

一般我们使用 mysql5 或mysql8 版本的驱动包,对应的预编译 prepareStatement 对象为:

//            mysql8  使用下面的方式
        /*if (statement.isWrapperFor(com.mysql.cj.jdbc.JdbcStatement.class)) {
            com.mysql.cj.jdbc.ClientPreparedStatement mysqlStatement = statement.unwrap(com.mysql.cj.jdbc.ClientPreparedStatement.class);
            mysqlStatement.setLocalInfileInputStream(dataStream);
            mysqlStatement.executeUpdate();
        }*/
//        mysql5 使用下面的方式
           /* if (statement.isWrapperFor(com.mysql.jdbc.Statement.class)) {
                com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class);
                mysqlStatement.setLocalInfileInputStream(dataStream);
                result = mysqlStatement.executeUpdate();
            }*/
           

代码中有注释说明

2.字段拼接问题

因为字段值是通过 “\t” 来区别字段和 “\n” 来区别一条数据的,所以在字段的值中如果含有 Tab 空格的数据无法通过此方法插入,会导致字段对应不上从而插入失败,我在项目中的处理方式是将含有这些字符的数据单独提出还是用insert语句去插入,因为我的项目中这种情况不会出现太多。

3.值得问题

因为拼接的字段值都是已字符串的形式识别的,所以不存在true/false 这种数据,如果model类默认将数据库的tinyint类型转化为boolean了,需要单独建立model,使用0/1插入,其他字段同理。

4.是否成功

一批数据,如有无法插入的会跳过,不会报错,但会返回插入的条数,只能根据返回结果判定是否全部插入,来处理数据是否

5.数据库连接

因为我自己测试的时候,从dataSource获取的连接,似乎无法执行,所以获取了原生的jdbc连接来操作,注意操作完成时正确的关闭,否则造成资源浪费。

6.字段为空

如果字段为空,需要转化为“”,否则插入数据库以 null形式存在、

7.权限

操作数据库的账户需要开启 Excute权限

8.无法引用 com.mysql.jdbc.ClientPreparedStatement问题

因为之前在项目中引入jar包时,mysql驱动是以运行时编译的状态引入的,所以修改pom文件的scope

<dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
                <!--去掉运行时依赖,以便能使用com.mysql.jdbc.ClientPreparedStatement -->
<!--            <scope>runtime</scope>-->
            </dependency>
           

以上为整个过程的使用及问题梳理,欢迎补充!