基于阿里云函数计算的云工作流CloudFlow设计与体验
基本要求
- 理解掌握阿里云CloudFlow的基本概念,及其与函数计算的可视化编排工具,能够按需设计CloudFlow,通过编排多个函数计算的自动执行工作流,从而构建服务接口
实验原理
阿里云云工作流CloudFlow
本小节主要介绍 CloudFlow的工作流模式、工作流状态及工作流中调用FC的过程。
云工作流模式
阿里云的云工作流服务提供了两种不同的执行模式,以适应不同的业务需求:标准模式和快速模式。标准模式适合于需要长时间运行的离线业务流程,它提供了执行步骤状态的持久化存储和支持长时间工作流执行状态流转的能力。而快速模式则适用于在线业务流程编排和准实时业务流程,它以低延迟和大负载处理能力为特点,适合处理高并发和快速响应的业务场景。两种模式差异对比如下图所示:
工作流状态
在工作流(Flow)的设计中,状态(State)是构成流程的基本单元。状态分为两大类:简单的原子状态和复杂的控制状态。本小节仅介绍工作流状态的基本概念,不同状态配置的差异参见阿里云云工作流官方文档 - 状态流转(https://help.aliyun.com/document_detail/2402078.html)。
原子状态是构成流程的最小单位,包括:
- 集成任务(Task):执行特定任务的操作,任务是编排工作流时使用频率最高的,也是功能最丰富的一种状态。例如 InvokeFunction、Request 等。
- 成功(Succeed):表示流程执行成功的状态。
- 失败(Fail):表示流程执行失败的状态。
- 等待(Wait):流程在此处暂停,直到满足特定条件。
- 传递(Pass):一个简单的状态,直接将控制权传递给下一个状态。
控制状态则用于实现更复杂的流程控制逻辑,包括:
- 选择(Choice):根据条件判断,选择不同的执行路径。
- 并行(Parallel):同时执行多个分支,直到所有分支完成。
- 循环(Map):对集合中的每个元素重复执行相同的操作。
工作流状态间数据传递
[!NOTE]
云工作流的各种状态(State)之间需要传递数据,每个状态(State)将接受前一状态的输入(Input),并返回输出(Output)并将此输出传递给下一个状态(State)。按输入输出时机不同可以分为以下四类:
- 执行输入:也被称为流程输入,指调用工作流时传入的数据。在工作流中需要使用执行输入数据时,可以通过系统表达式
$Context.Execution.Input
进行调用。 - 执行输出:也被称为流程输出,指工作流调用结束后的输出数据。当前流程编排规范设计中,在流程的编排过程中,无法通过系统变量访问流程执行的输出;
- 状态输入:执行到指定工作流状态时的由上一状态输入的数据。配置时可以使用系统表达式
$Context.Current.Input
访问当前状态的输入,也可简写为$Input
。 - 状态输出:指定状态执行结束后的输出数据。配置时可以使用系统表达式
$Context.Current.Output
访问当前状态的输入,也可简写为$OutPut
。
工作流调用FC过程
在云工作流中,当执行 InvokeFunction 任务以调用 FC Web 函数时,是 CloudFlow 在发起 InvokeFunction API 的调用过程。该过程中由 CloudFlow 和 FC 系统自动化完整 API 与 HTTP 请求的相互转换过程。转换流程如下图所示:
a. CloudFlow 作为 Client,向 FC System 发起 InvokeFunction API 调用。
b. FC 系统接收到这个调用,将 API 调用转换为 HTTP 请求(POST /invoke
),并将该请求发送到用户编写的 HTTP Server(FC Web 函数)。
c. HTTP Server(FC Web 函数)处理这个 HTTP 请求,执行业务逻辑,并将结果封装在 HTTP 响应中返回给 FC 系统。
d. FC 系统将 HTTP 响应转换为 Result 对象,并将这个对象作为调用结果返回给 Client(云工作流 CloudFlow)。
e. Client(云工作流 CloudFlow)接收到 Result 对象,可以根据这个结果继续执行工作流中的后续步骤。
[!NOTE]
图片来源:https://help.aliyun.com/zh/functioncompute/web-functions
阿里云CloudFlow工作流调度
工作流调度是事件驱动计算模型中的关键组成部分,它定义了一组规则用以触发工作流的执行。当事件源产生的事件满足这些预设规则时,工作流调度便会自动调用关联的工作流。这种机制允许开发者通过集中和统一的方式管理不同的事件源,无需直接调用工作流,从而简化了开发流程。
工作流调度类型
工作流调度支持多种调度类型,例如HTTP调度和定时调度,每种类型都针对特定的事件源和触发条件进行了优化。此外,工作流调度还提供了同步和异步两种调用模式。同步调用模式在事件处理完成后立即返回结果,而异步调用模式则是在将事件写入内部队列后即返回结果。不同的工作流模式可能会采用不同的调度策略,具体的差异如图所示。
[!NOTE]
阿里云事件总线EventBridge
[!NOTE]
事件是一种包含特定数据和上下文的信号,它描述了某个特定动作或状态的变化。例如,在电商平台中,一个事件可能表示一个订单的创建或支付成功。事件总线则是一个基础设施,它接收来自事件源的事件,并根据定义好的规则将这些事件路由到不同的目标,如函数计算、消息队列等。EventBridge 有两种类型的事件总线:
- 云服务专用事件总线,用于接收阿里云官方事件源的事件;
- 自定义事件总线,用于接收自定义应用或存量消息数据的事件。
阿里云事件总线 EventBridge 在 CloudFlow 工作流调度中扮演着重要角色,将工作流调度使用事件总线的好处在于其解耦和自动化的特性。通过事件驱动的方式,工作流可以在特定的事件发生时自动触发,无需轮询或定时任务。这种方式提高了系统的响应性和可扩展性,因为工作流可以根据事件的实际发生来动态调整资源和处理流程。
实验过程
实验环境准备
获取云工作流免费额度
点击链接进入云工作流产品界面(https://www.aliyun.com/product/aliware/fnf),云工作流 CloudFlow 提供每月前 5000 次步骤免费转换的额度。超出此免费额度,将按照实际使用量进行计费,标准为 1.7 元/万次步骤转换。
创建用于执行云工作流的角色
进入 RAM 访问控制台角色管理界面(https://ram.console.aliyun.com/roles),点击创建按钮。
该角色是为云工作流提供的,故角色类型勾选阿里云服务
。
根据页面提示填写角色名称,如cloud-flow-role
,受信服务选择函数工作流
,填写完成后点击完成
按钮,完成角色创建。
进入角色详情页面,点击新增授权按钮为该角色分配权限,分配的权限包括:AliyunFCFullAccess
(管理函数计算,用于调用FC)、AliyunFnFFullAccess
(管理函数工作流,用于操作工作流)和 AliyunEventBridgeFullAccess
(管理事件总线,用于后续创建工作流调度),勾选好后点击确认新增授权
。
开通事件总线EventBridge
后续步骤中创建的工作流调度需要利用 EventBridge 的事件调度能力,根据预设的事件规则自动触发关联的工作流执行,因此需要开通事件总线 EventBridge。
进入事件总线控制台 (https://eventbridge.console.aliyun.com/overview) 进行免费开通。
开通后将要求对EventBridge进行授权,才能正常收取事件。点击页面底部的一键授权
按钮完成授权。
构建并部署数据上报接口
首先设计部署一个新的 FC,用于接收上报的数据,参见上文《基于阿里云函数计算的简单邮件发送服务设计与体验》。鼓励自行实现数据上报接口的代码。
在此提供一份数据上报接口的 Python Sanic 示例代码。
# -*- coding: utf-8 -*-
import datetime
from sanic import Sanic
from sanic.response import json
app = Sanic("MyApp")
# 温度阈值
t_threshold = (25, 28)
# 湿度阈值
h_threshold = (30, 33)
@app.route("/upload", methods=["POST"])
def data_upload(request):
try:
data = request.json
sn = data.get("sn")
temperature = data.get("temperature")
humidity = data.get("humidity")
if not all([sn, temperature, humidity]):
return json({"error": "Missing required fields"}, status=400)
# 判断温湿度是否超出阈值
t_out_flag = not (t_threshold[0] <= temperature <= t_threshold[1])
h_out_flag = not (h_threshold[0] <= humidity <= h_threshold[1])
email_body = generate_email_body(
sn, temperature, humidity, t_threshold, h_threshold
)
res = {
"status": 1 if t_out_flag or h_out_flag else 0,
"message": "异常" if t_out_flag or h_out_flag else "正常",
"data": {
# 需要自定替换成收件人邮箱地址
"recipient": "@qq.com",
"subject": "告警邮件 - 温湿度异常",
"body": email_body
} if t_out_flag or h_out_flag else None
}
return json(res)
except Exception as e:
return json({"error": str(e)}, status=500)
def generate_email_body(sn, temperature, humidity, t_threshold, h_threshold):
return (
f"告警通知: \n\n"
f"当前设备{sn}的温湿度数据超出正常范围。\n\n"
f"设备温度: {temperature}°C\n"
f"温度阈值: {t_threshold[0]}°C - {t_threshold[1]}°C\n\n"
f"设备湿度: {humidity}%\n"
f"湿度阈值: {h_threshold[0]}% - {h_threshold[1]}%\n\n"
f"请尽快检查设备并采取相应措施。\n\n"
f"时间: {datetime.datetime.now().isoformat()}"
)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=9000)
构建完成后,此时 FC 函数列表中应当有两个函数:fun-temperature-and-humidity-data-upload 和 fun-alarm-email-send。
编排云工作流
创建云工作流
进入云工作流控制台,地域需与 FC 所在地域保持一致。点击创建工作流
按钮,在弹窗中勾选使用空白画布
创建工作流。
本实验不涉及耗时较长的任务,因此推荐采用快速模式
来设置工作流。这种模式简化了工作流的配置步骤,有助于快速理解工作流的基本概念。填写好工作流信息后点击创建
按钮完成工作流的创建。
编排工作流
- 配置执行工作流角色
工作流创建完成后将自动跳转至工作流编辑页面,点击页面中上部的工作流配置
按钮,在执行角色输入框选择之前创建的角色,配置执行工作流时所使用的角色,从而使工作流可以获得角色所拥有的权限。配置完成后,点击右上角保存
按钮。
- 编排数据上报FC
点击页面中上部的 CloudFlow Studio
按钮回到画布编辑页面。删除工作流默认创建的 Hello World,并从左侧操作中拖拽 InvokeFunction
任务状态到画布上。
在右侧的属性栏中基本配置
中设置 InvokeFunction
任务状态的相关配置,推荐步骤名称与 FC 函数名保持一致。函数调用方式选择同步调用
。
在基本配置页面点击切换 YAML 编辑
,在 YAML 框内添加一行 body.$: $Input.body
(后续通过工作流调度来请求工作流时,请求的参数会放在请求参数的 body 字段,故此处需要从输入中取出 body 部分),填写完后点击右上角保存
按钮进行保存。
点击属性配置栏的输出配置
按钮,对当前的 InvokeFunction 的执行输出进行配置。勾选使用 JsonPath 选择部分参数
,填入 $Output.Body
(将数据上报 FC 的执行 JSON 返回值会被携带上 Header 部分,只需要 Body 作为 InvokeFunction 状态输出)。配置完成后点击保存
按钮。
- 编排告警邮件发送 FC
从左侧操作栏新拖拽一个 InvokeFunction 任务状态至 fun-temperature-and-humidity-data-upload
任务与 End 状态
的箭头上,工作流画布将自动链接前驱和后继。
同编排数据上报 FC 配置相似。但配置 YAML 数据负载时,需要填写为 body.$: $Input.data
。因为上一个 InvokeFunction 的输出格式中的 data 部分才是发送告警邮件所需的参数,若数据上报接口代码为自行编写的,则按实际返回值情况进行填写。
- 编排选择步骤
为实现当温湿度异常时才调用发送告警邮件 FC,还需要在现有工作流基础上增加 选择(Choice)
步骤,当温湿度正常时,直接运行至 Succeed 流程
;当温湿度异常时,运行 fun-alarm-email-send
任务。
从左侧流程栏中拖拽 Choice
放置在两个 InvokeFunction 之间,此时工作流将形成如下图所示分支。
从左侧流程中将 Succeed
拖拽至右侧分支进行步骤补全。
在右侧 Choice 的属性设置中配置状态规则:默认规则卡片的下个状态设置为 Succeed
,自定义规则 #1
卡片添加条件 $Input.status == 1
(因为数据上报 FC 返回值中 status 字段为 1 则表示异常),并设置下个状态设置为 fun-alarm-email-send
,设置完成后点击保存按钮。
至此本实验完整的工作流构建完成。
修改现有FC以适配云工作流调用
云工作流的 InvokeFunction 调用 FC Web 函数时,本质上是发起 InvokeFunction API 的调用,由 FC System 将 invoke
调用转换为 POST /invoke
请求,因此需要为现有的两个 FC 新增 /invoke
路由。
fun-alarm-email-send
FC 代码修订,修订完成后点击代码部署
按钮。
@app.route("/send", methods=["POST"])
修订为:
@app.route("/send", methods=["POST"], name='send')
@app.route("/invoke", methods=["POST"], name='invoke')
fun-temperature-and-humidity-data-upload
FC 代码修订,修订完成后点击代码部署
按钮。
@app.route("/upload", methods=["POST"])
修订为:
@app.route("/upload", methods=["POST"], name='upload')
@app.route("/invoke", methods=["POST"], name='invoke')
云工作流测试
创建工作流调度
返回工作流详情
页面,点击工作流调度
标签页,点击创建工作流调度
按钮,在弹窗中选择调度类型为 HTTP/HTTPS 触发
。
选择调度类型后会弹出调度角色授权弹窗,点击授权即可。
填写调度类型名称,请求类型勾选 HTTPS
,请求方法勾选 POST
,设置完成后点击确认
按钮完成创建。
返回工作流详情页面,可以查看当前的工作流调度列表。点击工作流调度的 详情
按钮查看详情信息,并复制 公网访问地址
备用。
模拟温湿度数据上报
利用 Apifox 对上一步骤中复制的公网访问地址发起 POST 请求,参数即为数据上报 FC 的请求参数,且温度在阈值范围内,模拟设备日常情况。
{
"sn": "sn45N2K",
"temperature": 25.5,
"humidity": 30
}
返回工作流详情
页面,点击执行记录
标签,将出现一条执行记录。
点击执行记录详情
按钮,可查看详细的执行过程,包括执行的基本信息,图形视图的执行过程,执行的输入输出数据等信息,此处仅以图形视图为例进行解释,其余数据请自行摸索。
左侧工作流中被标记为绿色的步骤即为实际运行的路径,且执行步骤成功。点击 Choice 步骤观察右侧的输入数据,该数据为上一步骤的状态输出,且此时的 status 为 0,则执行默认分支至 Succeed
步骤。同理可以点击其他步骤查看相应的输出输入。
再次发起模拟数据上报请求,将温度调整至40度,模拟设备出现异常情况。进入新工作流执行记录详情页面,查看调用情况,由于问题超出阈值,数据上报 FC 返回的 status 为1,运行右侧分支进行告警信息发送。
阿里云 CloudFlow 功能十分强大,鼓励进一步探索 CloudFlow 的容错处理、自动重试等高级功能。