日志服务(Log Service),简称LOG,原SLS。是针对实时数据的一站式服务,在阿里巴巴集团经历大量大数据场景锤炼而成。无需开发就能快捷完成数据采集、消费、投递以及查询分析等功能,帮助提升运维、运营效率,建立DT时代海量日志处理能力。
日志服务本身是流数据存储,实时计算 Flink能将其作为流式数据输入。对于日志服务而言,数据格式类似JSON,示例如下。
-
{
-
"a": 1000,
-
"b": 1234,
-
"c": "li"
-
}
对于实时计算而言,我们需要定义的DDL如下(sls即Log Service)。
-
create table sls_stream(
-
a int,
-
b int,
-
c varchar
-
) with (
-
type ='sls',
-
endPoint ='http://cXXXXXXXXyuncs.com',
-
accessId ='0iXXXXXXXAs',
-
accessKey ='yF60EXXXXXXXPJ2zhCfHU',
-
startTime = '2017-07-05 00:00:00',
-
project ='ali-XXXXX-streamtest',
-
logStore ='strXXXtest',
-
consumerGroup ='consXXXXroupTest1'
-
);
目前默认支持三个属性字段的获取,也支持其他自定义写入的字段。
字段名 | 注释说明 |
---|---|
| 消息源 |
| 消息主题 |
| 日志时间 |
举例
通过
HEADER
关键字获取属性字段。
测试数据
-
__topic__: ens_altar_flow
-
result: {"MsgID":"ems0a","Version":"0.0.1"}
测试代码
-
CREATE TABLE sls_log (
-
__topic__ varchar HEADER,
-
result varchar
-
)
-
WITH
-
(
-
type ='sls'
-
);
-
CREATE TABLE sls_out (
-
name varchar,
-
MsgID varchar,
-
Version varchar
-
)
-
WITH
-
(
-
type ='RDS'
-
);
-
INSERT INTO sls_out
-
SELECT
-
__topic__,
-
JSON_VALUE(result,'$.MsgID'),
-
JSON_VALUE(result,'$.Version')
-
FROM
-
sls_log
测试结果
name(VARCHAT) | MsgID(VARCHAT) | Version(VARCHAT) |
---|---|---|
ens_altar_flow | ems0a | 0.0.1 |
参数 | 备注 | |
---|---|---|
endPoint | 消费端点信息 | 日志服务的ENDPOINT地址 |
accessId | sls读取的accessKey | N/A |
accessKey | sls读取的密钥 | |
project | 读取的sls项目 | |
logStore | project下的具体的logStore | |
consumerGroup | 消费组名 | 用户可以自定义消费组名(没有固定格式) |
startTime | 消费日志开始的时间点 | |
heartBeatIntervalMills | 可选,消费客户端心跳间隔时间 | 默认为10s |
maxRetryTimes | 读取最大尝试次数 | 可选,默认为5 |
batchGetSize | 单次读取logGroup条数 | 可选,默认为10 |
lengthCheck | 单行字段条数检查策略 | 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。 |
columnErrorDebug | 是否打开调试开关,如果打开,会把解析异常的log打印出来 | 可选,默认为false |
注意:
- SLS暂不支持MAP类型的数据。
- 字段顺序支持无序(建议字段顺序和表中定义一致)。
- 输入数据源为Json形式时,注意定义分隔符,并且需要采用内置函数分析JSON_VALUE,否则就会解析失败。报错如下:
2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing error, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]
- batchGetSize设置不能超过1000,否则会报错
- batchGetSize指明的是logGroup获取的数量,如果单条logItem的大小和 batchGetSize都很大,很有可能会导致频繁的GC,这种情况下该参数应注意调小。
日志服务和实时计算字段类型对应关系,建议您使用该对应关系进行DDL声明:
日志服务字段类型 | 流计算字段类型 |
---|---|
STRING | VARCHAR |
本文转自实时计算——
创建日志服务(Log Service)源表