Engineering12 min readMarian Engineering

Postgresだけで作るジョブキュー — リース、冪等キー、指数バックオフの実装

埋め込み・転写・コネクタ同期を支える非同期基盤をmarian_jobsテーブル1枚で

Key Takeaways

  • Postgresテーブル1枚+インデックス4本で冪等投入・優先度クレーム・クラッシュ回復・バックオフ・デッドレターを実装できる
  • 冪等性は部分ユニークインデックス(where status <> dead)でDB層に沈め、dead後の再投入も両立する
  • クラッシュ回復はリース方式(30秒)で、ロックもハートビートも不要
  • バックオフはrun_afterを未来に置くだけ(1s→2倍→5分cap)。スケジューリングの語彙が1つに統一される
  • 状態遷移は時刻を引数に取る純粋関数で、DBなしでテストできる

課題: キューは欲しい、インフラは増やしたくない

AIアプリの裏側は非同期ジョブだらけです。埋め込み生成(embed)、音声転写(transcribe)、外部コネクタ同期(connector-sync)、再インデックス(reindex)。これらをリクエスト内で実行すればタイムアウトし、fire-and-forgetにすれば失敗が闇に消えます。

かといってRedis+BullMQやSQSを足すと、運用するシステムとデータの整合性境界が増えます。Marianの選択はPostgres(Supabase)のテーブル1枚をキューにすることでした。ジョブのペイロードが参照するデータと同じDBにあるため、トランザクション整合性が自然に手に入ります。

スキーマ: 列の1本1本が設計判断

create table public.marian_jobs (
  id              uuid primary key default gen_random_uuid(),
  user_id         uuid references auth.users (id) on delete cascade,
  type            text not null,
  status          text not null default 'queued'
                    check (status in ('queued','running','retrying','succeeded','dead')),
  payload         jsonb not null default '{}'::jsonb,
  idempotency_key text,
  attempts        integer not null default 0,
  max_attempts    integer not null default 3,
  priority        integer not null default 0,
  run_after       timestamptz not null default now(),
  lease_until     timestamptz,
  error           text,
  created_at      timestamptz not null default now(),
  updated_at      timestamptz not null default now()
);

状態は5つ。queued → running → succeededが正常系、失敗時はretrying(run_afterを未来に置いて再投入)を経由し、attempts >= max_attemptsdead(デッドレター)に落ちます。

冪等性: 部分ユニークインデックスに任せる

「同じジョブを二度積まない」はアプリのチェックではなく、DBの部分ユニークインデックスで保証します。

create unique index marian_jobs_idem_active_idx
  on public.marian_jobs (idempotency_key)
  where status <> 'dead';

ポイントはwhere status <> deadの部分性です。生きているジョブ(待機・実行・成功)の間は同じキーの再投入が既存行にdedupされ、デッドレター化した後は同じキーで再投入できる。「失敗しきったジョブのやり直し」と「二重投入の防止」が1本のインデックスで両立します。enqueue側は同キーの既存ジョブが見つかればそれを返すだけです。

リース: ロックを持たないクラッシュ回復

ワーカーがジョブを掴んだままクラッシュしたとき、そのジョブを誰がいつ救出するか。Marianは行ロックを保持し続けるのではなく、可視性タイムアウト(リース)方式を取ります。

const DEFAULT_LEASE_MS = 30_000 // 30秒

// claim時: lease_until = now + leaseMs を刻んで running へ
// reclaimExpired: running かつ lease_until <= now の行を retrying に戻す

ワーカーはジョブを取るときにlease_untilを30秒先に設定します。正常なら完了時にsucceededへ。クラッシュしたらリースが切れ、次のtickのreclaimExpiredretryingへ戻します。ワーカー間の調整も、ハートビートも、分散ロックも不要です。SQSのvisibility timeoutと同じモデルをテーブルで再現しています。

クレームの競合は(status, run_after, priority desc, created_at)のインデックスに沿った取得とFOR UPDATE SKIP LOCKED相当の挙動で捌きます。run_after <= nowが「実行可能」の定義なので、バックオフは単にrun_afterを未来に置くことです。

バックオフ: 1秒から5分まで

export function backoffMs(attempt: number, options: BackoffOptions = {}): number {
  const base = options.baseMs ?? 1000
  const factor = options.factor ?? 2
  const max = options.maxMs ?? 5 * 60_000
  return Math.min(base * Math.pow(factor, Math.max(0, attempt - 1)), max)
}
// スケジュール: 1s, 2s, 4s, 8s, 16s, ... 上限5分

対象ジョブ(埋め込みAPI、外部コネクタ)の失敗は大半がレート制限か一時障害なので、序盤は密に・後半は粗く再試行し、5分でcapします。max_attempts(デフォルト3)を使い切るとdeadへ移り、error列に最後の失敗理由が残ります。

状態遷移は純粋関数、永続化はアダプタ

実装の分層も特徴的です。状態遷移(claim、complete、fail、reclaim)はlib/marian-data/job-queue.ts純粋関数として書かれ、現在時刻も常に引数で渡されます。Supabaseへの永続化はstore-supabase.tsのアダプタが担い、ワーカーは「1 tick = 実行可能ジョブを1件claimして処理」のループです。

  • 純粋層: 遷移ロジックをクロックなし・DBなしでユニットテスト可能
  • アダプタ層: Supabase固有のクエリとservice-roleキーの扱いを隔離
  • ワーカー層: tick単位の実行。テストではtickを手動で回す
インデックス担当パス
idem_active_idx(idempotency_key) where status <> dead二重投入防止
runnable_idx(status, run_after, priority desc, created_at)claim(ホットパス)
lease_idx(status, lease_until)リース切れ回収
user_status_idx(user_id, status)ユーザー別の一覧表示

RLS(Row Level Security)は「ユーザーは自分のジョブだけ見える」を強制し、ワーカーはservice-roleキーでRLSをバイパスします。ジョブ一覧UIはanonキーで安全に同じテーブルを読めます。

このアプローチの限界

Postgresキューはスループットの上限が比較的早く来ます(ポーリング間隔とクレーム競合)。毎秒数千ジョブを捌く規模になったら専用キューに移すべきです。ただしMarianのジョブは「ユーザー操作起点の数十〜数百件バースト」が中心で、整合性とインフラの少なさがスループットより価値が高い領域です。純粋層とアダプタ層が分離されているため、将来ストアを差し替えても状態機械は再利用できます。

まとめ

テーブル1枚+インデックス4本で、冪等な投入・優先度付きクレーム・クラッシュ回復・指数バックオフ・デッドレターを備えたキューになります。鍵は(1)部分ユニークインデックスによる「生きている間だけ」の冪等性、(2)リース方式のクラッシュ回復、(3)run_afterに統一されたスケジューリング、(4)純粋関数とアダプタの分離、です。

FAQ

RedisやSQSではなくPostgresでジョブキューを作る理由は?
ジョブが参照するデータと同じDBにキューがあるとトランザクション整合性が自然に手に入り、運用するインフラも増えないためです。Marianのワークロードはユーザー操作起点のバースト(数十〜数百件)が中心で、Postgresキューのスループット上限が問題になりません。状態機械は純粋関数として分離してあるため、規模が変わればストアだけ差し替えられます。
ワーカーがクラッシュしたジョブはどう回復されるのか?
リース(可視性タイムアウト)方式です。claim時にlease_untilを30秒先に設定し、完了しないままリースが切れた running ジョブは reclaimExpired が retrying に戻します。分散ロックやハートビートは不要で、(status, lease_until)のインデックスを見るだけで回収できます。
同じジョブの二重投入はどう防いでいるのか?
idempotency_keyの部分ユニークインデックス(where status <> dead)で防ぎます。アクティブな同キージョブがある間の再投入は既存ジョブにdedupされ、デッドレター化後は同じキーで再投入できます。アプリのチェックではなくDB制約なので、並行実行でも破れません。

Related Articles