Files
omniai-web/src/services/backgroundTaskRunner.ts
T

129 lines
4.3 KiB
TypeScript
Raw Normal View History

import { useGenerationStore, type GenerationQueueItem } from "../stores/useGenerationStore";
import { aiGenerationClient } from "../api/aiGenerationClient";
type PollCallback = (item: GenerationQueueItem) => void;
const activePollers = new Map<string, ReturnType<typeof setInterval>>();
const pollCallbacks = new Set<PollCallback>();
const POLL_INTERVAL = 3000;
const MAX_POLL_ATTEMPTS = 200; // 10 minutes max per task
export function subscribeToTaskUpdates(callback: PollCallback): () => void {
pollCallbacks.add(callback);
return () => { pollCallbacks.delete(callback); };
}
function notifyCallbacks(item: GenerationQueueItem): void {
pollCallbacks.forEach((cb) => cb(item));
}
function pollTask(item: GenerationQueueItem, attemptsRef: { current: number }): void {
const key = `poll-${item.id}`;
if (activePollers.has(key)) return;
const interval = setInterval(async () => {
const current = useGenerationStore.getState().queue.find((i) => i.id === item.id);
if (!current || current.status === "completed" || current.status === "failed" || current.status === "cancelled") {
cleanupPoll(key);
return;
}
attemptsRef.current++;
if (attemptsRef.current > MAX_POLL_ATTEMPTS) {
useGenerationStore.getState().updateTask(item.id, {
status: "failed",
error: "任务超时,请重新提交",
});
notifyCallbacks({ ...item, status: "failed", error: "任务超时,请重新提交" });
cleanupPoll(key);
return;
}
try {
const status = await aiGenerationClient.getTaskStatus(current.taskId || item.taskId || "");
const patch: Partial<GenerationQueueItem> = {
progress: status.progress,
resultUrl: status.resultUrl || current.resultUrl,
error: status.error || current.error,
};
if (status.status === "completed") {
patch.status = "completed";
useGenerationStore.getState().updateTask(item.id, patch);
notifyCallbacks({ ...item, ...patch, status: "completed" });
cleanupPoll(key);
} else if (status.status === "failed" || status.status === "cancelled") {
patch.status = "failed";
useGenerationStore.getState().updateTask(item.id, patch);
notifyCallbacks({ ...item, ...patch, status: "failed" });
cleanupPoll(key);
} else {
patch.status = "running";
useGenerationStore.getState().updateTask(item.id, patch);
notifyCallbacks({ ...item, ...patch, status: "running" });
}
} catch {
// Network error during poll — keep trying
}
}, POLL_INTERVAL);
activePollers.set(key, interval);
}
function cleanupPoll(key: string): void {
const interval = activePollers.get(key);
if (interval) {
clearInterval(interval);
activePollers.delete(key);
}
}
export function startBackgroundPolling(): void {
const tasks = useGenerationStore.getState().getRunningTasks();
const attemptsMap = new Map<string, { current: number }>();
tasks.forEach((task) => {
if (task.taskId) {
if (!attemptsMap.has(task.id)) {
attemptsMap.set(task.id, { current: 0 });
}
pollTask(task, attemptsMap.get(task.id)!);
}
});
}
export function resumeTaskPolling(taskId: string, storeId: string): void {
const task = useGenerationStore.getState().queue.find((i) => i.id === storeId);
if (task && task.status !== "completed" && task.status !== "failed") {
pollTask(task, { current: 0 });
}
}
export function stopAllPolling(): void {
activePollers.forEach((interval) => clearInterval(interval));
activePollers.clear();
}
// ── Recovery on page load ──────────────────────────
export function recoverAndResumeTasks(): void {
const pendingTasks = useGenerationStore.getState().getRunningTasks();
if (!pendingTasks.length) return;
pendingTasks.forEach((task) => {
if (task.taskId) {
// Mark as pending so the workbench/ecommerce can re-submit to polling
useGenerationStore.getState().updateTask(task.id, { status: "pending" });
} else {
// No taskId means it was queued but never submitted — mark failed
useGenerationStore.getState().updateTask(task.id, {
status: "failed",
error: "页面刷新后任务丢失,请重新提交",
});
}
});
// Start polling recovered tasks
setTimeout(() => startBackgroundPolling(), 500);
}