CloudEvents三部曲:实践篇

2021-02-01 12:12:31 浏览数 (1)

摘要

随着云原生的发展(云原生的下一个五年在哪里?),逐步进入深水区,业界需要一种统一的事件定义和描述规范,以提供跨服务、跨平台的交互能力。CloudEvents事件规范应运而生,并得到了行业的广泛关注,包括主要的云提供商和 SaaS 公司。

对于CloudEvent的介绍、规范说明及实践落地,将以三篇系列文章进行说明,zouyee的同事董琪今天带来《CloudEvent三部曲:实践篇》.

一、产品接入

1.1 场景介绍

Serverless是一项基于事件驱动的函数计算服务,通过使用函数计算产品,函数以弹性、免运维、高可靠的方式运行,用户可以无需采购和维护服务器等基础设施,可以更加专注于函数代码的编写。 目前 CloudEvents 在函数计算中已有广泛的应用,第三方服务接入函数计算服务,需要使用符合 CloudEvents 规范的消息传递数据,方便函数计算平台方对第三方服务的消息进行分发过滤,同时由于规范的通用性,第三方服务一次改造后可以无缝适配到各类符合 CloudEvents 规范的平台上。 此外消息类产品(例如:消息队列,消息服务,事件总线等)也可通过支持 CloudEvents 规范,统一云的事件标准,加速云原生生态集成。

1.2 开发实践

通常情况下,要构建一个CloudEvent,需要使用CloudEvents的软件开发工具包(SDK),利用SDK可以极大方便开发人员进行集成开发,截至 CloudEvents v1.0 规范的发布,CloudEvents 团队支持和维护以下6种SDK:

  • CSharp
  • Go SDK
  • Java SDK
  • JavaScript SDK
  • Python SDK
  • Ruby SDK

以下使用 Go,Python SDK构造符合CloudEvent 1.0 规范的消息接收发送,HTTP/JSON请求转化等功能的范例。

Golang

1)获取依赖 go get github.com/cloudevents/sdk-go/v2@v2.0.0

2)依赖引用 import cloudevents "github.com/cloudevents/sdk-go/v2"

3)发送事件

代码语言:javascript复制
package main

import (
    "log"

  cloudevents "github.com/cloudevents/sdk-go/v2"
)
func main() {
  // The default client is HTTP.
  c, err := cloudevents.NewDefaultClient()
  if err != nil {
    log.Fatalf("failed to create client, %v", err)
  }

  // Create an Event.
  event :=  cloudevents.NewEvent()
  event.SetSource("example/uri")
  event.SetType("example.type")
  event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"})

  // Set a target.
  ctx := cloudevents.ContextWithTarget(context.Background(), "http://localhost:8080/")

  // Send that Event.
  if result := c.Send(ctx, event); !cloudevents.IsACK(result) {
    log.Fatalf("failed to send, %v", result)
  }
}

4)接受事件

代码语言:javascript复制
package main

import (
    "log"

  cloudevents "github.com/cloudevents/sdk-go/v2"
)

func receive(event cloudevents.Event) {
  // do something with event.
    fmt.Printf("%s", event)
}

func main() {
  // The default client is HTTP.
  c, err := cloudevents.NewDefaultClient()
  if err != nil {
    log.Fatalf("failed to create client, %v", err)
  }
  log.Fatal(c.StartReceiver(context.Background(), receive));
}

5)序列化

a. 序列化为JSON

代码语言:javascript复制
event := cloudevents.NewEvent()
event.SetSource("example/uri")
event.SetType("example.type")
event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"})

bytes, err := json.Marshal(event)

b. 反序列化

代码语言:javascript复制
event :=  cloudevents.NewEvent()
err := json.Marshal(bytes, &event)

Python

1) 依赖包安装 pip install cloudevents

2) 事件发送者 通过 Python SDK 中的 CloudEvent 类型构造 CloudEvents 事件,再利用 to_binary函数将其序列化为 JSON 格式的数据,使用 requests框架发送该 JSON 请求。

代码语言:javascript复制
from cloudevents.http import CloudEvent, to_binary
import requests

# 构建CloudEvent结构体
# - The CloudEvent "id" is generated if omitted. "specversion" defaults to "1.0".
attributes = {
    "type": "com.example.sampletype1",
    "source": "https://example.com/event-producer",
}
data = {"message": "Hello World!"}
event = CloudEvent(attributes, data)

# 利用to_binary函数将其序列化为 JSON 格式的数据
headers, body = to_binary(event)

# POST
requests.post("<some-url>", data=body, headers=headers)

3) 接受事件处理

通过 Python SDK 中的 from_http 函数解析出 CloudEvents 事件,并打印事件内容

代码语言:javascript复制
from flask import Flask, request
from cloudevents.http import from_http

app = Flask(__name__)
# create an endpoint at http://localhost:/3000/
@app.route("/", methods=["POST"])
def home():
    # create a CloudEvent
    event = from_http(request.headers, request.get_data())
    # you can access cloudevent fields as seen below
    print(
        f"Found {event['id']} from {event['source']} with type "
        f"{event['type']} and specversion {event['specversion']}"
    )
    return "", 204

if __name__ == "__main__":
    app.run(port=3000)

二、接入方式

2.1 架构

基于事件驱动服务是函数计算的核心功能之一,平台使用 Knative Eventing 的Broker/Trigger 事件处理模型对事件进行过滤分发,此外为了确保跨平台和互操作性,将采用CNCF定义的标准数据格式CloudEvents 进行事件传输。

如上图所示,架构分为三块内容,从左到右分别为事件源,事件接收/转发者,事件消费者。

  1. 事件源

事件源是一种 Kubernetes 定制资源,它提供了一种机制,用于注册来自特定服务系统的一类事件。例如:对象存储事件源,Github事件源等等,因此不同的事件源需要的不同的自定义资源进行描述。

事件源负责获取特定服务系统的事件,并将事件转化为CloudEvents格式事件发送给 Knative Eventing 平台(即 Broker/Trigger事件处理模型)。

  1. 事件接受/转发者

引入Broker/Trigger事件处理模型的目的,是为了搭建一些黑盒子,将具体的实现隐藏起来,用户无需关心底层实现细节。

  • Broker如同事件桶,接收各种不同的事件,这些事件可以通过属性来过滤。
  • Trigger描述了一个过滤器,只有通过了过滤器选择的事件,可以被传送给事件消费者。

如图1所示,用户通过 filter指定感兴趣红色小球的事件,最终只有该类事件被传送给事件消费者(这里是Knative Service,即 KSvc函数)。

  1. 事件消费者

事件消费者可以是某个服务或系统,这里的事件消费者是用户编写的KSvc函数(即处理事件的逻辑代码)。

2.2 实现方式
  1. 第三方接入

第三方服务接入基于knative实现的serverless平台需要提供特定的事件源,Knative社区已维护部分事件源,具体列表请查看:https://github.com/knative/eventing-contrib,如果第三方服务不在社区提供的支持列表中,就需要自定义事件源,有如下常用的几种方式:

ContainerSource 实现简单,是目前大部分自定义事件源的实现方式,也是KNative平台推荐的方式。

ContainerSource 是 Kubernetes 中自定义的 CRD(Custom Resource Definition)资源类型,具体定义如下

主要看以下几个部分:

  1. sink:事件转发的目标对象,这里即图1中介绍的Borker
  2. image:需要开发的镜像,包括了监听具体数据源的事件和转发事件到sink的实现
  3. arg和env:开发者自定义的一些数据通过 arg 和 env 传入镜像

ContainerSource 中 image 镜像部分即需要自定义实现的部分,实现方式根据获取第三方服务事件的不同分为以下两种:

a. 消息队列方式 如下图 2所示,如果第三方服务已适配消息队列,可以将产生的事件发往消息队列,此时 ContainerSource 可以直接从消息队列中消费第三方服务的事件。

b. 直连方式 如下图所示,如果第三方服务未适配消息队列,但服务本身提供事件订阅能力(如 Redis 的键空间特性,Keyspace Notifications future),此时 ContainerSource 可以直接订阅第三方服务的事件,监听服务变化。

注意:无论采用以上哪种方式,ContainerSource 在生成 CloudEvents 事件时,都需要携带 KSVC 目标函数的唯一标识,以供平台侧分发事件时使用。例如:1. 消息队列方式,由于所有事件都从同一个消息队列中获取,此时需要在第三方服务生产事件时就携带目标函数的标识(对象存储产品接入时采用该方式);2. 直连方式,由于 ContainerSource 与第三方服务是一对一关系,所以可以在 ContainerSource 生成 CloudEvents 事件时添加目标函数的标识。

利用 Broker/Trigger 事件处理模型,极大简化了第三方服务接入函数计算的流程。无论使用消息队列方式还是直连方式,产品侧只需要提供适配第三方服务的 ContainerSource 镜像,以供平台侧使用。

  1. 平台侧纳管

平台侧的工作主要是纳管产品侧提供的 ContainerSource,并利用 Trigger 提供事件过滤的能力。

针对 ContainerSource 不同的实现方式,平台侧工作内容也有所区别:

a. 消息队列实现方式

平台侧会创建如下内容:

  1. 一组相同的ContainerSource(用于高可用)
  2. 一个 Broker 类型的资源,用于分发事件
  3. 多个 Trigger 类型资源,用于事件过滤

平台侧会预先创建好 ContainerSource 和 Broker 资源,并提供纳管 Trigger 的增删改查接口用于事件过滤,此时 ContainerSource,Broker,Trigger 对应关系如下图所示:

b. 直连方式

平台侧会创建如下内容:

  1. 多个 ContainerSource 订阅监听不同的服务实例
  2. 一个 Broker 类型的资源,用于分发事件
  3. 多个 Trigger 类型资源,用于事件过滤

平台侧会预先提供好 Broker 资源,并提供纳管 ContainerSource 和 Trigger 的增删改查接口,此时 ContainerSource,Broker,Trigger 对应关系如下图所示:

三、未来展望

通过使用Severless框架,可以大大减少开销和成本。新的架构打破了人们的习惯思维,它让服务器不可见,并提供了一个极具成本效益的服务。它给所有开发人员带来的是软件架构和应用程序部署新方式。

事件驱动通过采用统一的事件标准,CloudEvents来建立云上的事件枢纽,让Serverless开发集成云服务、云边端应用更简单。

云原生发展至今五年有余,如今大家应该跟我有着一样的疑问,云原生的下一个五年,在哪里?

四、参考资料

  • function
  • go sdk
  • sdk-go

END

0 人点赞