/// <summary>
/// 大数据单表批量插入,带事务
/// </summary>
/// <param name="keepID"></param>
/// <param name="tableName"></param>
/// <param name="dt"></param>
/// <param name="connectionString"></param>
/// <returns></returns>
public static bool MsSqlBulkCopyInsert(bool keepID, string tableName, DataTable dt, ref string msg, string connectionString = null)
{
if (dt == null || dt.Rows.Count <= 0)
{
msg = "找不到数据";
return true;
}
using (SqlConnection conn = string.IsNullOrWhiteSpace(connectionString) ? new SqlConnection(MSSQLHelper.connectionString) : new SqlConnection(connectionString))
{
conn.Open();
using (SqlTransaction trans = conn.BeginTransaction())
{
try
{
using (SqlBulkCopy sbc = new SqlBulkCopy(conn, (keepID ? SqlBulkCopyOptions.KeepIdentity : SqlBulkCopyOptions.Default) | SqlBulkCopyOptions.FireTriggers, trans))
{
sbc.BatchSize = dt.Rows.Count; //每一批次中的行数
//Bulkcopy 无法判断数据是否已经存在,如果在bulkcopy时发生数据库返回错误,此次的bulkcopy操作会rollback。建议在数据库中添加临时表,bulkcopy的destination设置为临时表,bulkcopy完成后运行从临时表到真正表的插入操作,筛选duplicate数据并删除
sbc.DestinationTableName = string.Concat("[", tableName + "]");
sbc.BulkCopyTimeout = 300;//超时之前操作完成所允许的秒数,大批量数量需要的时长5分钟。 报错:“超时时间已到。在操作完成之前超时时间已过或服务器未响应
foreach (DataColumn column in dt.Columns)
{
sbc.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
sbc.WriteToServer(dt);
trans.Commit();
return true;
}
}
catch (Exception err)
{
trans.Rollback();//回滚事务
msg = err.Message;
}
}
}
return false;
}
/// <summary>
/// 大数据多表批量插入,带事务
/// </summary>
/// <param name="keepID"></param>
/// <param name="tableName"></param>
/// <param name="dt"></param>
/// <param name="connectionString"></param>
/// <returns></returns>
public static bool MsSqlBulkCopyManyInsert(bool keepID, Dictionary<string, DataTable> dic, ref string msg, string connectionString = null)
{
if (dic == null || dic.Count <= 0)
{
msg = "找不到数据";
return true;
}
using (SqlConnection conn = string.IsNullOrWhiteSpace(connectionString) ? new SqlConnection(MSSQLHelper.connectionString) : new SqlConnection(connectionString))
{
conn.Open();
using (SqlTransaction trans = conn.BeginTransaction())
{
try
{
using (SqlBulkCopy sbc = new SqlBulkCopy(conn, (keepID ? SqlBulkCopyOptions.KeepIdentity : SqlBulkCopyOptions.Default) | SqlBulkCopyOptions.FireTriggers, trans))
{
foreach (var item in dic)
{
string tableName = item.Key;
DataTable dt = item.Value;
if (dt == null || dt.Rows.Count <= 0)
{
continue;
}
sbc.BatchSize = dt.Rows.Count; //每一批次中的行数
//Bulkcopy 无法判断数据是否已经存在,如果在bulkcopy时发生数据库返回错误,此次的bulkcopy操作会rollback。建议在数据库中添加临时表,bulkcopy的destination设置为临时表,bulkcopy完成后运行从临时表到真正表的插入操作,筛选duplicate数据并删除
sbc.DestinationTableName = string.Concat("[", tableName + "]");
sbc.BulkCopyTimeout = 300;//超时之前操作完成所允许的秒数,大批量数量需要的时长5分钟。 报错:“超时时间已到。在操作完成之前超时时间已过或服务器未响应
sbc.ColumnMappings.Clear();
foreach (DataColumn column in dt.Columns)
{
sbc.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
sbc.WriteToServer(dt);
}
trans.Commit();
return true;
}
}
catch (Exception err)
{
trans.Rollback();//回滚事务
msg = err.Message;
return false;
}
}
}
}
/// <summary>
/// 大数据批量更新
/// </summary>
/// <param name="table"></param>
/// <param name="tableName"></param>
/// <param name="limitWhere"></param>
/// <param name="onceUpdateNumber"></param>
/// <param name="connectionString"></param>
/// <returns></returns>
public static int MsSqlUpdate(DataTable table, string tableName, string limitWhere, int onceUpdateNumber = 5000, string connectionString = null)
{
int result = -1;
SqlConnection conn = new SqlConnection(string.IsNullOrWhiteSpace(connectionString) ? MSSQLHelper.connectionString :connectionString);
SqlCommand comm = conn.CreateCommand();
comm.CommandType = CommandType.Text;
SqlDataAdapter adapter = new SqlDataAdapter(comm);
SqlCommandBuilder commandBulider = new SqlCommandBuilder(adapter);
commandBulider.ConflictOption = ConflictOption.OverwriteChanges;
try
{
conn.Open();
//设置批量更新的每次处理条数
adapter.UpdateBatchSize = table.Rows.Count;
adapter.SelectCommand.Transaction = conn.BeginTransaction();
adapter.SelectCommand.CommandText = string.Format("select * from {0} where {1}", string.IsNullOrWhiteSpace(tableName) ? table.TableName : tableName, limitWhere);
result = adapter.Update(table);
adapter.SelectCommand.Transaction.Commit();/////提交事务
}
catch (Exception ex)
{
if (adapter.SelectCommand != null && adapter.SelectCommand.Transaction != null)
{
adapter.SelectCommand.Transaction.Rollback();
}
throw ex;
}
finally
{
conn.Close();
conn.Dispose();
}
return result;
}