微软BI 之SSIS 系列 - 使用 Script Component Destination 和 ADO.NET 解析不规则文件并插入数据

开篇介绍

这一篇文章是 微软BI 之SSIS 系列 - 带有 Header 和 Trailer 的不规则的平面文件输出处理技巧 的续篇,在上篇文章中介绍到了对于这种不规则文件输出的处理方式。比如下图中的这种不规则文件,第一行,第二行 Header 部分,第三行的内容 Content 部分,最后一行的 Trailer 部分。

在前几个课程 微软BI SSIS 2012 ETL 控件与案例精讲 第43,44,45,46 课中,我分别讲解了如何使用 .Script Component Source 解析不规则文件(第43,44课),如何使用 Script Component 同步 Transformation 转换处理不规则文件(第45课),以及使用异步的 Transformation 转换不规则文件(第46课),今天我们讲解的是 Script Component Destination。即把 Script Component 作为一个 Destination 目标来使用,既然是 Destination 组件,那么就应该是以接受来自上游的 Input 输入并对数据进行清洗处理。并且,一般的 Destination 要么就是数据库表,要么就是文件,即我们是借用 Script Component 往表或者文件中插入数据。

使用 Script Component Destination

我们还是使用之前的不规则文件作为本篇的例子来讲解。这是 微软BI 之SSIS 系列 - 带有 Header 和 Trailer 的不规则的平面文件输出处理技巧 这篇文章中用到的文件格式描述。

和之前的几个课程的配置,文件链接等方式一样,略。添加一个新的 Script Component 并选择使用 Destination。

这里仍然选择 EMPLOYEE 作为 INPUT 输入项,文件源配置参照前几课的设置。

在 Script 中访问 SQL Server 数据库表,应通过 ADO.NET 链接方式访问数据库表。

链接到课程指定的 DEMO 数据库。

跟 Script Component - Source 中处理一样,取一个名称 - CON_ADO。或者先在链接管理器中先创建好一个 ADO.NET 链接管理器,然后再关联也是可以的。

将上方的文件源关联起来,注意到下方 ADO.NET 连接管理器已经创建了。

在 Script 脚本中使用 ADO.NET 编程方式需要引入这个 NameSpace 命名空间。

具体代码部分参照如下,具体讲解请参看视频:

#region Help:  Introduction to the Script Component
/* The Script Component allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services data flow.
 *
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script component. */
#endregion

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.Data.SqlClient; // 需要引入的 ADO.NET 访问
#endregion

/// <summary>
/// This is the class to which to add your code.  Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
    //连接管理器接口
    IDTSConnectionManager100 con;

    //数据库访问对象
    SqlConnection sqlConn;
    SqlCommand sqlCommand1;
    SqlCommand sqlCommand2;
    SqlParameter sqlParameter; 

    // 需要重写的方法 初始化数据库连接
    public override void AcquireConnections(object Transaction)
    {
        // Script Component 关联的 ADO.NET 连接管理器
        con = this.Connections.CONADO;
        sqlConn = (SqlConnection)con.AcquireConnection(null);
    }

    // 初始化SQL语句与参数
    public override void PreExecute()
    {
        //INSERT INTO T046_EMPLOYEE_FILE_EXTRACTION VALUES(@FILE_CREATED_DATE,@TOTAL_EMPLOYEES)
        //INSERT INTO T046_EMPLOYEES VALUES(@EMP_NAME,@POSITION,@HIRED_DATE,@BIRTH_DATE,@EMAIL,@PHONE,@MARRIAGE)
        sqlCommand1 = new SqlCommand("INSERT INTO T046_EMPLOYEE_FILE_EXTRACTION VALUES(@FILE_CREATED_DATE,@TOTAL_EMPLOYEES)",sqlConn);
        sqlCommand2 = new SqlCommand("INSERT INTO T046_EMPLOYEES VALUES(@EMP_NAME,@POSITION,@HIRED_DATE,@BIRTH_DATE,@EMAIL,@PHONE,@MARRIAGE)", sqlConn);

        // For Table T046_EMPLOYEE_FILE_EXTRACTION
        sqlParameter = new SqlParameter("@FILE_CREATED_DATE", SqlDbType.NVarChar, 50);
        sqlCommand1.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@TOTAL_EMPLOYEES", SqlDbType.Int);
        sqlCommand1.Parameters.Add(sqlParameter);

        // For Table T046_EMPLOYEES
        sqlParameter = new SqlParameter("@EMP_NAME", SqlDbType.NVarChar, 50);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@POSITION", SqlDbType.NVarChar, 50);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@HIRED_DATE", SqlDbType.NVarChar, 50);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@BIRTH_DATE", SqlDbType.NVarChar, 50);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@EMAIL", SqlDbType.NVarChar, 50);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@PHONE", SqlDbType.NVarChar, 25);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@MARRIAGE", SqlDbType.NVarChar, 2);
        sqlCommand2.Parameters.Add(sqlParameter);
    } 

    /// <summary>
    /// This method is called once for every row that passes through the component from Input0.
    ///
    /// Example of reading a value from a column in the the row:
    ///  string zipCode = Row.ZipCode
    ///
    /// Example of writing a value to a column in the row:
    ///  Row.ZipCode = zipCode
    /// </summary>
    /// <param name="Row">The row that is currently passing through the component</param>
    public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        if (Row.EMPLOYEE.StartsWith("FILE CREATED DATE"))
        {
            // 则只取文件日期部分
            sqlCommand1.Parameters["@FILE_CREATED_DATE"].Value = Row.EMPLOYEE.Substring(19, 10);
            sqlCommand1.Parameters["@TOTAL_EMPLOYEES"].Value = 0;
            sqlCommand1.ExecuteNonQuery();
        }
        // 如果达到第二行
        else if (Row.EMPLOYEE.StartsWith("TOTAL EMPLOYEES"))
        {
            sqlCommand1.Parameters["@FILE_CREATED_DATE"].Value = "";
            sqlCommand1.Parameters["@TOTAL_EMPLOYEES"].Value = int.Parse(Row.EMPLOYEE.Substring(16, 10));

            sqlCommand1.ExecuteNonQuery();
        }
        else if (Row.EMPLOYEE.StartsWith("*"))
        {
            //不做处理
        }
        else
        {
            // 剩下的部分是主体内容部分,直接按照固定的列位置描述截取字符串
            sqlCommand2.Parameters["@EMP_NAME"].Value = Row.EMPLOYEE.Substring(0, 50);
            sqlCommand2.Parameters["@POSITION"].Value = Row.EMPLOYEE.Substring(50, 50);
            sqlCommand2.Parameters["@HIRED_DATE"].Value = Row.EMPLOYEE.Substring(100, 12);
            sqlCommand2.Parameters["@BIRTH_DATE"].Value = Row.EMPLOYEE.Substring(112, 12);
            sqlCommand2.Parameters["@EMAIL"].Value = Row.EMPLOYEE.Substring(124, 50);
            sqlCommand2.Parameters["@PHONE"].Value = Row.EMPLOYEE.Substring(174, 25);
            sqlCommand2.Parameters["@MARRIAGE"].Value = Row.EMPLOYEE.Substring(199, 1);

            sqlCommand2.ExecuteNonQuery();
        }
    }

    // 直接重写连接释放
    public override void ReleaseConnections()
    {
        base.ReleaseConnections();
    }
}

执行结果。

查询结果如下:

对于第一张表,通过简单的 SQL 语句即可以解决这个合并问题。

总结

其实对比 Script Component Source,Script Component Transformation (同步或异步) 这三种不规则的文件解析方式来说,前面几种是最简单的,特别是 Script Component Source 可以非常直观的看到两个解析之后的 Output 操作。并且我们的这四个案例,不同的解决方式实际上就是把解析不规则文件的过程分别放在了 Source 端,同步的 Transformation 端,异步的 Transformation 端,Destination 端。 不同的位置,解析的过程不一样,对输出的处理不一样。并且通过这样的一个案例可以让我们对 Source, 同步转换,异步转换这些概念更加深刻了。

并且在这些个案例中,我们能够看到 Script Component 强大的自定义编程能力,文件访问,数据库访问,同步转换,异步遍历等不同的解决方案。在实际的 ETL 项目中,我们可以针对不同的场景灵活的使用 Script Component 来解决这些问题。

最后,还是要提醒一下就这种不规则的文件处理不要使用 Script Component Destination 来处理,关于它的效率问题从下图中就可以看得出来。以下测试环境为3G左右的虚机,包括磁盘空间也都很紧张,具体的测试数据在不同的环境下可能表现不同。

而原因其实可以分析出来:

  • Script Component Transformation 异步转换并不是一个完全阻塞组件,它是一个半阻塞组件。在拿到上游的全部或者部分 Buffer 的时候,ProcessInput() 方法 就已经开始工作了,处理一部分 Buffer 就往下输出一部分 Buffer。并且在通过 OLE DB Destination 组件的时候,也是批量插入,因此在本案例中效率最高。
  • Script Component Source 是我以前在项目中经常使用到的一种方式,也是比较喜欢的一种方式。但是通常的做法就一次从文件读取一行,然后输出一行导致效率没有 Transformation 异步转换高。由于文件一般都是200 - 500MB,因此没有遇到特别大的性能问题,所以就没有进一步的优化。
  • Script Component Transformation 同步转换是一个非阻塞组件,但是由于一次处理一行输出一行,这个过程略微花费一点时间。
  • Script Component Destination 并不是一个完整意义上的转换组件,在本案例中是作为一个 Destination 组件来处理,受 ProcessInputRow() 方法限制也是一行一行的通过 ADO.NET 方式插入,因此效率最低。

关于阻塞,半阻塞,同步和异步的文章可以参考 微软BI 之SSIS 系列 - 理解Data Flow Task 中的同步与异步, 阻塞,半阻塞和全阻塞以及Buffer 缓存概念 此文中提到的 Script Component 是以默认的同步 Transformation 转换组件为例,因此归在 Non-Blocking 部分。

更多 BI 文章请参看 BI 系列随笔列表 (SSIS, SSRS, SSAS, MDX, SQL Server)  如果觉得这篇文章看了对您有帮助,请帮助推荐,以方便他人在 BIWORK 博客推荐栏中快速看到这些文章。

时间: 2024-11-04 18:45:36

微软BI 之SSIS 系列 - 使用 Script Component Destination 和 ADO.NET 解析不规则文件并插入数据的相关文章

微软BI 之SSIS 系列 - 使用 Script Task 访问非 Windows 验证下的 SMTP 服务器发送邮件

开篇介绍 大多数情况下我们的 SSIS 包都会配置在 SQL Agent Job 中周期性的按计划执行,比如每天晚上调用 SSIS 包刷新数据,处理 Cube 等.一旦 SSIS 包中出现任何异常,报错,那么配置在 SQL Agent Job 中的通知,邮件提醒就会把这些错误信息发邮件到指定的用户或者系统维护者,这样就起到了一个错误监控的作用. 但是在有的情况下,有一些自定义的 SSIS 调度框架的计划调度都不是通过 SQL Agent Job 配置来完成的.比如我以前在一个小项目中设计过一个

微软BI 之SSIS 系列 - MVP 们也不解的 Scrip Task 脚本任务中的一个 Bug

开篇介绍 前些天自己在整理 SSIS 2012 资料的时候发现了一个功能设计上的疑似Bug,在 Script Task 中是可以给只读列表中的变量赋值.我记得以前在 2008 的版本中为了弄明白这个配置,还特意测试过这个细节,获取错误并理解了这个功能.但是现在回去再次测试 2008 的版本时,发现这个功能在 2008 中其实也是错误的,把我印象中的测试结果完全给推翻了,所以到现在已经搞不清楚我当时到底是如果得出这个错误的. 疑似功能 Bug 描述 在 SSIS 包中定义了用户自定义变量 - PV

微软BI 之SSIS 系列 - 带有 Header 和 Trailer 的不规则的平面文件输出处理技巧

案例背景与需求介绍 之前做过一个美国的医疗保险的项目,保险提供商有大量的文件需要发送给比如像银行,医疗协会,第三方服务商等.比如像与银行交互的 ACH 文件,传送给协会的 ACH Credit 等文件.这些文件格式在美国都是开放的,通用的,可以直接到相关网站下载.也就是说像银行,协会等他们接受这种固定格式的文件,读取数据,读取公司编号进行业务来往或者记录.我当时就是直接在网上搜索到一个 PDF 格式的文件说明,大概有10来页,就是告诉你这个格式是如何定义,应该如何来处理的. 那么这种文件并非像我

微软BI 之SSIS 系列 - 对于平面文件中 NULL 值处理过程中容易极易混淆的几个细节

最近有人问我 OLE DB Destination 中的 Keep Nulls 如何控制 NULL 值的显示,为什么选中了 Keep Nulls 但是数据库中没有 NULL 值? 为什么在 Flat File Source 中勾选上了 Retain null values from the source as null values in the data flow 但是为什么目标表上显示的是一个当前日期,而不是 NULL 值等等,单开此文来解释这些非常容易混淆的概念. 在比较纯粹的 ETL 项

微软BI 之SSIS 系列 - 在 SSIS 中导入 ACCESS 数据库中的数据

开篇介绍 来自 天善学院 一个学员的问题,如何在 SSIS 中导入 ACCESS 数据表中的数据. 在 SSIS 中导入 ACCESS 数据库数据 ACCESS 实际上是一个轻量级的桌面数据库,直接使用文件形式存储.在国内大量使用 ACCESS 作为 BI 数据源并不多,但是在国外特别是美国使用的还比较多,因为他们的 IT 基础起步比较早.在我的第一个美国的医疗保险项目中,就遇到过大量的 ACCESS 数据源,前后总共有 500 多个 ACCESS 表.而现在从国外一些朋友反馈的情况仍然还有在使

微软BI 之SSIS 系列 - 再谈Lookup 缓存

开篇介绍 关于 Lookup 的缓存其实在之前的一篇文章中已经提到了 微软BI 之SSIS 系列 - Lookup 组件的使用与它的几种缓存模式 - Full Cache, Partial Cache, NO Cache 但是还是可能遗漏的部分内容,因此在这里重新总结并补充一下.这是第一篇,还是从理论的角度来讨论 Lookup 缓存的问题:后面有空还会再写一篇,从后台 SQL 执行的情况来理解 Lookup 的工作过程. 并且关于 Lookup 缓存还有其它比较有意思的话题,比如我的这些帖子,大

微软BI 之SSIS 系列 - Lookup 中的字符串比较大小写处理 Case Sensitive or Insensitive

前几天碰到这样的一个问题,在 Lookup 中如何设置大小写不敏感比较,即如何在 Lookup 中的字符串比较时不区分大小写? 实际上就这个问题已经有很多人提给微软了,但是得到的结果就是 Closed and Won’t fix. 说白了,这个就是 By Design,包括到现在的 2012 也没有这个配置选项. https://connect.microsoft.com/SQLServer/feedback/details/339069/ssis-case-sensitive-data-flo

微软BI 之SSIS 系列 - 使用 SQL Profilling Task (数据探测) 检测数据源数据

开篇介绍 SQL Profilling Task 可能我们很多人都没有在 SSIS 中真正使用过,所以对于这个控件的用法可能也不太了解.那我们换一个讲法,假设我们有这样的一个需求 - 需要对数据库表中的一些数据做一些数据分析,比如统计一下数据表中各列中实际数据的长度,各长度区间范围:比如统计一下各数据列中非空字段的比例,表的行数,重复字段等等.那么如果不是专门做过这种数据源数据分析的话,可能不知道用什么方式能够非常快的得到这些信息.写 SQL 语句?我想这个过程也是非常耗费时间和精力的. 实际上

微软BI 之SSIS 系列 - Execute SQL Task 中的 Single Row 与 Full Result Set 的处理技巧

开篇介绍 Execute SQL Task 这个控件在微软BI ETL 项目中使用的频率还是非常高的,也是大部分入门 SSIS 初学者最早接触到的几个控制流控件. 我们通常使用 Execute SQL Task 的场景包含但不止于以下几类: 在从源端加载数据到 Staging 表之前使用 Execute SQL Task 执行一些 Truncate 操作. 执行一些 Log 的插入,更新操作. ETL 过程中的 Merge 语句操作. XML 的输出处理. 关于如何使用 Execute SQL