feat: add task lifecycle management and improve generation reliability

Centralize timeout policies, stall detection, and error classification
for image/video/text generation tasks. Improve ecommerce OSS upload flow
and add script evaluation enhancements.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-05 01:00:33 +08:00
parent d36a093159
commit 178a2c47da
16 changed files with 1607 additions and 95 deletions
+41 -10
View File
@@ -1,5 +1,11 @@
import { useGenerationStore, type GenerationQueueItem } from "../stores/useGenerationStore";
import { aiGenerationClient } from "../api/aiGenerationClient";
import {
buildLocalTimeoutMessage,
buildTaskFailureInfo,
getTaskTimeoutPolicy,
isTaskLocallyTimedOut,
} from "../utils/taskLifecycle";
type PollCallback = (item: GenerationQueueItem) => void;
@@ -7,7 +13,7 @@ const activePollers = new Map<string, ReturnType<typeof setInterval>>();
const pollCallbacks = new Set<PollCallback>();
const POLL_INTERVAL = 3000;
const MAX_POLL_ATTEMPTS = 200; // 10 minutes max per task
const MAX_POLL_ATTEMPTS = 200; // Keep the previous 10-minute guard as a fallback.
export function subscribeToTaskUpdates(callback: PollCallback): () => void {
pollCallbacks.add(callback);
@@ -18,10 +24,25 @@ function notifyCallbacks(item: GenerationQueueItem): void {
pollCallbacks.forEach((cb) => cb(item));
}
function getQueueItemKind(item: GenerationQueueItem): "image" | "video" | "text" {
if (item.type === "image") return "image";
if (item.type === "video" || item.type === "ecommerce-video") return "video";
return "text";
}
function getQueueItemModel(item: GenerationQueueItem): string | undefined {
return typeof item.params?.model === "string" ? item.params.model : undefined;
}
function pollTask(item: GenerationQueueItem, attemptsRef: { current: number }): void {
const key = `poll-${item.id}`;
if (activePollers.has(key)) return;
const kind = getQueueItemKind(item);
const timeoutPolicy = getTaskTimeoutPolicy({ kind, model: getQueueItemModel(item) });
let lastProgress = Math.max(0, Number(item.progress || 0));
let lastProgressAt = Date.now();
const interval = setInterval(async () => {
const current = useGenerationStore.getState().queue.find((i) => i.id === item.id);
if (!current || current.status === "completed" || current.status === "failed" || current.status === "cancelled") {
@@ -30,18 +51,31 @@ function pollTask(item: GenerationQueueItem, attemptsRef: { current: number }):
}
attemptsRef.current++;
if (attemptsRef.current > MAX_POLL_ATTEMPTS) {
const timeoutReason = isTaskLocallyTimedOut({
startedAt: current.createdAt || item.createdAt || Date.now(),
lastProgressAt,
progress: lastProgress,
policy: timeoutPolicy,
});
if (timeoutReason || attemptsRef.current > MAX_POLL_ATTEMPTS) {
const error = buildLocalTimeoutMessage(kind);
useGenerationStore.getState().updateTask(item.id, {
status: "failed",
error: "任务超时,请重新提交",
error,
});
notifyCallbacks({ ...item, status: "failed", error: "任务超时,请重新提交" });
notifyCallbacks({ ...item, status: "failed", error });
cleanupPoll(key);
return;
}
try {
const status = await aiGenerationClient.getTaskStatus(current.taskId || item.taskId || "");
const nextProgress = Number(status.progress || 0);
if (nextProgress > lastProgress || status.status === "completed") {
lastProgress = Math.max(lastProgress, nextProgress);
lastProgressAt = Date.now();
}
const patch: Partial<GenerationQueueItem> = {
progress: status.progress,
resultUrl: status.resultUrl || current.resultUrl,
@@ -55,6 +89,7 @@ function pollTask(item: GenerationQueueItem, attemptsRef: { current: number }):
cleanupPoll(key);
} else if (status.status === "failed" || status.status === "cancelled") {
patch.status = "failed";
patch.error = buildTaskFailureInfo(status.error).message;
useGenerationStore.getState().updateTask(item.id, patch);
notifyCallbacks({ ...item, ...patch, status: "failed" });
cleanupPoll(key);
@@ -64,7 +99,7 @@ function pollTask(item: GenerationQueueItem, attemptsRef: { current: number }):
notifyCallbacks({ ...item, ...patch, status: "running" });
}
} catch {
// Network error during poll — keep trying
// Network errors during polling are retried until the lifecycle guard trips.
}
}, POLL_INTERVAL);
@@ -105,24 +140,20 @@ export function stopAllPolling(): void {
activePollers.clear();
}
// ── Recovery on page load ──────────────────────────
export function recoverAndResumeTasks(): void {
const pendingTasks = useGenerationStore.getState().getRunningTasks();
if (!pendingTasks.length) return;
pendingTasks.forEach((task) => {
if (task.taskId) {
// Mark as pending so the workbench/ecommerce can re-submit to polling
useGenerationStore.getState().updateTask(task.id, { status: "pending" });
} else {
// No taskId means it was queued but never submitted — mark failed
useGenerationStore.getState().updateTask(task.id, {
status: "failed",
error: "页面刷新后任务丢失,请重新提交",
error: "页面刷新后任务没有服务端 ID,已释放本地占用,请重新提交",
});
}
});
// Start polling recovered tasks
setTimeout(() => startBackgroundPolling(), 500);
}