Files
omniai-web/src/api/taskSubscription.ts
T
stringadmin 178a2c47da feat: add task lifecycle management and improve generation reliability
Centralize timeout policies, stall detection, and error classification
for image/video/text generation tasks. Improve ecommerce OSS upload flow
and add script evaluation enhancements.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-05 01:06:48 +08:00

131 lines
3.9 KiB
TypeScript

import { aiGenerationClient } from "./aiGenerationClient";
import {
buildLocalTimeoutMessage,
getTaskTimeoutPolicy,
isTaskLocallyTimedOut,
} from "../utils/taskLifecycle";
export interface TaskProgressEvent {
taskId: string;
status: string;
progress: number;
resultUrl?: string | null;
error?: string | null;
}
export interface WaitForTaskOptions {
onProgress?: (event: TaskProgressEvent) => void;
abortRef?: { current: boolean };
timeoutMs?: number;
noProgressTimeoutMs?: number;
startedAt?: number;
kind?: "image" | "video" | "text";
model?: string | null;
operation?: string | null;
}
const POLL_INTERVAL = 3000;
export function waitForTask(
taskId: string,
options: WaitForTaskOptions = {},
): Promise<string | null> {
const { onProgress, abortRef } = options;
const timeoutPolicy = getTaskTimeoutPolicy({
kind: options.kind,
model: options.model,
operation: options.operation,
});
const timeoutMs = options.timeoutMs ?? timeoutPolicy.maxRuntimeMs;
const noProgressTimeoutMs = options.noProgressTimeoutMs ?? timeoutPolicy.noProgressTimeoutMs;
const startedAt = options.startedAt ?? Date.now();
return new Promise((resolve, reject) => {
let settled = false;
let cleanup: (() => void) | null = null;
let timeoutId: ReturnType<typeof setTimeout> | null = null;
let sseConnected = false;
let fallbackTimerId: ReturnType<typeof setTimeout> | null = null;
let lastProgress = 0;
let lastProgressAt = startedAt;
const settle = (fn: () => void) => {
if (settled) return;
settled = true;
if (timeoutId) clearTimeout(timeoutId);
if (fallbackTimerId) clearTimeout(fallbackTimerId);
if (cleanup) cleanup();
fn();
};
timeoutId = setTimeout(
() => settle(() => reject(new Error(buildLocalTimeoutMessage(options.kind || "video")))),
timeoutMs,
);
const handleUpdate = (event: TaskProgressEvent) => {
if (settled) return;
if (abortRef?.current) {
settle(() => resolve(null));
return;
}
const progress = Number(event.progress || 0);
if (progress > lastProgress || event.status === "completed") {
lastProgress = Math.max(lastProgress, progress);
lastProgressAt = Date.now();
}
onProgress?.(event);
if (event.status === "completed") {
settle(() => resolve(event.resultUrl || null));
} else if (event.status === "failed" || event.status === "cancelled") {
settle(() => reject(new Error(event.error || "任务失败,请稍后重试")));
}
};
cleanup = aiGenerationClient.subscribeTaskStatus(taskId, handleUpdate);
sseConnected = true;
fallbackTimerId = setTimeout(() => {
if (settled || !sseConnected) return;
if (cleanup) cleanup();
startPolling();
}, 5000);
function startPolling() {
const poll = async () => {
while (!settled) {
if (abortRef?.current) {
settle(() => resolve(null));
return;
}
await new Promise((r) => setTimeout(r, POLL_INTERVAL));
if (settled || abortRef?.current) return;
const timeoutReason = isTaskLocallyTimedOut({
startedAt,
lastProgressAt,
progress: lastProgress,
policy: { ...timeoutPolicy, noProgressTimeoutMs },
});
if (timeoutReason) {
settle(() => reject(new Error(buildLocalTimeoutMessage(options.kind || "video"))));
return;
}
try {
const task = await aiGenerationClient.getTaskStatus(taskId);
handleUpdate({
taskId,
status: task.status,
progress: task.progress || 0,
resultUrl: task.resultUrl,
error: task.error,
});
} catch (e) {
if (!settled) settle(() => reject(e));
}
}
};
void poll();
}
});
}