天天看點

C#:幾種資料庫的大資料批量插入

在之前隻知道SqlServer支援資料批量插入,殊不知道Oracle、SQLite和MySql也是支援的,不過Oracle需要使用Orace.DataAccess驅動,今天就貼出幾種資料庫的批量插入解決方法。

首先說一下,IProvider裡有一個用于實作批量插入的插件服務接口IBatcherProvider,此接口在前一篇文章中已經提到過了。

/// <summary>

/// 提供資料批量處理的方法。

/// </summary>

public interface IBatcherProvider : IProviderService

{

/// 将 <see cref="DataTable"/> 的資料批量插入到資料庫中。

/// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>

/// <param name="batchSize">每批次寫入的資料量。</param>

void Insert(DataTable dataTable, int batchSize = 10000);

}

一、SqlServer資料批量插入

SqlServer的批量插入很簡單,使用SqlBulkCopy就可以,以下是該類的實作:

/// 為 System.Data.SqlClient 提供的用于批量操作的方法。

public sealed class MsSqlBatcher : IBatcherProvider

/// 擷取或設定提供者服務的上下文。

public ServiceContext ServiceContext { get; set; }

public void Insert(DataTable dataTable, int batchSize = 10000)

Checker.ArgumentNull(dataTable, "dataTable");

if (dataTable.Rows.Count == 0)

return;

using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection())

try

connection.TryOpen();

//給表名加上前後導符

var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName);

using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null)

DestinationTableName = tableName,

BatchSize = batchSize

})

//循環所有列,為bulk添加映射

dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement);

bulk.WriteToServer(dataTable);

bulk.Close();

catch (Exception exp)

throw new BatcherException(exp);

finally

connection.TryClose();

SqlBulkCopy的ColumnMappings中列的名稱受大小寫敏感限制,是以在構造DataTable的時候應請注意列名要與表一緻。

以上沒有使用事務,使用事務在性能上會有一定的影響,如果要使用事務,可以設定SqlBulkCopyOptions.UseInternalTransaction。

二、Oracle資料批量插入

System.Data.OracleClient不支援批量插入,是以隻能使用Oracle.DataAccess元件來作為提供者。

/// Oracle.Data.Access 元件提供的用于批量操作的方法。

public sealed class OracleAccessBatcher : IBatcherProvider

using (var connection = ServiceContext.Database.CreateConnection())

using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())

if (command == null)

throw new BatcherException(new ArgumentException("command"));

command.Connection = connection;

command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);

command.ExecuteNonQuery();

/// 生成插入資料的sql語句。

/// <param name="database"></param>

/// <param name="command"></param>

/// <param name="table"></param>

/// <returns></returns>

private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table)

var names = new StringBuilder();

var values = new StringBuilder();

//将一個DataTable的資料轉換為數組的數組

var data = table.ToArray();

//設定ArrayBindCount屬性

command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null);

var syntax = database.Provider.GetService<ISyntaxProvider>();

for (var i = 0; i < table.Columns.Count; i++)

var column = table.Columns[i];

var parameter = database.Provider.DbProviderFactory.CreateParameter();

if (parameter == null)

continue;

parameter.ParameterName = column.ColumnName;

parameter.Direction = ParameterDirection.Input;

parameter.DbType = column.DataType.GetDbType();

parameter.Value = data[i];

if (names.Length > 0)

names.Append(",");

values.Append(",");

names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName));

values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName);

command.Parameters.Add(parameter);

return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values);

以上最重要的一步,就是将DataTable轉為數組的數組表示,即object[][],前數組的上标是列的個數,後數組是行的個數,是以循環Columns将後數組作為Parameter的值,也就是說,參數的值是一個數組。而insert語句與一般的插入語句沒有什麼不一樣。

三、SQLite資料批量插入

SQLite的批量插入隻需開啟事務就可以了,這個具體的原理不得而知。

public sealed class SQLiteBatcher : IBatcherProvider

DbTransaction transcation = null;

transcation = connection.BeginTransaction();

command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable);

if (command.CommandText == string.Empty)

var flag = new AssertFlag();

dataTable.EachRow(row =>

var first = flag.AssertTrue();

ProcessCommandParameters(dataTable, command, row, first);

});

transcation.Commit();

if (transcation != null)

transcation.Rollback();

private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first)

for (var c = 0; c < dataTable.Columns.Count; c++)

DbParameter parameter;

//首次建立參數,是為了使用緩存

if (first)

parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter();

parameter.ParameterName = dataTable.Columns[c].ColumnName;

else

parameter = command.Parameters[c];

parameter.Value = row[c];

private string GenerateInserSql(IDatabase database, DataTable table)

table.EachColumn(column =>

if (!flag.AssertTrue())

names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName));

四、MySql資料批量插入

/// 為 MySql.Data 元件提供的用于批量操作的方法。

public sealed class MySqlBatcher : IBatcherProvider

var types = new List<DbType>();

var count = table.Columns.Count;

table.EachColumn(c =>

names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName));

types.Add(c.DataType.GetDbType());

var i = 0;

foreach (DataRow row in table.Rows)

if (i > 0)

values.Append("(");

for (var j = 0; j < count; j++)

if (j > 0)

values.Append(", ");

var isStrType = IsStringType(types[j]);

var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j);

if (parameter != null)

values.Append(parameter.ParameterName);

else if (isStrType)

values.AppendFormat("'{0}'", row[j]);

values.Append(row[j]);

values.Append(")");

i++;

return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values);

/// 判斷是否為字元串類别。

/// <param name="dbType"></param>

private bool IsStringType(DbType dbType)

return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength;

/// 建立參數。

/// <param name="provider"></param>

/// <param name="isStrType"></param>

/// <param name="value"></param>

/// <param name="parPrefix"></param>

/// <param name="row"></param>

/// <param name="col"></param>

private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col)

//如果生成全部的參數,則速度會很慢,是以,隻有資料類型為字元串(包含'号)和日期型時才添加參數

if ((isStrType && value.ToString().IndexOf('\'') != -1) || dbType == DbType.DateTime)

var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col);

var parameter = provider.DbProviderFactory.CreateParameter();

parameter.ParameterName = name;

parameter.DbType = dbType;

parameter.Value = value;

return parameter;

return null;

MySql的批量插入,是将值全部寫在語句的values裡,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。

五、測試

接下來寫一個測試用例來看一下使用批量插入的效果。

[Test]

public void TestBatchInsert()

Console.WriteLine(TimeWatcher.Watch(() =>

InvokeTest(database =>

var table = new DataTable("Batcher");

table.Columns.Add("Id", typeof(int));

table.Columns.Add("Name1", typeof(string));

table.Columns.Add("Name2", typeof(string));

table.Columns.Add("Name3", typeof(string));

table.Columns.Add("Name4", typeof(string));

//構造100000條資料

for (var i = 0; i < 100000; i++)

table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString());

//擷取 IBatcherProvider

var batcher = database.Provider.GetService<IBatcherProvider>();

if (batcher == null)

Console.WriteLine("不支援批量插入。");

batcher.Insert(table);

//輸出batcher表的資料量

var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher");

Console.WriteLine("目前共有 {0} 條資料", database.ExecuteScalar(sql));

})));

以下表中列出了四種資料庫生成10萬條資料各耗用的時間

資料庫

耗用時間

MsSql

00:00:02.9376300

Oracle

00:00:01.5155959

SQLite

00:00:01.6275634

MySql

00:00:05.4166891