天天看点

创建日志服务(Log Service)源表

本页目录

日志服务(Log Service),简称LOG,原SLS。是针对实时数据的一站式服务,在阿里巴巴集团经历大量大数据场景锤炼而成。无需开发就能快捷完成数据采集、消费、投递以及查询分析等功能,帮助提升运维、运营效率,建立DT时代海量日志处理能力。

日志服务本身是流数据存储,实时计算 Flink能将其作为流式数据输入。对于日志服务而言,数据格式类似JSON,示例如下。

  1. {

  2. "a": 1000,

  3. "b": 1234,

  4. "c": "li"

  5. }

对于实时计算而言,我们需要定义的DDL如下(sls即Log Service)。

  1. create table sls_stream(

  2. a int,

  3. b int,

  4. c varchar

  5. ) with (

  6. type ='sls',

  7. endPoint ='http://cXXXXXXXXyuncs.com',

  8. accessId ='0iXXXXXXXAs',

  9. accessKey ='yF60EXXXXXXXPJ2zhCfHU',

  10. startTime = '2017-07-05 00:00:00',

  11. project ='ali-XXXXX-streamtest',

  12. logStore ='strXXXtest',

  13. consumerGroup ='consXXXXroupTest1'

  14. );

目前默认支持三个属性字段的获取,也支持其他自定义写入的字段。

字段名 注释说明

__source__

消息源

__topic__

消息主题

__timestamp__

日志时间

举例

通过 

HEADER

 关键字获取属性字段。

测试数据

  1. __topic__: ens_altar_flow

  2. result: {"MsgID":"ems0a","Version":"0.0.1"}

测试代码

  1. CREATE TABLE sls_log (

  2. __topic__ varchar HEADER,

  3. result varchar

  4. )

  5. WITH

  6. (

  7. type ='sls'

  8. );

  9. CREATE TABLE sls_out (

  10. name varchar,

  11. MsgID varchar,

  12. Version varchar

  13. )

  14. WITH

  15. (

  16. type ='RDS'

  17. );

  18. INSERT INTO sls_out

  19. SELECT

  20. __topic__,

  21. JSON_VALUE(result,'$.MsgID'),

  22. JSON_VALUE(result,'$.Version')

  23. FROM

  24. sls_log

测试结果

name(VARCHAT) MsgID(VARCHAT) Version(VARCHAT)
ens_altar_flow ems0a 0.0.1

参数 备注
endPoint 消费端点信息 日志服务的ENDPOINT地址
accessId sls读取的accessKey N/A
accessKey sls读取的密钥
project 读取的sls项目
logStore project下的具体的logStore
consumerGroup 消费组名 用户可以自定义消费组名(没有固定格式)
startTime 消费日志开始的时间点
heartBeatIntervalMills 可选,消费客户端心跳间隔时间 默认为10s
maxRetryTimes 读取最大尝试次数 可选,默认为5
batchGetSize 单次读取logGroup条数 可选,默认为10
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false
注意:
  • SLS暂不支持MAP类型的数据。
  • 字段顺序支持无序(建议字段顺序和表中定义一致)。
  • 输入数据源为Json形式时,注意定义分隔符,并且需要采用内置函数分析JSON_VALUE,否则就会解析失败。报错如下:
    1. 2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing error, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]

  • batchGetSize设置不能超过1000,否则会报错
  • batchGetSize指明的是logGroup获取的数量,如果单条logItem的大小和 batchGetSize都很大,很有可能会导致频繁的GC,这种情况下该参数应注意调小。

日志服务和实时计算字段类型对应关系,建议您使用该对应关系进行DDL声明:

日志服务字段类型 流计算字段类型
STRING VARCHAR

本文转自实时计算——

创建日志服务(Log Service)源表