Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9a6e373181 | |||
| 00eba3e209 | |||
| ca84754bd2 | |||
| f9da506017 | |||
| 1166811ee4 |
@@ -20,6 +20,13 @@ JWT_EXPIRES_IN=7d
|
|||||||
# Connection pool
|
# Connection pool
|
||||||
PG_POOL_MAX=10
|
PG_POOL_MAX=10
|
||||||
|
|
||||||
|
# Provider polling reliability
|
||||||
|
# Shared across PM2 workers through Postgres-backed poll slots.
|
||||||
|
TASK_PROVIDER_POLL_MAX_CONCURRENCY=8
|
||||||
|
TASK_PROVIDER_POLL_SLOT_TTL_MS=30000
|
||||||
|
TASK_PROVIDER_POLL_REQUEST_TIMEOUT_MS=25000
|
||||||
|
GRSAI_IMAGE_SUBMIT_TIMEOUT_MS=30000
|
||||||
|
|
||||||
# CORS (comma separated allowed origins, * for all)
|
# CORS (comma separated allowed origins, * for all)
|
||||||
CORS_ORIGINS=*
|
CORS_ORIGINS=*
|
||||||
|
|
||||||
|
|||||||
+6
-1
@@ -14,7 +14,12 @@
|
|||||||
"audit-routes": "node src/cli/auditModelRoutes.js",
|
"audit-routes": "node src/cli/auditModelRoutes.js",
|
||||||
"import-config": "node src/cli/importConfig.js",
|
"import-config": "node src/cli/importConfig.js",
|
||||||
"init-pools": "node src/cli/initPools.js",
|
"init-pools": "node src/cli/initPools.js",
|
||||||
"test:community-routes": "node scripts/communityRouteContract.test.js"
|
"test:community-routes": "node scripts/communityRouteContract.test.js",
|
||||||
|
"test:enterprise-video-pricing": "node scripts/enterpriseVideoPricingContract.test.js",
|
||||||
|
"test:key-manager": "node scripts/keyManagerReleaseContract.test.js",
|
||||||
|
"test:provider-poll-limiter": "node scripts/providerPollLimiterContract.test.js",
|
||||||
|
"test:task-progress-contract": "node scripts/taskProgressContract.test.js",
|
||||||
|
"test": "npm run test:community-routes && npm run test:enterprise-video-pricing && npm run test:key-manager && npm run test:provider-poll-limiter && npm run test:task-progress-contract"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"alipay-sdk": "^4.14.0",
|
"alipay-sdk": "^4.14.0",
|
||||||
|
|||||||
@@ -0,0 +1,54 @@
|
|||||||
|
const assert = require("node:assert/strict");
|
||||||
|
|
||||||
|
const {
|
||||||
|
calculateEnterpriseVideoCredits,
|
||||||
|
getEnterpriseVideoCreditRate,
|
||||||
|
getEnterpriseVideoPricingConfig,
|
||||||
|
} = require("../src/enterpriseVideoBilling");
|
||||||
|
|
||||||
|
function getRule(config, id) {
|
||||||
|
const rule = config.rules.find((item) => item.id === id);
|
||||||
|
assert(rule, `missing enterprise video pricing rule: ${id}`);
|
||||||
|
return rule;
|
||||||
|
}
|
||||||
|
|
||||||
|
const config = getEnterpriseVideoPricingConfig();
|
||||||
|
|
||||||
|
assert.equal(config.currency, "CNY");
|
||||||
|
assert.equal(config.creditsPerCny, 100);
|
||||||
|
assert.equal(config.billingUnit, "per_second");
|
||||||
|
assert.deepEqual(config.resolutions, ["720P", "1080P"]);
|
||||||
|
|
||||||
|
assert.equal(getRule(config, "happyhorse").rates["720P"], 0.72);
|
||||||
|
assert.equal(getRule(config, "happyhorse").rates["1080P"], 1.28);
|
||||||
|
assert.equal(getRule(config, "wanxiang-i2v").rates["720P"], 0.6);
|
||||||
|
assert.equal(getRule(config, "kling-muted").rates["1080P"], 0.8);
|
||||||
|
|
||||||
|
assert.equal(
|
||||||
|
getEnterpriseVideoCreditRate({
|
||||||
|
model: "happyhorse-1.0",
|
||||||
|
resolution: "1080P",
|
||||||
|
}),
|
||||||
|
getRule(config, "happyhorse").rates["1080P"],
|
||||||
|
);
|
||||||
|
|
||||||
|
assert.equal(
|
||||||
|
getEnterpriseVideoCreditRate({
|
||||||
|
model: "kling-3.0-dashscope",
|
||||||
|
resolution: "720P",
|
||||||
|
muted: true,
|
||||||
|
hasReferenceVideo: false,
|
||||||
|
}),
|
||||||
|
getRule(config, "kling-muted").rates["720P"],
|
||||||
|
);
|
||||||
|
|
||||||
|
assert.equal(
|
||||||
|
calculateEnterpriseVideoCredits({
|
||||||
|
model: "vidu-q3-turbo",
|
||||||
|
resolution: "1080P",
|
||||||
|
durationSeconds: 5,
|
||||||
|
}),
|
||||||
|
500,
|
||||||
|
);
|
||||||
|
|
||||||
|
console.log("enterprise video pricing contract tests passed");
|
||||||
@@ -0,0 +1,73 @@
|
|||||||
|
const assert = require("node:assert/strict");
|
||||||
|
const { createRequire } = require("node:module");
|
||||||
|
|
||||||
|
const nodeRequire = createRequire(__filename);
|
||||||
|
|
||||||
|
function loadKeyManagerWithPool(pool) {
|
||||||
|
const dbPath = nodeRequire.resolve("../src/db");
|
||||||
|
const keyManagerPath = nodeRequire.resolve("../src/keyManager");
|
||||||
|
const originalDbModule = nodeRequire.cache[dbPath];
|
||||||
|
const originalKeyManagerModule = nodeRequire.cache[keyManagerPath];
|
||||||
|
|
||||||
|
delete nodeRequire.cache[keyManagerPath];
|
||||||
|
nodeRequire.cache[dbPath] = {
|
||||||
|
id: dbPath,
|
||||||
|
filename: dbPath,
|
||||||
|
loaded: true,
|
||||||
|
exports: {
|
||||||
|
pool,
|
||||||
|
withTransaction: async (fn) => fn(pool),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
return {
|
||||||
|
keyManager: nodeRequire("../src/keyManager"),
|
||||||
|
restore() {
|
||||||
|
delete nodeRequire.cache[keyManagerPath];
|
||||||
|
if (originalKeyManagerModule) nodeRequire.cache[keyManagerPath] = originalKeyManagerModule;
|
||||||
|
if (originalDbModule) nodeRequire.cache[dbPath] = originalDbModule;
|
||||||
|
else delete nodeRequire.cache[dbPath];
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function createReleasePool() {
|
||||||
|
const calls = [];
|
||||||
|
return {
|
||||||
|
calls,
|
||||||
|
async query(sql, params) {
|
||||||
|
calls.push({ sql, params });
|
||||||
|
if (/WITH candidate AS/i.test(sql)) {
|
||||||
|
return {
|
||||||
|
rows: [{
|
||||||
|
id: 10,
|
||||||
|
key_id: 20,
|
||||||
|
lease_user_id: 30,
|
||||||
|
lease_enterprise_id: 40,
|
||||||
|
provider: "dashscope",
|
||||||
|
}],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (/UPDATE api_keys SET active_count/i.test(sql)) return { rows: [] };
|
||||||
|
if (/INSERT INTO usage_logs/i.test(sql)) return { rows: [] };
|
||||||
|
throw new Error(`Unexpected SQL: ${sql}`);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
const pool = createReleasePool();
|
||||||
|
const { keyManager, restore } = loadKeyManagerWithPool(pool);
|
||||||
|
try {
|
||||||
|
const result = await keyManager.releaseKey("lease-token-without-user-context");
|
||||||
|
|
||||||
|
assert.equal(result.released, true);
|
||||||
|
const usageLogCall = pool.calls.find((call) => /INSERT INTO usage_logs/i.test(call.sql));
|
||||||
|
assert.deepEqual(usageLogCall.params, [30, 40, 20, 20, "release"]);
|
||||||
|
} finally {
|
||||||
|
restore();
|
||||||
|
}
|
||||||
|
})().catch((error) => {
|
||||||
|
console.error(error);
|
||||||
|
process.exitCode = 1;
|
||||||
|
});
|
||||||
@@ -0,0 +1,96 @@
|
|||||||
|
const assert = require("node:assert/strict");
|
||||||
|
const { createRequire } = require("node:module");
|
||||||
|
|
||||||
|
const nodeRequire = createRequire(__filename);
|
||||||
|
|
||||||
|
function loadLimiterWithPool(pool) {
|
||||||
|
const dbPath = nodeRequire.resolve("../src/db");
|
||||||
|
const limiterPath = nodeRequire.resolve("../src/providerPollLimiter");
|
||||||
|
const originalDbModule = nodeRequire.cache[dbPath];
|
||||||
|
const originalLimiterModule = nodeRequire.cache[limiterPath];
|
||||||
|
|
||||||
|
delete nodeRequire.cache[limiterPath];
|
||||||
|
nodeRequire.cache[dbPath] = {
|
||||||
|
id: dbPath,
|
||||||
|
filename: dbPath,
|
||||||
|
loaded: true,
|
||||||
|
exports: { pool },
|
||||||
|
};
|
||||||
|
|
||||||
|
return {
|
||||||
|
limiter: nodeRequire("../src/providerPollLimiter"),
|
||||||
|
restore() {
|
||||||
|
delete nodeRequire.cache[limiterPath];
|
||||||
|
if (originalLimiterModule) nodeRequire.cache[limiterPath] = originalLimiterModule;
|
||||||
|
if (originalDbModule) nodeRequire.cache[dbPath] = originalDbModule;
|
||||||
|
else delete nodeRequire.cache[dbPath];
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function createPool(options = {}) {
|
||||||
|
const calls = [];
|
||||||
|
return {
|
||||||
|
calls,
|
||||||
|
async query(sql, params = []) {
|
||||||
|
calls.push({ sql, params });
|
||||||
|
if (/CREATE TABLE IF NOT EXISTS generation_provider_poll_slots/i.test(sql)) return { rows: [] };
|
||||||
|
if (/WITH candidate AS/i.test(sql)) {
|
||||||
|
if (options.noAvailableSlot) return { rows: [] };
|
||||||
|
return { rows: [{ scope: params[0], slot_no: 2 }] };
|
||||||
|
}
|
||||||
|
if (/DELETE FROM generation_provider_poll_slots/i.test(sql)) return { rows: [] };
|
||||||
|
throw new Error(`Unexpected SQL: ${sql}`);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
const previousLimit = process.env.TASK_PROVIDER_POLL_MAX_CONCURRENCY;
|
||||||
|
process.env.TASK_PROVIDER_POLL_MAX_CONCURRENCY = "3";
|
||||||
|
|
||||||
|
const pool = createPool();
|
||||||
|
const { limiter, restore } = loadLimiterWithPool(pool);
|
||||||
|
try {
|
||||||
|
const outcome = await limiter.withProviderPollSlot(101, async () => "polled");
|
||||||
|
|
||||||
|
assert.equal(outcome.acquired, true);
|
||||||
|
assert.equal(outcome.value, "polled");
|
||||||
|
|
||||||
|
const acquireCall = pool.calls.find((call) => /WITH candidate AS/i.test(call.sql));
|
||||||
|
assert.equal(acquireCall.params[1], 3);
|
||||||
|
assert.equal(acquireCall.params[3], 101);
|
||||||
|
|
||||||
|
const releaseCall = pool.calls.find((call) => /DELETE FROM generation_provider_poll_slots/i.test(call.sql));
|
||||||
|
assert.equal(releaseCall.params[0], acquireCall.params[0]);
|
||||||
|
assert.equal(releaseCall.params[1], 2);
|
||||||
|
assert.equal(releaseCall.params[2], acquireCall.params[2]);
|
||||||
|
} finally {
|
||||||
|
if (previousLimit === undefined) delete process.env.TASK_PROVIDER_POLL_MAX_CONCURRENCY;
|
||||||
|
else process.env.TASK_PROVIDER_POLL_MAX_CONCURRENCY = previousLimit;
|
||||||
|
restore();
|
||||||
|
}
|
||||||
|
|
||||||
|
const saturatedPool = createPool({ noAvailableSlot: true });
|
||||||
|
const { limiter: saturatedLimiter, restore: restoreSaturated } = loadLimiterWithPool(saturatedPool);
|
||||||
|
try {
|
||||||
|
let called = false;
|
||||||
|
const outcome = await saturatedLimiter.withProviderPollSlot(202, async () => {
|
||||||
|
called = true;
|
||||||
|
return "should-not-run";
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.equal(outcome.acquired, false);
|
||||||
|
assert.equal(outcome.value, undefined);
|
||||||
|
assert.equal(called, false);
|
||||||
|
assert.equal(
|
||||||
|
saturatedPool.calls.some((call) => /DELETE FROM generation_provider_poll_slots/i.test(call.sql)),
|
||||||
|
false,
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
restoreSaturated();
|
||||||
|
}
|
||||||
|
})().catch((error) => {
|
||||||
|
console.error(error);
|
||||||
|
process.exitCode = 1;
|
||||||
|
});
|
||||||
@@ -0,0 +1,90 @@
|
|||||||
|
const assert = require("node:assert/strict");
|
||||||
|
|
||||||
|
const {
|
||||||
|
DEFAULT_IMAGE_EXPECTED_DURATION_MS,
|
||||||
|
DEFAULT_VIDEO_EXPECTED_DURATION_MS,
|
||||||
|
PROGRESS_SOURCE_ESTIMATED,
|
||||||
|
PROGRESS_SOURCE_REAL,
|
||||||
|
formatTaskProgressPayload,
|
||||||
|
getExpectedDurationMs,
|
||||||
|
parseTaskParams,
|
||||||
|
} = require("../src/taskProgressContract");
|
||||||
|
|
||||||
|
const createdAt = "2026-06-10T08:00:00.000Z";
|
||||||
|
|
||||||
|
{
|
||||||
|
const payload = formatTaskProgressPayload({
|
||||||
|
id: 101,
|
||||||
|
status: "completed",
|
||||||
|
progress: 0,
|
||||||
|
created_at: createdAt,
|
||||||
|
params_json: "{}",
|
||||||
|
result_url: "https://oss.example/result.png",
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.equal(payload.taskId, 101);
|
||||||
|
assert.equal(payload.progress, 100);
|
||||||
|
assert.equal(payload.progressSource, PROGRESS_SOURCE_REAL);
|
||||||
|
assert.equal(payload.stage, "\u5b8c\u6210");
|
||||||
|
assert.equal(payload.startedAt, createdAt);
|
||||||
|
assert.equal(payload.resultUrl, "https://oss.example/result.png");
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const payload = formatTaskProgressPayload({
|
||||||
|
id: 102,
|
||||||
|
status: "running",
|
||||||
|
progress: 43,
|
||||||
|
progress_source: PROGRESS_SOURCE_REAL,
|
||||||
|
created_at: createdAt,
|
||||||
|
params_json: JSON.stringify({ model: "kling-3.0-dashscope", duration: 5 }),
|
||||||
|
type: "video",
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.equal(payload.progress, 43);
|
||||||
|
assert.equal(payload.progressSource, PROGRESS_SOURCE_REAL);
|
||||||
|
assert.equal(payload.stage, "\u751f\u6210\u4e2d");
|
||||||
|
assert.equal(payload.expectedDurationMs, 300_000);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const payload = formatTaskProgressPayload({
|
||||||
|
id: 103,
|
||||||
|
status: "running",
|
||||||
|
progress: 0,
|
||||||
|
created_at: createdAt,
|
||||||
|
params_json: JSON.stringify({
|
||||||
|
requestedModel: "nano-banana-pro",
|
||||||
|
referenceUrls: ["https://oss.example/a.png", "https://oss.example/b.png"],
|
||||||
|
}),
|
||||||
|
type: "image",
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.equal(payload.progressSource, PROGRESS_SOURCE_ESTIMATED);
|
||||||
|
assert.equal(payload.stage, "\u5df2\u63d0\u4ea4");
|
||||||
|
assert.equal(payload.expectedDurationMs, 250_000);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const expectedDurationMs = getExpectedDurationMs({
|
||||||
|
type: "video",
|
||||||
|
params_json: JSON.stringify({ model: "kling-3.0-dashscope", duration: 10 }),
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.equal(expectedDurationMs, 400_000);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
assert.deepEqual(parseTaskParams("{bad json"), {});
|
||||||
|
assert.deepEqual(parseTaskParams({ model: "gpt-image-2" }), { model: "gpt-image-2" });
|
||||||
|
assert.equal(
|
||||||
|
getExpectedDurationMs({ type: "image", params_json: "{}" }),
|
||||||
|
DEFAULT_IMAGE_EXPECTED_DURATION_MS,
|
||||||
|
);
|
||||||
|
assert.equal(
|
||||||
|
getExpectedDurationMs({ type: "video", params_json: "{}" }),
|
||||||
|
DEFAULT_VIDEO_EXPECTED_DURATION_MS,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log("task progress contract tests passed");
|
||||||
+215
-55
@@ -5,6 +5,13 @@ const { EventEmitter } = require("node:events");
|
|||||||
const { pool } = require("./db");
|
const { pool } = require("./db");
|
||||||
const { refundTaskBillingOnFailure } = require("./billing");
|
const { refundTaskBillingOnFailure } = require("./billing");
|
||||||
const { putObject, isOssConfigured } = require("./ossClient");
|
const { putObject, isOssConfigured } = require("./ossClient");
|
||||||
|
const { withProviderPollSlot } = require("./providerPollLimiter");
|
||||||
|
const {
|
||||||
|
PROGRESS_SOURCE_ESTIMATED,
|
||||||
|
PROGRESS_SOURCE_REAL,
|
||||||
|
formatTaskProgressPayload,
|
||||||
|
normalizeProgressSource,
|
||||||
|
} = require("./taskProgressContract");
|
||||||
|
|
||||||
const taskEvents = new EventEmitter();
|
const taskEvents = new EventEmitter();
|
||||||
taskEvents.setMaxListeners(200);
|
taskEvents.setMaxListeners(200);
|
||||||
@@ -18,10 +25,12 @@ const TASK_EVENT_ORIGIN = `${process.pid}-${crypto.randomUUID()}`;
|
|||||||
const POLLER_OWNER_ID = `${process.pid}-${crypto.randomUUID()}`;
|
const POLLER_OWNER_ID = `${process.pid}-${crypto.randomUUID()}`;
|
||||||
const POLLER_OWNER_STALE_MS = Number(process.env.TASK_POLLER_OWNER_STALE_MS || 20_000);
|
const POLLER_OWNER_STALE_MS = Number(process.env.TASK_POLLER_OWNER_STALE_MS || 20_000);
|
||||||
const POLLER_RECOVERY_INTERVAL_MS = Number(process.env.TASK_POLLER_RECOVERY_INTERVAL_MS || 30_000);
|
const POLLER_RECOVERY_INTERVAL_MS = Number(process.env.TASK_POLLER_RECOVERY_INTERVAL_MS || 30_000);
|
||||||
|
const PROVIDER_POLL_REQUEST_TIMEOUT_MS = Number(process.env.TASK_PROVIDER_POLL_REQUEST_TIMEOUT_MS || 25_000);
|
||||||
let taskEventListenerClient = null;
|
let taskEventListenerClient = null;
|
||||||
let taskEventListenerStarting = null;
|
let taskEventListenerStarting = null;
|
||||||
let pollerStoreReady = null;
|
let pollerStoreReady = null;
|
||||||
let pollerRecoveryTimer = null;
|
let pollerRecoveryTimer = null;
|
||||||
|
let staleTaskCleanupStartupTimer = null;
|
||||||
|
|
||||||
function normalizeTaskProgress(value) {
|
function normalizeTaskProgress(value) {
|
||||||
const numeric = Number(value);
|
const numeric = Number(value);
|
||||||
@@ -30,13 +39,7 @@ function normalizeTaskProgress(value) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function formatTaskEvent(row) {
|
function formatTaskEvent(row) {
|
||||||
return {
|
return formatTaskProgressPayload(row);
|
||||||
taskId: row.id,
|
|
||||||
status: row.status,
|
|
||||||
progress: row.progress,
|
|
||||||
resultUrl: row.result_url || null,
|
|
||||||
error: row.error || null,
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function emitTaskEvent(event) {
|
function emitTaskEvent(event) {
|
||||||
@@ -152,6 +155,23 @@ async function clearPollerState(taskDbId) {
|
|||||||
await pool.query("DELETE FROM generation_task_pollers WHERE task_id = $1", [taskDbId]);
|
await pool.query("DELETE FROM generation_task_pollers WHERE task_id = $1", [taskDbId]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function orphanOwnedPollerState() {
|
||||||
|
await ensureTaskPollerStore();
|
||||||
|
await pool.query(
|
||||||
|
"UPDATE generation_task_pollers SET owner_id = NULL, owner_heartbeat_at = NULL, updated_at = NOW() WHERE owner_id = $1",
|
||||||
|
[POLLER_OWNER_ID],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function getPersistedLeaseToken(taskDbId) {
|
||||||
|
await ensureTaskPollerStore();
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
"SELECT lease_token FROM generation_task_pollers WHERE task_id = $1 LIMIT 1",
|
||||||
|
[taskDbId],
|
||||||
|
);
|
||||||
|
return rows[0]?.lease_token || null;
|
||||||
|
}
|
||||||
|
|
||||||
async function getLeaseKey(leaseToken) {
|
async function getLeaseKey(leaseToken) {
|
||||||
if (!leaseToken) return null;
|
if (!leaseToken) return null;
|
||||||
const { rows } = await pool.query(
|
const { rows } = await pool.query(
|
||||||
@@ -241,6 +261,10 @@ async function updateTaskInDb(taskId, updates) {
|
|||||||
const progress = normalizeTaskProgress(nextUpdates.progress);
|
const progress = normalizeTaskProgress(nextUpdates.progress);
|
||||||
if (progress !== undefined) { fields.push(`progress = $${idx++}`); values.push(progress); }
|
if (progress !== undefined) { fields.push(`progress = $${idx++}`); values.push(progress); }
|
||||||
}
|
}
|
||||||
|
if (nextUpdates.progressSource !== undefined) {
|
||||||
|
const progressSource = normalizeProgressSource(nextUpdates.progressSource);
|
||||||
|
if (progressSource) { fields.push(`progress_source = $${idx++}`); values.push(progressSource); }
|
||||||
|
}
|
||||||
if (nextUpdates.resultUrl !== undefined) { fields.push(`result_url = $${idx++}`); values.push(nextUpdates.resultUrl); }
|
if (nextUpdates.resultUrl !== undefined) { fields.push(`result_url = $${idx++}`); values.push(nextUpdates.resultUrl); }
|
||||||
if (nextUpdates.error !== undefined) { fields.push(`error = $${idx++}`); values.push(nextUpdates.error); }
|
if (nextUpdates.error !== undefined) { fields.push(`error = $${idx++}`); values.push(nextUpdates.error); }
|
||||||
if (nextUpdates.providerTaskId !== undefined) { fields.push(`provider_task_id = $${idx++}`); values.push(nextUpdates.providerTaskId); }
|
if (nextUpdates.providerTaskId !== undefined) { fields.push(`provider_task_id = $${idx++}`); values.push(nextUpdates.providerTaskId); }
|
||||||
@@ -271,6 +295,12 @@ async function updateTaskInDb(taskId, updates) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (nextUpdates.status === "completed") {
|
||||||
|
await markTaskBillingAccepted(taskId).catch((err) => {
|
||||||
|
console.error(`[aiTaskWorker] billing accept error for task ${taskId}:`, err.message);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
if (nextUpdates.status === "failed") {
|
if (nextUpdates.status === "failed") {
|
||||||
await refundTaskBillingOnFailure(taskId).catch((err) => {
|
await refundTaskBillingOnFailure(taskId).catch((err) => {
|
||||||
console.error(`[aiTaskWorker] refund error for task ${taskId}:`, err.message);
|
console.error(`[aiTaskWorker] refund error for task ${taskId}:`, err.message);
|
||||||
@@ -278,6 +308,13 @@ async function updateTaskInDb(taskId, updates) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function markTaskBillingAccepted(taskId) {
|
||||||
|
await pool.query(
|
||||||
|
"UPDATE credit_ledger SET status = 'charged', updated_at = NOW() WHERE task_id = $1 AND status = 'reserved'",
|
||||||
|
[taskId],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
function persistTaskResultUrlToOssInBackground(task) {
|
function persistTaskResultUrlToOssInBackground(task) {
|
||||||
if (!task?.id || !task?.result_url) return;
|
if (!task?.id || !task?.result_url) return;
|
||||||
|
|
||||||
@@ -632,9 +669,22 @@ function extractErrorMessage(json, fallback) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function fetchJson(url, headers) {
|
async function fetchJson(url, headers) {
|
||||||
const res = await fetch(url, { method: "GET", headers });
|
const controller = new AbortController();
|
||||||
if (!res.ok) return { ok: false, json: null };
|
const timeoutMs = Number.isFinite(PROVIDER_POLL_REQUEST_TIMEOUT_MS) && PROVIDER_POLL_REQUEST_TIMEOUT_MS > 0
|
||||||
return { ok: true, json: await res.json() };
|
? PROVIDER_POLL_REQUEST_TIMEOUT_MS
|
||||||
|
: 25_000;
|
||||||
|
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
||||||
|
timer.unref?.();
|
||||||
|
|
||||||
|
try {
|
||||||
|
const res = await fetch(url, { method: "GET", headers, signal: controller.signal });
|
||||||
|
if (!res.ok) return { ok: false, json: null };
|
||||||
|
return { ok: true, json: await res.json() };
|
||||||
|
} catch (err) {
|
||||||
|
return { ok: false, json: null, error: err };
|
||||||
|
} finally {
|
||||||
|
clearTimeout(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function pollGrsaiImage(_taskId, providerTaskId, apiKey, baseUrl, resultEndpoint) {
|
async function pollGrsaiImage(_taskId, providerTaskId, apiKey, baseUrl, resultEndpoint) {
|
||||||
@@ -645,7 +695,7 @@ async function pollGrsaiImage(_taskId, providerTaskId, apiKey, baseUrl, resultEn
|
|||||||
});
|
});
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
console.warn(`[grsai-poll] task ${_taskId} fetch not ok, url=${url}`);
|
console.warn(`[grsai-poll] task ${_taskId} fetch not ok, url=${url}`);
|
||||||
return { status: "running", progress: 50 };
|
return { status: "running", progressSource: PROGRESS_SOURCE_ESTIMATED };
|
||||||
}
|
}
|
||||||
|
|
||||||
const data = asObject(json?.data) || json;
|
const data = asObject(json?.data) || json;
|
||||||
@@ -663,17 +713,17 @@ async function pollGrsaiImage(_taskId, providerTaskId, apiKey, baseUrl, resultEn
|
|||||||
const resultUrl = extractImageUrl(json);
|
const resultUrl = extractImageUrl(json);
|
||||||
console.info(`[grsai-poll] task ${_taskId} status=${status} resultUrl=${resultUrl ? "yes" : "no"} raw=${JSON.stringify(json).slice(0, 300)}`);
|
console.info(`[grsai-poll] task ${_taskId} status=${status} resultUrl=${resultUrl ? "yes" : "no"} raw=${JSON.stringify(json).slice(0, 300)}`);
|
||||||
if (resultUrl) {
|
if (resultUrl) {
|
||||||
return { status: "completed", progress: 100, resultUrl };
|
return { status: "completed", progress: 100, progressSource: PROGRESS_SOURCE_REAL, resultUrl };
|
||||||
}
|
}
|
||||||
if (isCompletedStatus(status)) {
|
if (isCompletedStatus(status)) {
|
||||||
const completedUrl = extractImageUrl(json);
|
const completedUrl = extractImageUrl(json);
|
||||||
if (!completedUrl) return { status: "failed", error: "Image generation completed without a result url" };
|
if (!completedUrl) return { status: "failed", error: "Image generation completed without a result url" };
|
||||||
return { status: "completed", progress: 100, resultUrl: completedUrl };
|
return { status: "completed", progress: 100, progressSource: PROGRESS_SOURCE_REAL, resultUrl: completedUrl };
|
||||||
}
|
}
|
||||||
if (isFailedStatus(status)) {
|
if (isFailedStatus(status)) {
|
||||||
return { status: "failed", error: extractErrorMessage(json, "Image generation failed") };
|
return { status: "failed", error: extractErrorMessage(json, "Image generation failed") };
|
||||||
}
|
}
|
||||||
return { status: "running", progress: Math.min(90, 30 + Math.random() * 40) };
|
return { status: "running", progressSource: PROGRESS_SOURCE_ESTIMATED };
|
||||||
}
|
}
|
||||||
|
|
||||||
async function pollDashscopeImage(_taskId, providerTaskId, apiKey) {
|
async function pollDashscopeImage(_taskId, providerTaskId, apiKey) {
|
||||||
@@ -682,19 +732,19 @@ async function pollDashscopeImage(_taskId, providerTaskId, apiKey) {
|
|||||||
Authorization: `Bearer ${apiKey}`,
|
Authorization: `Bearer ${apiKey}`,
|
||||||
Accept: "application/json",
|
Accept: "application/json",
|
||||||
});
|
});
|
||||||
if (!ok) return { status: "running", progress: 50 };
|
if (!ok) return { status: "running", progressSource: PROGRESS_SOURCE_ESTIMATED };
|
||||||
|
|
||||||
const output = asObject(json?.output) || {};
|
const output = asObject(json?.output) || {};
|
||||||
const status = normalizeStatus(output.task_status || json?.task_status);
|
const status = normalizeStatus(output.task_status || json?.task_status);
|
||||||
const resultUrl = extractImageUrl(json);
|
const resultUrl = extractImageUrl(json);
|
||||||
if (isCompletedStatus(status)) {
|
if (isCompletedStatus(status)) {
|
||||||
if (!resultUrl) return { status: "failed", error: "DashScope image generation completed without a result url" };
|
if (!resultUrl) return { status: "failed", error: "DashScope image generation completed without a result url" };
|
||||||
return { status: "completed", progress: 100, resultUrl };
|
return { status: "completed", progress: 100, progressSource: PROGRESS_SOURCE_REAL, resultUrl };
|
||||||
}
|
}
|
||||||
if (isFailedStatus(status)) {
|
if (isFailedStatus(status)) {
|
||||||
return { status: "failed", error: extractErrorMessage(json, "DashScope image generation failed") };
|
return { status: "failed", error: extractErrorMessage(json, "DashScope image generation failed") };
|
||||||
}
|
}
|
||||||
return { status: "running", progress: Math.min(90, 30 + Math.random() * 40) };
|
return { status: "running", progressSource: PROGRESS_SOURCE_ESTIMATED };
|
||||||
}
|
}
|
||||||
|
|
||||||
function base64Url(input) {
|
function base64Url(input) {
|
||||||
@@ -762,7 +812,7 @@ function getPollRequest(providerTaskId, apiKey, providerConfig) {
|
|||||||
async function pollVideoTask(_taskId, providerTaskId, apiKey, providerConfig) {
|
async function pollVideoTask(_taskId, providerTaskId, apiKey, providerConfig) {
|
||||||
const { url, headers } = getPollRequest(providerTaskId, apiKey, providerConfig);
|
const { url, headers } = getPollRequest(providerTaskId, apiKey, providerConfig);
|
||||||
const { ok, json } = await fetchJson(url, headers);
|
const { ok, json } = await fetchJson(url, headers);
|
||||||
if (!ok) return { status: "running", progress: 50 };
|
if (!ok) return { status: "running", progressSource: PROGRESS_SOURCE_ESTIMATED };
|
||||||
|
|
||||||
const data = asObject(json?.data) || json;
|
const data = asObject(json?.data) || json;
|
||||||
const output = asObject(json?.output) || {};
|
const output = asObject(json?.output) || {};
|
||||||
@@ -776,13 +826,15 @@ async function pollVideoTask(_taskId, providerTaskId, apiKey, providerConfig) {
|
|||||||
|
|
||||||
const resultUrl = extractVideoUrl(json);
|
const resultUrl = extractVideoUrl(json);
|
||||||
if (isCompletedStatus(status) || resultUrl) {
|
if (isCompletedStatus(status) || resultUrl) {
|
||||||
return { status: "completed", progress: 100, resultUrl: resultUrl || null };
|
return { status: "completed", progress: 100, progressSource: PROGRESS_SOURCE_REAL, resultUrl: resultUrl || null };
|
||||||
}
|
}
|
||||||
if (isFailedStatus(status)) {
|
if (isFailedStatus(status)) {
|
||||||
return { status: "failed", error: extractErrorMessage(json, "Video generation failed") };
|
return { status: "failed", error: extractErrorMessage(json, "Video generation failed") };
|
||||||
}
|
}
|
||||||
const progress = Number(data.progress || output.progress);
|
const progress = Number(data.progress || output.progress);
|
||||||
return { status: "running", progress: Number.isFinite(progress) ? Math.min(95, progress) : Math.min(90, 30 + Math.random() * 30) };
|
return Number.isFinite(progress)
|
||||||
|
? { status: "running", progress: Math.min(95, progress), progressSource: PROGRESS_SOURCE_REAL }
|
||||||
|
: { status: "running", progressSource: PROGRESS_SOURCE_ESTIMATED };
|
||||||
}
|
}
|
||||||
|
|
||||||
function getMaxPollAttempts(type, providerConfig) {
|
function getMaxPollAttempts(type, providerConfig) {
|
||||||
@@ -804,26 +856,31 @@ function startPolling(taskDbId, { providerTaskId, apiKey, type, providerConfig,
|
|||||||
}
|
}
|
||||||
|
|
||||||
let attempts = 0;
|
let attempts = 0;
|
||||||
|
let polling = false;
|
||||||
|
let skippedPolls = 0;
|
||||||
const maxPollAttempts = getMaxPollAttempts(type, providerConfig);
|
const maxPollAttempts = getMaxPollAttempts(type, providerConfig);
|
||||||
const interval = setInterval(async () => {
|
const interval = setInterval(async () => {
|
||||||
attempts++;
|
if (polling) return;
|
||||||
if (attempts > maxPollAttempts) {
|
polling = true;
|
||||||
clearInterval(interval);
|
|
||||||
activePollers.delete(taskDbId);
|
|
||||||
if (leaseToken && keyManager) await keyManager.releaseKey(leaseToken).catch(() => {});
|
|
||||||
if (typeof onTaskFailed === "function") {
|
|
||||||
const handled = await onTaskFailed("Task timed out").catch((fallbackErr) => {
|
|
||||||
console.error(`[aiTaskWorker] fallback error for task ${taskDbId}:`, fallbackErr.message);
|
|
||||||
return false;
|
|
||||||
});
|
|
||||||
if (handled) return;
|
|
||||||
}
|
|
||||||
await updateTaskInDb(taskDbId, { status: "failed", error: "Task timed out" });
|
|
||||||
await clearPollerState(taskDbId).catch(() => {});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
if (attempts >= maxPollAttempts) {
|
||||||
|
clearInterval(interval);
|
||||||
|
activePollers.delete(taskDbId);
|
||||||
|
if (leaseToken && keyManager) await keyManager.releaseKey(leaseToken).catch(() => {});
|
||||||
|
if (typeof onTaskFailed === "function") {
|
||||||
|
await clearPollerState(taskDbId).catch(() => {});
|
||||||
|
const handled = await onTaskFailed("Task timed out").catch((fallbackErr) => {
|
||||||
|
console.error(`[aiTaskWorker] fallback error for task ${taskDbId}:`, fallbackErr.message);
|
||||||
|
return false;
|
||||||
|
});
|
||||||
|
if (handled) return;
|
||||||
|
}
|
||||||
|
await updateTaskInDb(taskDbId, { status: "failed", error: "Task timed out" });
|
||||||
|
await clearPollerState(taskDbId).catch(() => {});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Check if task was cancelled by user
|
// Check if task was cancelled by user
|
||||||
const { rows: [taskRow] } = await pool.query("SELECT status FROM generation_tasks WHERE id = $1", [taskDbId]);
|
const { rows: [taskRow] } = await pool.query("SELECT status FROM generation_tasks WHERE id = $1", [taskDbId]);
|
||||||
if (!taskRow || taskRow.status === "cancelled") {
|
if (!taskRow || taskRow.status === "cancelled") {
|
||||||
@@ -835,15 +892,29 @@ function startPolling(taskDbId, { providerTaskId, apiKey, type, providerConfig,
|
|||||||
}
|
}
|
||||||
await refreshPollerHeartbeat(taskDbId).catch(() => {});
|
await refreshPollerHeartbeat(taskDbId).catch(() => {});
|
||||||
|
|
||||||
let result;
|
const pollOutcome = await withProviderPollSlot(taskDbId, async () => {
|
||||||
if (type === "image") {
|
attempts++;
|
||||||
if (providerConfig.transport === "dashscope-image") {
|
if (type === "image") {
|
||||||
result = await pollDashscopeImage(taskDbId, providerTaskId, apiKey);
|
if (providerConfig.transport === "dashscope-image") {
|
||||||
} else {
|
return pollDashscopeImage(taskDbId, providerTaskId, apiKey);
|
||||||
result = await pollGrsaiImage(taskDbId, providerTaskId, apiKey, providerConfig.baseUrl, providerConfig.resultEndpoint || "/result");
|
}
|
||||||
|
return pollGrsaiImage(taskDbId, providerTaskId, apiKey, providerConfig.baseUrl, providerConfig.resultEndpoint || "/result");
|
||||||
}
|
}
|
||||||
} else {
|
return pollVideoTask(taskDbId, providerTaskId, apiKey, providerConfig);
|
||||||
result = await pollVideoTask(taskDbId, providerTaskId, apiKey, providerConfig);
|
});
|
||||||
|
|
||||||
|
if (!pollOutcome.acquired) {
|
||||||
|
skippedPolls++;
|
||||||
|
if (skippedPolls % 20 === 0) {
|
||||||
|
console.info(`[aiTaskWorker] task ${taskDbId} waiting for provider poll slot (skipped=${skippedPolls})`);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
skippedPolls = 0;
|
||||||
|
const result = pollOutcome.value;
|
||||||
|
if (!result) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result.status === "completed" || result.status === "failed") {
|
if (result.status === "completed" || result.status === "failed") {
|
||||||
@@ -851,6 +922,7 @@ function startPolling(taskDbId, { providerTaskId, apiKey, type, providerConfig,
|
|||||||
activePollers.delete(taskDbId);
|
activePollers.delete(taskDbId);
|
||||||
if (leaseToken && keyManager) await keyManager.releaseKey(leaseToken).catch(() => {});
|
if (leaseToken && keyManager) await keyManager.releaseKey(leaseToken).catch(() => {});
|
||||||
if (result.status === "failed" && typeof onTaskFailed === "function") {
|
if (result.status === "failed" && typeof onTaskFailed === "function") {
|
||||||
|
await clearPollerState(taskDbId).catch(() => {});
|
||||||
const handled = await onTaskFailed(result.error || "Task failed").catch((fallbackErr) => {
|
const handled = await onTaskFailed(result.error || "Task failed").catch((fallbackErr) => {
|
||||||
console.error(`[aiTaskWorker] fallback error for task ${taskDbId}:`, fallbackErr.message);
|
console.error(`[aiTaskWorker] fallback error for task ${taskDbId}:`, fallbackErr.message);
|
||||||
return false;
|
return false;
|
||||||
@@ -865,6 +937,8 @@ function startPolling(taskDbId, { providerTaskId, apiKey, type, providerConfig,
|
|||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error(`[aiTaskWorker] poll error for task ${taskDbId}:`, err.message);
|
console.error(`[aiTaskWorker] poll error for task ${taskDbId}:`, err.message);
|
||||||
|
} finally {
|
||||||
|
polling = false;
|
||||||
}
|
}
|
||||||
}, POLL_INTERVAL_MS);
|
}, POLL_INTERVAL_MS);
|
||||||
|
|
||||||
@@ -880,6 +954,29 @@ function stopPolling(taskDbId) {
|
|||||||
clearPollerState(taskDbId).catch(() => {});
|
clearPollerState(taskDbId).catch(() => {});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function cancelTaskRuntimeState(taskDbId, keyManager) {
|
||||||
|
const poller = activePollers.get(taskDbId);
|
||||||
|
if (poller) {
|
||||||
|
clearInterval(poller.interval);
|
||||||
|
activePollers.delete(taskDbId);
|
||||||
|
}
|
||||||
|
|
||||||
|
const leaseToken = poller?.leaseToken || await getPersistedLeaseToken(taskDbId).catch(() => null);
|
||||||
|
await clearPollerState(taskDbId).catch(() => {});
|
||||||
|
if (leaseToken && keyManager) {
|
||||||
|
await keyManager.releaseKey(leaseToken).catch((err) => {
|
||||||
|
console.error(`[aiTaskWorker] failed to release lease for cancelled task ${taskDbId}:`, err.message);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
await publishTaskEvent({
|
||||||
|
taskId: taskDbId,
|
||||||
|
status: "cancelled",
|
||||||
|
progress: 100,
|
||||||
|
resultUrl: null,
|
||||||
|
error: "任务已取消",
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
function getActiveCount() {
|
function getActiveCount() {
|
||||||
return activePollers.size;
|
return activePollers.size;
|
||||||
}
|
}
|
||||||
@@ -889,7 +986,7 @@ async function recoverRunnablePollers() {
|
|||||||
const staleInterval = `${Math.max(5, Math.ceil(POLLER_OWNER_STALE_MS / 1000))} seconds`;
|
const staleInterval = `${Math.max(5, Math.ceil(POLLER_OWNER_STALE_MS / 1000))} seconds`;
|
||||||
const { rows } = await pool.query(
|
const { rows } = await pool.query(
|
||||||
`
|
`
|
||||||
SELECT p.task_id
|
SELECT p.task_id, p.updated_at
|
||||||
FROM generation_task_pollers p
|
FROM generation_task_pollers p
|
||||||
JOIN generation_tasks t ON t.id = p.task_id
|
JOIN generation_tasks t ON t.id = p.task_id
|
||||||
WHERE t.status IN ('pending', 'running')
|
WHERE t.status IN ('pending', 'running')
|
||||||
@@ -912,6 +1009,7 @@ async function recoverRunnablePollers() {
|
|||||||
const apiKey = await getLeaseKey(poller.lease_token);
|
const apiKey = await getLeaseKey(poller.lease_token);
|
||||||
if (apiKey == null) {
|
if (apiKey == null) {
|
||||||
console.warn(`[aiTaskWorker] cannot recover task ${taskId}: active lease not found`);
|
console.warn(`[aiTaskWorker] cannot recover task ${taskId}: active lease not found`);
|
||||||
|
await releaseUnrecoverableTask(taskId, "任务执行状态已失效,已自动释放");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -923,11 +1021,51 @@ async function recoverRunnablePollers() {
|
|||||||
providerConfig: parseProviderConfig(poller.provider_config_json),
|
providerConfig: parseProviderConfig(poller.provider_config_json),
|
||||||
leaseToken: poller.lease_token,
|
leaseToken: poller.lease_token,
|
||||||
keyManager: require("./keyManager"),
|
keyManager: require("./keyManager"),
|
||||||
|
onTaskFailed: async (failureMessage) => {
|
||||||
|
await updateTaskInDb(taskId, { status: "failed", error: failureMessage || "Task failed" });
|
||||||
|
return true;
|
||||||
|
},
|
||||||
skipPersist: true,
|
skipPersist: true,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function releaseUnrecoverableTask(taskId, message) {
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`
|
||||||
|
UPDATE generation_tasks t
|
||||||
|
SET status = 'failed', error = $2, completed_at = NOW(), updated_at = NOW()
|
||||||
|
FROM generation_task_pollers p
|
||||||
|
WHERE t.id = $1
|
||||||
|
AND p.task_id = t.id
|
||||||
|
AND p.owner_id = $3
|
||||||
|
AND t.status IN ('pending', 'running')
|
||||||
|
RETURNING t.*
|
||||||
|
`,
|
||||||
|
[taskId, message, POLLER_OWNER_ID],
|
||||||
|
);
|
||||||
|
|
||||||
|
const task = rows[0];
|
||||||
|
if (!task) return false;
|
||||||
|
|
||||||
|
const leaseToken = await getPersistedLeaseToken(taskId).catch(() => null);
|
||||||
|
await clearPollerState(taskId).catch(() => {});
|
||||||
|
if (leaseToken) {
|
||||||
|
await require("./keyManager").releaseKey(leaseToken).catch((err) => {
|
||||||
|
console.error(`[aiTaskWorker] failed to release lease for unrecoverable task ${taskId}:`, err.message);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
await publishTaskEvent(formatTaskEvent(task));
|
||||||
|
await createTaskLifecycleNotification(task).catch((err) => {
|
||||||
|
console.error(`[aiTaskWorker] notification error for unrecoverable task ${taskId}:`, err.message);
|
||||||
|
});
|
||||||
|
await refundTaskBillingOnFailure(taskId).catch((err) => {
|
||||||
|
console.error(`[aiTaskWorker] refund error for unrecoverable task ${taskId}:`, err.message);
|
||||||
|
});
|
||||||
|
console.warn(`[aiTaskWorker] released unrecoverable task ${taskId}: ${message}`);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// --- Periodic stale task cleanup ---
|
// --- Periodic stale task cleanup ---
|
||||||
// Runs every 5 minutes, marks tasks stuck in 'pending'/'running' for too long as 'failed'.
|
// Runs every 5 minutes, marks tasks stuck in 'pending'/'running' for too long as 'failed'.
|
||||||
// This catches cases where the worker crashed, the provider API never responded,
|
// This catches cases where the worker crashed, the provider API never responded,
|
||||||
@@ -939,26 +1077,32 @@ async function runStaleTaskCleanup() {
|
|||||||
try {
|
try {
|
||||||
const { rows } = await pool.query(
|
const { rows } = await pool.query(
|
||||||
`UPDATE generation_tasks
|
`UPDATE generation_tasks
|
||||||
SET status = 'failed', error = '任务超时自动释放', updated_at = NOW()
|
SET status = 'failed', error = '任务超时自动释放', completed_at = NOW(), updated_at = NOW()
|
||||||
WHERE status IN ('pending', 'running')
|
WHERE status IN ('pending', 'running')
|
||||||
AND GREATEST(updated_at, COALESCE(last_poll_at, created_at)) < NOW() - INTERVAL '10 minutes'
|
AND GREATEST(updated_at, COALESCE(last_poll_at, created_at)) < NOW() - INTERVAL '10 minutes'
|
||||||
RETURNING id`,
|
RETURNING *`,
|
||||||
);
|
);
|
||||||
for (const row of rows) {
|
for (const row of rows) {
|
||||||
await publishTaskEvent({
|
|
||||||
taskId: row.id,
|
|
||||||
status: "failed",
|
|
||||||
progress: null,
|
|
||||||
resultUrl: null,
|
|
||||||
error: "任务超时自动释放",
|
|
||||||
});
|
|
||||||
// Also stop any active poller for this task
|
// Also stop any active poller for this task
|
||||||
const poller = activePollers.get(row.id);
|
const poller = activePollers.get(row.id);
|
||||||
if (poller) {
|
if (poller) {
|
||||||
clearInterval(poller.interval);
|
clearInterval(poller.interval);
|
||||||
activePollers.delete(row.id);
|
activePollers.delete(row.id);
|
||||||
}
|
}
|
||||||
|
const leaseToken = poller?.leaseToken || await getPersistedLeaseToken(row.id).catch(() => null);
|
||||||
await clearPollerState(row.id).catch(() => {});
|
await clearPollerState(row.id).catch(() => {});
|
||||||
|
if (leaseToken) {
|
||||||
|
await require("./keyManager").releaseKey(leaseToken).catch((err) => {
|
||||||
|
console.error(`[aiTaskWorker] failed to release lease for stale task ${row.id}:`, err.message);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
await publishTaskEvent(formatTaskEvent(row));
|
||||||
|
await createTaskLifecycleNotification(row).catch((err) => {
|
||||||
|
console.error(`[aiTaskWorker] notification error for stale task ${row.id}:`, err.message);
|
||||||
|
});
|
||||||
|
await refundTaskBillingOnFailure(row.id).catch((err) => {
|
||||||
|
console.error(`[aiTaskWorker] refund error for stale task ${row.id}:`, err.message);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
if (rows.length > 0) {
|
if (rows.length > 0) {
|
||||||
console.log(`[aiTaskWorker] Cleaned up ${rows.length} stale task(s)`);
|
console.log(`[aiTaskWorker] Cleaned up ${rows.length} stale task(s)`);
|
||||||
@@ -1032,10 +1176,14 @@ function startStaleTaskCleanup() {
|
|||||||
if (staleTaskCleanupTimer) return;
|
if (staleTaskCleanupTimer) return;
|
||||||
staleTaskCleanupTimer = setInterval(runStaleTaskCleanup, STALE_TASK_CLEANUP_INTERVAL_MS);
|
staleTaskCleanupTimer = setInterval(runStaleTaskCleanup, STALE_TASK_CLEANUP_INTERVAL_MS);
|
||||||
// Run once shortly after startup
|
// Run once shortly after startup
|
||||||
setTimeout(runStaleTaskCleanup, 10_000);
|
staleTaskCleanupStartupTimer = setTimeout(runStaleTaskCleanup, 10_000);
|
||||||
}
|
}
|
||||||
|
|
||||||
function stopStaleTaskCleanup() {
|
function stopStaleTaskCleanup() {
|
||||||
|
if (staleTaskCleanupStartupTimer) {
|
||||||
|
clearTimeout(staleTaskCleanupStartupTimer);
|
||||||
|
staleTaskCleanupStartupTimer = null;
|
||||||
|
}
|
||||||
if (staleTaskCleanupTimer) {
|
if (staleTaskCleanupTimer) {
|
||||||
clearInterval(staleTaskCleanupTimer);
|
clearInterval(staleTaskCleanupTimer);
|
||||||
staleTaskCleanupTimer = null;
|
staleTaskCleanupTimer = null;
|
||||||
@@ -1061,9 +1209,21 @@ function stopPollerRecovery() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function stopAllPollers() {
|
||||||
|
for (const [taskId, poller] of activePollers.entries()) {
|
||||||
|
clearInterval(poller.interval);
|
||||||
|
activePollers.delete(taskId);
|
||||||
|
}
|
||||||
|
await orphanOwnedPollerState().catch((err) => {
|
||||||
|
console.error("[aiTaskWorker] failed to orphan owned poller state:", err.message);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
startPolling,
|
startPolling,
|
||||||
stopPolling,
|
stopPolling,
|
||||||
|
stopAllPollers,
|
||||||
|
cancelTaskRuntimeState,
|
||||||
updateTaskInDb,
|
updateTaskInDb,
|
||||||
getActiveCount,
|
getActiveCount,
|
||||||
extractProviderTaskId,
|
extractProviderTaskId,
|
||||||
|
|||||||
@@ -353,6 +353,10 @@ async function migrateGenerationTasksBillingColumns(client) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function migrateGenerationTaskProgressContract() {
|
||||||
|
await addColumnIfMissing("generation_tasks", "progress_source TEXT");
|
||||||
|
}
|
||||||
|
|
||||||
async function ensureModelPriceSeed() {
|
async function ensureModelPriceSeed() {
|
||||||
const columns = await getColumnNames("model_prices");
|
const columns = await getColumnNames("model_prices");
|
||||||
const useMills = columns.includes("input_price_mills");
|
const useMills = columns.includes("input_price_mills");
|
||||||
@@ -519,6 +523,7 @@ async function migrateGenerationTasksSchema(client) {
|
|||||||
params_json TEXT NOT NULL DEFAULT '{}',
|
params_json TEXT NOT NULL DEFAULT '{}',
|
||||||
result_url VARCHAR(2000),
|
result_url VARCHAR(2000),
|
||||||
progress INTEGER NOT NULL DEFAULT 0,
|
progress INTEGER NOT NULL DEFAULT 0,
|
||||||
|
progress_source TEXT,
|
||||||
error TEXT,
|
error TEXT,
|
||||||
dedupe_key VARCHAR(256),
|
dedupe_key VARCHAR(256),
|
||||||
source_device_id VARCHAR(128),
|
source_device_id VARCHAR(128),
|
||||||
@@ -959,6 +964,7 @@ async function ensureSchema() {
|
|||||||
await runMigration("030_generation_tasks_user_status_index", migrateGenerationTasksUserStatusIndex);
|
await runMigration("030_generation_tasks_user_status_index", migrateGenerationTasksUserStatusIndex);
|
||||||
await runMigration("031_generation_tasks_billing_columns", migrateGenerationTasksBillingColumns);
|
await runMigration("031_generation_tasks_billing_columns", migrateGenerationTasksBillingColumns);
|
||||||
await runMigration("032_ecommerce_video_history", migrateEcommerceVideoHistorySchema);
|
await runMigration("032_ecommerce_video_history", migrateEcommerceVideoHistorySchema);
|
||||||
|
await runMigration("033_generation_task_progress_contract", migrateGenerationTaskProgressContract);
|
||||||
await ensureModelPriceSeed();
|
await ensureModelPriceSeed();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -21,6 +21,53 @@ const ENTERPRISE_VIDEO_ALLOWED_MODELS = new Set([
|
|||||||
|
|
||||||
const CREDITS_PER_CNY = 100;
|
const CREDITS_PER_CNY = 100;
|
||||||
const CREDIT_UNITS_PER_CREDIT = 100;
|
const CREDIT_UNITS_PER_CREDIT = 100;
|
||||||
|
const ENTERPRISE_VIDEO_RESOLUTIONS = ["720P", "1080P"];
|
||||||
|
const ENTERPRISE_VIDEO_DEFAULT_RESOLUTION = "1080P";
|
||||||
|
|
||||||
|
const ENTERPRISE_VIDEO_PRICING_RULES = [
|
||||||
|
{
|
||||||
|
id: "happyhorse",
|
||||||
|
modelIncludes: ["happyhorse"],
|
||||||
|
rates: { "720P": 0.72, "1080P": 1.28 },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: "wanxiang-i2v",
|
||||||
|
modelIncludes: ["wan2.7-i2v", "wanxiang"],
|
||||||
|
rates: { "720P": 0.6, "1080P": 1 },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: "wan-animate-s2v",
|
||||||
|
modelIncludes: ["animate-mix", "s2v"],
|
||||||
|
rates: { "720P": 0.6, "1080P": 1 },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: "kling-muted-reference",
|
||||||
|
modelIncludes: ["kling"],
|
||||||
|
when: { muted: true, hasReferenceVideo: true },
|
||||||
|
rates: { "720P": 0.9, "1080P": 1.2 },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: "kling-muted",
|
||||||
|
modelIncludes: ["kling"],
|
||||||
|
when: { muted: true, hasReferenceVideo: false },
|
||||||
|
rates: { "720P": 0.6, "1080P": 0.8 },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: "kling-default",
|
||||||
|
modelIncludes: ["kling"],
|
||||||
|
rates: { "720P": 0.9, "1080P": 1.2 },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: "vidu",
|
||||||
|
modelIncludes: ["vidu"],
|
||||||
|
rates: { "720P": 0.6, "1080P": 1 },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: "pixverse",
|
||||||
|
modelIncludes: ["pixverse"],
|
||||||
|
rates: { "720P": 0.6, "1080P": 1 },
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
function normalizeModel(value) {
|
function normalizeModel(value) {
|
||||||
return String(value || "").trim().toLowerCase();
|
return String(value || "").trim().toLowerCase();
|
||||||
@@ -36,6 +83,21 @@ function normalizeEnterpriseVideoDuration(value) {
|
|||||||
return Math.max(1, Math.ceil(numeric));
|
return Math.max(1, Math.ceil(numeric));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function enterpriseVideoPricingRuleMatches(rule, input, model) {
|
||||||
|
if (!rule.modelIncludes.some((pattern) => model.includes(pattern))) return false;
|
||||||
|
if (!rule.when) return true;
|
||||||
|
if (Object.prototype.hasOwnProperty.call(rule.when, "muted") && Boolean(input.muted) !== rule.when.muted) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
Object.prototype.hasOwnProperty.call(rule.when, "hasReferenceVideo") &&
|
||||||
|
Boolean(input.hasReferenceVideo) !== rule.when.hasReferenceVideo
|
||||||
|
) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
function isEnterpriseVideoBillingUser(user) {
|
function isEnterpriseVideoBillingUser(user) {
|
||||||
return Boolean(user?.enterpriseId);
|
return Boolean(user?.enterpriseId);
|
||||||
}
|
}
|
||||||
@@ -69,33 +131,10 @@ function getEnterpriseVideoCreditRate(input) {
|
|||||||
const resolution = normalizeEnterpriseVideoResolution(input.resolution || input.quality);
|
const resolution = normalizeEnterpriseVideoResolution(input.resolution || input.quality);
|
||||||
const model = normalizeModel(input.model || input.requestedModel);
|
const model = normalizeModel(input.model || input.requestedModel);
|
||||||
|
|
||||||
if (model.includes("happyhorse")) {
|
const rule = ENTERPRISE_VIDEO_PRICING_RULES.find((candidate) =>
|
||||||
return resolution === "720P" ? 0.72 : 1.28;
|
enterpriseVideoPricingRuleMatches(candidate, input, model),
|
||||||
}
|
);
|
||||||
|
if (rule) return rule.rates[resolution] ?? rule.rates[ENTERPRISE_VIDEO_DEFAULT_RESOLUTION];
|
||||||
if (model.includes("wan2.7-i2v") || model.includes("wanxiang")) {
|
|
||||||
return resolution === "720P" ? 0.6 : 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (model.includes("animate-mix") || model.includes("s2v")) {
|
|
||||||
return resolution === "720P" ? 0.6 : 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (model.includes("kling")) {
|
|
||||||
if (input.muted) {
|
|
||||||
if (input.hasReferenceVideo) return resolution === "720P" ? 0.9 : 1.2;
|
|
||||||
return resolution === "720P" ? 0.6 : 0.8;
|
|
||||||
}
|
|
||||||
return resolution === "720P" ? 0.9 : 1.2;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (model.includes("vidu")) {
|
|
||||||
return resolution === "720P" ? 0.6 : 1.0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (model.includes("pixverse")) {
|
|
||||||
return resolution === "720P" ? 0.6 : 1.0;
|
|
||||||
}
|
|
||||||
|
|
||||||
const error = new Error(`Unsupported enterprise video model: ${input.model || input.requestedModel}`);
|
const error = new Error(`Unsupported enterprise video model: ${input.model || input.requestedModel}`);
|
||||||
error.status = 403;
|
error.status = 403;
|
||||||
@@ -103,6 +142,22 @@ function getEnterpriseVideoCreditRate(input) {
|
|||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function getEnterpriseVideoPricingConfig() {
|
||||||
|
return {
|
||||||
|
currency: "CNY",
|
||||||
|
creditsPerCny: CREDITS_PER_CNY,
|
||||||
|
billingUnit: "per_second",
|
||||||
|
defaultResolution: ENTERPRISE_VIDEO_DEFAULT_RESOLUTION,
|
||||||
|
resolutions: [...ENTERPRISE_VIDEO_RESOLUTIONS],
|
||||||
|
rules: ENTERPRISE_VIDEO_PRICING_RULES.map((rule) => ({
|
||||||
|
id: rule.id,
|
||||||
|
modelIncludes: [...rule.modelIncludes],
|
||||||
|
when: rule.when ? { ...rule.when } : undefined,
|
||||||
|
rates: { ...rule.rates },
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
function calculateEnterpriseVideoCredits(input) {
|
function calculateEnterpriseVideoCredits(input) {
|
||||||
const duration = normalizeEnterpriseVideoDuration(input.durationSeconds || input.duration);
|
const duration = normalizeEnterpriseVideoDuration(input.durationSeconds || input.duration);
|
||||||
return Number((getEnterpriseVideoCreditRate(input) * duration * CREDITS_PER_CNY).toFixed(2));
|
return Number((getEnterpriseVideoCreditRate(input) * duration * CREDITS_PER_CNY).toFixed(2));
|
||||||
@@ -233,6 +288,7 @@ module.exports = {
|
|||||||
assertEnterpriseVideoModelAllowed,
|
assertEnterpriseVideoModelAllowed,
|
||||||
calculateEnterpriseVideoCost,
|
calculateEnterpriseVideoCost,
|
||||||
calculateEnterpriseVideoCredits,
|
calculateEnterpriseVideoCredits,
|
||||||
|
getEnterpriseVideoPricingConfig,
|
||||||
getEnterpriseVideoCreditRate,
|
getEnterpriseVideoCreditRate,
|
||||||
isEnterpriseVideoBillingUser,
|
isEnterpriseVideoBillingUser,
|
||||||
isEnterpriseVideoModelAllowed,
|
isEnterpriseVideoModelAllowed,
|
||||||
|
|||||||
+50
-25
@@ -3,8 +3,17 @@ const express = require('express')
|
|||||||
const rateLimit = require('express-rate-limit')
|
const rateLimit = require('express-rate-limit')
|
||||||
const cors = require('cors')
|
const cors = require('cors')
|
||||||
const helmet = require('helmet')
|
const helmet = require('helmet')
|
||||||
const { startSettlementWorker } = require('./settlementWorker')
|
const { startSettlementWorker, stopSettlementWorker } = require('./settlementWorker')
|
||||||
const { startProviderHealthMonitor } = require('./providerHealthMonitor')
|
const { startProviderHealthMonitor, stopProviderHealthMonitor } = require('./providerHealthMonitor')
|
||||||
|
const {
|
||||||
|
startStaleTaskCleanup,
|
||||||
|
startTaskEventListener,
|
||||||
|
startPollerRecovery,
|
||||||
|
stopStaleTaskCleanup,
|
||||||
|
stopTaskEventListener,
|
||||||
|
stopPollerRecovery,
|
||||||
|
stopAllPollers,
|
||||||
|
} = require('./aiTaskWorker')
|
||||||
const { ensureDatabase } = require('./dbSetup')
|
const { ensureDatabase } = require('./dbSetup')
|
||||||
const { assertRuntimeSecurityConfig } = require('./securityConfig')
|
const { assertRuntimeSecurityConfig } = require('./securityConfig')
|
||||||
const { loadPriceCache } = require('./pricing')
|
const { loadPriceCache } = require('./pricing')
|
||||||
@@ -17,6 +26,7 @@ const PORT = Number(process.env.PORT) || 3600
|
|||||||
const HOST = process.env.HOST || '0.0.0.0'
|
const HOST = process.env.HOST || '0.0.0.0'
|
||||||
const IS_PRODUCTION = process.env.NODE_ENV === 'production'
|
const IS_PRODUCTION = process.env.NODE_ENV === 'production'
|
||||||
let server = null
|
let server = null
|
||||||
|
let staleLeaseCleanupTimer = null
|
||||||
|
|
||||||
// CORS: in production, require explicit allowlist; in dev, allow all with credentials
|
// CORS: in production, require explicit allowlist; in dev, allow all with credentials
|
||||||
function buildCorsOptions() {
|
function buildCorsOptions() {
|
||||||
@@ -133,18 +143,18 @@ async function main() {
|
|||||||
|
|
||||||
// Periodic stale lease cleanup (every 5 min)
|
// Periodic stale lease cleanup (every 5 min)
|
||||||
const { cleanStaleLeases } = require('./keyManager')
|
const { cleanStaleLeases } = require('./keyManager')
|
||||||
setInterval(() => {
|
staleLeaseCleanupTimer = setInterval(() => {
|
||||||
cleanStaleLeases().then((cleaned) => {
|
cleanStaleLeases().then((cleaned) => {
|
||||||
if (cleaned > 0) console.log(`[cleanup] Released ${cleaned} stale lease(s)`)
|
if (cleaned > 0) console.log(`[cleanup] Released ${cleaned} stale lease(s)`)
|
||||||
}).catch((err) => {
|
}).catch((err) => {
|
||||||
console.error('[cleanup] error:', err)
|
console.error('[cleanup] error:', err)
|
||||||
})
|
})
|
||||||
}, 5 * 60 * 1000)
|
}, 5 * 60 * 1000)
|
||||||
|
if (staleLeaseCleanupTimer.unref) staleLeaseCleanupTimer.unref()
|
||||||
|
|
||||||
startSettlementWorker()
|
startSettlementWorker()
|
||||||
startProviderHealthMonitor()
|
startProviderHealthMonitor()
|
||||||
|
|
||||||
const { startStaleTaskCleanup, startTaskEventListener, startPollerRecovery } = require('./aiTaskWorker')
|
|
||||||
await startTaskEventListener()
|
await startTaskEventListener()
|
||||||
startPollerRecovery()
|
startPollerRecovery()
|
||||||
startStaleTaskCleanup()
|
startStaleTaskCleanup()
|
||||||
@@ -175,32 +185,47 @@ process.on('uncaughtException', (err) => {
|
|||||||
// ── Graceful shutdown ───────────────────────────────────────────────────
|
// ── Graceful shutdown ───────────────────────────────────────────────────
|
||||||
let shuttingDown = false
|
let shuttingDown = false
|
||||||
|
|
||||||
function gracefulShutdown(signal) {
|
async function shutdownRuntimeState() {
|
||||||
|
if (staleLeaseCleanupTimer) {
|
||||||
|
clearInterval(staleLeaseCleanupTimer)
|
||||||
|
staleLeaseCleanupTimer = null
|
||||||
|
}
|
||||||
|
stopSettlementWorker()
|
||||||
|
stopProviderHealthMonitor()
|
||||||
|
stopPollerRecovery()
|
||||||
|
stopStaleTaskCleanup()
|
||||||
|
await Promise.allSettled([stopTaskEventListener(), stopAllPollers()])
|
||||||
|
}
|
||||||
|
|
||||||
|
function closeServer() {
|
||||||
|
if (!server || !server.listening) return Promise.resolve()
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
server.close(() => {
|
||||||
|
console.log('[shutdown] Server closed, cleaning up...')
|
||||||
|
resolve()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async function gracefulShutdown(signal) {
|
||||||
if (shuttingDown) return
|
if (shuttingDown) return
|
||||||
shuttingDown = true
|
shuttingDown = true
|
||||||
console.log('[shutdown] Received ' + signal + ', draining connections...')
|
console.log('[shutdown] Received ' + signal + ', draining connections...')
|
||||||
|
|
||||||
if (server && server.listening) {
|
setTimeout(() => {
|
||||||
server.close(() => {
|
console.error('[shutdown] Forced exit after timeout')
|
||||||
console.log('[shutdown] Server closed, cleaning up...')
|
process.exit(1)
|
||||||
const { stopProviderHealthMonitor } = require('./providerHealthMonitor')
|
}, 15000).unref()
|
||||||
stopProviderHealthMonitor()
|
|
||||||
const { stopTaskEventListener, stopPollerRecovery } = require('./aiTaskWorker')
|
|
||||||
stopPollerRecovery()
|
|
||||||
void stopTaskEventListener()
|
|
||||||
const { pool } = require('./db')
|
|
||||||
pool.end().then(() => {
|
|
||||||
console.log('[shutdown] Database pool closed')
|
|
||||||
process.exit(0)
|
|
||||||
}).catch(() => process.exit(0))
|
|
||||||
})
|
|
||||||
|
|
||||||
// Force exit after timeout
|
try {
|
||||||
setTimeout(() => {
|
await shutdownRuntimeState()
|
||||||
console.error('[shutdown] Forced exit after timeout')
|
await closeServer()
|
||||||
process.exit(1)
|
const { pool } = require('./db')
|
||||||
}, 15000).unref()
|
await pool.end()
|
||||||
} else {
|
console.log('[shutdown] Database pool closed')
|
||||||
|
process.exit(0)
|
||||||
|
} catch (err) {
|
||||||
|
console.error('[shutdown] error:', err)
|
||||||
process.exit(0)
|
process.exit(0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+9
-3
@@ -284,7 +284,7 @@ async function releaseLeaseInternal(leaseToken, user, options = {}) {
|
|||||||
const { rows } = await client.query(
|
const { rows } = await client.query(
|
||||||
`
|
`
|
||||||
WITH candidate AS (
|
WITH candidate AS (
|
||||||
SELECT l.id, l.key_id, k.provider
|
SELECT l.id, l.key_id, l.user_id, l.enterprise_id, k.provider
|
||||||
FROM key_leases l
|
FROM key_leases l
|
||||||
JOIN api_keys k ON k.id = l.key_id
|
JOIN api_keys k ON k.id = l.key_id
|
||||||
WHERE l.lease_token = $1 AND l.released_at IS NULL
|
WHERE l.lease_token = $1 AND l.released_at IS NULL
|
||||||
@@ -297,7 +297,7 @@ async function releaseLeaseInternal(leaseToken, user, options = {}) {
|
|||||||
WHERE id = (SELECT id FROM candidate)
|
WHERE id = (SELECT id FROM candidate)
|
||||||
RETURNING id, key_id
|
RETURNING id, key_id
|
||||||
)
|
)
|
||||||
SELECT r.id, r.key_id, c.provider
|
SELECT r.id, r.key_id, c.user_id AS lease_user_id, c.enterprise_id AS lease_enterprise_id, c.provider
|
||||||
FROM released r
|
FROM released r
|
||||||
JOIN candidate c ON c.key_id = r.key_id
|
JOIN candidate c ON c.key_id = r.key_id
|
||||||
`,
|
`,
|
||||||
@@ -339,7 +339,13 @@ async function releaseLeaseInternal(leaseToken, user, options = {}) {
|
|||||||
INSERT INTO usage_logs (user_id, enterprise_id, provider, key_id, action)
|
INSERT INTO usage_logs (user_id, enterprise_id, provider, key_id, action)
|
||||||
VALUES ($1, $2, (SELECT provider FROM api_keys WHERE id = $3), $4, $5)
|
VALUES ($1, $2, (SELECT provider FROM api_keys WHERE id = $3), $4, $5)
|
||||||
`,
|
`,
|
||||||
[userId, enterpriseId, lease.key_id, lease.key_id, "release"],
|
[
|
||||||
|
userId || lease.lease_user_id,
|
||||||
|
enterpriseId || lease.lease_enterprise_id,
|
||||||
|
lease.key_id,
|
||||||
|
lease.key_id,
|
||||||
|
"release",
|
||||||
|
],
|
||||||
);
|
);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -7,6 +7,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
const { pool } = require("./db");
|
const { pool } = require("./db");
|
||||||
|
const { recordProviderSuccess, recordProviderFailure, getAllBreakerStats } = require("./providerCircuitBreaker");
|
||||||
|
|
||||||
const CHECK_INTERVAL_MS = 5 * 60 * 1000;
|
const CHECK_INTERVAL_MS = 5 * 60 * 1000;
|
||||||
const DASHSCOPE_TEST_MODEL = "qwen-max";
|
const DASHSCOPE_TEST_MODEL = "qwen-max";
|
||||||
@@ -21,6 +22,15 @@ const providerHealthCache = {
|
|||||||
grsai: { status: "unknown", lastCheck: null, lastError: null, details: null },
|
grsai: { status: "unknown", lastCheck: null, lastError: null, details: null },
|
||||||
};
|
};
|
||||||
|
|
||||||
|
function recordProbeOutcome(provider, result, latencyMs) {
|
||||||
|
if (!provider) return;
|
||||||
|
if (result?.ok) {
|
||||||
|
recordProviderSuccess(provider, latencyMs);
|
||||||
|
} else {
|
||||||
|
recordProviderFailure(provider);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function getDashScopeKey() {
|
async function getDashScopeKey() {
|
||||||
const { rows } = await pool.query(
|
const { rows } = await pool.query(
|
||||||
"SELECT id, api_key FROM api_keys WHERE provider LIKE '%dashscope%' AND enabled = 1 ORDER BY id LIMIT 1"
|
"SELECT id, api_key FROM api_keys WHERE provider LIKE '%dashscope%' AND enabled = 1 ORDER BY id LIMIT 1"
|
||||||
@@ -120,8 +130,10 @@ async function runHealthCheck() {
|
|||||||
// ── DashScope ──
|
// ── DashScope ──
|
||||||
const dashKey = await getDashScopeKey();
|
const dashKey = await getDashScopeKey();
|
||||||
if (dashKey) {
|
if (dashKey) {
|
||||||
|
const startedAt = Date.now();
|
||||||
try {
|
try {
|
||||||
const result = await probeDashScope(dashKey);
|
const result = await probeDashScope(dashKey);
|
||||||
|
recordProbeOutcome("dashscope", result, Date.now() - startedAt);
|
||||||
const prev = providerHealthCache.dashscope.status;
|
const prev = providerHealthCache.dashscope.status;
|
||||||
providerHealthCache.dashscope = {
|
providerHealthCache.dashscope = {
|
||||||
status: result.status,
|
status: result.status,
|
||||||
@@ -144,6 +156,7 @@ async function runHealthCheck() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
recordProviderFailure("dashscope");
|
||||||
providerHealthCache.dashscope = {
|
providerHealthCache.dashscope = {
|
||||||
status: "timeout",
|
status: "timeout",
|
||||||
lastCheck: new Date().toISOString(),
|
lastCheck: new Date().toISOString(),
|
||||||
@@ -164,8 +177,10 @@ async function runHealthCheck() {
|
|||||||
// ── GrsAI ──
|
// ── GrsAI ──
|
||||||
const grsaiKey = await getGrsaiKey();
|
const grsaiKey = await getGrsaiKey();
|
||||||
if (grsaiKey) {
|
if (grsaiKey) {
|
||||||
|
const startedAt = Date.now();
|
||||||
try {
|
try {
|
||||||
const result = await probeGrsai(grsaiKey);
|
const result = await probeGrsai(grsaiKey);
|
||||||
|
recordProbeOutcome("grsai", result, Date.now() - startedAt);
|
||||||
const prev = providerHealthCache.grsai.status;
|
const prev = providerHealthCache.grsai.status;
|
||||||
providerHealthCache.grsai = {
|
providerHealthCache.grsai = {
|
||||||
status: result.status,
|
status: result.status,
|
||||||
@@ -186,6 +201,7 @@ async function runHealthCheck() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
recordProviderFailure("grsai");
|
||||||
providerHealthCache.grsai = {
|
providerHealthCache.grsai = {
|
||||||
status: "timeout",
|
status: "timeout",
|
||||||
lastCheck: new Date().toISOString(),
|
lastCheck: new Date().toISOString(),
|
||||||
@@ -204,10 +220,7 @@ async function runHealthCheck() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ── Circuit breaker summary ──
|
// ── Circuit breaker summary ──
|
||||||
try {
|
providerHealthCache.circuitBreaker = getAllBreakerStats();
|
||||||
const cb = require("./providerCircuitBreaker");
|
|
||||||
providerHealthCache.circuitBreaker = cb.getProviderStatusMap ? cb.getProviderStatusMap() : null;
|
|
||||||
} catch {}
|
|
||||||
|
|
||||||
// ── Admin low-balance alert ──
|
// ── Admin low-balance alert ──
|
||||||
try {
|
try {
|
||||||
@@ -256,4 +269,4 @@ module.exports = {
|
|||||||
stopProviderHealthMonitor,
|
stopProviderHealthMonitor,
|
||||||
getProviderHealthCache,
|
getProviderHealthCache,
|
||||||
runHealthCheck,
|
runHealthCheck,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -0,0 +1,120 @@
|
|||||||
|
"use strict";
|
||||||
|
|
||||||
|
const crypto = require("node:crypto");
|
||||||
|
const { pool } = require("./db");
|
||||||
|
|
||||||
|
const DEFAULT_MAX_CONCURRENCY = 8;
|
||||||
|
const DEFAULT_SLOT_TTL_MS = 30_000;
|
||||||
|
const POLL_SCOPE = "generation-provider-poll:global";
|
||||||
|
const OWNER_ID = `${process.pid}-${crypto.randomUUID()}`;
|
||||||
|
|
||||||
|
let storeReady = null;
|
||||||
|
|
||||||
|
function normalizePositiveInteger(value, fallback) {
|
||||||
|
const numeric = Number(value);
|
||||||
|
if (!Number.isFinite(numeric) || numeric <= 0) return fallback;
|
||||||
|
return Math.max(1, Math.trunc(numeric));
|
||||||
|
}
|
||||||
|
|
||||||
|
function getMaxConcurrency() {
|
||||||
|
return normalizePositiveInteger(process.env.TASK_PROVIDER_POLL_MAX_CONCURRENCY, DEFAULT_MAX_CONCURRENCY);
|
||||||
|
}
|
||||||
|
|
||||||
|
function getSlotTtlInterval() {
|
||||||
|
const ttlMs = normalizePositiveInteger(process.env.TASK_PROVIDER_POLL_SLOT_TTL_MS, DEFAULT_SLOT_TTL_MS);
|
||||||
|
return `${Math.max(1, Math.ceil(ttlMs / 1000))} seconds`;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function ensureProviderPollLimiterStore() {
|
||||||
|
if (storeReady) return storeReady;
|
||||||
|
storeReady = pool.query(`
|
||||||
|
CREATE TABLE IF NOT EXISTS generation_provider_poll_slots (
|
||||||
|
scope TEXT NOT NULL,
|
||||||
|
slot_no INTEGER NOT NULL,
|
||||||
|
owner_id TEXT NOT NULL,
|
||||||
|
task_id INTEGER,
|
||||||
|
expires_at TIMESTAMPTZ NOT NULL,
|
||||||
|
acquired_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
PRIMARY KEY (scope, slot_no)
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_generation_provider_poll_slots_expires
|
||||||
|
ON generation_provider_poll_slots(expires_at);
|
||||||
|
`).catch((err) => {
|
||||||
|
storeReady = null;
|
||||||
|
throw err;
|
||||||
|
});
|
||||||
|
return storeReady;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function acquireProviderPollSlot(taskId, options = {}) {
|
||||||
|
await ensureProviderPollLimiterStore();
|
||||||
|
|
||||||
|
const scope = options.scope || POLL_SCOPE;
|
||||||
|
const maxConcurrency = normalizePositiveInteger(options.maxConcurrency, getMaxConcurrency());
|
||||||
|
const ttlInterval = options.ttlInterval || getSlotTtlInterval();
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`
|
||||||
|
WITH candidate AS (
|
||||||
|
SELECT s.slot_no
|
||||||
|
FROM generate_series(1, $2::integer) AS s(slot_no)
|
||||||
|
LEFT JOIN generation_provider_poll_slots l
|
||||||
|
ON l.scope = $1 AND l.slot_no = s.slot_no
|
||||||
|
WHERE l.scope IS NULL OR l.expires_at < NOW()
|
||||||
|
ORDER BY s.slot_no ASC
|
||||||
|
LIMIT 1
|
||||||
|
),
|
||||||
|
claimed AS (
|
||||||
|
INSERT INTO generation_provider_poll_slots (
|
||||||
|
scope, slot_no, owner_id, task_id, expires_at, acquired_at, updated_at
|
||||||
|
)
|
||||||
|
SELECT $1, slot_no, $3, $4, NOW() + ($5::text)::interval, NOW(), NOW()
|
||||||
|
FROM candidate
|
||||||
|
ON CONFLICT (scope, slot_no) DO UPDATE SET
|
||||||
|
owner_id = EXCLUDED.owner_id,
|
||||||
|
task_id = EXCLUDED.task_id,
|
||||||
|
expires_at = EXCLUDED.expires_at,
|
||||||
|
acquired_at = NOW(),
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE generation_provider_poll_slots.expires_at < NOW()
|
||||||
|
RETURNING scope, slot_no
|
||||||
|
)
|
||||||
|
SELECT scope, slot_no FROM claimed
|
||||||
|
`,
|
||||||
|
[scope, maxConcurrency, OWNER_ID, taskId || null, ttlInterval],
|
||||||
|
);
|
||||||
|
|
||||||
|
const slot = rows[0];
|
||||||
|
return slot ? { scope: slot.scope, slotNo: slot.slot_no, ownerId: OWNER_ID } : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function releaseProviderPollSlot(slot) {
|
||||||
|
if (!slot?.scope || !slot?.slotNo) return;
|
||||||
|
await ensureProviderPollLimiterStore();
|
||||||
|
await pool.query(
|
||||||
|
"DELETE FROM generation_provider_poll_slots WHERE scope = $1 AND slot_no = $2 AND owner_id = $3",
|
||||||
|
[slot.scope, slot.slotNo, slot.ownerId || OWNER_ID],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function withProviderPollSlot(taskId, fn, options = {}) {
|
||||||
|
const slot = await acquireProviderPollSlot(taskId, options);
|
||||||
|
if (!slot) return { acquired: false, value: undefined };
|
||||||
|
|
||||||
|
try {
|
||||||
|
return { acquired: true, value: await fn() };
|
||||||
|
} finally {
|
||||||
|
await releaseProviderPollSlot(slot).catch((err) => {
|
||||||
|
console.error(`[providerPollLimiter] failed to release poll slot ${slot.scope}:${slot.slotNo}:`, err.message);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
acquireProviderPollSlot,
|
||||||
|
ensureProviderPollLimiterStore,
|
||||||
|
getMaxConcurrency,
|
||||||
|
normalizePositiveInteger,
|
||||||
|
releaseProviderPollSlot,
|
||||||
|
withProviderPollSlot,
|
||||||
|
};
|
||||||
+98
-26
@@ -4,7 +4,7 @@ const crypto = require("node:crypto");
|
|||||||
const { requireAuth, keyManager, preauthorizeCall, pool, withTransaction, deductImageGenerationCredits } = require("./context");
|
const { requireAuth, keyManager, preauthorizeCall, pool, withTransaction, deductImageGenerationCredits } = require("./context");
|
||||||
const { putObject, isOssConfigured } = require("../ossClient");
|
const { putObject, isOssConfigured } = require("../ossClient");
|
||||||
const { buildImageProviderDebug, resolveImageProviderCandidates, resolveVideoProvider, resolveTextProvider, getPostUrl } = require("../aiProviderRouter");
|
const { buildImageProviderDebug, resolveImageProviderCandidates, resolveVideoProvider, resolveTextProvider, getPostUrl } = require("../aiProviderRouter");
|
||||||
const { shouldSkipProvider, recordProviderSuccess, recordProviderFailure } = require("../providerCircuitBreaker");
|
const { shouldSkipProvider, recordProviderSuccess, recordProviderFailure, getAdaptiveTimeout } = require("../providerCircuitBreaker");
|
||||||
const {
|
const {
|
||||||
isEnterpriseVideoBillingUser,
|
isEnterpriseVideoBillingUser,
|
||||||
markEnterpriseVideoCreditsAccepted,
|
markEnterpriseVideoCreditsAccepted,
|
||||||
@@ -16,6 +16,7 @@ const {
|
|||||||
} = require("../enterpriseVideoBilling");
|
} = require("../enterpriseVideoBilling");
|
||||||
const {
|
const {
|
||||||
startPolling,
|
startPolling,
|
||||||
|
cancelTaskRuntimeState,
|
||||||
updateTaskInDb,
|
updateTaskInDb,
|
||||||
extractProviderTaskId,
|
extractProviderTaskId,
|
||||||
extractImageUrl,
|
extractImageUrl,
|
||||||
@@ -31,6 +32,10 @@ const {
|
|||||||
normalizeImageUpscaleFactor,
|
normalizeImageUpscaleFactor,
|
||||||
normalizeVideoStyleTransformOptions,
|
normalizeVideoStyleTransformOptions,
|
||||||
} = require("../aiUpscaleHelpers");
|
} = require("../aiUpscaleHelpers");
|
||||||
|
const {
|
||||||
|
formatTaskProgressPayload,
|
||||||
|
parseTaskParams,
|
||||||
|
} = require("../taskProgressContract");
|
||||||
|
|
||||||
const GRSAI_IMAGE_QUALITY_MODEL_OVERRIDES = new Map([
|
const GRSAI_IMAGE_QUALITY_MODEL_OVERRIDES = new Map([
|
||||||
["gpt-image-2", "1K"],
|
["gpt-image-2", "1K"],
|
||||||
@@ -59,6 +64,7 @@ function toViapiAccessibleUrl(url) {
|
|||||||
const SUPER_RESOLVE_POLL_INTERVAL_MS = 3000;
|
const SUPER_RESOLVE_POLL_INTERVAL_MS = 3000;
|
||||||
const SUPER_RESOLVE_MAX_POLL_ATTEMPTS = 200;
|
const SUPER_RESOLVE_MAX_POLL_ATTEMPTS = 200;
|
||||||
const IMAGE_PROVIDER_SUBMIT_TIMEOUT_MS = 90_000;
|
const IMAGE_PROVIDER_SUBMIT_TIMEOUT_MS = 90_000;
|
||||||
|
const GRSAI_IMAGE_SUBMIT_TIMEOUT_MS = Number(process.env.GRSAI_IMAGE_SUBMIT_TIMEOUT_MS || 30_000);
|
||||||
const GEMINI_IMAGE_SUBMIT_TIMEOUT_MS = 180_000;
|
const GEMINI_IMAGE_SUBMIT_TIMEOUT_MS = 180_000;
|
||||||
const DASHSCOPE_VIDEO_STYLE_ENDPOINT = "https://dashscope.aliyuncs.com/api/v1/services/aigc/video-generation/video-synthesis";
|
const DASHSCOPE_VIDEO_STYLE_ENDPOINT = "https://dashscope.aliyuncs.com/api/v1/services/aigc/video-generation/video-synthesis";
|
||||||
const DASHSCOPE_IMAGE_EDIT_ENDPOINT = "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2image/image-synthesis";
|
const DASHSCOPE_IMAGE_EDIT_ENDPOINT = "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2image/image-synthesis";
|
||||||
@@ -431,16 +437,8 @@ function sanitizeUpstreamError(value, fallback = "上游服务暂时不可用,
|
|||||||
return compact.slice(0, 320);
|
return compact.slice(0, 320);
|
||||||
}
|
}
|
||||||
|
|
||||||
function parseTaskParams(value) {
|
|
||||||
if (!value || typeof value !== "string") return {};
|
|
||||||
try {
|
|
||||||
return JSON.parse(value);
|
|
||||||
} catch {
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function formatAiTaskRow(row) {
|
function formatAiTaskRow(row) {
|
||||||
|
const progressPayload = formatTaskProgressPayload(row);
|
||||||
return {
|
return {
|
||||||
taskId: String(row.id),
|
taskId: String(row.id),
|
||||||
projectId: row.project_id,
|
projectId: row.project_id,
|
||||||
@@ -448,9 +446,13 @@ function formatAiTaskRow(row) {
|
|||||||
clientQueueId: row.client_queue_id || null,
|
clientQueueId: row.client_queue_id || null,
|
||||||
type: row.type,
|
type: row.type,
|
||||||
status: row.status,
|
status: row.status,
|
||||||
progress: Number(row.progress || 0),
|
progress: progressPayload.progress,
|
||||||
resultUrl: row.result_url || null,
|
progressSource: progressPayload.progressSource,
|
||||||
error: row.error || null,
|
stage: progressPayload.stage,
|
||||||
|
startedAt: progressPayload.startedAt,
|
||||||
|
expectedDurationMs: progressPayload.expectedDurationMs,
|
||||||
|
resultUrl: progressPayload.resultUrl,
|
||||||
|
error: progressPayload.error,
|
||||||
params: parseTaskParams(row.params_json),
|
params: parseTaskParams(row.params_json),
|
||||||
createdAt: row.created_at,
|
createdAt: row.created_at,
|
||||||
updatedAt: row.updated_at,
|
updatedAt: row.updated_at,
|
||||||
@@ -1071,6 +1073,16 @@ function registerAiRoutes(router) {
|
|||||||
error.costCents = billingResult.costCents;
|
error.costCents = billingResult.costCents;
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
|
if (billingResult.costCents > 0) {
|
||||||
|
await client.query(
|
||||||
|
"UPDATE generation_tasks SET cost_cents = $1, billing_target = $2, billing_refunded = 0, updated_at = NOW() WHERE id = $3",
|
||||||
|
[
|
||||||
|
billingResult.costCents,
|
||||||
|
billingResult.deductionType === "enterprise_image_flat" ? "enterprise_image" : "user",
|
||||||
|
nextTaskRow.id,
|
||||||
|
],
|
||||||
|
);
|
||||||
|
}
|
||||||
return { taskRow: nextTaskRow, imageBilling: billingResult };
|
return { taskRow: nextTaskRow, imageBilling: billingResult };
|
||||||
});
|
});
|
||||||
const preauth = { authorized: true, estimatedCostCents: 0, billingMode: imageBilling.deductionType };
|
const preauth = { authorized: true, estimatedCostCents: 0, billingMode: imageBilling.deductionType };
|
||||||
@@ -1085,9 +1097,11 @@ function registerAiRoutes(router) {
|
|||||||
},
|
},
|
||||||
providerDebug: buildImageProviderDebug(model),
|
providerDebug: buildImageProviderDebug(model),
|
||||||
});
|
});
|
||||||
submitImageWithProviderFallback(taskRow.id, providerCandidates, req.user, preauth, params).catch((err) => {
|
submitImageWithProviderFallback(taskRow.id, providerCandidates, req.user, preauth, params).catch(async (err) => {
|
||||||
console.error("[ai/image] submit error:", err.message);
|
console.error("[ai/image] submit error:", err.message);
|
||||||
updateTaskInDb(taskRow.id, { status: "failed", error: err.message });
|
await updateTaskInDb(taskRow.id, { status: "failed", error: err.message }).catch((updateErr) => {
|
||||||
|
console.error(`[ai/image] failed to persist task ${taskRow.id} failure:`, updateErr.message);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("[ai/image] error:", err.message);
|
console.error("[ai/image] error:", err.message);
|
||||||
@@ -1199,6 +1213,10 @@ function registerAiRoutes(router) {
|
|||||||
...enterpriseBilling,
|
...enterpriseBilling,
|
||||||
taskId: nextTaskRow.id,
|
taskId: nextTaskRow.id,
|
||||||
});
|
});
|
||||||
|
await client.query(
|
||||||
|
"UPDATE generation_tasks SET cost_cents = $1, billing_target = 'enterprise_video', billing_refunded = 0, updated_at = NOW() WHERE id = $2",
|
||||||
|
[nextBilling.amountCents, nextTaskRow.id],
|
||||||
|
);
|
||||||
return { taskRow: nextTaskRow, reservedBilling: nextBilling, regularBilling: null };
|
return { taskRow: nextTaskRow, reservedBilling: nextBilling, regularBilling: null };
|
||||||
}
|
}
|
||||||
// Regular user: deduct from personal balance
|
// Regular user: deduct from personal balance
|
||||||
@@ -1221,6 +1239,10 @@ function registerAiRoutes(router) {
|
|||||||
"INSERT INTO transactions (user_id, type, amount_cents, balance_after_cents, description) VALUES ($1, 'deduct', $2, $3, $4)",
|
"INSERT INTO transactions (user_id, type, amount_cents, balance_after_cents, description) VALUES ($1, 'deduct', $2, $3, $4)",
|
||||||
[req.user.id, -costCents, deducted.balance_cents, `视频生成扣费 ${credits} 积分`],
|
[req.user.id, -costCents, deducted.balance_cents, `视频生成扣费 ${credits} 积分`],
|
||||||
);
|
);
|
||||||
|
await client.query(
|
||||||
|
"UPDATE generation_tasks SET cost_cents = $1, billing_target = 'user', billing_refunded = 0, updated_at = NOW() WHERE id = $2",
|
||||||
|
[costCents, nextTaskRow.id],
|
||||||
|
);
|
||||||
return { taskRow: nextTaskRow, reservedBilling: null, regularBilling: { costCents, balanceAfterCents: deducted.balance_cents, credits } };
|
return { taskRow: nextTaskRow, reservedBilling: null, regularBilling: { costCents, balanceAfterCents: deducted.balance_cents, credits } };
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -1742,6 +1764,29 @@ function registerAiRoutes(router) {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const streamTaskStatusPoll = async (taskId, userId, emit) => {
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
"SELECT * FROM generation_tasks WHERE id = $1 AND user_id = $2",
|
||||||
|
[taskId, userId],
|
||||||
|
);
|
||||||
|
const row = rows[0];
|
||||||
|
if (!row) return { found: false, terminal: true };
|
||||||
|
|
||||||
|
if (row.status === "pending" || row.status === "running") {
|
||||||
|
pool.query(
|
||||||
|
"UPDATE generation_tasks SET last_poll_at = NOW() WHERE id = $1",
|
||||||
|
[taskId],
|
||||||
|
).catch(() => {});
|
||||||
|
}
|
||||||
|
|
||||||
|
const event = formatTaskProgressPayload(row);
|
||||||
|
emit(event);
|
||||||
|
return {
|
||||||
|
found: true,
|
||||||
|
terminal: ["completed", "failed", "cancelled"].includes(row.status),
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
router.get("/ai/tasks/:taskId/stream", requireAuth, async (req, res) => {
|
router.get("/ai/tasks/:taskId/stream", requireAuth, async (req, res) => {
|
||||||
const { taskId } = req.params;
|
const { taskId } = req.params;
|
||||||
try {
|
try {
|
||||||
@@ -1759,13 +1804,7 @@ function registerAiRoutes(router) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
const row = rows[0];
|
const row = rows[0];
|
||||||
const initial = {
|
const initial = formatTaskProgressPayload(row);
|
||||||
taskId: row.id,
|
|
||||||
status: row.status,
|
|
||||||
progress: row.progress,
|
|
||||||
resultUrl: row.result_url || null,
|
|
||||||
error: row.error || null,
|
|
||||||
};
|
|
||||||
res.write(`data: ${JSON.stringify(initial)}\n\n`);
|
res.write(`data: ${JSON.stringify(initial)}\n\n`);
|
||||||
|
|
||||||
if (["completed", "failed", "cancelled"].includes(row.status)) {
|
if (["completed", "failed", "cancelled"].includes(row.status)) {
|
||||||
@@ -1773,16 +1812,43 @@ function registerAiRoutes(router) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let closed = false;
|
||||||
|
let lastSnapshot = JSON.stringify(initial);
|
||||||
|
let dbPollTimer = null;
|
||||||
|
const endStream = () => {
|
||||||
|
if (closed) return;
|
||||||
|
closed = true;
|
||||||
|
if (dbPollTimer) clearInterval(dbPollTimer);
|
||||||
|
taskEvents.off(`task:${taskId}`, onUpdate);
|
||||||
|
res.end();
|
||||||
|
};
|
||||||
|
const emitIfChanged = (evt) => {
|
||||||
|
if (closed) return;
|
||||||
|
const snapshot = JSON.stringify(evt);
|
||||||
|
if (snapshot === lastSnapshot) return;
|
||||||
|
lastSnapshot = snapshot;
|
||||||
|
res.write(`data: ${snapshot}\n\n`);
|
||||||
|
};
|
||||||
const onUpdate = (evt) => {
|
const onUpdate = (evt) => {
|
||||||
res.write(`data: ${JSON.stringify(evt)}\n\n`);
|
emitIfChanged(evt);
|
||||||
if (["completed", "failed", "cancelled"].includes(evt.status)) {
|
if (["completed", "failed", "cancelled"].includes(evt.status)) {
|
||||||
res.end();
|
endStream();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
taskEvents.on(`task:${taskId}`, onUpdate);
|
taskEvents.on(`task:${taskId}`, onUpdate);
|
||||||
|
|
||||||
|
dbPollTimer = setInterval(() => {
|
||||||
|
streamTaskStatusPoll(taskId, req.user.id, emitIfChanged)
|
||||||
|
.then((result) => {
|
||||||
|
if (!result.found || result.terminal) endStream();
|
||||||
|
})
|
||||||
|
.catch((pollErr) => {
|
||||||
|
console.error(`[ai/task-stream] db poll failed for task ${taskId}:`, pollErr.message);
|
||||||
|
});
|
||||||
|
}, 3000);
|
||||||
|
|
||||||
req.on("close", () => {
|
req.on("close", () => {
|
||||||
taskEvents.off(`task:${taskId}`, onUpdate);
|
endStream();
|
||||||
});
|
});
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (!res.headersSent) res.status(err.name === "AbortError" ? 504 : 500).json({ error: err.name === "AbortError" ? "AI 上游响应超时,请重试" : err.message });
|
if (!res.headersSent) res.status(err.name === "AbortError" ? 504 : 500).json({ error: err.name === "AbortError" ? "AI 上游响应超时,请重试" : err.message });
|
||||||
@@ -1799,6 +1865,7 @@ function registerAiRoutes(router) {
|
|||||||
[taskId, req.user.id],
|
[taskId, req.user.id],
|
||||||
);
|
);
|
||||||
if (rows.length === 0) return res.status(404).json({ error: "Task not found or not in active state" });
|
if (rows.length === 0) return res.status(404).json({ error: "Task not found or not in active state" });
|
||||||
|
await cancelTaskRuntimeState(taskId, keyManager);
|
||||||
res.json({ id: rows[0].id, status: rows[0].status });
|
res.json({ id: rows[0].id, status: rows[0].status });
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("[ai/task-cancel] error:", err.message);
|
console.error("[ai/task-cancel] error:", err.message);
|
||||||
@@ -1957,7 +2024,12 @@ async function submitImageToProvider(taskDbId, providerConfig, slotResult, param
|
|||||||
const { headers, body } = buildImageRequest(providerConfig, params, slotResult.apiKey);
|
const { headers, body } = buildImageRequest(providerConfig, params, slotResult.apiKey);
|
||||||
|
|
||||||
await updateTaskInDb(taskDbId, { status: "running", progress: 10 });
|
await updateTaskInDb(taskDbId, { status: "running", progress: 10 });
|
||||||
const submitTimeout = providerConfig.transport === "gemini-image" ? GEMINI_IMAGE_SUBMIT_TIMEOUT_MS : IMAGE_PROVIDER_SUBMIT_TIMEOUT_MS;
|
const defaultSubmitTimeout = providerConfig.transport === "gemini-image"
|
||||||
|
? GEMINI_IMAGE_SUBMIT_TIMEOUT_MS
|
||||||
|
: providerConfig.transport === "grsai-image"
|
||||||
|
? GRSAI_IMAGE_SUBMIT_TIMEOUT_MS
|
||||||
|
: IMAGE_PROVIDER_SUBMIT_TIMEOUT_MS;
|
||||||
|
const submitTimeout = getAdaptiveTimeout(providerConfig.provider, defaultSubmitTimeout);
|
||||||
const response = await fetchWithTimeout(url, { method: "POST", headers, body: JSON.stringify(body) }, submitTimeout);
|
const response = await fetchWithTimeout(url, { method: "POST", headers, body: JSON.stringify(body) }, submitTimeout);
|
||||||
if (!response.ok) {
|
if (!response.ok) {
|
||||||
const errText = await response.text().catch(() => "provider error");
|
const errText = await response.text().catch(() => "provider error");
|
||||||
|
|||||||
@@ -1,11 +1,16 @@
|
|||||||
const { keyManager, listModelPrices, pool } = require("./context");
|
const { keyManager, listModelPrices, pool } = require("./context");
|
||||||
|
const { getEnterpriseVideoPricingConfig } = require("../enterpriseVideoBilling");
|
||||||
|
|
||||||
function registerPriceRoutes(router) {
|
function registerPriceRoutes(router) {
|
||||||
// ── Public ───────────────────────────────────────────────────────────
|
// ── Public ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
router.get("/prices", async (_req, res) => {
|
router.get("/prices", async (_req, res) => {
|
||||||
const prices = await listModelPrices({ enabledOnly: true });
|
const prices = await listModelPrices({ enabledOnly: true });
|
||||||
res.json(prices);
|
res.json({
|
||||||
|
prices,
|
||||||
|
modelPrices: prices,
|
||||||
|
enterpriseVideoPricing: getEnterpriseVideoPricingConfig(),
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,134 @@
|
|||||||
|
"use strict";
|
||||||
|
|
||||||
|
const PROGRESS_SOURCE_REAL = "real";
|
||||||
|
const PROGRESS_SOURCE_ESTIMATED = "estimated";
|
||||||
|
|
||||||
|
const DEFAULT_IMAGE_EXPECTED_DURATION_MS = 120_000;
|
||||||
|
const DEFAULT_VIDEO_EXPECTED_DURATION_MS = 240_000;
|
||||||
|
const DEFAULT_SUPER_RESOLUTION_EXPECTED_DURATION_MS = 180_000;
|
||||||
|
|
||||||
|
function parseTaskParams(value) {
|
||||||
|
if (!value) return {};
|
||||||
|
if (typeof value === "object" && !Array.isArray(value)) return value;
|
||||||
|
if (typeof value !== "string") return {};
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(value);
|
||||||
|
return parsed && typeof parsed === "object" && !Array.isArray(parsed) ? parsed : {};
|
||||||
|
} catch {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function normalizeProgressSource(value) {
|
||||||
|
const source = String(value || "").trim().toLowerCase();
|
||||||
|
if (source === PROGRESS_SOURCE_REAL) return PROGRESS_SOURCE_REAL;
|
||||||
|
if (source === PROGRESS_SOURCE_ESTIMATED) return PROGRESS_SOURCE_ESTIMATED;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function inferProgressSource(row) {
|
||||||
|
const explicit = normalizeProgressSource(row?.progress_source || row?.progressSource);
|
||||||
|
if (explicit) return explicit;
|
||||||
|
if (row?.status === "completed") return PROGRESS_SOURCE_REAL;
|
||||||
|
return PROGRESS_SOURCE_ESTIMATED;
|
||||||
|
}
|
||||||
|
|
||||||
|
function normalizePositiveNumber(value) {
|
||||||
|
const numeric = Number(value);
|
||||||
|
return Number.isFinite(numeric) && numeric > 0 ? numeric : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function normalizeProgress(value, status) {
|
||||||
|
if (status === "completed") return 100;
|
||||||
|
const numeric = Number(value);
|
||||||
|
if (!Number.isFinite(numeric)) return 0;
|
||||||
|
return Math.max(0, Math.min(100, Math.round(numeric)));
|
||||||
|
}
|
||||||
|
|
||||||
|
function getExpectedImageDurationMs(model, params) {
|
||||||
|
const normalized = String(model || "").toLowerCase();
|
||||||
|
let durationMs = DEFAULT_IMAGE_EXPECTED_DURATION_MS;
|
||||||
|
|
||||||
|
if (normalized.includes("nano-banana-pro")) durationMs = 220_000;
|
||||||
|
else if (normalized.includes("nano-banana-2")) durationMs = 180_000;
|
||||||
|
else if (normalized.includes("nano-banana-fast")) durationMs = 90_000;
|
||||||
|
else if (normalized.includes("wan2.7-image-pro")) durationMs = 180_000;
|
||||||
|
else if (normalized.includes("wan2.7-image")) durationMs = 120_000;
|
||||||
|
else if (normalized.includes("gpt-image")) durationMs = 120_000;
|
||||||
|
|
||||||
|
const referenceCount = Array.isArray(params.referenceUrls) ? params.referenceUrls.filter(Boolean).length : 0;
|
||||||
|
if (referenceCount > 0) durationMs += Math.min(60_000, referenceCount * 15_000);
|
||||||
|
return durationMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getExpectedVideoDurationMs(model, params) {
|
||||||
|
const normalized = String(model || "").toLowerCase();
|
||||||
|
const seconds = normalizePositiveNumber(params.duration || params.durationSeconds) || 5;
|
||||||
|
let durationMs = DEFAULT_VIDEO_EXPECTED_DURATION_MS;
|
||||||
|
|
||||||
|
if (normalized.includes("kling")) durationMs = 300_000;
|
||||||
|
else if (normalized.includes("happyhorse")) durationMs = 240_000;
|
||||||
|
else if (normalized.includes("wan2.7") || normalized.includes("wanxiang")) durationMs = 240_000;
|
||||||
|
else if (normalized.includes("vidu") || normalized.includes("pixverse")) durationMs = 240_000;
|
||||||
|
else if (normalized.includes("aliyun-video-super-resolve") || normalized.includes("video-style-transform")) {
|
||||||
|
durationMs = DEFAULT_SUPER_RESOLUTION_EXPECTED_DURATION_MS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (seconds > 5) durationMs += Math.min(240_000, Math.ceil(seconds - 5) * 20_000);
|
||||||
|
return durationMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getExpectedDurationMs(rowOrTask) {
|
||||||
|
const params = parseTaskParams(rowOrTask?.params_json || rowOrTask?.params);
|
||||||
|
const model = params.requestedModel || params.model || rowOrTask?.model || "";
|
||||||
|
|
||||||
|
if (params.operation === "image-edit" || params.function || String(model).includes("imageedit")) {
|
||||||
|
return DEFAULT_SUPER_RESOLUTION_EXPECTED_DURATION_MS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rowOrTask?.type === "video") return getExpectedVideoDurationMs(model, params);
|
||||||
|
return getExpectedImageDurationMs(model, params);
|
||||||
|
}
|
||||||
|
|
||||||
|
function deriveTaskStage(row) {
|
||||||
|
const status = String(row?.status || "");
|
||||||
|
if (status === "pending") return "\u6392\u961f\u4e2d";
|
||||||
|
if (status === "completed") return "\u5b8c\u6210";
|
||||||
|
if (status === "failed") return "\u5931\u8d25";
|
||||||
|
if (status === "cancelled") return "\u5df2\u53d6\u6d88";
|
||||||
|
if (status !== "running") return "\u5904\u7406\u4e2d";
|
||||||
|
|
||||||
|
const progress = Number(row?.progress || 0);
|
||||||
|
if (progress >= 90) return "\u7ed3\u679c\u5904\u7406\u4e2d";
|
||||||
|
if (progress >= 15) return "\u751f\u6210\u4e2d";
|
||||||
|
return "\u5df2\u63d0\u4ea4";
|
||||||
|
}
|
||||||
|
|
||||||
|
function formatTaskProgressPayload(row) {
|
||||||
|
const progress = normalizeProgress(row.progress, row.status);
|
||||||
|
return {
|
||||||
|
taskId: row.id,
|
||||||
|
status: row.status,
|
||||||
|
progress,
|
||||||
|
progressSource: inferProgressSource(row),
|
||||||
|
stage: deriveTaskStage(row),
|
||||||
|
startedAt: row.created_at,
|
||||||
|
expectedDurationMs: getExpectedDurationMs(row),
|
||||||
|
resultUrl: row.result_url || null,
|
||||||
|
error: row.error || null,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
DEFAULT_IMAGE_EXPECTED_DURATION_MS,
|
||||||
|
DEFAULT_SUPER_RESOLUTION_EXPECTED_DURATION_MS,
|
||||||
|
DEFAULT_VIDEO_EXPECTED_DURATION_MS,
|
||||||
|
PROGRESS_SOURCE_ESTIMATED,
|
||||||
|
PROGRESS_SOURCE_REAL,
|
||||||
|
deriveTaskStage,
|
||||||
|
formatTaskProgressPayload,
|
||||||
|
getExpectedDurationMs,
|
||||||
|
inferProgressSource,
|
||||||
|
normalizeProgressSource,
|
||||||
|
parseTaskParams,
|
||||||
|
};
|
||||||
Reference in New Issue
Block a user