Dagster 排隊一大堆 job 時,怎麼讓緊急任務插隊先跑?
情境
某天下午我手賤點了個歷史 backfill,一口氣塞了 20 個 partition 進 Dagster 的 queue。我的 QueuedRunCoordinator 設了 max_concurrent_runs: 10,所以最多同時跑 10 個,剩下 10 個排隊。
然後業務端突然丟訊息進來:
欸,剛剛那份月底結算的報表能不能現在重跑一次?老闆等著看。
我看了一眼 Dagster UI,20 個 backfill run 正在排隊處理中。我以為只能:
- 選項 A:把 backfill 全部 terminate,跑緊急那個,再重新 launch backfill(手動操作 20 次,還要記哪些跑過了)
- 選項 B:等 backfill 跑完(每個 5 分鐘 × 10 batch = 50 分鐘以上)
結果都不對。Dagster 內建就有「插隊」機制 —— dagster/priority tag。
QueuedRunCoordinator 是怎麼決定誰先跑的?
預設 QueuedRunCoordinator 從 queue 取 run 的順序:
- 先看
dagster/prioritytag(整數,越大越優先) - 同 priority 看提交時間(早提交的先跑)
- 檢查
tag_concurrency_limits(不違反並行上限才放行)
預設所有 run 的 priority 都是 0,所以變成「先進先出」。把新 run 的 priority 設成 100,它就會排到 queue 最前面。
三種設 priority 的方式
方式 1:從 UI launchpad 加 tag
最快、最即興。Launch 那個緊急 job 時,在 Tags 那一欄加:
dagster/priority: 100
Code language: HTTP (http)按 Launch 之後它就會跳到 queue 最前面(但不會搶斷已經在跑的 10 個)。
方式 2:CLI 用 --tags 帶進去
如果你習慣 CLI / 自動化腳本:
uv run dg launch \
--assets 'reports/monthly_settlement' \
--tags '{"dagster/priority":"100"}'
Code language: JavaScript (javascript)注意 value 是字串(JSON),不是 int —— coordinator 內部會 int(value) 轉一次。
方式 3:在 code 裡寫死(整個 job / asset 永遠優先)
如果這類 job 本來就應該永遠優先(例如即時告警、面向使用者的互動式報表),直接寫進 definition:
import dagster as dg
# job 級別
urgent_alert_job = dg.define_asset_job(
name="urgent_alert_job",
selection=dg.AssetSelection.assets(alert_signal),
tags={"dagster/priority": "100"}, # ← 這
)
# asset 級別(會 propagate 到 run)
@dg.asset(
tags={"dagster/priority": "100"},
)
def alert_signal() -> pd.DataFrame:
...
Code language: PHP (php)這樣不管是 schedule 觸發、sensor 觸發、還是手動 launch,這個 job 進 queue 一律 priority=100。
兩個重要的「不會」
不會搶斷正在跑的 run
dagster/priority 只決定 QUEUED → STARTED 的順序。已經在跑的那 10 個 backfill run 不會因為你塞一個 priority=100 進去就被砍掉。
如果你真的等不及,得自己手動去 Runs page terminate 一個 running 的,空出 slot,priority 高的這個才會立刻進去。
不會放寬 tag_concurrency_limits
很多人(包括我)一開始會混淆這兩件事。看這個 dagster.yaml:
run_coordinator:
module: dagster._core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 10
tag_concurrency_limits:
- key: db
value: warehouse
limit: 1 # 同時只允許 1 個帶 db=warehouse 的 run
假設 backfill 都帶 db=warehouse,緊急 job 也帶 db=warehouse,那即使緊急 job priority=100:
- 它會被 dequeue 嘗試啟動
- 但 coordinator 一看現在已經有 1 個
db=warehouse在跑 → 塞回去 - 等那 1 個跑完,下次掃 queue 時它仍然會是「最優先」被挑出來的
所以 priority 只改順序,不改並行上限。如果你的瓶頸是 tag concurrency=1,priority 救不了你 —— 它只能保證「下一個出來的會是你」。
設多少數值才合理?
- 預設
0 - 想優先:給個
10、100都行,Dagster 內部就是當 int 排序 - 想壓後(例如批次 backfill 讓道給 daily schedule):給
-10也可以 - 不要用
inf、MAX_INT那種特殊值,沒額外效果還容易讓未來的你混淆
我自己的慣例:
| 場景 | priority |
|---|---|
| 面向使用者的互動式 job(緊急報表、ad-hoc 查詢) | 100 |
| 正常 daily schedule | 10 |
| 一般 ingest | 0(不設) |
| 歷史 backfill / 慢的重算任務 | -10 |
這樣 daily schedule 永遠插得到 backfill 前面,緊急 job 又能插得到 schedule 前面。
小結
- Dagster queue 內建「插隊」機制:
dagster/prioritytag - 三種設法:UI launchpad tag、CLI
--tags、code 裡tags={...}寫死 - 只改 queue 出隊順序,不會搶斷 running run、不會放寬 tagconcurrencylimits
- 想立刻跑:terminate 一個 running 的空出 slot,priority 高的會自動進去
下次再遇到「業務端臨時插單」,不用慌 —— 加個 tag 推進 queue 就好。