LogHub(SLS)数据源为您提供读取和写入LogHub(SLS)双向通道的功能,本文为您介绍DataWorks的LogHub(SLS)数据同步的能力支持情况。
使用限制
数据集成离线写LogHub(SLS)时,由于LogHub(SLS)无法实现幂等,Failover重跑任务时会引起数据重复。
支持的字段类型
数据集成支持读写的LogHub(SLS)字段类型如下。
字段类型 | 离线读(LogHub(SLS) Reader) | 离线写(LogHub(SLS) Writer) | 实时读 |
字段类型 | 离线读(LogHub(SLS) Reader) | 离线写(LogHub(SLS) Writer) | 实时读 |
STRING | 支持 | 支持 | 支持 |
其中:
离线写LogHub(SLS)时
会将支持同步的各类型数据均转换成STRING类型后写入LogHub(SLS)。LogHub(SLS) Writer针对LogHub(SLS)类型的转换列表,如下所示。
支持的数据集成内部类型
写入LogHub(SLS)时的数据类型
支持的数据集成内部类型
写入LogHub(SLS)时的数据类型
LONG
STRING
DOUBLE
STRING
STRING
STRING
DATE
STRING
BOOLEAN
STRING
BYTES
STRING
实时读LogHub(SLS)时
会自带以下元数据字段。
LogHub(SLS)实时同步字段
数据类型
说明
LogHub(SLS)实时同步字段
数据类型
说明
__time__
STRING
SLS保留字段:__time__写入日志数据时指定的日志时间,unix时间戳,单位为秒。
__source__
STRING
SLS保留字段:__source__日志来源设备。
__topic__
STRING
SLS保留字段:__topic__topic名称。
__tag__:__receive_time__
STRING
日志到达服务端的时间。开启记录外网IP功能后,服务端接收日志时为原始日志追加该字段。unix时间戳,单位为秒。
__tag__:__client_ip__
STRING
日志来源设备的公网IP。开启记录外网IP功能后,服务端接收日志时为原始日志追加该字段。
__tag__:__path__
STRING
Logtail采集的日志文件路径,Logtail会自动为日志追加该字段。
__tag__:__hostname__
STRING
Logtail采集数据的来源机器主机名,Logtail会自动为日志追加该字段。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源,详细的配置参数解释可在配置界面查看对应参数的文案提示。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
LogHub数据源作为数据来源端,在进行任务配置同步时支持通过LogHub的查询语法、SPL语句(SLS Processing Language是SLS处理日志的语法)对LogHub内的数据进行过滤,具体语法说明请参见附录二:LogHub SPL语法过滤说明。
单表离线同步任务配置指导
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录一:脚本Demo与参数说明。
单表实时同步任务配置指导
操作流程请参见配置单表增量数据实时同步、DataStudio侧实时同步任务配置。
整库离线、整库(实时)全增量、整库(实时)分库分表等整库级别同步配置指导
操作流程请参见数据集成侧同步任务配置。
常见问题
更多其他数据集成常见问题请参见数据集成常见问题。
附录一:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"LogHub",//插件名。
"parameter":{
"datasource":"",//数据源。
"column":[//字段。
"col0",
"col1",
"col2",
"col3",
"col4",
"C_Category",
"C_Source",
"C_Topic",
"C_MachineUUID", //日志主题。
"C_HostName", //主机名。
"C_Path", //路径。
"C_LogTime" //事件时间。
],
"beginDateTime":"",//数据消费的开始时间位点。
"batchSize":"",//一次从日志服务查询的数据条数。
"endDateTime":"",//数据消费的结束时间位点。
"fieldDelimiter":",",//列分隔符。
"logstore":""//:目标日志库的名字。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1 //作业并发数。
"mbps":"12",//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader脚本参数
参数 | 描述 | 是否必选 | 默认值 |
参数 | 描述 | 是否必选 | 默认值 |
endPoint | 日志服务入口endPoint是访问一个项目(Project)及其内部日志数据的URL。它和Project所在的阿里云地域(Region)及Project名称相关。各地域的服务入口请参见服务入口。 | 是 | 无 |
accessId | 访问日志服务的访问密钥,用于标识用户。 | 是 | 无 |
accessKey | 访问日志服务的访问密钥,用来验证用户的密钥。 | 是 | 无 |
project | 目标日志服务的项目名称,是日志服务中的资源管理单元,用于隔离和控制资源。 | 是 | 无 |
logstore | 目标日志库的名称,logstore是日志服务中日志数据的采集、存储和查询单元。 | 是 | 无 |
batchSize | 一次从日志服务查询的数据条数。 | 否 | 128 |
column | 每条数据中的列名,此处可以配置日志服务中的元数据作为同步列。日志服务支持日志主题、采集机器唯一标识、主机名、路径和日志时间等元数据。 列名区分大小写。元数据的写法请参见日志服务机器组。 | 是 | 无 |
beginDateTime | 数据消费的开始时间位点,即日志数据到达LogHub(SLS)的时间。该参数为时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013000),可以和DataWorks的调度时间参数配合使用。 例如,您在节点编辑页面右侧的调度配置,在参数中配置
| 是 | 无 |
endDateTime | 数据消费的结束时间位点,为时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013010),可以和DataWorks的调度时间参数配合使用。 例如,您在节点编辑页面右侧的调度配置,在参数中配置endDateTime=${yyyymmdd},则在日志结束时间处配置为${endDateTime}000000,表示获取的日志结束时间为业务日期后一天的0点0分0秒。详情请参见调度参数支持的格式。 上一周期的endDateTime需要和下一周期的beginDateTime保持一致,或晚于下一周期的beginDateTime。否则,可能无法拉取部分区域的数据。 | 是 | 无 |
若读取LogHub同步缺少数据,请在LogHub控制台检查数据元数据字段receive_time是否在任务配置的时间区间内。
Writer脚本Demo
{
"type": "job",
"version": "2.0",//版本号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "LogHub",//插件名。
"parameter": {
"datasource": "",//数据源。
"column": [//字段。
"col0",
"col1",
"col2",
"col3",
"col4",
"col5"
],
"topic": "",//选取topic。
"batchSize": "1024",//一次性批量提交的记录数大小。
"logstore": ""//目标LogService LogStore的名称。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""//错误记录数。
},
"speed": {
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":3, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer脚本参数
LogHub(SLS) Writer通过数据集成框架获取Reader生成的数据,然后将数据集成支持的类型通过逐一判断转换成STRING类型。当达到您指定的batchSize时,会使用LogService Java SDK一次性推送至LogHub(SLS)。
参数 | 描述 | 是否必选 | 默认值 |
参数 | 描述 | 是否必选 | 默认值 |
endpoint | 日志服务入口endPoint是访问一个项目(Project)及其内部日志数据的URL。它和Project所在的阿里云地域(Region)及Project名称相关。各地域的服务入口请参见:服务入口。 | 是 | 无 |
accessKeyId | 访问日志服务的AccessKeyId。 | 是 | 无 |
accessKeySecret | 访问日志服务的AccessKeySecret。 | 是 | 无 |
project | 目标日志服务的项目名称。 | 是 | 无 |
logstore | 目标日志库的名称,logstore是日志服务中日志数据的采集、存储和查询单元。 | 是 | 无 |
topic | 目标日志服务的topic名称。 | 否 | 空字符串 |
batchSize | LogHub(SLS)一次同步的数据条数,默认1,024条,最大值为4,096。 一次性同步至LogHub(SLS)的数据大小不要超过5M,请根据您的单条数据量大小调整一次性推送的条数。 | 否 | 1,024 |
column | 每条数据中的column名称。 | 是 | 无 |
附录二:LogHub SPL语法过滤说明
LogHub数据源作为数据来源端,在进行任务配置同步时支持通过LogHub的查询语法、SPL语句(SLS Processing Language是SLS处理日志的语法)对LogHub内的数据进行过滤,具体语法说明如下:
SPL的更多详细信息,请参见SPL语法。
场景 | SQL语句 | SPL语句 |
场景 | SQL语句 | SPL语句 |
数据过滤 |
|
|
字段处理与筛选 | 精确选择字段,并将其重命名:
|
|
数据规整 (调用SQL函数) | 转换数据类型、时间解析等:
| 转换数据类型、时间解析等:
|
字段提取 | 正则提取:
JSON提取:
|
|
- 本页导读 (1)
- 使用限制
- 支持的字段类型
- 创建数据源
- 数据同步任务开发
- 单表离线同步任务配置指导
- 单表实时同步任务配置指导
- 整库离线、整库(实时)全增量、整库(实时)分库分表等整库级别同步配置指导
- 常见问题
- 附录一:脚本Demo与参数说明
- 离线任务脚本配置方式
- Reader脚本Demo
- Reader脚本参数
- Writer脚本Demo
- Writer脚本参数
- 附录二:LogHub SPL语法过滤说明