C# & SQL Server大数据量插入方式对比

以下内容大部分来自:

http://blog.csdn.net/tjvictor/article/details/4360030

部分内容出自互联网,实验结果为亲测。

最近自己开发一个向数据库中插入大量历史数据的函数库,需要解决一个大数据量插入的效率问题。不用分析,我知道如果采取逐条数据插入的方式,那么效率肯定很低,光是那么多循环就知道很慢了。于是乎,我找到了上篇博客,知道了BulkCopy和TVPs方式。为了更好的了解其效率,我自己动手亲测了一下效果,测试的数据库位于本机。

(1)方式1:循环插入

        public static void NormalInerst(String connString)
        {
            Console.WriteLine("使用NNormalInerst方式:");
            Stopwatch sw = new Stopwatch();
            SqlConnection sqlConn = new SqlConnection(connString);
            SqlCommand sqlCmd = new SqlCommand();
            sqlCmd.CommandText = String.Format("insert into BulkTestTable(Id,UserName,Pwd)values(@p0,@p1,@p2)");
            sqlCmd.Parameters.Add("@p0", SqlDbType.Int);
            sqlCmd.Parameters.Add("@p1", SqlDbType.NVarChar);
            sqlCmd.Parameters.Add("@p2", SqlDbType.VarChar);
            sqlCmd.CommandType = CommandType.Text;
            sqlCmd.Connection = sqlConn;
            sqlConn.Open();
            try
            {
                for (int i = 0, j = 0; i < 10; ++i )
                {
                    for (j = i * 10000; j < (i + 1) * 10000; ++j )
                    {
                        sqlCmd.Parameters["@p0"].Value = j;
                        sqlCmd.Parameters["@p1"].Value = String.Format("User-{0}", i * j);
                        sqlCmd.Parameters["@p2"].Value = String.Format("Pwd-{0}", i * j);
                        sw.Start();
                        sqlCmd.ExecuteNonQuery();
                        sw.Stop();
                    }

                    Console.WriteLine("第{0}次插入{1}条数据耗时:{2}", (i + 1), dataScale, sw.ElapsedMilliseconds);
                    sw.Reset();
                }
            }
            catch (System.Exception ex)
            {
                throw ex;
            }
            finally
            {
                sqlConn.Close();
            }
        }

该方式的效率极低,运行时间很长,我这里就不给出结果了,有兴趣可以自己粘贴试一下。PS:其中的数据规模应该是dataScale而不是10000,不过总是还是慢。

(2)方式2:使用BulkCopy

        public static void BulkInerst(String connString)
        {
            Console.WriteLine("使用BulkInerst方式:");
            Stopwatch sw = new Stopwatch();

            String strDel = "delete from BulkTestTable";
            float millTime = 0;
            for (int multiply = 0; multiply < 10; multiply++)
            {
                DataTable dt = GetTableSchema();
                for (int count = multiply * dataScale; count < (multiply + 1) * dataScale; count++)
                {
                    DataRow r = dt.NewRow();
                    r[0] = count;
                    r[1] = string.Format("User-{0}", count * multiply);
                    r[2] = string.Format("Pwd-{0}", count * multiply);
                    dt.Rows.Add(r);
                }

                SqlConnection sqlConn = new SqlConnection(connString);
                SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConn);
                bulkCopy.DestinationTableName = "BulkTestTable";
                bulkCopy.BatchSize = dt.Rows.Count;

                sw.Reset();
                sw.Start();
                try
                {
                    sqlConn.Open();
                    if (dt != null && dt.Rows.Count != 0)
                        bulkCopy.WriteToServer(dt);
                }
                catch (Exception ex)
                {
                    throw ex;
                }
                finally
                {
                    sqlConn.Close();
                    if (bulkCopy != null)
                        bulkCopy.Close();
                }
                sw.Stop();

                Console.WriteLine("第{0}次插入{1}条数据耗时:{2}", (multiply + 1), dataScale, sw.ElapsedMilliseconds);
                millTime += sw.ElapsedMilliseconds;
            }
            Console.WriteLine("总耗时:{0}毫秒,平均耗时:{1}毫秒", millTime, millTime / 10);
            SqlConnection sqlConn2 = new SqlConnection(connString);
            SqlCommand sqlCmd = new SqlCommand(strDel, sqlConn2);
            try
            {
                sqlConn2.Open();
                sqlCmd.ExecuteNonQuery();
            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                sqlConn2.Close();
            }
            Console.WriteLine("Done!");
        }

(3)方式3:使用TVPs

        public static void TVPsInerst(String connString)
        {
            Console.WriteLine("使用TVPsInerst方式:");
            Stopwatch sw = new Stopwatch();
            SqlConnection sqlConn = new SqlConnection(connString);
            String strSQL = "insert into BulkTestTable (Id,UserName,Pwd)" +
                " SELECT nc.Id, nc.UserName,nc.Pwd" +
                " FROM @NewBulkTestTvp AS nc";
            String strDel = "delete from BulkTestTable";
            float millTime = 0;

            for (int multiply = 0; multiply < 10; multiply++)
            {
                DataTable dt = GetTableSchema();
                for (int count = multiply * dataScale; count < (multiply + 1) * dataScale; count++)
                {
                    DataRow r = dt.NewRow();
                    r[0] = count;
                    r[1] = string.Format("User-{0}", count * multiply);
                    r[2] = string.Format("Pwd-{0}", count * multiply);
                    dt.Rows.Add(r);
                }

                sw.Reset();
                sw.Start();
                SqlCommand cmd = new SqlCommand(strSQL, sqlConn);
                SqlParameter catParam = cmd.Parameters.AddWithValue("@NewBulkTestTvp", dt);
                catParam.SqlDbType = SqlDbType.Structured;
                catParam.TypeName = "dbo.BulkUDT";
                try
                {
                    sqlConn.Open();
                    if (dt != null && dt.Rows.Count != 0)
                    {
                        cmd.ExecuteNonQuery();
                    }
                }
                catch (Exception ex)
                {
                    throw ex;
                }
                finally
                {
                    sqlConn.Close();
                }
                sw.Stop();

                Console.WriteLine("第{0}次插入{1}条数据耗时:{2}", (multiply + 1), dataScale, sw.ElapsedMilliseconds);
                millTime += sw.ElapsedMilliseconds;
            }
            Console.WriteLine("总耗时:{0}毫秒,平均耗时:{1}毫秒", millTime, millTime / 10);
            SqlCommand sqlCmd = new SqlCommand(strDel, sqlConn);
            try
            {
                sqlConn.Open();
                sqlCmd.ExecuteNonQuery();
            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                sqlConn.Close();
            }
            Console.WriteLine("Done!");
        }

这里TVPs方式需要利用Visual Studio 2008采用的自定义数据表类型,这是一个比较新的东西。这里补充几个类型和函数,主要是为了检测数据库中是否存在数据表和数据表类型,如果不存在则进行创建。补充代码如下:

        public enum CheckType
        {
            isTable = 0,
            isType
        }

        protected static int dataScale = 100000;

        public static bool CheckExistsObject(String connString, String objectName, CheckType type)
        {
            String strSQL = "select COUNT(1) from sys.sysobjects where name=‘" + objectName + "‘";
            switch (type)
            {
                case CheckType.isTable:
                    strSQL = "select COUNT(1) from sys.sysobjects where name=‘" + objectName + "‘";
                    break;
                case CheckType.isType:
                    strSQL = "select COUNT(1) from sys.types where name=‘" + objectName + "‘";
                    break;
                default:
                    break;
            }
            using (SqlConnection conn = new SqlConnection(connString))
            {
                conn.Open();
                SqlCommand cmd = new SqlCommand(strSQL, conn);
                int result = Convert.ToInt32(cmd.ExecuteScalar());
                if (0 == result)
                {
                    return false;
                }
            }

            return true;
        }

        public static bool CreateObject(String connString, String objectName, CheckType type)
        {
            String strSQL = "";
            switch (type)
            {
                case CheckType.isTable:
                    strSQL = "Create table " + objectName + " (Id int primary key, UserName nvarchar(32), Pwd varchar(16))";
                    break;
                case CheckType.isType:
                    strSQL = "CREATE TYPE " + objectName + " AS TABLE (Id int, UserName nvarchar(32), Pwd varchar(16))";
                    break;
                default:
                    break;
            }
            using (SqlConnection conn = new SqlConnection(connString))
            {
                conn.Open();
                SqlCommand cmd = new SqlCommand(strSQL, conn);
                cmd.ExecuteNonQuery();
            }

            return true;
        }
        public static DataTable GetTableSchema()
        {
            DataTable dt = new DataTable();
            dt.Columns.AddRange(new DataColumn[]{
                    new DataColumn("Id",typeof(int)),
                    new DataColumn("UserName",typeof(string)),
                    new DataColumn("Pwd",typeof(string))});

            return dt;
        }

调用的方式就很好说了,参见如下测试代码:

        public static void Main(string[] args)
        {
            String conString = "Persist Security Info=False;User ID=sa;[email protected]#;Initial Catalog=testGR;Server=KLH-PC";
            String strType = "BulkUDT";
            String strTable = "BulkTestTable";
            if (!CheckExistsObject(conString, strType, CheckType.isType))
            {
                Console.WriteLine("类型{0}不存在", strType);
                if (CreateObject(conString, strType, CheckType.isType))
                {
                    Console.WriteLine("类型{0}创建成功!", strType);
                }
            }

            if (!CheckExistsObject(conString, strTable, CheckType.isTable))
            {
                Console.WriteLine("表格{0}不存在", strTable);
                if (CreateObject(conString, strTable, CheckType.isTable))
                {
                    Console.WriteLine("表格{0}创建成功!", strTable);
                }
            }
            Console.WriteLine("==================================================");

            //NormalInerst(conString);
            BulkInerst(conString);
            TVPsInerst(conString);

            Console.ReadKey();
        }

-------------------------------------------------------------------------------------------------

直接看效果对比:

<1>第一次运行

<2>第二次和第三次运行

这里考虑到了SQL Server自身缓存的原因,所以进行了多次测试,不过数据量没有变。可以从上述结果中看出:TVPs方式不愧是新出的啊,一代更比一代强!

C# & SQL Server大数据量插入方式对比

时间: 2024-12-28 11:35:23

C# & SQL Server大数据量插入方式对比的相关文章

Sql server 大数据量插入速度慢或丢失数据解决办法

问题描述:我的设备每秒2000条数据插入数据库,2个设备总共4000条,当在程序里面直接用insert语句插入时,两个设备同时插入大概总共能插入约2800条左右,数据丢失约1200条左右,找了好多解决方法,整理了两种效果比较明显的解决办法: 第一种:使用Sql Server函数: 1.将数据组合成字串,使用函数将数据插入内存表,后将内存表数据复制到要插入的表. 2.组合成的字符换格式:'111|222|333|456,7894,7458|0|1|2014-01-01 12:15:16;1111|

SQL Server 大数据量批量插入

private void AddShuJu_Click(object sender, RoutedEventArgs e) { Stopwatch wath = new Stopwatch(); wath.Start(); for (int i = 0; i < 10; i++) { //创建datatable实例 DataTable data = new DataTable(); //填充字段 data = GetFiled(data); for (int count = i * 100000

SQL Server 大数据量分页建议方案

简单的说就是这个 select top(20) * from( select *, rowid = row_number() over(order by xxx) from tb with(nolock) ) data where rowid > 0 order by rowid 或者这样写 select * from( select *, rowid = row_number() over(order by xxx) from tb with(nolock) ) data where rowi

SQL Server 大数据搬迁之文件组备份还原实战

原文:SQL Server 大数据搬迁之文件组备份还原实战 一.本文所涉及的内容(Contents) 本文所涉及的内容(Contents) 背景(Contexts) 解决方案(Solution) 搬迁步骤(Procedure) 搬迁脚本(SQL Codes) 注意事项(Attention) 疑问(Questions) 参考文献(References) 二.背景(Contexts) 有一个数据库大概在700G左右,需要从服务器A搬迁到服务器B,两台服务器网络传输速度可以达到8MB/s,怎么做才能更

SQL SERVER大数据分页

select * from (select rownum r, a.* from (select * from  table_name order by ndatetime desc ) a where rownum <= currentPage * pageSize ) where r > (currentPage - 1) * pageSizeSQL SERVER大数据分页,布布扣,bubuko.com

sql server 大数据, 统计分组查询,数据量比较大计算十分钟内每秒钟执行次数

-- 数据量比较大的情况,统计十分钟内每秒钟执行次数 declare @begintime varchar(100); -- 开始时间 declare @endtime varchar(100); -- 结束时间 declare @num int; -- 结束时间 set @begintime = '2019-08-10 09:10:00' -- 开始时间 set @endtime = '2019-08-10 09:20:00' -- 结束时间 set @num = (select count(

oracle基于3种方法的大数据量插入更新

过程插入更新的3种方法: a.逐条检查插入或更新,同时执行插入或更新 b.逐条merge into(逐条是为了记录过程日志与错误信息) c.基于关联数组的检查插入.更新,通过forall批量sql执行 以下为模拟步骤: 1.创建模拟大表,数据量1亿不分区 create table big_table as SELECT ROWNUM RN,'A' A,'B' B,'C' C,'D' D FROM ( SELECT ROWNUM RN FROM DUAL CONNECT BY ROWNUM <=

SQL SERVER 大数据的分页【测】

我用五百万的数据来测试,有兴趣的大家可以试试千万级的: 1.先添加大量数据 SQL SERVER 百万级数据测试[测] 2.普通 分页 A.ROW_NUMBER()OVER SELECT * FROM ( SELECT ROW_NUMBER() OVER ( ORDER BY T.UserID ) AS row , * FROM Users AS T ) AS TT WHERE TT.row BETWEEN 999000 AND 1000000; 测:[百万级]第一页时间:126MS, 100W

SQL Server大数据导入导出:将一张表的数据导入到另一张表

今天下午休息的时候又被扔给一项任务:把全国的街道数据导入街道表.但是他们扔给我的SQL脚本是从网上down的一个,跟平台这边的数据库设计的完全不一样. 当时的思路是:先把扔给我的脚本在本地生成一个表,然后选出要的数据,批量插入到开发库所在服务器的表. 然后,按照这个思路做了个测试: INSERT INTO dbo.test_Street(Code,CountyCode,CnName,VersionNo,Creator,CreateTime,ValidStatus) /*要插入数据的表*/ SEL