数据集成(Data Integration)产品提供数据同步服务,有向导模式和脚本模式两种方式。向导模式更简单,脚本模式更灵活。
本章介绍如何将Table Store中的增量数据通过数据集成的脚本模式同步到OpenSearch中。
途径
数据集成脚本模式
- Reader:OTSStreamReader
- Writer:OSSWriter
配置Table Store
无需配置。
配置OSS
无需配置。
配置数据集成
- 创建Table Store数据源。
说明
- 如果已经创建了Table Store的数据源,可以跳过这一步。
- 如果您不希望创建数据源,也可以在后续的配置页面中配置相应的endpoint、instanceName、AccessKeyID和AccessKeySecret。
创建数据源的具体步骤,请参见创建Table Store数据源。
- 创建OSS数据源。
本操作与上一个步骤类似,只是选择OSS作为数据源。说明 配置OSS数据源的参数时,注意Endpoint不包括bucketName。
- 创建同步任务。
- 登录数据集成控制台。
- 在同步任务页面,选择脚本模式。
- 在弹出的导入模板对话框中,来源类型选择OTS Stream,目标类型选择OSS。
- 单击确认,进入配置页面。
- 完善配置项。
- 在配置界面,已经提前嵌入了OTSStreamReader和OSSWriter的模板,请参考以下解释完成配置。
{ "type": "job", "version": "1.0", "configuration": { "setting": { "errorLimit": { "record": "0" # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。 }, "speed": { "mbps": "1", # 每次同步任务的最大流量。 "concurrent": "1" # 每次同步任务的并发度。 } }, "reader": { "plugin": "otsstream", # Reader插件的名称。 "parameter": { "datasource": "", # Table Store的数据源名称,如果有此项则不再需要配置endpoint,accessId,accessKey和instanceName。 "dataTable": "", # TableStore中的表名。 "statusTable": "TableStoreStreamReaderStatusTable", # 存储TableStore Stream状态的表,一般不需要修改。 "startTimestampMillis": "", # 开始导出的时间点,由于是增量导出,需要循环启动此任务,则这里每次启动的时候的时间都不一样,这里需要设置一个变量,比如${start_time}。 "endTimestampMillis": "", # 结束导出的时间点。这里也需要设置一个变量,比如${end_time}。 "date": "yyyyMMdd", # 导出哪一天的数据,功能和startTimestampMillis、endTimestampMillis重复,这一项需要删除。 "mode": "single_version_and_update_only", # TableStore Stream导出数据的格式,目前需要设置成:single_version_and_update_only。如果配置模板中没有则需要增加。 "column":[ # 需要导出TableStore中的哪些列到OSS中去,如果配置模板中没有则需要增加,具体配置个数由用户自定义设置 { "name": "uid" # 列名,这个是Table Store中的主键 }, { "name": "name" # 列名,这个是Table Store中的属性列。 }, ], "isExportSequenceInfo": false, # single_version_and_update_only 模式下只能是false。 "maxRetries": 30 # 最大重试次数。 } }, "writer": { "plugin": "oss", # Writer插件的名称 "parameter": { "datasource": "", # OSS的数据源名称 "object": "", # 最后备份到OSS的文件名的前缀,建议Table Store实例名/表名/date。比如"instance/table/{date}" "writeMode": "truncate", # 支持truncate|append|nonConflict,truncate会清理已存在的同名文件;append会加到已存在的同名文件内容后面;nonConflict会报错当同名文件存在时。 "fileFormat": "csv", # 文件类型 "encoding": "UTF-8", # 编码类型 "nullFormat": "null", # 当遇到控制时,在文本中如何表示 "dateFormat": "yyyy-MM-dd HH:mm:ss", # # 时间格式 "fieldDelimiter": "," # 每一列的分隔符 } } } }
说明 详细的配置项解释请参见配置OTSStreamReader和配置OSSWriter。
- 单击保存,保存任务。
- 在配置界面,已经提前嵌入了OTSStreamReader和OSSWriter的模板,请参考以下解释完成配置。
- 运行任务。
- 单击页面上方的运行。
- 在弹出的配置框中,配置变量参数。
- 单击确认后开始运行任务。
- 运行结束后登录OSS控制台检查是否成功备份文件。
- 配置调度。
- 单击提交。
- 在弹出的对话框中,配置各项参数。
参数说明如下:参数 描述 调度类型 选择周期调度。 自动重跑 如果勾选,则当失败的时候会默认重试3次,每次间隔2分钟。 生效日期 使用默认值。默认从1970-01-01到一百年后。 调度周期 选择分钟。 起始时间 选择00:00至23:59,表示全天24小时都需要调度。 时间间隔 选择5分钟。 start_time 输入$[yyyymmddhh24miss-10/24/60],表示调度时间减去10分钟。 end_time 输入$[yyyymmddhh24miss-5/24/60],表示调度时间减去5分钟。 date 输入${bdp.system.bizdate},表示调度日期。 依赖属性 如果有依赖则填写,没有则不用填。 跨周期依赖 选择自依赖,等待上一调度周期结束,才能继续运行。 - 单击确认。
周期性的同步任务配置完成,当前配置文件显示为只读状态。
- 查看任务。
- 单击页面上方的运维中心。
- 在左侧导航栏,选择任务列表 > 周期任务,可以查看新创建的同步任务。
- 新建的任务会从第二天00点开始执行。
- 在左侧导航栏中,选择任务运维 > 周期实例,查看每一个预创建的当天同步任务,每个任务相隔5分钟,每个任务处理过去10~5分钟的数据。
- 单击实例名称,可以查看详情。
- 单个任务在运行中或运行结束后,可以查看日志。
- 检查导出到OSS中的数据。
登录OSS控制台,查看是否生成了新的文件,文件内容是否正确。
至此,Table Store数据可以在延迟5~10分钟的基础上自动同步到OSS中了。
原文地址:https://www.cnblogs.com/syncnavigator/p/10193510.html
时间: 2024-11-07 13:46:07