转移,清洗,同步数据

最近看了看公司的导入,清洗,同步数据。想自己也实现下

首先用SqlBulkCopy批量导入,然后用Partition by对要删除的数据进行分组,然后删除ID>1的数据。同步数据就是对源数据进行查询,然后批量更新目标数据。

我用MVC实现了下,代码实现如下:

前台代码

@{
    Layout = null;
}
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <title>数据同步</title>
</head>
<body>
    <input type="button" id="btnCopy" value="复制数据"/>
    <input type="button" id="btnSync" value="同步已有变化的数据"/>
    <input type="button" id="btnClear" value="清洗数据"/>
</body>
</html>
<script src="~/Scripts/jquery-1.10.2.min.js"></script>
<script>
    $("#btnCopy").click(function () {
        $.ajax({
            type: "post",
            url: "/Home/CopyData",
            beforeSend: function () {
                // 禁用按钮防止重复提交
                $("#btnCopy").attr({ disabled: "disabled" });
            },
            success: function (data) {
                if (data == "ok") {
                    alert("恭喜你,复制成功~");
                } else {
                    alert("复制失败!");
                }
            },
            complete: function () {
                $("#btnCopy").removeAttr("disabled");
            },
            error: function (data) {
                console.info("error: " + data.responseText);
            }
        });
    });
    $("#btnSync").click(function () {
        $.ajax({
            type: "post",
            url: "/Home/SyncClick3",
            beforeSend: function () {
                // 禁用按钮防止重复提交
                $("#btnSync").attr({ disabled: "disabled" });
            },
            success: function (data) {
                if (data == "ok") {
                    alert("恭喜你,同步成功~");
                } else {
                    alert("同步失败!");
                }
            },
            complete: function () {
                $("#btnSync").removeAttr("disabled");
            },
            error: function (data) {
                console.info("error: " + data.responseText);
            }
        });
    });

    $("#btnClear").click(function () {
        $.ajax({
            type: "post",
            url: "/Home/SelectDistinct",
            beforeSend: function () {
                // 禁用按钮防止重复提交
                $("#btnClear").attr({ disabled: "disabled" });
            },
            success: function (data) {
                if (data == "ok") {
                    alert("恭喜你,清理成功~");
                } else {
                    alert("清理失败!");
                }
            },
            complete: function () {
                $("#btnClear").removeAttr("disabled");
            },
            error: function (data) {
                console.info("error: " + data.responseText);
            }
        });
    });
</script>

后台代码

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Linq;
using System.Web;
using System.Web.Mvc;
using 数据同步.Models;

namespace 数据同步.Controllers
{
    public class HomeController : Controller
    {
        /// <summary>
        /// 声明:使用本程序时候,要注意【原始表】与【要同步数据库表】结构一致,才能执行成功。并且【所有表的主键】相同才能同步成功。
        /// </summary>     

        public ActionResult Index()
        {
            return View();
        }
        /// <summary>
        /// 转移数据
        /// </summary>
        [HttpPost]
        public ActionResult CopyData()
        {
            string result = "ok";
            #region 简单调用一张表=======
            //DBUtility db = new DBUtility();
            //try {
            //db.BulkCopyTo("Users", "UserID");
            //}
            //catch(Exception ex)
            //{
            //    result = "no";
            //}
            #endregion
            #region 自动循环调用所有表=====
            try
            {
                string conStr = "DB data is syncing!";
                DBUtility db = new DBUtility();
                //DataSet可以比作内存中的数据库,DataTable比作内存中的数据表。DataSet可以存储多个DataTable
                DataSet ds = db.ExecuteDS("SELECT sobjects.name FROM sysobjects sobjects WHERE sobjects.xtype = ‘U‘");//求出所有表名,DataSet表示一个存放数据库中的数据缓存
                DataRowCollection drc = ds.Tables[0].Rows;//获取表名集合
                foreach (DataRow dr in drc)
                {
                    string tableName = dr[0].ToString();//获取数据表
                    conStr = conStr + Environment.NewLine + " syncing table:" + tableName + Environment.NewLine;
                    DataSet ds2 = db.ExecuteDS("SELECT * FROM sys.columns WHERE object_id = OBJECT_ID(‘dbo." + tableName + "‘)");//求出表里面的属性集合,DataSet表示一个存放数据库中的数据缓存
                    DataRowCollection drc2 = ds2.Tables[0].Rows;//获取属性集合
                    if (drc2 != null)
                    {
                        string primaryKeyName = drc2[0]["name"].ToString();//获取主键名称

                        db.BulkCopyTo(tableName, primaryKeyName);

                        conStr = conStr + "Done sync data for table:" + tableName + Environment.NewLine;
                    }
                }
            }
            catch (Exception exc)
            {
                result = "no";
                throw exc;
            }
            #endregion
            return Content(result);
        }

        /// <summary>
        /// 同步数据
        /// </summary>
        [HttpPost]
        public ActionResult SyncClick3()
        {
            string result = "ok";
            #region 简单调用一张表===
            //DBUtility db = new DBUtility();
            //try {
            //List<string> list1 = new List<string>();
            //list1.AddRange(new String[] { "UserID", "UserName","UserAge" });
            //List<string> list2 = new List<string>();
            //list2.Add("UserID");
            //db.BulkUpdateTo("Users", list1, list2);
            //}catch(Exception ex)
            //{
            //    result = "no";
            //}
            #endregion
            #region 自动循环调用所有表===
            try
            {
                string str = "DB data is syncing!";
                DBUtility db = new DBUtility();
                //查询数据库所有表
                DataSet ds = db.ExecuteDS("SELECT sobjects.name FROM sysobjects sobjects WHERE sobjects.xtype = ‘U‘");
                DataRowCollection drc = ds.Tables[0].Rows;//获取表名集合
                DateTime start = DateTime.Now;
                foreach (DataRow dr in drc)
                {
                    string tableName = dr[0].ToString();//获取一个表名
                    str = str + Environment.NewLine + " syncing table:" + tableName + Environment.NewLine;
                    DataSet ds2 = db.ExecuteDS("SELECT * FROM sys.columns WHERE object_id = OBJECT_ID(‘dbo." + tableName + "‘)");//获取表的相关属性信息
                    DataRowCollection drc2 = ds2.Tables[0].Rows;//获取属性集合
                    string primaryKeyName = drc2[0]["name"].ToString();//获取主键名
                    List<string> columns = new List<string>();
                    if (tableName == "Customers")//如果table名字为Customers,则执行下列操作
                    {
                        columns.Add("CustomerName");
                        columns.Add("CustomerId");
                        columns.Add("IsNewData");
                    }
                    else//将属性名称添加进集合
                    {
                        foreach (DataRow dr2 in drc2)
                        {
                            columns.Add(dr2["name"].ToString());//将此表的所有属性写进list集合
                        }
                    }
                    List<string> ignoreUpdateColumns = new List<string>();
                    ignoreUpdateColumns.Add("ID");//添加更新时要忽视的字段
                    db.BulkUpdateTo(tableName, columns, ignoreUpdateColumns);

                    str = str + "Done sync data for table:" + tableName + Environment.NewLine;
                }
                DateTime end = DateTime.Now;
                str = str + "cost total seconds:" + (end - start).TotalSeconds.ToString() + Environment.NewLine;

            }
            catch (Exception ex)
            {
                result = "no";
            }
            #endregion
            return Content(result);

        }

        /// <summary>
        /// 清洗数据
        /// </summary>
        public ActionResult SelectDistinct()
        {
            string result = "ok";
            DBUtility d = new DBUtility();
            bool b=d.CleartData();
            if (!b)
            {
                result = "no";
            }
            return Content(result);
        }
    }
}

DBUtility数据转移清洗同步帮助类

public class DBUtility
    {
        private string Server;
        private string Database;
        private string Uid;
        private string Password;
        public string connectionStr;
        private SqlConnection mySqlConn;
        private string targetServer;
        private string targetDatabase;
        private string targetUid;
        private string targetPassword;
        private string targetConnectionStr;
        private SqlConnection targetConn;
        public void EnsureConnectionIsOpen()
        {
            if (mySqlConn == null)
            {
                mySqlConn = new SqlConnection(this.connectionStr);
                mySqlConn.Open();
            }
            else if (mySqlConn.State == System.Data.ConnectionState.Closed)
            {
                mySqlConn.Open();
            }
            if (targetConn==null)
            {
                targetConn = new SqlConnection(this.targetConnectionStr);
                targetConn.Open();
            }
            else if(targetConn.State == System.Data.ConnectionState.Closed)
            {
                targetConn.Open();
            }
        }
        public DBUtility()
        {
            this.Server = ConfigurationManager.AppSettings["sourceServer"].ToString();
            this.Database = ConfigurationManager.AppSettings["sourceDatabase"].ToString();
            this.Uid = ConfigurationManager.AppSettings["sourceUid"].ToString();
            this.Password = ConfigurationManager.AppSettings["sourcePassword"].ToString();
            this.connectionStr = "Server=" + this.Server + ";Database=" + this.Database + ";User Id=" + this.Uid + ";Password=" + this.Password;
            this.targetServer = ConfigurationManager.AppSettings["targetServer"].ToString();
            this.targetDatabase = ConfigurationManager.AppSettings["targetDatabase"].ToString();
            this.targetUid = ConfigurationManager.AppSettings["targetUid"].ToString();
            this.targetPassword = ConfigurationManager.AppSettings["targetPassword"].ToString();
            this.targetConnectionStr = "Server=" + this.targetServer + ";Database=" + this.targetDatabase + ";User Id=" + this.targetUid + ";Password=" + this.targetPassword;
        }

        public int ExecuteNonQuery(string sqlStr,SqlConnection connStr)
        {
            this.EnsureConnectionIsOpen();
            SqlCommand cmd = new SqlCommand(sqlStr, connStr);
            cmd.CommandType = CommandType.Text;
            return cmd.ExecuteNonQuery();
        }
        public object ExecuteScalar(string sqlStr)
        {
            this.EnsureConnectionIsOpen();
            SqlCommand cmd = new SqlCommand(sqlStr, mySqlConn);
            cmd.CommandType = CommandType.Text;
            return cmd.ExecuteScalar();
        }
        public DataSet ExecuteDS(string sqlStr)
        {
            DataSet ds = new DataSet();
            this.EnsureConnectionIsOpen();
            SqlDataAdapter sda = new SqlDataAdapter(sqlStr, mySqlConn);
            sda.Fill(ds);
            return ds;
        }
        #region 转移数据
        /// <summary>
        /// 转移数据
        /// </summary>
        public void BulkCopyTo(string TableName, string PrimaryKeyName)
        {
            this.EnsureConnectionIsOpen();
            SqlConnection destinationConnector = new SqlConnection(targetConnectionStr);
            SqlCommand cmd = new SqlCommand("SELECT * FROM " + TableName, mySqlConn);//查询本地数据
            destinationConnector.Open();
            SqlDataReader readerSource = cmd.ExecuteReader();//执行查询
            bool isSourceContainsData = false;//是否有数据
            string whereClause = " where ";
            while (readerSource.Read()) //读取源数据
            {
                isSourceContainsData = true;
                whereClause += " " + PrimaryKeyName + "=" + readerSource[PrimaryKeyName].ToString() + " or ";
            }
            whereClause = whereClause.Remove(whereClause.Length - " or ".Length, " or ".Length);
            readerSource.Close();

            whereClause = isSourceContainsData ? whereClause : string.Empty;
            // Select data from Products table
            cmd = new SqlCommand("SELECT * FROM " + TableName + whereClause, mySqlConn);//根据条件查询本地数据库
            // Execute reader
            SqlDataReader reader = cmd.ExecuteReader();//执行查询
            // Create SqlBulkCopy
            SqlBulkCopy bulkData = new SqlBulkCopy(destinationConnector);
            // Set destination table name
            bulkData.DestinationTableName = TableName;
            // Write data
            bulkData.WriteToServer(reader);//将本地查询的数据写入远程
            // Close objects
            bulkData.Close();
            destinationConnector.Close();
            mySqlConn.Close();
        }
        #endregion

        #region 更新同步数据
        /// <summary>
        /// 更新同步数据
        /// </summary>
        public void BulkUpdateTo(string targetTableName, List<string> columns, List<string> ignoreUpdateColumns)
        {
            string primaryKeyName = columns[0];//从属性里面获取主键名称
            SqlConnection destinationConnector = new SqlConnection(targetConnectionStr);

            SqlCommand cmd = new SqlCommand("SELECT * FROM " + targetTableName, destinationConnector);//查询远程数据
            this.EnsureConnectionIsOpen();
            destinationConnector.Open();

            Dictionary<int, string> Index_PrimaryKeyValue = new Dictionary<int, string>();

            SqlDataReader readerSource = cmd.ExecuteReader();//执行查询
            Dictionary<string, Dictionary<string, string>> recordsDest = new Dictionary<string, Dictionary<string, string>>();
            int i = 0;
            while (readerSource.Read())//读取远程查询结果
            {
                Index_PrimaryKeyValue.Add(i, readerSource[primaryKeyName].ToString());//获取主键值
                string recordIndex = Index_PrimaryKeyValue[i];
                recordsDest[recordIndex] = new Dictionary<string, string>();
                foreach (string keyName in columns)
                {
                    recordsDest[recordIndex].Add(keyName, readerSource[keyName].ToString());
                }
                i++;
            }
            cmd = new SqlCommand("SELECT * FROM " + targetTableName, mySqlConn);//查询本地数据
            SqlDataReader reader = cmd.ExecuteReader();//执行查询
            Dictionary<string, Dictionary<string, string>> recordsSource = new Dictionary<string, Dictionary<string, string>>();

            Dictionary<int, string> Index_PrimaryKeyValue2 = new Dictionary<int, string>();
            int j = 0;
            while (reader.Read())//读取本地数据
            {
                Index_PrimaryKeyValue2.Add(j, reader[primaryKeyName].ToString());
                string recordIndex = Index_PrimaryKeyValue2[j];
                recordsSource[recordIndex] = new Dictionary<string, string>();
                foreach (string keyName in columns)
                {
                    recordsSource[recordIndex].Add(keyName, reader[keyName].ToString());
                }
                j++;
            }
            reader.Close();
            readerSource.Close();

            foreach (var record in recordsSource)
            {
                string setScripts = string.Empty;
                int setScriptsIndex = 0;
                string primaryKeyValue = record.Key;
                foreach (string keyName in columns)
                {
                    if (!ignoreUpdateColumns.Contains(keyName))
                    {
                        if (recordsDest == null)
                        {

                        }
                        else
                        {
                            if (setScriptsIndex == 0)
                            {
                                setScripts += keyName + "=‘" + recordsSource[primaryKeyValue][keyName] + "‘ ";
                            }
                            else
                            {
                                setScripts += "," + keyName + "=‘" + recordsSource[primaryKeyValue][keyName] + "‘ ";
                            }
                            setScriptsIndex++;
                        }
                    }
                }
                //update source to dest
                if (setScriptsIndex > 0)
                {
                    cmd = new SqlCommand("Update " + targetTableName + " set " + setScripts + " where " + primaryKeyName + "=‘" + recordsSource[primaryKeyValue][primaryKeyName] + "‘", destinationConnector);//根据主键进行更新
                    cmd.ExecuteNonQuery();
                }
            }
            destinationConnector.Close();
            mySqlConn.Close();
        }
        #endregion

        #region 清洗数据--(将name相同的去除掉)这个可以根据业务需要改成要清洗的表
        /// <summary>
        /// 清洗数据--(将name相同的去除掉)这个可以根据业务需要改成要清洗的表
        /// </summary>
        public bool CleartData()
        {
            this.EnsureConnectionIsOpen();
            string sql = @"select ROW_NUMBER() OVER(PARTITION BY Name ORDER BY ID) NameSource,ID into #1 from Test2
                           delete Test2 where ID in(select ID from #1 where NameSource>1)
                           drop table #1
                            ";
            int n = ExecuteNonQuery(sql, targetConn);
            return n > 0;
        }
        #endregion

        private bool ColumnEqual(object A, object B)
        {
            if (A == DBNull.Value && B == DBNull.Value) //     两个都是   DBNull.Value
                return true;
            if (A == DBNull.Value || B == DBNull.Value) //     只有一个是   DBNull.Value
                return false;
            return (A.Equals(B));//   正常比较
        }

        public void Dispose()
        {
            if (mySqlConn != null)
                mySqlConn.Close();
        }
    }
时间: 2024-11-09 00:05:18

转移,清洗,同步数据的相关文章

MySQL向redis同步数据

##创建表及数据 CREATE TABLE events_all_time ( id int(11) unsigned NOT NULL AUTO_INCREMENT, action varchar(255) NOT NULL, count int(11) NOT NULL DEFAULT 0, PRIMARY KEY (id), UNIQUE KEY uniq_action (action) ); insert into events_all_time(action,count) values

十八、Rsync 远程同步数据

在linux系统下数据备份的工具.Rsync不仅可以远程同步数据(类似于scp [1]),当然还可以本地同步数据(类似于cp),但不同于cp或scp的一点是,rsync不像cp/scp一样会覆盖以前的数据(如果数据已经存在),它会先判断已经存在的数据和新数据有什么不同,只有不同时才会把不同的部分覆盖掉.如果你的linux没有rsync命令请使用 yum install -y rsync 安装. 下面阿铭先举一个例子,然后再详细讲解rsync的用法: [[email protected] ~]#

zookeeper源码分析三LEADER与FOLLOWER同步数据流程

根据二)中的分析,如果一台zookeeper服务器成为集群中的leader,那么一定是当前所有服务器中保存数据最多的服务器,所以在这台服务器成为leader之后,首先要做的事情就是与集群中的其它服务器(现在是follower)同步数据,保证大家的数据一致,这个过程完毕了才开始正式处理来自客户端的连接请求. 首先来看Leader做的工作:二)中提到的同步数据时使用的逻辑时钟,它的初始值是0,每次选举过程都会递增的,在leader正式上任之后做的第一件事情,就是根据当前保存的数据id值,设置最新的逻

2-3-2 rsync+inotify备份同步数据

RSYNC = Remote Sync 远程同步 高效,一定要结合shell 官网:https://rsync.samba.org Author: Andrew Tridgell, Wayne Davison, and others Andrew Tridgell是Samba项目的领导者和主要开发人员,同时还在参与开发rsync\Linux Kernel. 与SCP的比较:scp=无法备份大量数据,类似windows的复制 rsync=边复制 ,边统计,边比较 Rsync特性和优点 可以镜像保存

Sql2008 r2 使用ftp 公布和订阅方式同步数据

Sql2008 r2使用公布和订阅方式同步数据 因为非常多图片 本篇没有图片 详情能够进入下载页  http://download.csdn.net/download/yefighter/7603741 1:公布服务器:公布方 sql2008 r2 iis7.5 windows server 2008 请登入服务器进行操作 不要用sqlserver远程连接 必须开启sqlserver agent服务以及开机自己主动启动 右键属性 打开sqlserver 点击新建本地公布 第一次公布的时候 会提示

(四)redis 主从同步数据

主从架构可以本机多实例数据库之间实现,也可以异机多实例之间实现. 主可读可写,备只读,这样就可以实现读写分离的架构. redis主从复制的特点: 1.一台master可以拥有多个slave(1对多的关系) 2.多个slave可以连接同一个master外,还可以连接到其他slave 这样做的原因是,如果master 挂掉之后,其中的一台slave立马可以充当master 的角色 整个服务流程可以不受影响 3.复制过程不会阻塞master,在同步数据的同时,master可以继续处理client请求.

多学一点(十四)——服务器间通过rsync和inotify-tools动态同步数据

Linux 下的 rsync 命令非常强大,多用来同步不同服务器上的数据同步.以前我们通常使用 crond 来实现,但 crond 很难做到实时同步.下面介绍一种方法,通过在脚本中结合使用 inotify 及 rsync实现数据实时同步. 1.安装 rsync: [[email protected] ~]# yum install -y openssh-clients rsync 这里要注意两点,一是进行同步的两台服务器上均需安装 rsync ,二是 rsync 需要依赖 openssh-cli

rsync + inotify 实时同步数据

简介 Rsync (remote sync)是一款开源.快速,多功能.可实现增量的本地或远程数据镜像同步备份优秀工具.它可通过 LAN/WAN 快速同步多台主机间的文件.rsync 当前由 rsync.samba.org 维护.rsync 使用所谓的"rsync演算法"来使本地和远程主机之间的文件达到同步,该算法只传送两个文件中的不同部分,而不是每次都整份传送,因此速度相当快. 特点:   1.可以镜像保存整个目录树或文件系统:   2.可实现增量同步,既只同步发生变化的数据,因此数据

”免“验证rsync-inotify同步数据

拓扑图: 实验中我先利用rsync在192.168.150.151下载192.168.150.150的源数据到本地,然后在192.168.150.150利用rsync和inotify实时同步数据到192.168.150.151. rsync [选项]源位置   目标位置 -r:递归模式,包含目录及子目录中的所有文件 -l:复制符号链接文件 -v:显示同步过程 -a:归档模式,保留文件的权限,属性等信息 -z:在传输过程中压缩 -p:保留权限标记 -t:保留文件的时间标记 -H:保留硬链接 -A: