创建一个windows服务用于同步SqlServer数据到Elasticsearch
新建elasticsearch索引
PUT:http://localhost:9200/index_singer/
{ "settings": { "number_of_shards": 5, "number_of_replicas": 1 }, "mappings": { "singer_index": { "properties": { "id": { "type": "long" }, "SerialNumber": { "type": "integer" }, "RealName": { "type": "string" }, "NickName": { "type": "string" }, "Nationality": { "type": "string" }, "Birthplace": { "type": "string" }, "Birthday": { "format": "dateOptionalTime", "type": "date" }, "Constellation": { "type": "string" }, "BriefIntroduction": { "type": "string", "fields": { "no": { "index": "not_analyzed", "type": "string" }, "en": { "analyzer": "english", "type": "string" }, "cn": { "analyzer": "ik", "type": "string" } } }, "IsApprove": { "type": "boolean" }, "CreateBy": { "type": "string" }, "CreateDate": { "format": "dateOptionalTime", "type": "date" }, "ModifyBy": { "type": "string" }, "ModifyDate": { "format": "dateOptionalTime", "type": "date" } } } }, "aliases": { "singer": { } } }
1、创建控制台程序
2、安装Topshelf组件
Install-Package Topshelf
Program
using EsDataImporter.Worker; using Topshelf; namespace EsDataImporter { public class Program { public static void Main() { HostFactory.Run(x => { x.Service<DataWorker>(s => { s.ConstructUsing(name => new DataWorker()); //服务启动执行方法 s.WhenStarted(tc => tc.Start()); //服务结束执行方法 s.WhenStopped(tc => tc.Stop()); }); x.RunAsLocalSystem(); x.SetDescription("Data Synchronism For Elasticsearch"); x.SetDisplayName("EsDataSync"); x.SetServiceName("EsDataSync"); }); } } }
安装:EsDataImporter.exe install
启动:EsDataImporter.exe start
卸载:EsDataImporter.exe uninstall
DataWorker
using EsDataImporter.Common; using EsDataImporter.Job; using Quartz; using System.Threading.Tasks; namespace EsDataImporter.Worker { public class DataWorker { public DataWorker() { } /// <summary> /// windows Service Start执行 /// </summary> public void Start() { Task.Factory.StartNew(() => { if (!CommonHelper.scheduler.IsStarted) { //创建一个作业 IJobDetail singerJob = JobBuilder.Create<ExIndexSinger>() .WithIdentity("sJob") .Build(); ITrigger singerTrigger = TriggerBuilder.Create() .WithIdentity("sTrigger") .StartNow() //现在开始 .WithSimpleSchedule(x => x //触发时间,24小时一次。 .WithIntervalInSeconds(5) //5s执行一次 .RepeatForever()) //不间断重复执行 .Build(); CommonHelper.scheduler.ScheduleJob(singerJob, singerTrigger); //把作业,触发器加入调度器。 CommonHelper.scheduler.Start(); } }); } /// <summary> /// windows Service Stop执行 /// </summary> public void Stop() { } } }
CommonHelper
using Quartz; using Quartz.Impl; using ServiceStack.Orm.Extension.Imples; using ServiceStack.Orm.Extension.Interface; using ServiceStack.OrmLite; using System.Configuration; namespace EsDataImporter.Common { public static class CommonHelper { /// <summary> /// Orm client /// </summary> public static IOrmClient dbClient = dbClient = new OrmClient(ConfigurationManager.ConnectionStrings["mssql"].ConnectionString, SqlServerDialect.Provider); /// <summary> /// 任务调度器实例 /// </summary> public static IScheduler scheduler = StdSchedulerFactory.GetDefaultScheduler(); } }
SqlServer实体模型
using System; using ServiceStack.DataAnnotations; using ServiceStack.Model; using ServiceStack; using ServiceStack.Orm.Extension.Interface; namespace Datebase.Entity { public partial class T_Singer : IDBEntity { [Required] [PrimaryKey] public long ID { get; set; } public int? SerialNumber { get; set; } public string RealName { get; set; } public string NickName { get; set; } public string Nationality { get; set; } public string Birthplace { get; set; } public DateTime? Birthday { get; set; } public string Constellation { get; set; } public string BriefIntroduction { get; set; } public bool? IsApprove { get; set; } public string CreateBy { get; set; } public DateTime? CreateDate { get; set; } public string ModifyBy { get; set; } public DateTime? ModifyDate { get; set; } } }
Elasticsearch实体模型
using Nest; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace EsDataImporter.Model { public class ix_singer { /// <summary> /// 主键ID /// </summary> public long ID { get; set; } /// <summary> /// 歌手序号 /// </summary> public int? SerialNumber { get; set; } /// <summary> /// 真实姓名 /// </summary> public string RealName { get; set; } /// <summary> /// 昵称 /// </summary> public string NickName { get; set; } /// <summary> /// 国籍 /// </summary> public string Nationality { get; set; } /// <summary> /// 出生地 /// </summary> public string Birthplace { get; set; } /// <summary> /// 生日 /// </summary> public DateTime? Birthday { get; set; } /// <summary> /// 星座 /// </summary> public string Constellation { get; set; } /// <summary> /// 简介 /// </summary> public string BriefIntroduction { get; set; } /// <summary> /// 有效性 /// </summary> public bool? IsApprove { get; set; } /// <summary> /// 创建人 /// </summary> public string CreateBy { get; set; } /// <summary> /// 创建时间 /// </summary> public DateTime? CreateDate { get; set; } /// <summary> /// 修改人 /// </summary> public string ModifyBy { get; set; } /// <summary> /// 修改时间 /// </summary> public DateTime? ModifyDate { get; set; } } }
ExIndexSinger:定时任务
using Datebase.Entity; using EmitMapper; using EsDataImporter.Common; using EsDataImporter.Model; using Nest; using Quartz; using System; using System.Collections.Generic; using System.Linq; namespace EsDataImporter.Job { public class ExIndexSinger : IJob { string searchAddress = "http://127.0.0.1:9200/"; //服务地址 string singerIndex = "singer_index"; //索引名称 public void Execute(IJobExecutionContext context) { try { JobIndex(); } catch (Exception x) { //日志 NLog.LogManager.GetCurrentClassLogger().Error(x.ToString()); } } #region 歌手信息索引同步方法 /// <summary> /// 职位索引 /// </summary> private void JobIndex() { var node = new Uri(searchAddress); var settings = new ConnectionSettings(node); var client = new ElasticClient(settings); var ck = client.IndexExists(singerIndex); //文档修改时间-数据库中的新创建文档修改时间就是创建时间 DateTime mtime = new DateTime(1970, 1, 1); if (ck.Exists) { //从es中查询出文档的最后修改时间 var response = client.Search<ix_singer>(s => s.Aggregations(a => a .Max("MaxModifyDate", m => m .Field(p => p.ModifyDate) ) ).From(0).Size(1) ); var maxModifyDate = response.Aggs.Max("MaxModifyDate"); //文档的最新修改时间 mtime = mtime.AddMilliseconds(maxModifyDate.Value ?? 0); } else { //创建该索引 var createIndexResult = client.CreateIndex(singerIndex); var mapResult = client.Map<ix_singer>(e => e.AutoMap()); } var result = QuerySinger(mtime); //使用EmitMapper映射实体对象 ObjectsMapper<T_Singer, ix_singer> mapper = ObjectMapperManager.DefaultInstance.GetMapper<T_Singer, ix_singer>(); var singers = result.Select(e => mapper.Map(e)); if (result.Any()) { //将关系型数据库中的数据同步到es服务中 var response = client.IndexMany<ix_singer>(singers, singerIndex); NLog.LogManager.GetCurrentClassLogger().Error(singers.Count()); } } #endregion #region 数据查询方法 /// <summary> /// 查询出新增和修改的歌手的信息 /// </summary> private List<T_Singer> QuerySinger(DateTime modifyDate) { var expre = CommonHelper.dbClient.From<T_Singer>(); expre.And(e => e.ModifyDate > modifyDate); return CommonHelper.dbClient.Select(expre); } #endregion } }
时间: 2024-10-20 03:20:05