"use strict"; const crypto = require("node:crypto"); const { pool } = require("./db"); const DEFAULT_MAX_CONCURRENCY = 8; const DEFAULT_SLOT_TTL_MS = 30_000; const POLL_SCOPE = "generation-provider-poll:global"; const OWNER_ID = `${process.pid}-${crypto.randomUUID()}`; let storeReady = null; function normalizePositiveInteger(value, fallback) { const numeric = Number(value); if (!Number.isFinite(numeric) || numeric <= 0) return fallback; return Math.max(1, Math.trunc(numeric)); } function getMaxConcurrency() { return normalizePositiveInteger(process.env.TASK_PROVIDER_POLL_MAX_CONCURRENCY, DEFAULT_MAX_CONCURRENCY); } function getSlotTtlInterval() { const ttlMs = normalizePositiveInteger(process.env.TASK_PROVIDER_POLL_SLOT_TTL_MS, DEFAULT_SLOT_TTL_MS); return `${Math.max(1, Math.ceil(ttlMs / 1000))} seconds`; } async function ensureProviderPollLimiterStore() { if (storeReady) return storeReady; storeReady = pool.query(` CREATE TABLE IF NOT EXISTS generation_provider_poll_slots ( scope TEXT NOT NULL, slot_no INTEGER NOT NULL, owner_id TEXT NOT NULL, task_id INTEGER, expires_at TIMESTAMPTZ NOT NULL, acquired_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (scope, slot_no) ); CREATE INDEX IF NOT EXISTS idx_generation_provider_poll_slots_expires ON generation_provider_poll_slots(expires_at); `).catch((err) => { storeReady = null; throw err; }); return storeReady; } async function acquireProviderPollSlot(taskId, options = {}) { await ensureProviderPollLimiterStore(); const scope = options.scope || POLL_SCOPE; const maxConcurrency = normalizePositiveInteger(options.maxConcurrency, getMaxConcurrency()); const ttlInterval = options.ttlInterval || getSlotTtlInterval(); const { rows } = await pool.query( ` WITH candidate AS ( SELECT s.slot_no FROM generate_series(1, $2::integer) AS s(slot_no) LEFT JOIN generation_provider_poll_slots l ON l.scope = $1 AND l.slot_no = s.slot_no WHERE l.scope IS NULL OR l.expires_at < NOW() ORDER BY s.slot_no ASC LIMIT 1 ), claimed AS ( INSERT INTO generation_provider_poll_slots ( scope, slot_no, owner_id, task_id, expires_at, acquired_at, updated_at ) SELECT $1, slot_no, $3, $4, NOW() + ($5::text)::interval, NOW(), NOW() FROM candidate ON CONFLICT (scope, slot_no) DO UPDATE SET owner_id = EXCLUDED.owner_id, task_id = EXCLUDED.task_id, expires_at = EXCLUDED.expires_at, acquired_at = NOW(), updated_at = NOW() WHERE generation_provider_poll_slots.expires_at < NOW() RETURNING scope, slot_no ) SELECT scope, slot_no FROM claimed `, [scope, maxConcurrency, OWNER_ID, taskId || null, ttlInterval], ); const slot = rows[0]; return slot ? { scope: slot.scope, slotNo: slot.slot_no, ownerId: OWNER_ID } : null; } async function releaseProviderPollSlot(slot) { if (!slot?.scope || !slot?.slotNo) return; await ensureProviderPollLimiterStore(); await pool.query( "DELETE FROM generation_provider_poll_slots WHERE scope = $1 AND slot_no = $2 AND owner_id = $3", [slot.scope, slot.slotNo, slot.ownerId || OWNER_ID], ); } async function withProviderPollSlot(taskId, fn, options = {}) { const slot = await acquireProviderPollSlot(taskId, options); if (!slot) return { acquired: false, value: undefined }; try { return { acquired: true, value: await fn() }; } finally { await releaseProviderPollSlot(slot).catch((err) => { console.error(`[providerPollLimiter] failed to release poll slot ${slot.scope}:${slot.slotNo}:`, err.message); }); } } module.exports = { acquireProviderPollSlot, ensureProviderPollLimiterStore, getMaxConcurrency, normalizePositiveInteger, releaseProviderPollSlot, withProviderPollSlot, };