Dagster 排隊一大堆 job 時,怎麼讓緊急任務插隊先跑?

已經塞了 20 個 backfill 進 queue,臨時想插一個緊急 job 先跑,是不是只能乾等?其實用 `dagster/priority` tag 就能插隊。

Dagster 排隊一大堆 job 時,怎麼讓緊急任務插隊先跑?

情境

某天下午我手賤點了個歷史 backfill,一口氣塞了 20 個 partition 進 Dagster 的 queue。我的 QueuedRunCoordinator 設了 max_concurrent_runs: 10,所以最多同時跑 10 個,剩下 10 個排隊。

然後業務端突然丟訊息進來:

欸,剛剛那份月底結算的報表能不能現在重跑一次?老闆等著看。

我看了一眼 Dagster UI,20 個 backfill run 正在排隊處理中。我以為只能:

  • 選項 A:把 backfill 全部 terminate,跑緊急那個,再重新 launch backfill(手動操作 20 次,還要記哪些跑過了)
  • 選項 B:等 backfill 跑完(每個 5 分鐘 × 10 batch = 50 分鐘以上)

結果都不對。Dagster 內建就有「插隊」機制 —— dagster/priority tag。


QueuedRunCoordinator 是怎麼決定誰先跑的?

預設 QueuedRunCoordinator 從 queue 取 run 的順序:

  1. 先看 dagster/priority tag(整數,越大越優先)
  2. 同 priority 看提交時間(早提交的先跑)
  3. 檢查 tag_concurrency_limits(不違反並行上限才放行)

預設所有 run 的 priority 都是 0,所以變成「先進先出」。把新 run 的 priority 設成 100,它就會排到 queue 最前面。


三種設 priority 的方式

方式 1:從 UI launchpad 加 tag

最快、最即興。Launch 那個緊急 job 時,在 Tags 那一欄加:

dagster/priority: 100
Code language: HTTP (http)

按 Launch 之後它就會跳到 queue 最前面(但不會搶斷已經在跑的 10 個)。

方式 2:CLI 用 --tags 帶進去

如果你習慣 CLI / 自動化腳本:

uv run dg launch \
  --assets 'reports/monthly_settlement' \
  --tags '{"dagster/priority":"100"}'
Code language: JavaScript (javascript)

注意 value 是字串(JSON),不是 int —— coordinator 內部會 int(value) 轉一次。

方式 3:在 code 裡寫死(整個 job / asset 永遠優先)

如果這類 job 本來就應該永遠優先(例如即時告警、面向使用者的互動式報表),直接寫進 definition:

import dagster as dg

# job 級別
urgent_alert_job = dg.define_asset_job(
    name="urgent_alert_job",
    selection=dg.AssetSelection.assets(alert_signal),
    tags={"dagster/priority": "100"},  # ← 這
)

# asset 級別(會 propagate 到 run)
@dg.asset(
    tags={"dagster/priority": "100"},
)
def alert_signal() -> pd.DataFrame:
    ...
Code language: PHP (php)

這樣不管是 schedule 觸發、sensor 觸發、還是手動 launch,這個 job 進 queue 一律 priority=100。


兩個重要的「不會」

不會搶斷正在跑的 run

dagster/priority 只決定 QUEUED → STARTED 的順序。已經在跑的那 10 個 backfill run 不會因為你塞一個 priority=100 進去就被砍掉。

如果你真的等不及,得自己手動去 Runs page terminate 一個 running 的,空出 slot,priority 高的這個才會立刻進去。

不會放寬 tag_concurrency_limits

很多人(包括我)一開始會混淆這兩件事。看這個 dagster.yaml

run_coordinator:
  module: dagster._core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 10
    tag_concurrency_limits:
      - key: db
        value: warehouse
        limit: 1     # 同時只允許 1 個帶 db=warehouse 的 run

假設 backfill 都帶 db=warehouse,緊急 job 也帶 db=warehouse,那即使緊急 job priority=100:

  • 它會被 dequeue 嘗試啟動
  • 但 coordinator 一看現在已經有 1 個 db=warehouse 在跑 → 塞回去
  • 等那 1 個跑完,下次掃 queue 時它仍然會是「最優先」被挑出來的

所以 priority 只改順序,不改並行上限。如果你的瓶頸是 tag concurrency=1,priority 救不了你 —— 它只能保證「下一個出來的會是你」。


設多少數值才合理?

  • 預設 0
  • 想優先:給個 10100 都行,Dagster 內部就是當 int 排序
  • 想壓後(例如批次 backfill 讓道給 daily schedule):給 -10 也可以
  • 不要用 infMAX_INT 那種特殊值,沒額外效果還容易讓未來的你混淆

我自己的慣例:

場景 priority
面向使用者的互動式 job(緊急報表、ad-hoc 查詢) 100
正常 daily schedule 10
一般 ingest 0(不設)
歷史 backfill / 慢的重算任務 -10

這樣 daily schedule 永遠插得到 backfill 前面,緊急 job 又能插得到 schedule 前面。


小結

  • Dagster queue 內建「插隊」機制:dagster/priority tag
  • 三種設法:UI launchpad tag、CLI --tags、code 裡 tags={...} 寫死
  • 只改 queue 出隊順序,不會搶斷 running run、不會放寬 tagconcurrencylimits
  • 想立刻跑:terminate 一個 running 的空出 slot,priority 高的會自動進去

下次再遇到「業務端臨時插單」,不用慌 —— 加個 tag 推進 queue 就好。

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *