Postgresだけで作るジョブキュー — リース、冪等キー、指数バックオフの実装
埋め込み・転写・コネクタ同期を支える非同期基盤をmarian_jobsテーブル1枚で
Key Takeaways
- Postgresテーブル1枚+インデックス4本で冪等投入・優先度クレーム・クラッシュ回復・バックオフ・デッドレターを実装できる
- 冪等性は部分ユニークインデックス(where status <> dead)でDB層に沈め、dead後の再投入も両立する
- クラッシュ回復はリース方式(30秒)で、ロックもハートビートも不要
- バックオフはrun_afterを未来に置くだけ(1s→2倍→5分cap)。スケジューリングの語彙が1つに統一される
- 状態遷移は時刻を引数に取る純粋関数で、DBなしでテストできる
課題: キューは欲しい、インフラは増やしたくない
AIアプリの裏側は非同期ジョブだらけです。埋め込み生成(embed)、音声転写(transcribe)、外部コネクタ同期(connector-sync)、再インデックス(reindex)。これらをリクエスト内で実行すればタイムアウトし、fire-and-forgetにすれば失敗が闇に消えます。
かといってRedis+BullMQやSQSを足すと、運用するシステムとデータの整合性境界が増えます。Marianの選択はPostgres(Supabase)のテーブル1枚をキューにすることでした。ジョブのペイロードが参照するデータと同じDBにあるため、トランザクション整合性が自然に手に入ります。
スキーマ: 列の1本1本が設計判断
create table public.marian_jobs (
id uuid primary key default gen_random_uuid(),
user_id uuid references auth.users (id) on delete cascade,
type text not null,
status text not null default 'queued'
check (status in ('queued','running','retrying','succeeded','dead')),
payload jsonb not null default '{}'::jsonb,
idempotency_key text,
attempts integer not null default 0,
max_attempts integer not null default 3,
priority integer not null default 0,
run_after timestamptz not null default now(),
lease_until timestamptz,
error text,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);状態は5つ。queued → running → succeededが正常系、失敗時はretrying(run_afterを未来に置いて再投入)を経由し、attempts >= max_attemptsでdead(デッドレター)に落ちます。
冪等性: 部分ユニークインデックスに任せる
「同じジョブを二度積まない」はアプリのチェックではなく、DBの部分ユニークインデックスで保証します。
create unique index marian_jobs_idem_active_idx
on public.marian_jobs (idempotency_key)
where status <> 'dead';ポイントはwhere status <> deadの部分性です。生きているジョブ(待機・実行・成功)の間は同じキーの再投入が既存行にdedupされ、デッドレター化した後は同じキーで再投入できる。「失敗しきったジョブのやり直し」と「二重投入の防止」が1本のインデックスで両立します。enqueue側は同キーの既存ジョブが見つかればそれを返すだけです。
リース: ロックを持たないクラッシュ回復
ワーカーがジョブを掴んだままクラッシュしたとき、そのジョブを誰がいつ救出するか。Marianは行ロックを保持し続けるのではなく、可視性タイムアウト(リース)方式を取ります。
const DEFAULT_LEASE_MS = 30_000 // 30秒
// claim時: lease_until = now + leaseMs を刻んで running へ
// reclaimExpired: running かつ lease_until <= now の行を retrying に戻すワーカーはジョブを取るときにlease_untilを30秒先に設定します。正常なら完了時にsucceededへ。クラッシュしたらリースが切れ、次のtickのreclaimExpiredがretryingへ戻します。ワーカー間の調整も、ハートビートも、分散ロックも不要です。SQSのvisibility timeoutと同じモデルをテーブルで再現しています。
クレームの競合は(status, run_after, priority desc, created_at)のインデックスに沿った取得とFOR UPDATE SKIP LOCKED相当の挙動で捌きます。run_after <= nowが「実行可能」の定義なので、バックオフは単にrun_afterを未来に置くことです。
バックオフ: 1秒から5分まで
export function backoffMs(attempt: number, options: BackoffOptions = {}): number {
const base = options.baseMs ?? 1000
const factor = options.factor ?? 2
const max = options.maxMs ?? 5 * 60_000
return Math.min(base * Math.pow(factor, Math.max(0, attempt - 1)), max)
}
// スケジュール: 1s, 2s, 4s, 8s, 16s, ... 上限5分対象ジョブ(埋め込みAPI、外部コネクタ)の失敗は大半がレート制限か一時障害なので、序盤は密に・後半は粗く再試行し、5分でcapします。max_attempts(デフォルト3)を使い切るとdeadへ移り、error列に最後の失敗理由が残ります。
状態遷移は純粋関数、永続化はアダプタ
実装の分層も特徴的です。状態遷移(claim、complete、fail、reclaim)はlib/marian-data/job-queue.tsの純粋関数として書かれ、現在時刻も常に引数で渡されます。Supabaseへの永続化はstore-supabase.tsのアダプタが担い、ワーカーは「1 tick = 実行可能ジョブを1件claimして処理」のループです。
- 純粋層: 遷移ロジックをクロックなし・DBなしでユニットテスト可能
- アダプタ層: Supabase固有のクエリとservice-roleキーの扱いを隔離
- ワーカー層: tick単位の実行。テストではtickを手動で回す
| インデックス | 列 | 担当パス |
|---|---|---|
| idem_active_idx | (idempotency_key) where status <> dead | 二重投入防止 |
| runnable_idx | (status, run_after, priority desc, created_at) | claim(ホットパス) |
| lease_idx | (status, lease_until) | リース切れ回収 |
| user_status_idx | (user_id, status) | ユーザー別の一覧表示 |
RLS(Row Level Security)は「ユーザーは自分のジョブだけ見える」を強制し、ワーカーはservice-roleキーでRLSをバイパスします。ジョブ一覧UIはanonキーで安全に同じテーブルを読めます。
このアプローチの限界
Postgresキューはスループットの上限が比較的早く来ます(ポーリング間隔とクレーム競合)。毎秒数千ジョブを捌く規模になったら専用キューに移すべきです。ただしMarianのジョブは「ユーザー操作起点の数十〜数百件バースト」が中心で、整合性とインフラの少なさがスループットより価値が高い領域です。純粋層とアダプタ層が分離されているため、将来ストアを差し替えても状態機械は再利用できます。
まとめ
テーブル1枚+インデックス4本で、冪等な投入・優先度付きクレーム・クラッシュ回復・指数バックオフ・デッドレターを備えたキューになります。鍵は(1)部分ユニークインデックスによる「生きている間だけ」の冪等性、(2)リース方式のクラッシュ回復、(3)run_afterに統一されたスケジューリング、(4)純粋関数とアダプタの分離、です。
FAQ
- RedisやSQSではなくPostgresでジョブキューを作る理由は?
- ジョブが参照するデータと同じDBにキューがあるとトランザクション整合性が自然に手に入り、運用するインフラも増えないためです。Marianのワークロードはユーザー操作起点のバースト(数十〜数百件)が中心で、Postgresキューのスループット上限が問題になりません。状態機械は純粋関数として分離してあるため、規模が変わればストアだけ差し替えられます。
- ワーカーがクラッシュしたジョブはどう回復されるのか?
- リース(可視性タイムアウト)方式です。claim時にlease_untilを30秒先に設定し、完了しないままリースが切れた running ジョブは reclaimExpired が retrying に戻します。分散ロックやハートビートは不要で、(status, lease_until)のインデックスを見るだけで回収できます。
- 同じジョブの二重投入はどう防いでいるのか?
- idempotency_keyの部分ユニークインデックス(where status <> dead)で防ぎます。アクティブな同キージョブがある間の再投入は既存ジョブにdedupされ、デッドレター化後は同じキーで再投入できます。アプリのチェックではなくDB制約なので、並行実行でも破れません。