fix: improve generation task reliability
This commit is contained in:
@@ -1,20 +1,12 @@
|
||||
import { useGenerationStore, type GenerationQueueItem } from "../stores/useGenerationStore";
|
||||
import { aiGenerationClient } from "../api/aiGenerationClient";
|
||||
import {
|
||||
buildLocalTimeoutMessage,
|
||||
buildTaskFailureInfo,
|
||||
getTaskTimeoutPolicy,
|
||||
isTaskLocallyTimedOut,
|
||||
} from "../utils/taskLifecycle";
|
||||
import { waitForTask, type TaskProgressEvent } from "../api/taskSubscription";
|
||||
import { buildTaskFailureInfo } from "../utils/taskLifecycle";
|
||||
|
||||
type PollCallback = (item: GenerationQueueItem) => void;
|
||||
|
||||
const activePollers = new Map<string, ReturnType<typeof setInterval>>();
|
||||
const activePollers = new Map<string, { current: boolean }>();
|
||||
const pollCallbacks = new Set<PollCallback>();
|
||||
|
||||
const POLL_INTERVAL = 3000;
|
||||
const MAX_POLL_ATTEMPTS = 200; // Keep the previous 10-minute guard as a fallback.
|
||||
|
||||
export function subscribeToTaskUpdates(callback: PollCallback): () => void {
|
||||
pollCallbacks.add(callback);
|
||||
return () => { pollCallbacks.delete(callback); };
|
||||
@@ -34,109 +26,109 @@ function getQueueItemModel(item: GenerationQueueItem): string | undefined {
|
||||
return typeof item.params?.model === "string" ? item.params.model : undefined;
|
||||
}
|
||||
|
||||
function pollTask(item: GenerationQueueItem, attemptsRef: { current: number }): void {
|
||||
const key = `poll-${item.id}`;
|
||||
if (activePollers.has(key)) return;
|
||||
|
||||
const kind = getQueueItemKind(item);
|
||||
const timeoutPolicy = getTaskTimeoutPolicy({ kind, model: getQueueItemModel(item) });
|
||||
let lastProgress = Math.max(0, Number(item.progress || 0));
|
||||
let lastProgressAt = Date.now();
|
||||
|
||||
const interval = setInterval(async () => {
|
||||
const current = useGenerationStore.getState().queue.find((i) => i.id === item.id);
|
||||
if (!current || current.status === "completed" || current.status === "failed" || current.status === "cancelled") {
|
||||
cleanupPoll(key);
|
||||
return;
|
||||
}
|
||||
|
||||
attemptsRef.current++;
|
||||
const timeoutReason = isTaskLocallyTimedOut({
|
||||
startedAt: current.createdAt || item.createdAt || Date.now(),
|
||||
lastProgressAt,
|
||||
progress: lastProgress,
|
||||
policy: timeoutPolicy,
|
||||
});
|
||||
if (timeoutReason || attemptsRef.current > MAX_POLL_ATTEMPTS) {
|
||||
const error = buildLocalTimeoutMessage(kind);
|
||||
useGenerationStore.getState().updateTask(item.id, {
|
||||
status: "failed",
|
||||
error,
|
||||
});
|
||||
notifyCallbacks({ ...item, status: "failed", error });
|
||||
cleanupPoll(key);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const status = await aiGenerationClient.getTaskStatus(current.taskId || item.taskId || "");
|
||||
const nextProgress = Number(status.progress || 0);
|
||||
if (nextProgress > lastProgress || status.status === "completed") {
|
||||
lastProgress = Math.max(lastProgress, nextProgress);
|
||||
lastProgressAt = Date.now();
|
||||
}
|
||||
|
||||
const patch: Partial<GenerationQueueItem> = {
|
||||
progress: status.progress,
|
||||
resultUrl: status.resultUrl || current.resultUrl,
|
||||
error: status.error || current.error,
|
||||
};
|
||||
|
||||
if (status.status === "completed") {
|
||||
patch.status = "completed";
|
||||
useGenerationStore.getState().updateTask(item.id, patch);
|
||||
notifyCallbacks({ ...item, ...patch, status: "completed" });
|
||||
cleanupPoll(key);
|
||||
} else if (status.status === "failed" || status.status === "cancelled") {
|
||||
patch.status = "failed";
|
||||
patch.error = buildTaskFailureInfo(status.error).message;
|
||||
useGenerationStore.getState().updateTask(item.id, patch);
|
||||
notifyCallbacks({ ...item, ...patch, status: "failed" });
|
||||
cleanupPoll(key);
|
||||
} else {
|
||||
patch.status = "running";
|
||||
useGenerationStore.getState().updateTask(item.id, patch);
|
||||
notifyCallbacks({ ...item, ...patch, status: "running" });
|
||||
}
|
||||
} catch {
|
||||
// Network errors during polling are retried until the lifecycle guard trips.
|
||||
}
|
||||
}, POLL_INTERVAL);
|
||||
|
||||
activePollers.set(key, interval);
|
||||
function updateTaskAndNotify(id: string, patch: Partial<GenerationQueueItem>): GenerationQueueItem | null {
|
||||
const current = useGenerationStore.getState().queue.find((i) => i.id === id);
|
||||
if (!current) return null;
|
||||
const next = { ...current, ...patch };
|
||||
useGenerationStore.getState().updateTask(id, patch);
|
||||
notifyCallbacks(next);
|
||||
return next;
|
||||
}
|
||||
|
||||
function cleanupPoll(key: string): void {
|
||||
const interval = activePollers.get(key);
|
||||
if (interval) {
|
||||
clearInterval(interval);
|
||||
activePollers.delete(key);
|
||||
}
|
||||
function isTerminalStatus(status: GenerationQueueItem["status"]): boolean {
|
||||
return status === "completed" || status === "failed" || status === "cancelled";
|
||||
}
|
||||
|
||||
function pollTask(item: GenerationQueueItem): void {
|
||||
const key = `poll-${item.id}`;
|
||||
if (activePollers.has(key) || !item.taskId) return;
|
||||
|
||||
const kind = getQueueItemKind(item);
|
||||
const abortRef = { current: false };
|
||||
activePollers.set(key, abortRef);
|
||||
|
||||
const applyProgress = (event: TaskProgressEvent) => {
|
||||
const current = useGenerationStore.getState().queue.find((i) => i.id === item.id);
|
||||
if (!current || isTerminalStatus(current.status)) {
|
||||
abortRef.current = true;
|
||||
return;
|
||||
}
|
||||
|
||||
const patch: Partial<GenerationQueueItem> = {
|
||||
progress: Number(event.progress || 0),
|
||||
resultUrl: event.resultUrl || current.resultUrl,
|
||||
error: event.error || current.error,
|
||||
};
|
||||
|
||||
if (event.status === "completed") {
|
||||
patch.status = "completed";
|
||||
patch.progress = 100;
|
||||
} else if (event.status === "failed" || event.status === "cancelled") {
|
||||
patch.status = "failed";
|
||||
patch.error = buildTaskFailureInfo(event.error).message;
|
||||
} else {
|
||||
patch.status = "running";
|
||||
}
|
||||
|
||||
updateTaskAndNotify(item.id, patch);
|
||||
};
|
||||
|
||||
void waitForTask(item.taskId, {
|
||||
kind,
|
||||
model: getQueueItemModel(item),
|
||||
startedAt: item.createdAt || Date.now(),
|
||||
abortRef,
|
||||
onProgress: applyProgress,
|
||||
})
|
||||
.then((resultUrl) => {
|
||||
if (abortRef.current) return;
|
||||
const current = useGenerationStore.getState().queue.find((i) => i.id === item.id);
|
||||
if (!current || isTerminalStatus(current.status)) return;
|
||||
updateTaskAndNotify(item.id, {
|
||||
status: "completed",
|
||||
progress: 100,
|
||||
resultUrl: resultUrl || current.resultUrl,
|
||||
});
|
||||
})
|
||||
.catch((error) => {
|
||||
if (abortRef.current) return;
|
||||
const failure = buildTaskFailureInfo(error instanceof Error ? error.message : String(error));
|
||||
updateTaskAndNotify(item.id, {
|
||||
status: "failed",
|
||||
error: failure.message,
|
||||
});
|
||||
})
|
||||
.finally(() => {
|
||||
cleanupPoll(key, abortRef);
|
||||
});
|
||||
}
|
||||
|
||||
function cleanupPoll(key: string, abortRef: { current: boolean }): void {
|
||||
if (activePollers.get(key) !== abortRef) return;
|
||||
activePollers.delete(key);
|
||||
}
|
||||
|
||||
export function startBackgroundPolling(): void {
|
||||
const tasks = useGenerationStore.getState().getRunningTasks();
|
||||
const attemptsMap = new Map<string, { current: number }>();
|
||||
|
||||
tasks.forEach((task) => {
|
||||
if (task.taskId) {
|
||||
if (!attemptsMap.has(task.id)) {
|
||||
attemptsMap.set(task.id, { current: 0 });
|
||||
}
|
||||
pollTask(task, attemptsMap.get(task.id)!);
|
||||
pollTask(task);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export function resumeTaskPolling(taskId: string, storeId: string): void {
|
||||
const task = useGenerationStore.getState().queue.find((i) => i.id === storeId);
|
||||
if (task && task.status !== "completed" && task.status !== "failed") {
|
||||
pollTask(task, { current: 0 });
|
||||
if (task && !isTerminalStatus(task.status)) {
|
||||
pollTask({ ...task, taskId });
|
||||
}
|
||||
}
|
||||
|
||||
export function stopAllPolling(): void {
|
||||
activePollers.forEach((interval) => clearInterval(interval));
|
||||
activePollers.forEach((abortRef) => {
|
||||
abortRef.current = true;
|
||||
});
|
||||
activePollers.clear();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user