This website requires JavaScript.

[译]SSIS-使用Script Task 监控目录并处理到达的文件.

介绍

在前之前的文章中我们演示了两种监控处理到达文件的方式.Using the WMI Event Watcher Task in SSIS to Process Data FilesUsing the Konesans File Watcher Task in SSIS to Process Data Files 这两个方案各有优缺点, 并不是每个项目都适用.

本篇文章使用 SSIS Script Task 来调用.NET Framework 4.0 Class System.IO.FileSystemWatcher  ,主要为了处理另外两个方式的弊端,当然主要是为了有个平衡. 我们会演示整个流程,并在最后比较这三种方案. 希望可以帮助你选择适用于你项目的最佳方案.

再次说明我们的案列, 我们希望通过SSIS包等待一个Excel文件,当这个Excel文件传入目录的时候,马上进行处理. 与WMI Event Watcher Task Konesans File Watcher Task  类似, 为了减少系统的活动,我们启停SSIS包的时候每分钟检查一次文件.不存在就退出.  使用 Script Task 来提醒我们的包什么时候可以开始处理文件,用极短的时间间隔处理文件可以大大 改善我们的用户体验.

配置需求

下面是本文需要用到的一些东西:

  • 一个名为 ‘drop directory’的目录, 也就是让用户把Excel文件扔进去让我们自动处理的目录.
  • 每个Excel都有一个名为’Products’的表,有固定数量的字段,每个文件可以包含不同数量的数据行,但是格式是统一的.
  • Excel 的文件名每天都会变,但是遵循一个模式,比如 BussinessData.YYYYMMDD.xlsx.  其中 YYYYMMDD可以根据传送的日期更改. (如 BussinessData.20120903.xlsx).
  • 每天处理最少0个文件最多1个文件
  • 数据文件到达的时候必须需马上处理.
  • SSIS包无限循环等待文件到达. 下面是系统环境:

  • Windows 7  (译者注: 我用的Windows10)

  • SQL Server 2012 Evaluation Edition (Database Engine, Management Tools Complete, Integration Services and SQL Server Data Tools (SSDT) selected during install)
  • SQL Server 2012 Data Tools (SSDT) for SSIS development
  • ACE OLE DB Driver 12.0 (install Microsoft Access Database Engine 2010 -or- Access or Excel 2007/2010) CPU方面,Script Task 等待文件的过程中并用不了多少CPU(几乎为0) ,内存方面,因为SSIS包要持续执行来监控文件,因此会一直占用着一些内存.持续运行包的时候我们需要在cpu与内存之间把握好平衡

关于SSIS包长时间的持续运行并没有什么不妥的,用来监控文件被添加进目录的资源其实很少.

设计

我们决定用SSIS 2012 来处理我们的Excel 文件,现在来概述一下实现流程,逻辑如下:

  1. 当文件放到’drop directory’之后,把它移动到’processing directory’ ,用来减少这个文件在处理中被修改的机会.
  2. 加载Excel数据前清空临时表
  3. 把文件从’processing directory’ 加载到临时表
  4. 把文件从’processing directory’ 移动到’archive directory’. 用术语描述就是下面这样:

  5. 使用一个Script Task 来监控 ‘drop directory’ 目录.

  6. 使用File System Task 把文件从’dorp director’ 移动到’processing directory’
  7. 使用Execute SQL Task 清除数据库中用来存储Excel数据的临时表
  8. 使用Data Flow 把Excel文件从’processing directory’导入到临时表.
  9. 使用File System Task 把文件从’processing directory ‘ 移动到’archive directory’.
  10. 退出 这里是整个包的截图

image

开发

这个章节我们会一步步的使用SSIS包来处理Excel

在我们开始之前,请确保你的电脑上有Excel的驱动. 你可以检查ODBC Data Sources 里面Drivers table中有没有ACE ODBC 驱动. 有的话SSIS会用这个.

image

如果你电脑没有装Excel 2007 (或者更高版本),那么可以安装一个支持*.xlsx excel扩展名的驱动. 比如 Microsoft Access Database Engine 2010 ,这个是免费的可以直接从微软官方下载.

让我们开始开发之旅

USE` [master]
`GO`
`CREATE DATABASE` [Inventory]
`GO`
`USE` [Inventory]
`GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE TABLE` dbo.ProductStaging
(
 ProductStagingId `INT 
        IDENTITY`(1,1) NOT NULL,
 ProductName `NVARCHAR`(255) NULL,
 PricePerUnit `DECIMAL`(12, 2) NULL,
 `CONSTRAINT `PK_ProductStaging
`         PRIMARY KEY CLUSTERED`
         (ProductStagingId `ASC`)
)
`GO
#region Help:  Introduction to the script task
/* The Script Task allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services control flow. 
 * 
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script task. */
#endregion

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.IO;
using System.Threading;
#endregion

namespace SqlServerCentral
{
    /// <summary>
    /// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
    /// or parent of this class.
    /// </summary>
    [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {
        #region Help:  Using Integration Services variables and parameters in a script
        /* To use a variable in this script, first ensure that the variable has been added to 
         * either the list contained in the ReadOnlyVariables property or the list contained in 
         * the ReadWriteVariables property of this script task, according to whether or not your
         * code needs to write to the variable.  To add the variable, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and 
         * ReadWriteVariables properties in the Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         * 
         * Example of reading from a variable:
         *  DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
         * 
         * Example of writing to a variable:
         *  Dts.Variables["User::myStringVariable"].Value = "new value";
         * 
         * Example of reading from a package parameter:
         *  int batchId = (int) Dts.Variables["$Package::batchId"].Value;
         *  
         * Example of reading from a project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].Value;
         * 
         * Example of reading from a sensitive project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
         * */

        #endregion

        #region Help:  Firing Integration Services events from a script
        /* This script task can fire events for logging purposes.
         * 
         * Example of firing an error event:
         *  Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
         * 
         * Example of firing an information event:
         *  Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
         * 
         * Example of firing a warning event:
         *  Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
         * */
        #endregion

        #region Help:  Using Integration Services connection managers in a script
        /* Some types of connection managers can be used in this script task.  See the topic 
         * "Working with Connection Managers Programatically" for details.
         * 
         * Example of using an ADO.Net connection manager:
         *  object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
         *  SqlConnection myADONETConnection = (SqlConnection)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
         *
         * Example of using a File connection manager
         *  object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
         *  string filePath = (string)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
         * */
        #endregion

        #region Instance variables

        // we need access to the found file info from the FileSystemWatcher OnFileCreate event in our class 
        // scope. an instance variable may look odd but will do for our purposes
        private FileInfo foundFile = null;

        #endregion

        #region Method: void Main()
        /// <summary>
        /// This method is called when this script task executes in the control flow.
        /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
        /// To open Help, press F1.
        /// </summary>
        public void Main()
        {
            try
            {
                // initialize common variables from DTS variables collection
                string dropDirectory = Dts.Variables["User::WatcherInputDropPath"].Value.ToString();
                string fileMask = Dts.Variables["User::WatcherInputFileMask"].Value.ToString();
                bool includeSubdirectories = Convert.ToBoolean(Dts.Variables["User::WatcherInputIncludeSubdirectories"].Value);

                // look for existing files if configuration suggests we should
                bool findExistingFiles = Convert.ToBoolean(Dts.Variables["User::WatcherInputFindExistingFiles"].Value);
                if (findExistingFiles)
                {
                    FindExistingFile(dropDirectory, fileMask, includeSubdirectories);
                }

                // do we (still) need to look for a file?
                if (foundFile == null)
                {
                    // if we made it here there were no existing files to process (or we didn't check for them per the 
                    // configuration variables) so setup a FileSystemWatcher object per the configuration variables
                    bool timeoutAsWarning = Convert.ToBoolean(Dts.Variables["User::WatcherInputTimeoutAsWarning"].Value);
                    int timeoutSeconds = Convert.ToInt32(Dts.Variables["User::WatcherInputTimeoutSeconds"].Value);
                    int timeoutMilliseconds = (timeoutSeconds == 0 ? -1 : timeoutSeconds * 1000);
                    WatchForFileCreation(dropDirectory, fileMask, includeSubdirectories, timeoutAsWarning, timeoutMilliseconds);
                }

                Dts.TaskResult = (int)ScriptResults.Success;
            }
            catch (Exception e)
            {
                Dts.Events.FireError(0, null, e.Message, string.Empty, 0);
                Dts.TaskResult = (int)ScriptResults.Failure;
            }
        }
        #endregion

        #region Event: void OnFileCreate(object source, FileSystemEventArgs e)
        /// <summary>
        /// Event attached to FileSystemWatcher when a file is created.
        /// </summary>
        /// <param name="source">Event source.</param>
        /// <param name="e">Event arguments.</param>
        private void OnFileCreate(object source, FileSystemEventArgs e)
        {
            PreProcessFoundFile(new FileInfo(e.FullPath));
        }
        #endregion

        #region Method: WatchForFileCreation
        /// <summary>
        /// Sets up a FileSystemWatcher to watch for new files being created.
        /// </summary>
        /// <param name="dropDirectory">Directory to watch</param>
        /// <param name="fileMask">File pattern mask of files being watched for.</param>
        /// <param name="includeSubdirectories">If true all subdirectories are also watched.</param>
        /// <param name="timeoutAsWarning">If true then if watcher times out only a warning is raised, i.e. the Task succeeds.</param>
        /// <param name="timeoutMilliseconds">Number of milliseconds to wait for a file to be initially created. This timeout period 
        /// does not apply to the tiem spent waiting for exclusive access to be gained to the file.</param>
        private void WatchForFileCreation(string dropDirectory,
            string fileMask,
            bool includeSubdirectories,
            bool timeoutAsWarning,
            int timeoutMilliseconds)
        {
            // create a new FileSystemWatcher
            FileSystemWatcher fileSystemWatcher = new FileSystemWatcher();

            // set the path to watch to our 'drop directory'
            fileSystemWatcher.Path = dropDirectory;

            // set the option to watch subdirectories
            fileSystemWatcher.IncludeSubdirectories = includeSubdirectories;

            // set the filter of files to watch for to our 'file mask'
            fileSystemWatcher.Filter = fileMask;

            // add event handler to execute when new files are created
            fileSystemWatcher.Created += new FileSystemEventHandler(OnFileCreate);

            // begin watching
            fileSystemWatcher.WaitForChanged(WatcherChangeTypes.Created, timeoutMilliseconds);

            if (foundFile == null)
            {
                // the file watcher timed out waiting for a file  :-<
                string message = "Timeout waiting for file {Path='" + dropDirectory + "'; Filter='" + fileMask + 
                     "'; IncludeSubdirectories=" + includeSubdirectories.ToString() + "}.";
                if (timeoutAsWarning)
                {
                    // only raise a warning
                    Dts.Events.FireWarning(0, null, message, string.Empty, 0);
                }
                else
                {
                    // raise an error
                    throw new TimeoutException(message);
                }
            }
        }
        #endregion

        #region Method: void PreProcessFoundFile(FileInfo dataFile)
        /// <summary>
        /// Takes actions subsequent to locating a file that allow later processing of the file. This method
        /// reports information to the parent container by firing info events. This method also ensures exclusive 
        /// access to the file can be achieved before returning control to the parent container.
        /// </summary>
        /// <param name="dataFile">File to preprocess.</param>
        private void PreProcessFoundFile(FileInfo dataFile)
        {
            // set the instance variable value to the found file
            this.foundFile = dataFile;

            // local variable to pass to events that require parameters be passed by ref
            bool fireAgain = true;

            // raise an information event saying we found a file (not necessarily that it can be used)
            Dts.Events.FireInformation(0, null, "File found: " + dataFile.FullName, string.Empty, 0, ref fireAgain);

            // We know there is a new file that can be processed because
            // the FileSystemWatcher fired an event, however we do not know if the user or process
            // supplying the file has completed uploading it. We will loop over drop directory
            // looking for files that meet our criteria and once we find one we will make sure
            // the supplier has completed their upload process by checking to see if we can gain 
            // exclusive access to the file. Once we can gain exclusive access to the file we will know
            // the upload is complete and we can allow the rest of the SSIS package to continue.
            WaitForExclusiveAccess(dataFile);

            // store the full file name (includes path) in output variable
            Dts.Variables["User::WatcherOutputFileFullName"].Value = dataFile.FullName;

            // store the file name in output variable
            Dts.Variables["User::WatcherOutputFileName"].Value = dataFile.Name;

            // raise an information event saying we found a file -and- it can be used
            Dts.Events.FireInformation(0, null, "File ready for use: " + dataFile.FullName, string.Empty, 0, ref fireAgain);
        }
        #endregion

        #region Method: void WaitForExclusiveAccess(FileInfo dataFile)
        /// <summary>
        /// Waits until exclusive access to a file can be achieved.
        /// </summary>
        /// <param name="dataFile">File to access.</param>
        private void WaitForExclusiveAccess(FileInfo dataFile)
        {
            // local variable to say how many seconds to wait in between checking if we can gain 
            // exclusive access to the found file
            int secondsToWaitBetweenAttempts = 5;

            // local variable to pass to events that require parameters be passed by ref
            bool fireAgain = true;

            // Loop indefinitely checking if we can access the data file.
            while (1 == 1)
            {
                try
                {
                    // Attempt to gain access to the file.
                    using (Stream stream = new FileStream(dataFile.FullName, FileMode.Open))
                    {
                        // If we made it here no exception was thrown meaning we 
                        // could access the file. We will break out of the loop and allow 
                        // the rest of the package to continue processing.
                        break;
                    }
                }
                catch (IOException)
                {
                    // We are not interested in ending the program when an IOException
                    // occurs in this area. This type of exception means we could not 
                    // gain access to the file.

                    // In general, programming algorithms that leverage exceptions for 
                    // control flow are frowned upon. However in the case of file access 
                    // it is an acceptable pattern.
                }

                // raise an information event saying we could not gain exclusive access to the found file and will wait
                Dts.Events.FireInformation(0, null, "Could not gain exclusive access to file " + foundFile.FullName + 
                                                           ". Waiting " + secondsToWaitBetweenAttempts.ToString() + 
                                                           " seconds before trying again...", string.Empty, 0, ref fireAgain);

                // wait some time before checking whether the file can be used
                Thread.Sleep(secondsToWaitBetweenAttempts * 1000);
            }
        }
        #endregion

        #region Method: void FindExistingFile(string directoryName, string fileMask, bool includeSubdirectories)
        /// <summary>
        /// Check a directory for files that match a file mask.
        /// </summary>
        /// <param name="directoryName">Directory to look for files.</param>
        /// <param name="fileMask">File pattern mask matching files to look for.</param>
        /// <param name="includeSubdirectories">True if subdirectories should also be checked.</param>
        private void FindExistingFile(string directoryName, string fileMask, bool includeSubdirectories)
        {
            // local variable to pass to events that require parameters be passed by ref
            bool fireAgain = true;

            // get the list of files that qualify
            DirectoryInfo directoryInfo = new DirectoryInfo(directoryName);
            FileInfo[] fileInfos;
            if (includeSubdirectories)
            { fileInfos = directoryInfo.GetFiles(fileMask, SearchOption.AllDirectories); }
            else
            { fileInfos = directoryInfo.GetFiles(fileMask, SearchOption.TopDirectoryOnly); }

            // check to see if any files were found
            if (fileInfos.Length > 0)
            {
                // found a file!
                PreProcessFoundFile(fileInfos[0]);

                // raise an info message
                Dts.Events.FireInformation(0, null, "Existing files found: " + fileInfos.Length.ToString(), string.Empty, 0, ref fireAgain);
            }
            else
            {
                // no files found, raise a warning
                Dts.Events.FireWarning(0, null, "No existing files found.", string.Empty, 0);
            }
        }
        #endregion

        #region ScriptResults declaration
        /// <summary>
        /// This enum provides a convenient shorthand within the scope of this class for setting the
        /// result of the script.
        /// 
        /// This code was generated automatically.
        /// </summary>
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion
    }
}
保存脚本并且关闭脚本编辑器,然后点击OK来保存对Script Task的更改. 我们已经成功配置了Script Task ,用来监控一个放在ExcelDrop 目录中的文件(BusinessData*.xlsx),我们已经完成了第一步:使用一个Script Task 来监控 ‘drop directory’ 目录.现在开始剩下的步骤
  1. 实现第二步,我们需要使用 File System Task , 拖动一个到Control Flow 设计页面 image
  2. 双击File System Task 代开编辑器.设置名字为_Move Data File To Processing Directory. image 在Operation 选项里面选择 Move file _image _在Source Connection 我们使用变量User::WatcherOutputFileFullName _image 设置变量用来定义我们存放文件的目的目录image 点击OK完成配置 还有一个就是把_DelayValidation _设置为Ture,这样就可以避免验证错误,因为我们Watcher-output现在是空的. 右击File System Task 点击属性.选择如下. image
  3. 链接Script Task和 File System Task: image 到这里我们第二步使用File System Task 把文件从’dorp director’ 移动到’processing directory’ 就完成了,接着做剩余部分
  4. 清除临时表,我们需要用到Execute SQL Task,同样的拖到我们Control Flow image
  5. 双击改名为_Clear Staging Table _image _新建一个数据库链接 _image SQLStatement 的值如下: TRUNCATE TABLE dbo.ProductStaging; 现在Execute SQL Task 属性页应该显示如下 image 点击OK结束设定 image
  6. 现在做第四步使用Data Flow 把Excel文件从’processing directory’导入到临时表. 这个需要_Data Flow Task,改名为Load Excel Data to Staging Table . _这里需要用到两个数据源,一个是Excel的一个是我们数据库的. image Source 1. 拖动一个Source Assistant 到你的Data Flow里面.通过向导链接我们准备好的Excel文件 image image 保存后我们可以看到Excel Source旁边有个错误图标,这是因为我们还没有定义数据源. 把刚才新建的 Excel Connection的_DelayValidation _设置为True image 双击打开 Excel Source 选中Products表,如下图(译者注:有问题的小伙伴请装驱动. 我电脑装的正版的Office365 并不会自动安装相关驱动) image 右击Excel Source 组件选择属性(或者高亮按F4) 在属性里面把_ValidateExternalMetadata _改为False image 这个步骤很重要,因为正式环境下,当SSIS包执行的时候有可能这个Excel文件不存在, 把这个设置为False 可以让SSIS跳过检测. 我们的Excel数据源已经配置,但是我们要有个动态的参数来提供Excel文件的名字.在属性里面设置Expressions. 如图 image 自此我们的数据源配置完了,接着配置目标, image 最后把_Data Flow Task _的_DelayValidation _也设置为True image
  7. 遵循前面的逻辑我们要把导入到数据库中的文件从’Processing directory” 移动到’ archive directory”  ,为此我们要再拖一个File System Task image

测试

先执行包

image 然后我们把一个BusinessData.YYYYMMDD.xlsx 放进C:@\ExcelDrop  ,SSIS包就会检测到然后马上执行.把数据导入数据库中

image

我们可以检查数据表

image

总结

相对于另外两个方法,Script Task带来的好处为:

  • 减少文件到达目录至SSIS 可以开始处理的时间
  • 减少SSIS包被执行的次数 下面是三个方案的对比总结.

image

引用

资源

ScriptTaskFileWatcherArticle.zip

原文地址 http://www.sqlservercentral.com/articles/Integration+Services+(SSIS)/91665/

0条评论
avatar