Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2249f7ce36 |
+262
-337
@@ -5,6 +5,8 @@ const { EventEmitter } = require("node:events");
|
|||||||
const { pool } = require("./db");
|
const { pool } = require("./db");
|
||||||
const { refundTaskBillingOnFailure } = require("./billing");
|
const { refundTaskBillingOnFailure } = require("./billing");
|
||||||
const { putObject, isOssConfigured } = require("./ossClient");
|
const { putObject, isOssConfigured } = require("./ossClient");
|
||||||
|
const keyManager = require("./keyManager");
|
||||||
|
const { resolveImageProviderCandidates, resolveVideoProvider } = require("./aiProviderRouter");
|
||||||
|
|
||||||
const taskEvents = new EventEmitter();
|
const taskEvents = new EventEmitter();
|
||||||
taskEvents.setMaxListeners(200);
|
taskEvents.setMaxListeners(200);
|
||||||
@@ -13,15 +15,10 @@ const activePollers = new Map();
|
|||||||
const POLL_INTERVAL_MS = 3000;
|
const POLL_INTERVAL_MS = 3000;
|
||||||
const MAX_POLL_ATTEMPTS = 120;
|
const MAX_POLL_ATTEMPTS = 120;
|
||||||
const GRS_IMAGE_MAX_POLL_ATTEMPTS = Number(process.env.GRSAI_IMAGE_MAX_POLL_ATTEMPTS || 60);
|
const GRS_IMAGE_MAX_POLL_ATTEMPTS = Number(process.env.GRSAI_IMAGE_MAX_POLL_ATTEMPTS || 60);
|
||||||
const TASK_EVENT_CHANNEL = "generation_task_events";
|
const STALE_TASK_TIMEOUT_MINUTES = Math.max(10, Number(process.env.STALE_GENERATION_TASK_MINUTES || 120));
|
||||||
const TASK_EVENT_ORIGIN = `${process.pid}-${crypto.randomUUID()}`;
|
const RESULT_PERSIST_RETRY_LIMIT = Math.max(1, Number(process.env.RESULT_PERSIST_RETRY_LIMIT || 5));
|
||||||
const POLLER_OWNER_ID = `${process.pid}-${crypto.randomUUID()}`;
|
const RESULT_PERSIST_RETRY_BATCH_SIZE = Math.max(1, Number(process.env.RESULT_PERSIST_RETRY_BATCH_SIZE || 25));
|
||||||
const POLLER_OWNER_STALE_MS = Number(process.env.TASK_POLLER_OWNER_STALE_MS || 20_000);
|
const TASK_STARTUP_RECOVERY_LIMIT = Math.max(1, Number(process.env.TASK_STARTUP_RECOVERY_LIMIT || 50));
|
||||||
const POLLER_RECOVERY_INTERVAL_MS = Number(process.env.TASK_POLLER_RECOVERY_INTERVAL_MS || 30_000);
|
|
||||||
let taskEventListenerClient = null;
|
|
||||||
let taskEventListenerStarting = null;
|
|
||||||
let pollerStoreReady = null;
|
|
||||||
let pollerRecoveryTimer = null;
|
|
||||||
|
|
||||||
function normalizeTaskProgress(value) {
|
function normalizeTaskProgress(value) {
|
||||||
const numeric = Number(value);
|
const numeric = Number(value);
|
||||||
@@ -39,156 +36,6 @@ function formatTaskEvent(row) {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
function emitTaskEvent(event) {
|
|
||||||
if (!event?.taskId) return;
|
|
||||||
taskEvents.emit(`task:${event.taskId}`, event);
|
|
||||||
}
|
|
||||||
|
|
||||||
async function publishTaskEvent(event) {
|
|
||||||
if (!event?.taskId) return;
|
|
||||||
emitTaskEvent(event);
|
|
||||||
try {
|
|
||||||
await pool.query("SELECT pg_notify($1, $2)", [
|
|
||||||
TASK_EVENT_CHANNEL,
|
|
||||||
JSON.stringify({ origin: TASK_EVENT_ORIGIN, event }),
|
|
||||||
]);
|
|
||||||
} catch (err) {
|
|
||||||
console.error(`[aiTaskWorker] task event publish failed for task ${event.taskId}:`, err.message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function serializeProviderConfig(providerConfig) {
|
|
||||||
if (!providerConfig || typeof providerConfig !== "object") return {};
|
|
||||||
const allowedKeys = [
|
|
||||||
"provider",
|
|
||||||
"transport",
|
|
||||||
"protocol",
|
|
||||||
"baseUrl",
|
|
||||||
"endpoint",
|
|
||||||
"resultEndpoint",
|
|
||||||
"model",
|
|
||||||
"requestedModel",
|
|
||||||
];
|
|
||||||
const result = {};
|
|
||||||
for (const key of allowedKeys) {
|
|
||||||
if (providerConfig[key] !== undefined) result[key] = providerConfig[key];
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
function parseProviderConfig(value) {
|
|
||||||
if (!value) return {};
|
|
||||||
if (typeof value === "object") return value;
|
|
||||||
try {
|
|
||||||
const parsed = JSON.parse(value);
|
|
||||||
return parsed && typeof parsed === "object" ? parsed : {};
|
|
||||||
} catch {
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function ensureTaskPollerStore() {
|
|
||||||
if (pollerStoreReady) return pollerStoreReady;
|
|
||||||
pollerStoreReady = pool.query(`
|
|
||||||
CREATE TABLE IF NOT EXISTS generation_task_pollers (
|
|
||||||
task_id INTEGER PRIMARY KEY REFERENCES generation_tasks(id) ON DELETE CASCADE,
|
|
||||||
provider_task_id TEXT NOT NULL,
|
|
||||||
task_type TEXT NOT NULL,
|
|
||||||
provider_config_json TEXT NOT NULL,
|
|
||||||
lease_token TEXT,
|
|
||||||
owner_id TEXT,
|
|
||||||
owner_heartbeat_at TIMESTAMPTZ,
|
|
||||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
||||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_generation_task_pollers_owner
|
|
||||||
ON generation_task_pollers(owner_heartbeat_at);
|
|
||||||
`).catch((err) => {
|
|
||||||
pollerStoreReady = null;
|
|
||||||
throw err;
|
|
||||||
});
|
|
||||||
return pollerStoreReady;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function persistPollerState(taskDbId, { providerTaskId, type, providerConfig, leaseToken }) {
|
|
||||||
await ensureTaskPollerStore();
|
|
||||||
await pool.query(
|
|
||||||
`
|
|
||||||
INSERT INTO generation_task_pollers (
|
|
||||||
task_id, provider_task_id, task_type, provider_config_json, lease_token,
|
|
||||||
owner_id, owner_heartbeat_at, updated_at
|
|
||||||
)
|
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, NOW(), NOW())
|
|
||||||
ON CONFLICT (task_id) DO UPDATE SET
|
|
||||||
provider_task_id = EXCLUDED.provider_task_id,
|
|
||||||
task_type = EXCLUDED.task_type,
|
|
||||||
provider_config_json = EXCLUDED.provider_config_json,
|
|
||||||
lease_token = EXCLUDED.lease_token,
|
|
||||||
owner_id = EXCLUDED.owner_id,
|
|
||||||
owner_heartbeat_at = NOW(),
|
|
||||||
updated_at = NOW()
|
|
||||||
`,
|
|
||||||
[
|
|
||||||
taskDbId,
|
|
||||||
providerTaskId,
|
|
||||||
type,
|
|
||||||
JSON.stringify(serializeProviderConfig(providerConfig)),
|
|
||||||
leaseToken || null,
|
|
||||||
POLLER_OWNER_ID,
|
|
||||||
],
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
async function refreshPollerHeartbeat(taskDbId) {
|
|
||||||
await ensureTaskPollerStore();
|
|
||||||
await pool.query(
|
|
||||||
"UPDATE generation_task_pollers SET owner_id = $1, owner_heartbeat_at = NOW(), updated_at = NOW() WHERE task_id = $2",
|
|
||||||
[POLLER_OWNER_ID, taskDbId],
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
async function clearPollerState(taskDbId) {
|
|
||||||
await ensureTaskPollerStore();
|
|
||||||
await pool.query("DELETE FROM generation_task_pollers WHERE task_id = $1", [taskDbId]);
|
|
||||||
}
|
|
||||||
|
|
||||||
async function getLeaseKey(leaseToken) {
|
|
||||||
if (!leaseToken) return null;
|
|
||||||
const { rows } = await pool.query(
|
|
||||||
`
|
|
||||||
SELECT k.api_key
|
|
||||||
FROM key_leases l
|
|
||||||
JOIN api_keys k ON k.id = l.key_id
|
|
||||||
WHERE l.lease_token = $1
|
|
||||||
AND l.released_at IS NULL
|
|
||||||
AND k.enabled = 1
|
|
||||||
LIMIT 1
|
|
||||||
`,
|
|
||||||
[leaseToken],
|
|
||||||
);
|
|
||||||
const apiKey = rows[0]?.api_key;
|
|
||||||
return apiKey === "pool-slot" ? "" : apiKey || null;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function claimPoller(taskId) {
|
|
||||||
await ensureTaskPollerStore();
|
|
||||||
const staleInterval = `${Math.max(5, Math.ceil(POLLER_OWNER_STALE_MS / 1000))} seconds`;
|
|
||||||
const { rows } = await pool.query(
|
|
||||||
`
|
|
||||||
UPDATE generation_task_pollers
|
|
||||||
SET owner_id = $1, owner_heartbeat_at = NOW(), updated_at = NOW()
|
|
||||||
WHERE task_id = $2
|
|
||||||
AND (
|
|
||||||
owner_heartbeat_at IS NULL
|
|
||||||
OR owner_heartbeat_at < NOW() - ($3::text)::interval
|
|
||||||
)
|
|
||||||
RETURNING *
|
|
||||||
`,
|
|
||||||
[POLLER_OWNER_ID, taskId, staleInterval],
|
|
||||||
);
|
|
||||||
return rows[0] || null;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function createTaskLifecycleNotification(task) {
|
async function createTaskLifecycleNotification(task) {
|
||||||
if (!task || !task.user_id || !task.id) return;
|
if (!task || !task.user_id || !task.id) return;
|
||||||
|
|
||||||
@@ -251,14 +98,15 @@ async function updateTaskInDb(taskId, updates) {
|
|||||||
|
|
||||||
if (fields.length === 0) return;
|
if (fields.length === 0) return;
|
||||||
values.push(taskId);
|
values.push(taskId);
|
||||||
|
const protectCancelled = nextUpdates.status !== "cancelled" ? " AND status <> 'cancelled'" : "";
|
||||||
const { rows } = await pool.query(
|
const { rows } = await pool.query(
|
||||||
`UPDATE generation_tasks SET ${fields.join(", ")} WHERE id = $${idx} RETURNING *`,
|
`UPDATE generation_tasks SET ${fields.join(", ")} WHERE id = $${idx}${protectCancelled} RETURNING *`,
|
||||||
values,
|
values,
|
||||||
);
|
);
|
||||||
let updatedTask = rows[0];
|
let updatedTask = rows[0];
|
||||||
|
|
||||||
if (updatedTask) {
|
if (updatedTask) {
|
||||||
await publishTaskEvent(formatTaskEvent(updatedTask));
|
taskEvents.emit(`task:${taskId}`, formatTaskEvent(updatedTask));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nextUpdates.status === "completed" && updatedTask?.result_url) {
|
if (nextUpdates.status === "completed" && updatedTask?.result_url) {
|
||||||
@@ -283,20 +131,66 @@ function persistTaskResultUrlToOssInBackground(task) {
|
|||||||
|
|
||||||
Promise.resolve()
|
Promise.resolve()
|
||||||
.then(async () => {
|
.then(async () => {
|
||||||
const durableUrl = await persistResultUrlToOss(task);
|
await persistTaskResultUrlToOss(task);
|
||||||
if (!durableUrl || durableUrl === task.result_url) return;
|
|
||||||
|
|
||||||
await pool.query(
|
|
||||||
"UPDATE generation_tasks SET result_url = $1, updated_at = NOW() WHERE id = $2 AND result_url = $3",
|
|
||||||
[durableUrl, task.id, task.result_url],
|
|
||||||
);
|
|
||||||
console.info(`[aiTaskWorker] task ${task.id} result persisted to OSS after completion`);
|
|
||||||
})
|
})
|
||||||
.catch((error) => {
|
.catch((error) => {
|
||||||
console.warn(`[aiTaskWorker] background result persistence failed for task ${task.id}:`, error.message);
|
console.warn(`[aiTaskWorker] background result persistence failed for task ${task.id}:`, error.message);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function markResultPersistence(taskId, status, error = null, durableUrl = null, previousUrl = null) {
|
||||||
|
const fields = [
|
||||||
|
"result_persist_status = $1",
|
||||||
|
"result_persist_attempts = result_persist_attempts + 1",
|
||||||
|
"result_persist_error = $2",
|
||||||
|
"updated_at = NOW()",
|
||||||
|
];
|
||||||
|
const values = [status, error ? String(error).slice(0, 1000) : null];
|
||||||
|
let idx = values.length + 1;
|
||||||
|
|
||||||
|
if (status === "succeeded") {
|
||||||
|
fields.push("result_persisted_at = NOW()");
|
||||||
|
}
|
||||||
|
if (durableUrl) {
|
||||||
|
fields.push(`result_url = $${idx++}`);
|
||||||
|
values.push(durableUrl);
|
||||||
|
}
|
||||||
|
|
||||||
|
values.push(taskId);
|
||||||
|
let where = `id = $${idx}`;
|
||||||
|
if (previousUrl) {
|
||||||
|
idx += 1;
|
||||||
|
values.push(previousUrl);
|
||||||
|
where += ` AND result_url = $${idx}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
await pool.query(`UPDATE generation_tasks SET ${fields.join(", ")} WHERE ${where}`, values);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function persistTaskResultUrlToOss(task) {
|
||||||
|
if (!task?.id || !task?.result_url) return null;
|
||||||
|
|
||||||
|
if (isOwnPersistedResultUrl(task.result_url)) {
|
||||||
|
await markResultPersistence(task.id, "succeeded", null, null);
|
||||||
|
return task.result_url;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isOssConfigured()) {
|
||||||
|
await markResultPersistence(task.id, "failed", "OSS is not configured");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const durableUrl = await persistResultUrlToOss(task);
|
||||||
|
if (!durableUrl) {
|
||||||
|
await markResultPersistence(task.id, "failed", "Result URL could not be copied to OSS");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
await markResultPersistence(task.id, "succeeded", null, durableUrl, task.result_url);
|
||||||
|
console.info(`[aiTaskWorker] task ${task.id} result persisted to OSS after completion`);
|
||||||
|
return durableUrl;
|
||||||
|
}
|
||||||
|
|
||||||
function asObject(value) {
|
function asObject(value) {
|
||||||
return value && typeof value === "object" && !Array.isArray(value) ? value : undefined;
|
return value && typeof value === "object" && !Array.isArray(value) ? value : undefined;
|
||||||
}
|
}
|
||||||
@@ -795,22 +689,27 @@ function getMaxPollAttempts(type, providerConfig) {
|
|||||||
return MAX_POLL_ATTEMPTS;
|
return MAX_POLL_ATTEMPTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
function startPolling(taskDbId, { providerTaskId, apiKey, type, providerConfig, leaseToken, keyManager, onTaskFailed, skipPersist = false }) {
|
async function releasePollingLease(poller) {
|
||||||
if (activePollers.has(taskDbId)) return;
|
if (!poller?.leaseToken || !poller?.keyManager) return;
|
||||||
if (!skipPersist) {
|
await poller.keyManager.releaseKey(poller.leaseToken).catch((err) => {
|
||||||
persistPollerState(taskDbId, { providerTaskId, type, providerConfig, leaseToken }).catch((err) => {
|
console.warn(`[aiTaskWorker] release lease failed for task ${poller.taskDbId}:`, err.message);
|
||||||
console.error(`[aiTaskWorker] failed to persist poller state for task ${taskDbId}:`, err.message);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function startPolling(taskDbId, { providerTaskId, apiKey, type, providerConfig, leaseToken, keyManager, onTaskFailed }) {
|
||||||
|
if (activePollers.has(taskDbId)) return;
|
||||||
|
|
||||||
let attempts = 0;
|
let attempts = 0;
|
||||||
|
let polling = false;
|
||||||
const maxPollAttempts = getMaxPollAttempts(type, providerConfig);
|
const maxPollAttempts = getMaxPollAttempts(type, providerConfig);
|
||||||
const interval = setInterval(async () => {
|
const interval = setInterval(async () => {
|
||||||
|
if (polling) return;
|
||||||
|
polling = true;
|
||||||
attempts++;
|
attempts++;
|
||||||
|
|
||||||
|
try {
|
||||||
if (attempts > maxPollAttempts) {
|
if (attempts > maxPollAttempts) {
|
||||||
clearInterval(interval);
|
await stopPolling(taskDbId, { releaseLease: true });
|
||||||
activePollers.delete(taskDbId);
|
|
||||||
if (leaseToken && keyManager) await keyManager.releaseKey(leaseToken).catch(() => {});
|
|
||||||
if (typeof onTaskFailed === "function") {
|
if (typeof onTaskFailed === "function") {
|
||||||
const handled = await onTaskFailed("Task timed out").catch((fallbackErr) => {
|
const handled = await onTaskFailed("Task timed out").catch((fallbackErr) => {
|
||||||
console.error(`[aiTaskWorker] fallback error for task ${taskDbId}:`, fallbackErr.message);
|
console.error(`[aiTaskWorker] fallback error for task ${taskDbId}:`, fallbackErr.message);
|
||||||
@@ -819,21 +718,14 @@ function startPolling(taskDbId, { providerTaskId, apiKey, type, providerConfig,
|
|||||||
if (handled) return;
|
if (handled) return;
|
||||||
}
|
}
|
||||||
await updateTaskInDb(taskDbId, { status: "failed", error: "Task timed out" });
|
await updateTaskInDb(taskDbId, { status: "failed", error: "Task timed out" });
|
||||||
await clearPollerState(taskDbId).catch(() => {});
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
|
||||||
// Check if task was cancelled by user
|
|
||||||
const { rows: [taskRow] } = await pool.query("SELECT status FROM generation_tasks WHERE id = $1", [taskDbId]);
|
const { rows: [taskRow] } = await pool.query("SELECT status FROM generation_tasks WHERE id = $1", [taskDbId]);
|
||||||
if (!taskRow || taskRow.status === "cancelled") {
|
if (!taskRow || taskRow.status === "cancelled") {
|
||||||
clearInterval(interval);
|
await stopPolling(taskDbId, { releaseLease: true });
|
||||||
activePollers.delete(taskDbId);
|
|
||||||
await clearPollerState(taskDbId).catch(() => {});
|
|
||||||
if (leaseToken && keyManager) await keyManager.releaseKey(leaseToken).catch(() => {});
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
await refreshPollerHeartbeat(taskDbId).catch(() => {});
|
|
||||||
|
|
||||||
let result;
|
let result;
|
||||||
if (type === "image") {
|
if (type === "image") {
|
||||||
@@ -847,9 +739,7 @@ function startPolling(taskDbId, { providerTaskId, apiKey, type, providerConfig,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (result.status === "completed" || result.status === "failed") {
|
if (result.status === "completed" || result.status === "failed") {
|
||||||
clearInterval(interval);
|
await stopPolling(taskDbId, { releaseLease: true });
|
||||||
activePollers.delete(taskDbId);
|
|
||||||
if (leaseToken && keyManager) await keyManager.releaseKey(leaseToken).catch(() => {});
|
|
||||||
if (result.status === "failed" && typeof onTaskFailed === "function") {
|
if (result.status === "failed" && typeof onTaskFailed === "function") {
|
||||||
const handled = await onTaskFailed(result.error || "Task failed").catch((fallbackErr) => {
|
const handled = await onTaskFailed(result.error || "Task failed").catch((fallbackErr) => {
|
||||||
console.error(`[aiTaskWorker] fallback error for task ${taskDbId}:`, fallbackErr.message);
|
console.error(`[aiTaskWorker] fallback error for task ${taskDbId}:`, fallbackErr.message);
|
||||||
@@ -860,179 +750,228 @@ function startPolling(taskDbId, { providerTaskId, apiKey, type, providerConfig,
|
|||||||
}
|
}
|
||||||
|
|
||||||
await updateTaskInDb(taskDbId, result);
|
await updateTaskInDb(taskDbId, result);
|
||||||
if (result.status === "completed" || result.status === "failed") {
|
|
||||||
await clearPollerState(taskDbId).catch(() => {});
|
|
||||||
}
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error(`[aiTaskWorker] poll error for task ${taskDbId}:`, err.message);
|
console.error(`[aiTaskWorker] poll error for task ${taskDbId}:`, err.message);
|
||||||
|
} finally {
|
||||||
|
polling = false;
|
||||||
}
|
}
|
||||||
}, POLL_INTERVAL_MS);
|
}, POLL_INTERVAL_MS);
|
||||||
|
|
||||||
activePollers.set(taskDbId, { interval, leaseToken });
|
activePollers.set(taskDbId, { taskDbId, interval, leaseToken, keyManager });
|
||||||
}
|
}
|
||||||
|
|
||||||
function stopPolling(taskDbId) {
|
async function stopPolling(taskDbId, options = {}) {
|
||||||
const poller = activePollers.get(taskDbId);
|
const poller = activePollers.get(taskDbId);
|
||||||
if (poller) {
|
if (!poller) return;
|
||||||
|
|
||||||
clearInterval(poller.interval);
|
clearInterval(poller.interval);
|
||||||
activePollers.delete(taskDbId);
|
activePollers.delete(taskDbId);
|
||||||
|
if (options.releaseLease) {
|
||||||
|
await releasePollingLease(poller);
|
||||||
}
|
}
|
||||||
clearPollerState(taskDbId).catch(() => {});
|
}
|
||||||
|
|
||||||
|
async function cancelTask(taskId, userId) {
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`UPDATE generation_tasks
|
||||||
|
SET status = 'cancelled', completed_at = NOW(), updated_at = NOW()
|
||||||
|
WHERE id = $1 AND user_id = $2 AND status IN ('pending', 'running')
|
||||||
|
RETURNING *`,
|
||||||
|
[taskId, userId],
|
||||||
|
);
|
||||||
|
const task = rows[0];
|
||||||
|
if (!task) return null;
|
||||||
|
|
||||||
|
await stopPolling(task.id, { releaseLease: true });
|
||||||
|
taskEvents.emit(`task:${task.id}`, formatTaskEvent(task));
|
||||||
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
function getActiveCount() {
|
function getActiveCount() {
|
||||||
return activePollers.size;
|
return activePollers.size;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function recoverRunnablePollers() {
|
const STALE_TASK_CLEANUP_INTERVAL_MS = 5 * 60 * 1000;
|
||||||
await ensureTaskPollerStore();
|
let staleTaskCleanupTimer = null;
|
||||||
const staleInterval = `${Math.max(5, Math.ceil(POLLER_OWNER_STALE_MS / 1000))} seconds`;
|
const TASK_RESULT_PERSIST_RETRY_INTERVAL_MS = 5 * 60 * 1000;
|
||||||
const { rows } = await pool.query(
|
let taskResultPersistenceRetryTimer = null;
|
||||||
`
|
let taskStartupRecoveryTimer = null;
|
||||||
SELECT p.task_id
|
let taskStaleCleanupRunning = false;
|
||||||
FROM generation_task_pollers p
|
let taskResultPersistenceRetryRunning = false;
|
||||||
JOIN generation_tasks t ON t.id = p.task_id
|
let taskStartupRecoveryRunning = false;
|
||||||
WHERE t.status IN ('pending', 'running')
|
|
||||||
AND (
|
|
||||||
p.owner_heartbeat_at IS NULL
|
|
||||||
OR p.owner_heartbeat_at < NOW() - ($1::text)::interval
|
|
||||||
)
|
|
||||||
ORDER BY p.owner_heartbeat_at NULLS FIRST, p.updated_at ASC
|
|
||||||
LIMIT 20
|
|
||||||
`,
|
|
||||||
[staleInterval],
|
|
||||||
);
|
|
||||||
|
|
||||||
for (const row of rows) {
|
function parseTaskParams(paramsJson) {
|
||||||
const taskId = row.task_id;
|
if (!paramsJson) return {};
|
||||||
if (activePollers.has(taskId)) continue;
|
if (typeof paramsJson === "object") return paramsJson;
|
||||||
const poller = await claimPoller(taskId);
|
try {
|
||||||
if (!poller || activePollers.has(taskId)) continue;
|
const parsed = JSON.parse(paramsJson);
|
||||||
|
return parsed && typeof parsed === "object" ? parsed : {};
|
||||||
const apiKey = await getLeaseKey(poller.lease_token);
|
} catch {
|
||||||
if (apiKey == null) {
|
return {};
|
||||||
console.warn(`[aiTaskWorker] cannot recover task ${taskId}: active lease not found`);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
console.info(`[aiTaskWorker] recovering poller for task ${taskId}`);
|
|
||||||
startPolling(taskId, {
|
|
||||||
providerTaskId: poller.provider_task_id,
|
|
||||||
apiKey,
|
|
||||||
type: poller.task_type,
|
|
||||||
providerConfig: parseProviderConfig(poller.provider_config_json),
|
|
||||||
leaseToken: poller.lease_token,
|
|
||||||
keyManager: require("./keyManager"),
|
|
||||||
skipPersist: true,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Periodic stale task cleanup ---
|
function resolveProviderConfigForRecovery(task) {
|
||||||
// Runs every 5 minutes, marks tasks stuck in 'pending'/'running' for too long as 'failed'.
|
const params = parseTaskParams(task.params_json);
|
||||||
// This catches cases where the worker crashed, the provider API never responded,
|
|
||||||
// or the cancel request failed silently on the client side.
|
if (task.type === "video") {
|
||||||
const STALE_TASK_CLEANUP_INTERVAL_MS = 5 * 60 * 1000;
|
if (params.model === "video-style-transform" || params.operation === "video-style-super-resolution") {
|
||||||
let staleTaskCleanupTimer = null;
|
return { provider: "dashscope", protocol: "wan-i2v", baseUrl: "https://dashscope.aliyuncs.com" };
|
||||||
|
}
|
||||||
|
if (params.model === "aliyun-video-super-resolve" || params.model === "aliyun-erase-subtitles") {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return resolveVideoProvider(params.model);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (task.type === "image") {
|
||||||
|
if (params.operation === "image-super-resolution" || params.operation === "image-edit") {
|
||||||
|
return { provider: "dashscope", transport: "dashscope-image" };
|
||||||
|
}
|
||||||
|
const candidates = resolveImageProviderCandidates(params.model);
|
||||||
|
return candidates[0] || null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function normalizeRecoveryUser(task) {
|
||||||
|
return {
|
||||||
|
id: task.user_id,
|
||||||
|
enterpriseId: task.enterprise_id ?? null,
|
||||||
|
accountType: task.enterprise_id ? "enterprise" : "personal",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
async function runStaleTaskCleanup() {
|
async function runStaleTaskCleanup() {
|
||||||
|
if (taskStaleCleanupRunning) return;
|
||||||
|
taskStaleCleanupRunning = true;
|
||||||
try {
|
try {
|
||||||
const { rows } = await pool.query(
|
const { rows } = await pool.query(
|
||||||
`UPDATE generation_tasks
|
`UPDATE generation_tasks
|
||||||
SET status = 'failed', error = '任务超时自动释放', updated_at = NOW()
|
SET status = 'failed',
|
||||||
|
error = 'Task timed out and was released automatically',
|
||||||
|
completed_at = NOW(),
|
||||||
|
updated_at = NOW()
|
||||||
WHERE status IN ('pending', 'running')
|
WHERE status IN ('pending', 'running')
|
||||||
AND GREATEST(updated_at, COALESCE(last_poll_at, created_at)) < NOW() - INTERVAL '10 minutes'
|
AND GREATEST(updated_at, COALESCE(last_poll_at, created_at)) < NOW() - ($1::int * INTERVAL '1 minute')
|
||||||
RETURNING id`,
|
RETURNING *`,
|
||||||
|
[STALE_TASK_TIMEOUT_MINUTES],
|
||||||
);
|
);
|
||||||
|
|
||||||
for (const row of rows) {
|
for (const row of rows) {
|
||||||
await publishTaskEvent({
|
await stopPolling(row.id, { releaseLease: true });
|
||||||
taskId: row.id,
|
taskEvents.emit(`task:${row.id}`, formatTaskEvent(row));
|
||||||
status: "failed",
|
await refundTaskBillingOnFailure(row.id).catch((err) => {
|
||||||
progress: null,
|
console.error(`[aiTaskWorker] stale task refund error for task ${row.id}:`, err.message);
|
||||||
resultUrl: null,
|
|
||||||
error: "任务超时自动释放",
|
|
||||||
});
|
});
|
||||||
// Also stop any active poller for this task
|
|
||||||
const poller = activePollers.get(row.id);
|
|
||||||
if (poller) {
|
|
||||||
clearInterval(poller.interval);
|
|
||||||
activePollers.delete(row.id);
|
|
||||||
}
|
|
||||||
await clearPollerState(row.id).catch(() => {});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rows.length > 0) {
|
if (rows.length > 0) {
|
||||||
console.log(`[aiTaskWorker] Cleaned up ${rows.length} stale task(s)`);
|
console.log(`[aiTaskWorker] Cleaned up ${rows.length} stale task(s)`);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("[aiTaskWorker] Stale task cleanup failed:", err.message);
|
console.error("[aiTaskWorker] Stale task cleanup failed:", err.message);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function startTaskEventListener() {
|
|
||||||
if (taskEventListenerClient) return;
|
|
||||||
if (taskEventListenerStarting) return taskEventListenerStarting;
|
|
||||||
|
|
||||||
taskEventListenerStarting = (async () => {
|
|
||||||
const client = await pool.connect();
|
|
||||||
let released = false;
|
|
||||||
|
|
||||||
const releaseClient = () => {
|
|
||||||
if (released) return;
|
|
||||||
released = true;
|
|
||||||
taskEventListenerClient = null;
|
|
||||||
try {
|
|
||||||
client.release();
|
|
||||||
} catch {}
|
|
||||||
};
|
|
||||||
|
|
||||||
client.on("notification", (message) => {
|
|
||||||
if (message.channel !== TASK_EVENT_CHANNEL || !message.payload) return;
|
|
||||||
try {
|
|
||||||
const payload = JSON.parse(message.payload);
|
|
||||||
if (payload?.origin === TASK_EVENT_ORIGIN) return;
|
|
||||||
emitTaskEvent(payload?.event || payload);
|
|
||||||
} catch (err) {
|
|
||||||
console.error("[aiTaskWorker] task event notification parse failed:", err.message);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
client.on("error", (err) => {
|
|
||||||
console.error("[aiTaskWorker] task event listener error:", err.message);
|
|
||||||
releaseClient();
|
|
||||||
setTimeout(() => {
|
|
||||||
startTaskEventListener().catch((restartErr) => {
|
|
||||||
console.error("[aiTaskWorker] task event listener restart failed:", restartErr.message);
|
|
||||||
});
|
|
||||||
}, 5000).unref?.();
|
|
||||||
});
|
|
||||||
|
|
||||||
await client.query(`LISTEN ${TASK_EVENT_CHANNEL}`);
|
|
||||||
taskEventListenerClient = client;
|
|
||||||
console.log(`[aiTaskWorker] listening for task events on ${TASK_EVENT_CHANNEL}`);
|
|
||||||
})();
|
|
||||||
|
|
||||||
try {
|
|
||||||
await taskEventListenerStarting;
|
|
||||||
} finally {
|
} finally {
|
||||||
taskEventListenerStarting = null;
|
taskStaleCleanupRunning = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function stopTaskEventListener() {
|
async function runResultPersistenceRetry() {
|
||||||
const client = taskEventListenerClient;
|
if (taskResultPersistenceRetryRunning) return;
|
||||||
taskEventListenerClient = null;
|
taskResultPersistenceRetryRunning = true;
|
||||||
if (!client) return;
|
|
||||||
try {
|
try {
|
||||||
await client.query(`UNLISTEN ${TASK_EVENT_CHANNEL}`);
|
const { rows } = await pool.query(
|
||||||
} catch {}
|
`SELECT *
|
||||||
client.release();
|
FROM generation_tasks
|
||||||
|
WHERE status = 'completed'
|
||||||
|
AND result_url IS NOT NULL
|
||||||
|
AND result_url ~* '^https?://'
|
||||||
|
AND result_url !~* '/users/[^/]+/generation-results/'
|
||||||
|
AND result_persist_status IN ('pending', 'failed')
|
||||||
|
AND result_persist_attempts < $1
|
||||||
|
ORDER BY updated_at ASC
|
||||||
|
LIMIT $2`,
|
||||||
|
[RESULT_PERSIST_RETRY_LIMIT, RESULT_PERSIST_RETRY_BATCH_SIZE],
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const row of rows) {
|
||||||
|
await persistTaskResultUrlToOss(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rows.length > 0) {
|
||||||
|
console.log(`[aiTaskWorker] Retried OSS result persistence for ${rows.length} task(s)`);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error("[aiTaskWorker] Result persistence retry failed:", err.message);
|
||||||
|
} finally {
|
||||||
|
taskResultPersistenceRetryRunning = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function runTaskStartupRecovery() {
|
||||||
|
if (taskStartupRecoveryRunning) return;
|
||||||
|
taskStartupRecoveryRunning = true;
|
||||||
|
try {
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`SELECT gt.*, u.enterprise_id
|
||||||
|
FROM generation_tasks gt
|
||||||
|
JOIN users u ON u.id = gt.user_id
|
||||||
|
WHERE gt.status = 'running'
|
||||||
|
AND gt.provider_task_id IS NOT NULL
|
||||||
|
AND GREATEST(gt.updated_at, COALESCE(gt.last_poll_at, gt.created_at)) >= NOW() - ($1::int * INTERVAL '1 minute')
|
||||||
|
ORDER BY gt.updated_at DESC
|
||||||
|
LIMIT $2`,
|
||||||
|
[STALE_TASK_TIMEOUT_MINUTES, TASK_STARTUP_RECOVERY_LIMIT],
|
||||||
|
);
|
||||||
|
|
||||||
|
let recovered = 0;
|
||||||
|
for (const task of rows) {
|
||||||
|
if (activePollers.has(task.id)) continue;
|
||||||
|
|
||||||
|
let providerConfig;
|
||||||
|
try {
|
||||||
|
providerConfig = resolveProviderConfigForRecovery(task);
|
||||||
|
} catch (err) {
|
||||||
|
console.warn(`[aiTaskWorker] task ${task.id} recovery skipped: ${err.message}`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!providerConfig?.provider) continue;
|
||||||
|
const slotResult = await keyManager.acquireKey(providerConfig.provider, normalizeRecoveryUser(task), null, { waitTimeoutMs: 0 });
|
||||||
|
if (!slotResult) {
|
||||||
|
console.warn(`[aiTaskWorker] task ${task.id} recovery waiting for provider capacity: ${providerConfig.provider}`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
startPolling(task.id, {
|
||||||
|
providerTaskId: task.provider_task_id,
|
||||||
|
apiKey: slotResult.apiKey,
|
||||||
|
type: task.type,
|
||||||
|
providerConfig,
|
||||||
|
leaseToken: slotResult.leaseToken,
|
||||||
|
keyManager,
|
||||||
|
});
|
||||||
|
recovered += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (recovered > 0) {
|
||||||
|
console.log(`[aiTaskWorker] Recovered ${recovered} running task poller(s) after startup`);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error("[aiTaskWorker] Startup task recovery failed:", err.message);
|
||||||
|
} finally {
|
||||||
|
taskStartupRecoveryRunning = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function startStaleTaskCleanup() {
|
function startStaleTaskCleanup() {
|
||||||
if (staleTaskCleanupTimer) return;
|
if (staleTaskCleanupTimer) return;
|
||||||
staleTaskCleanupTimer = setInterval(runStaleTaskCleanup, STALE_TASK_CLEANUP_INTERVAL_MS);
|
staleTaskCleanupTimer = setInterval(runStaleTaskCleanup, STALE_TASK_CLEANUP_INTERVAL_MS);
|
||||||
// Run once shortly after startup
|
taskResultPersistenceRetryTimer = setInterval(runResultPersistenceRetry, TASK_RESULT_PERSIST_RETRY_INTERVAL_MS);
|
||||||
|
taskStartupRecoveryTimer = setTimeout(runTaskStartupRecovery, 5_000);
|
||||||
setTimeout(runStaleTaskCleanup, 10_000);
|
setTimeout(runStaleTaskCleanup, 10_000);
|
||||||
|
setTimeout(runResultPersistenceRetry, 15_000);
|
||||||
}
|
}
|
||||||
|
|
||||||
function stopStaleTaskCleanup() {
|
function stopStaleTaskCleanup() {
|
||||||
@@ -1040,30 +979,20 @@ function stopStaleTaskCleanup() {
|
|||||||
clearInterval(staleTaskCleanupTimer);
|
clearInterval(staleTaskCleanupTimer);
|
||||||
staleTaskCleanupTimer = null;
|
staleTaskCleanupTimer = null;
|
||||||
}
|
}
|
||||||
}
|
if (taskResultPersistenceRetryTimer) {
|
||||||
|
clearInterval(taskResultPersistenceRetryTimer);
|
||||||
function startPollerRecovery() {
|
taskResultPersistenceRetryTimer = null;
|
||||||
if (pollerRecoveryTimer) return;
|
}
|
||||||
ensureTaskPollerStore()
|
if (taskStartupRecoveryTimer) {
|
||||||
.then(() => recoverRunnablePollers())
|
clearTimeout(taskStartupRecoveryTimer);
|
||||||
.catch((err) => console.error("[aiTaskWorker] initial poller recovery failed:", err.message));
|
taskStartupRecoveryTimer = null;
|
||||||
pollerRecoveryTimer = setInterval(() => {
|
|
||||||
recoverRunnablePollers().catch((err) => {
|
|
||||||
console.error("[aiTaskWorker] poller recovery failed:", err.message);
|
|
||||||
});
|
|
||||||
}, POLLER_RECOVERY_INTERVAL_MS);
|
|
||||||
}
|
|
||||||
|
|
||||||
function stopPollerRecovery() {
|
|
||||||
if (pollerRecoveryTimer) {
|
|
||||||
clearInterval(pollerRecoveryTimer);
|
|
||||||
pollerRecoveryTimer = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
startPolling,
|
startPolling,
|
||||||
stopPolling,
|
stopPolling,
|
||||||
|
cancelTask,
|
||||||
updateTaskInDb,
|
updateTaskInDb,
|
||||||
getActiveCount,
|
getActiveCount,
|
||||||
extractProviderTaskId,
|
extractProviderTaskId,
|
||||||
@@ -1073,10 +1002,6 @@ module.exports = {
|
|||||||
parseKlingCredential,
|
parseKlingCredential,
|
||||||
createKlingJwt,
|
createKlingJwt,
|
||||||
taskEvents,
|
taskEvents,
|
||||||
startTaskEventListener,
|
|
||||||
stopTaskEventListener,
|
|
||||||
startPollerRecovery,
|
|
||||||
stopPollerRecovery,
|
|
||||||
startStaleTaskCleanup,
|
startStaleTaskCleanup,
|
||||||
stopStaleTaskCleanup,
|
stopStaleTaskCleanup,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -353,6 +353,18 @@ async function migrateGenerationTasksBillingColumns(client) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function migrateGenerationTaskResultPersistence(client) {
|
||||||
|
await addColumnIfMissing("generation_tasks", "result_persist_status TEXT NOT NULL DEFAULT 'pending'");
|
||||||
|
await addColumnIfMissing("generation_tasks", "result_persist_attempts INTEGER NOT NULL DEFAULT 0");
|
||||||
|
await addColumnIfMissing("generation_tasks", "result_persist_error TEXT");
|
||||||
|
await addColumnIfMissing("generation_tasks", "result_persisted_at TIMESTAMPTZ");
|
||||||
|
await client.query(`
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_generation_tasks_result_persist_retry
|
||||||
|
ON generation_tasks(result_persist_status, updated_at)
|
||||||
|
WHERE status = 'completed' AND result_url IS NOT NULL
|
||||||
|
`);
|
||||||
|
}
|
||||||
|
|
||||||
async function ensureModelPriceSeed() {
|
async function ensureModelPriceSeed() {
|
||||||
const columns = await getColumnNames("model_prices");
|
const columns = await getColumnNames("model_prices");
|
||||||
const useMills = columns.includes("input_price_mills");
|
const useMills = columns.includes("input_price_mills");
|
||||||
@@ -959,6 +971,7 @@ async function ensureSchema() {
|
|||||||
await runMigration("030_generation_tasks_user_status_index", migrateGenerationTasksUserStatusIndex);
|
await runMigration("030_generation_tasks_user_status_index", migrateGenerationTasksUserStatusIndex);
|
||||||
await runMigration("031_generation_tasks_billing_columns", migrateGenerationTasksBillingColumns);
|
await runMigration("031_generation_tasks_billing_columns", migrateGenerationTasksBillingColumns);
|
||||||
await runMigration("032_ecommerce_video_history", migrateEcommerceVideoHistorySchema);
|
await runMigration("032_ecommerce_video_history", migrateEcommerceVideoHistorySchema);
|
||||||
|
await runMigration("033_generation_task_result_persistence", migrateGenerationTaskResultPersistence);
|
||||||
await ensureModelPriceSeed();
|
await ensureModelPriceSeed();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+1
-6
@@ -144,9 +144,7 @@ async function main() {
|
|||||||
startSettlementWorker()
|
startSettlementWorker()
|
||||||
startProviderHealthMonitor()
|
startProviderHealthMonitor()
|
||||||
|
|
||||||
const { startStaleTaskCleanup, startTaskEventListener, startPollerRecovery } = require('./aiTaskWorker')
|
const { startStaleTaskCleanup } = require('./aiTaskWorker')
|
||||||
await startTaskEventListener()
|
|
||||||
startPollerRecovery()
|
|
||||||
startStaleTaskCleanup()
|
startStaleTaskCleanup()
|
||||||
|
|
||||||
server = app.listen(PORT, HOST, () => {
|
server = app.listen(PORT, HOST, () => {
|
||||||
@@ -185,9 +183,6 @@ function gracefulShutdown(signal) {
|
|||||||
console.log('[shutdown] Server closed, cleaning up...')
|
console.log('[shutdown] Server closed, cleaning up...')
|
||||||
const { stopProviderHealthMonitor } = require('./providerHealthMonitor')
|
const { stopProviderHealthMonitor } = require('./providerHealthMonitor')
|
||||||
stopProviderHealthMonitor()
|
stopProviderHealthMonitor()
|
||||||
const { stopTaskEventListener, stopPollerRecovery } = require('./aiTaskWorker')
|
|
||||||
stopPollerRecovery()
|
|
||||||
void stopTaskEventListener()
|
|
||||||
const { pool } = require('./db')
|
const { pool } = require('./db')
|
||||||
pool.end().then(() => {
|
pool.end().then(() => {
|
||||||
console.log('[shutdown] Database pool closed')
|
console.log('[shutdown] Database pool closed')
|
||||||
|
|||||||
+3
-2
@@ -284,7 +284,7 @@ async function releaseLeaseInternal(leaseToken, user, options = {}) {
|
|||||||
const { rows } = await client.query(
|
const { rows } = await client.query(
|
||||||
`
|
`
|
||||||
WITH candidate AS (
|
WITH candidate AS (
|
||||||
SELECT l.id, l.key_id, k.provider
|
SELECT l.id, l.key_id, l.user_id, l.enterprise_id, k.provider
|
||||||
FROM key_leases l
|
FROM key_leases l
|
||||||
JOIN api_keys k ON k.id = l.key_id
|
JOIN api_keys k ON k.id = l.key_id
|
||||||
WHERE l.lease_token = $1 AND l.released_at IS NULL
|
WHERE l.lease_token = $1 AND l.released_at IS NULL
|
||||||
@@ -298,6 +298,7 @@ async function releaseLeaseInternal(leaseToken, user, options = {}) {
|
|||||||
RETURNING id, key_id
|
RETURNING id, key_id
|
||||||
)
|
)
|
||||||
SELECT r.id, r.key_id, c.provider
|
SELECT r.id, r.key_id, c.provider
|
||||||
|
, c.user_id, c.enterprise_id
|
||||||
FROM released r
|
FROM released r
|
||||||
JOIN candidate c ON c.key_id = r.key_id
|
JOIN candidate c ON c.key_id = r.key_id
|
||||||
`,
|
`,
|
||||||
@@ -339,7 +340,7 @@ async function releaseLeaseInternal(leaseToken, user, options = {}) {
|
|||||||
INSERT INTO usage_logs (user_id, enterprise_id, provider, key_id, action)
|
INSERT INTO usage_logs (user_id, enterprise_id, provider, key_id, action)
|
||||||
VALUES ($1, $2, (SELECT provider FROM api_keys WHERE id = $3), $4, $5)
|
VALUES ($1, $2, (SELECT provider FROM api_keys WHERE id = $3), $4, $5)
|
||||||
`,
|
`,
|
||||||
[userId, enterpriseId, lease.key_id, lease.key_id, "release"],
|
[userId ?? lease.user_id, enterpriseId ?? lease.enterprise_id, lease.key_id, lease.key_id, "release"],
|
||||||
);
|
);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
+4
-6
@@ -16,6 +16,7 @@ const {
|
|||||||
} = require("../enterpriseVideoBilling");
|
} = require("../enterpriseVideoBilling");
|
||||||
const {
|
const {
|
||||||
startPolling,
|
startPolling,
|
||||||
|
cancelTask,
|
||||||
updateTaskInDb,
|
updateTaskInDb,
|
||||||
extractProviderTaskId,
|
extractProviderTaskId,
|
||||||
extractImageUrl,
|
extractImageUrl,
|
||||||
@@ -1770,12 +1771,9 @@ function registerAiRoutes(router) {
|
|||||||
if (!Number.isFinite(taskId)) return res.status(400).json({ error: "Invalid task id" });
|
if (!Number.isFinite(taskId)) return res.status(400).json({ error: "Invalid task id" });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const { rows } = await pool.query(
|
const task = await cancelTask(taskId, req.user.id);
|
||||||
"UPDATE generation_tasks SET status = 'cancelled', updated_at = NOW() WHERE id = $1 AND user_id = $2 AND status IN ('pending', 'running') RETURNING id, status",
|
if (!task) return res.status(404).json({ error: "Task not found or not in active state" });
|
||||||
[taskId, req.user.id],
|
res.json({ id: task.id, status: task.status });
|
||||||
);
|
|
||||||
if (rows.length === 0) return res.status(404).json({ error: "Task not found or not in active state" });
|
|
||||||
res.json({ id: rows[0].id, status: rows[0].status });
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("[ai/task-cancel] error:", err.message);
|
console.error("[ai/task-cancel] error:", err.message);
|
||||||
res.status(500).json({ error: "取消任务失败" });
|
res.status(500).json({ error: "取消任务失败" });
|
||||||
|
|||||||
Reference in New Issue
Block a user