天天看点

Knative Eventing 之 Sequence 介绍

在处理数据时,往往会涉及到一个数据需要进行多次加工,这时候我们一般是通过Pipeline的方式进行处理。那么在Knative Eventing中是否也能支持对一个事件进行分步骤多次处理? 这个还真有。从 0.7 版本开始,Knative Eventing中提供了一个 Sequence 资源模型,可用于事件Pipeline处理。

Sequence 定义

首先我们看一下Sequence Spec定义:

apiVersion: messaging.knative.dev/v1alpha1
kind: Sequence
metadata:
  name: test
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1alpha1
    kind: InMemoryChannel
  steps:
    - ref:
        apiVersion: serving.knative.dev/v1alpha1
        kind: Service
        name: test
  reply:
    kind: Broker
    apiVersion: eventing.knative.dev/v1alpha1
    name: test           

Sequence Spec包括3个部分:

  1. steps: 在step中定义了按照顺序执行的服务,每个服务会对应创建Subscription。
  2. channelTemplate:指定了使用具体的那个Channel
  3. reply:(可选)定义了最后一个step返回结果的响应目标

在 Broker/Trigger 模型中使用 Sequence

我们将创建以下逻辑配置。创建一个 cronjobsource,向 Broker 提供事件,然后创建一个 filter,将这些事件连接到由3个 step 组成的 Sequence 中。然后,我们获取最后的step返回结果事件发送给给Broker,并创建另一个 Trigger,该 Trigger 随后将显示事件结果。

对于这个例子,这里设置一个 Broker 程序、一个 InMemoryChannel 以及一个 Knative Service(用于显示事件结果)。示例使用 default namespace。

如果要使用不同类型的Channel,则需要修改

sequence.spec.channeltemplate

以创建对应的 Channel 资源。

Knative Eventing 之 Sequence 介绍

创建 Knative Service

首先创建3个Knative Service,用于 Sequence 中服务处理

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: first
spec:
  template:
    spec:
      containers:
      - image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
            env:
            - name: STEP
              value: "0"

---
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: second
spec:
  template:
    spec:
      containers:
      - image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
            env:
            - name: STEP
              value: "1"
---
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: third
spec:
  template:
    spec:
      containers:
      - image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
            env:
            - name: STEP
              value: "2"

---           

执行创建命令:

kubectl -n default create -f ./steps.yaml           

创建 Sequence

创建Sequence,这里依次顺序执行[first->second->third]这3个服务。将最终处理的结果发送到broker-test中。

apiVersion: messaging.knative.dev/v1alpha1
kind: Sequence
metadata:
  name: sequence
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1alpha1
    kind: InMemoryChannel
  steps:
    - ref:
        apiVersion: serving.knative.dev/v1alpha1
        kind: Service
        name: first
    - ref:
        apiVersion: serving.knative.dev/v1alpha1
        kind: Service
        name: second
    - ref:
        apiVersion: serving.knative.dev/v1alpha1
        kind: Service
        name: third
  reply:
    kind: Broker
    apiVersion: eventing.knative.dev/v1alpha1
    name: broker-test           

执行如下命令:

kubectl -n default create -f ./sequence.yaml           

创建CronJobSource指向Broker

这里将创建一个 cronjobsource,它将每2分钟发送一个{"message": "Hello world!"} 信息到 broker-test 中。

apiVersion: sources.eventing.knative.dev/v1alpha1
kind: CronJobSource
metadata:
  name: cronjob-source
spec:
  schedule: "*/2 * * * *"
  data: '{"message": "Hello world!"}'
  sink:
    apiVersion: eventing.knative.dev/v1alpha1
    kind: Broker
    name: broker-test           

执行命令如下:

kubectl -n default create -f ./cron-source.yaml           

为Sequence创建Trigger

创建订阅事件类型为:dev.knative.cronjob.event 的 Trigger, 用于Sequence 进行消费处理。

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: sequence-trigger
spec:
  filter:
    sourceAndType:
      type: dev.knative.cronjob.event
  subscriber:
    ref:
      apiVersion: messaging.knative.dev/v1alpha1
      kind: Sequence
      name: sequence           
kubectl -n default create -f ./trigger.yaml           

创建结果订阅 Trigger

创建结果订阅 Trigger,订阅

samples.http.mod3

的事件类型,对 sequence 执行的结果进行显示

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: sequence-display
spec:
  template:
    spec:
      containers:
        - image: gcr.io/knative-releases/github.com/knative/eventing-sources/cmd/event_display
---
apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: sequence-trigger
spec:
  filter:
    sourceAndType:
      type: samples.http.mod3
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1alpha1
      kind: Service
      name: sequence-display
---           

结论

通过 Sequence 资源模型,我们很容易在 Knative Eventing 中实现事件处理的 Pipeline。对于需要多步骤处理的服务尤为适合。

欢迎加入 Knative 交流群

Knative Eventing 之 Sequence 介绍

继续阅读