概述
使用
全局富化函数做富化时, 需要传递一个字典或者
表格结构做富化. 参考
构建字典与表格做数据富化的各种途径比较.
本篇介绍从使用资源函数
res_log_logstore_pull
从其他logstore拉取数据的做富化的详细实践.关于
res_log_logstore_pull
的参数说明, 参考
这里. 该语法目前支持两种模式去logstore拉取数据,一种是拉取指定时间间隔内的logstore 的数据内容,另外一种是不设置结束时间,持续的拉取目标logstore内容
背景
这里我们有两个logstore,一个是存储个人信息的source_logstore,一个是酒店存储客人入住信息的target_logstore ,我们现在将酒店的入住信息拿来做富化。
注意: 这里采用pull_log接口拉取数据, 富化的logstore并不依赖索引.
个人信息 source_logstore
topic:xxx
city:xxx
cid:12345
name:maki
topic:xxx
city:xxx
cid:12346
name:vicky
topic:xxx
city:xxx
cid:12347
name:mary
酒店入住信息logstore
time:1567038284
status:check in
cid:12345
name:maki
room_number:1111
time:1567038284
status:check in
cid:12346
name:vicky
room_number:2222
time:1567038500
status:check in
cid:12347
name:mary
room_number:3333
time:1567038500
status:leave
cid:12345
name:maki
room_number:1111
基本语法
res_log_logstore_pull(
endpoint,
ak_id,
ak_secret,
project,
logstore,
fields,
from_time=None,
to_time=None,
fetch_include_data=None,
fetch_exclude_data=None,
primary_keys=None,
delete_data=None,
refresh_interval_max=60,
fetch_interval=2):
具体的参数说明请参考
res_log_logstore_pull,需要注意的地方是,res_log_logstore_pull 是一个单独的语法,只负责从目标logstore 拉取数据下来,本身自己并没有做任何富化的操作,所以请不要单独使用res_log_logstore_pull语法,结合e_table_map和e_search_table_map语句一起使用才是有意义的,本篇也会结合e_table_map和e_search_map_table的使用给出一些例子进行演示。
场景1: 获取指定时间内所有的数据
注意: 这里的时间是日志获取时间.
DSL编排语法
res_log_logstore_pull(..., ["cid","name","room_number"],from_time=1567038284,to_time=1567038500)
获取到的数据
#这里我们的语法中 field 填入了 cid,name,room_number 三个字段,,并且指定了时间范围,将会获取这个时间范围内的logstore的所有数据的这三个字段的值
cid:12345
name:maki
room_number:1111
cid:12346
name:vicky
room_number:2222
cid:12347
name:mary
room_number:3333
cid:12345
name:maki
room_number:1111
场景2: 设置黑白名单参数来过滤拉取的数据
1.DSL 编排语法(只设置白名单)
# 设置白名单,只有 room_number 值等于 1111的的数据会被拉去下来
res_log_logstore_pull(..., ["cid","name","room_number","status"],from_time=1567038284,to_time=1567038500,fetch_include_data="room_number:1111")
# 设置了 ferch_include_data 白名单,只有包含 room_numver:1111的数据会被拉去下来,其他数据不会被拉取。
status: check in
cid:12345
name:maki
room_number:1111
status:leave
cid:12345
name:maki
room_number:1111
2.DSL 编排语法(只设置黑名单)
res_log_logstore_pull(..., ["cid","name","room_number","status"],from_time=1567038284,to_time=1567038500,fetch_exclude_data="room_number:1111")
# 设置黑名单 fetch_exclude_data 当数据包含 room_number:1111的时候丢弃这条数据。
status:check in
cid:12346
name:vicky
room_number:2222
status:check in
cid:12347
name:mary
room_number:3333
3.DSL编排语法(同时设置黑白名单)
res_log_logstore_pull(..., ["cid","name","room_number","status"],from_time=1567038284,to_time=1567038500,fetch_exclude_data="status:leave",fetch_include_data="status:check in")
# 黑白名单同时存在的情况下,优先进行黑名单数据的匹配,这里我们填入的是 status:leave的值,当数据包含status:leave的值时候,数据会被直接丢弃,而后匹配白名单,白名单我们填入的是 status:check in 当数据包含 status: check in 的值时候,该数据才会被拉取下来.
status:check in
cid:12345
name:maki
room_number:1111
status:check in
cid:12346
name:vicky
room_number:2222
status:check in
cid:12347
name:mary
room_number:3333
场景3: 开通持续拉取目标logstore 数据
如果目标logstore 的数据是持续写入,我们需要持续的去拉取时候,设置 to_time 参数为None 就可以,同时可以设置fetch_interval 设置拉取的时间间隔,和refresh_interval_max 当拉取遇到错误的时候退火重试的最大时间间隔
res_log_logstore_pull(..., ["cid","name","room_number","status"],from_time=1567038284,to_time=None,fetch_interval=15,refresh_interval_max=60)
# 需要注意的是,在持续拉取的过程中,如果遇到错误,服务器会一直退火重试,直到成功为止,不会停止数据加工进程。
场景4: 开启主键维护拉取的目标logstore数据(暂时不推荐)
注意事项
目前该功能仅限使用所有数据存储在logstore的同一个shard中,所以暂时不推荐使用该功能。
以我们的个人信息logstore 和 酒店信息logstore的数据进行举例,因为logstore和数据库不同,logstore中的数据只能写入,无法删除,所以有的时候我们希望匹配的时候将已经删除的数据不要进行匹配,这时候就需要开启主键维护功能。
需求演示
现在我们想拉取酒店信息logstore中,所有入住还没有离开的客人信息,当status=leave的时候,表示客人已经离开酒店,所以不需要将该信息进行拉取。
res_log_logstore_pull(..., ["cid","name","room_number","status","time"],from_time=1567038284,to_time=None,primary_keys="cid",delete_data="status:leave")
得到的数据
## 可以看到 name为maki 的客人的最后更新status为leave ,已经离开酒店,所以并没有将 maki的数据拉取下来,
time:1567038284
status:check in
cid:12346
name:vicky
room_number:2222
time:1567038500
status:check in
cid:12347
name:mary
room_number:3333
注意
需要注意的是 primary_keys 目前只支持设置单字符串,这个需要设置logstore数据中 值为唯一的字段,比如样例中的cid , 类似数据库的唯一主键,并且当设置primary_keys的时候,delete_data 也必须不为 None,这样才有意义。
进一步参考
欢迎扫码加入官方钉钉群获得实时更新与阿里云工程师的及时直接的支持: