AWS StepFunction

Gary Ng
11 min readAug 24, 2024

--

介紹

是由 AWS 提供的一個一系列 event-driven 步驟服務, 而 Step Function 也被稱為 state machine、workflow。簡單說就是透過 AWS 建構一個邏輯係的事件驅動。

有些會在搭配 EMR (大資料處理)、Glue (資料準備)、Batch (批次處理)、Athenna(資料分析)。

以下範例圖為 AWS 官網的流程圖

AWS 官網範例

Workflow Type

Standard Workflow:

適用於長時間運行的 task, 時間至多可以到 1 年

Express Workflow:

適用於有一定效能的 task, timeout 時間至多為 5 分鐘。且 log 需使用 CloudWatch Log。

ASL (Amazon State Language) 說明

Task:

State Machine 執行的一個單位,執行的方式需透過 Activity 或者是 AWS Lambda 甚至是第三方的 API。

Choice:

讓你的 Step Function 擁有條件式判斷

Map:

迭代用的 State, 分為 Inline Map 以及 Distributed Map。

Inline Map 最多可同時處理的數量為 40。

Distributed Map 主要用於大資料的處理,因為它可以同時處理的數量為 10000

Pass:

將資料傳給下一個 state, 用於資料的轉換以及 debug 使用

Parallel:

使你的 Step Function 可以平行處理。

Workflow State

各個資料流的處理順序流程圖

https://docs.aws.amazon.com/step-functions/latest/dg/concepts-input-output-filtering.html

InputPath:

是否只選取傳進來 payload 的一部分資訊,預設為: $, 代表是接收所有傳進來的 payload。

範例:

payload:

{
"result": "message",
"code" : 200
}

倘若只想要擷取 payload 的 result, 則 InputPath 設定為 $.result。

Parameter:

創建一對 key-value pair 的資料當作 input, 資料可以是靜態的,也可以是動態的,動態的話資料則是由 input 或者 context object 而來。假如要使用動態的話則 Key name 需以 .$ 結尾

例如:

Input Payload 內容

{
"result": "message",
"code": 200
}

倘若只想擷取 result 並且多設定一個靜態的資料叫做 http: get

Parameters 內容如下

"Parameters": {
"result.$": "$.result",
"http": "get"
}

ResultSelector:

從 Lambda or Activity 所回傳的結果截取部分的資料或者全部,倘若要完全使用回傳值的話則使用 $,預設為 $。而假如要沿用之前的 input 並且附加 Lambda 的結果,則使用 $.lambdaResult,這樣則會在原先的 input 多增加一個叫做 lambdaResult 的 key name。

範例:

Input Payload

{
"result": "message",
"code": 200
}

Lambda 結果

{
"res": "success"
}

在原先的 Input Payload 多增加 Lambda 的結果

{
"ResultSelector": $.lambdaResult
}

那結果會為

{
"result": "message",
"code": 200,
"lambdaResult": {
"res": "success"
}
}

ResultPath:

擷取 ResultSelector Output 的哪一個部分當作結果,倘若只想擷取 lambdaResult 的話則使用 $.lambdaResult

OutputPath:

擷取 ResultPath Output 當作輸出,預設為 $, 也就是 ResultPath 的所有 Output 都直接輸出。

Payload 過大建議做法:

建議將 payload 儲存到 S3, 然後透過 Distributed Map State 從 S3 讀取 Payload 資料。

因為 Step Function 有 payload 大小的限制,所以才建議使用 S3, 而可能資料過大所以也建議透過 Distributed Map State 進行平行的執行。

服務整合

呼叫整合的服務有三種方式。

  1. Request Response

等待 http response 的回傳不會等到 job 完成才進行到下一個 state

2. Wait for Job Complete (sync)

倘若要使用 sync 的話則在 Reosurce 的值結尾要加上 .sync

3. 等待 task token 被回傳 (Wait until token is returned)

會暫停 workflow 直到收到來自 SendTaskSuccess 或者 SendTaskFailure 的token。

Local Developement & Debugging

待續

Retry & Catch 處理

當 Step Function 執行失敗時,會觸發 Retry 機制而當 Retry 都重新試過,但是都失敗的話重會觸發 Catch 去捕捉錯誤訊息。

Retry 特點:

  1. 只有 Task, Map, Parallel 才有此設定
  2. IntervalSeconds: 針對第一次錯誤發生時,要等待多久才進行 retry
  3. BackoffRate: 每次失敗發生後,要間隔 n 倍才會進行下次的 retry, 例如倘若 BackoffRate 設定為 2, 當第一次 retry 發生時倘若是 3 秒, 則第二次 retry 則需要 6 秒後才會觸發
  4. MaxAttempts: 最多嘗試的次數

Catch 特點:

  1. 只有 Task, Parallel, Map 才有此設定
  2. States.All 捕捉所有的錯誤訊息
  3. States.TaskFailed 捕捉除了 States.Timeout 外的所有錯誤訊息

Practice:

  1. 簡單的 Lambda 實作

首先建立一個 Function Name 為 HelloFunction 的 Lambda

1–1 點選 Create Function

1–2 Function 輸入以下內容

export const handler = async (event, context) => {
// 到時 step function 內容會在 event 裡面
console.log('event => ', event);

const response = {
statusCode: 200,
body: JSON.stringify('Hello ' + event.who),
};
return response;
};

1–3 設定好之後點選 deploy

1–4 點選測試撰寫好的 lambda function 後點選 Invoke

Event Json 輸入

{
"who": "world"
}

1–5 開始新增 Step Function State Machine

1–6 拖拉 Lambda Invoke 至右邊的圖形中

1–7 點選 Execute

執行完後檢查 ASL Definition 可以看到 OutputPath 為 $.Payload, 意思是將 Lambda 執行完的結果放置 Payload 位置。

從 Payload 將資料取出來

所以看到輸出的結果為

2. Lambda Exception Catch

2–1設定要 catch 的 error type

2–2 當錯誤訊息發生的時候要轉到哪個 state

2–3 設定 catch 輸出的錯誤訊息, 這邊直接使用 Lambda Result 取代先前 input 的值

2–3 因為錯誤訊息是 escaped json string, 因此要透過 step funciton 內建的函示將 string 轉換成 json

此行設定到下個 state 的 input parameters

{
"errorMessage.$": "States.StringToJson($.Cause)"
}

3. Choice State

3–1 拖曳一個 Choice Flow

3–2 設定 choice 的條件, 因為前面的 lambda 回傳結構為

{
"statusCode": "xx",
"body": "xx"
}

所以這邊的條件是才會為 $.statusCode

3–3 設定假如成功的話觸發 Lambda

4.Lambda + SNS 發送簡訊

4–1 建立 Standard Topic

4–2 建置 subscription, 並且 endpoint 輸入目標的手機號碼

4–3 拖曳一個 SNS Publish

4–4 發布訊息

4–5 倘若要想 deubg 的話建議開啟 delivery status log

--

--

Gary Ng
Gary Ng

Written by Gary Ng

軟體工程師、後端工程師

No responses yet