Files
omniai-server/src/dbSetup.js
T
2026-06-04 18:58:45 +08:00

1128 lines
42 KiB
JavaScript

const bcrypt = require("bcryptjs");
const crypto = require("node:crypto");
const { pool, withTransaction } = require("./db");
const { DEFAULT_MODEL_PRICES } = require("./pricing");
const { getDefaultAdminPassword } = require("./securityConfig");
const {
ENTERPRISE_BETA_ACCOUNTS,
ENTERPRISE_BETA_INITIAL_BALANCE_CENTS,
createEnterpriseBetaPasswordMap,
normalizeEnterpriseInviteCode,
} = require("./enterpriseBetaAccounts");
async function ensureMigrationTable() {
await pool.query(`
CREATE TABLE IF NOT EXISTS schema_migrations (
id TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`);
}
async function hasMigration(id) {
const { rows } = await pool.query("SELECT 1 FROM schema_migrations WHERE id = $1", [id]);
return rows.length > 0;
}
async function recordMigration(client, id) {
await client.query("INSERT INTO schema_migrations (id) VALUES ($1)", [id]);
}
async function getColumnNames(tableName) {
const { rows } = await pool.query(
"SELECT column_name FROM information_schema.columns WHERE table_name = $1",
[tableName],
);
return rows.map((r) => r.column_name);
}
async function hasColumn(tableName, columnName) {
const columns = await getColumnNames(tableName);
return columns.includes(columnName);
}
async function addColumnIfMissing(tableName, columnDefinition) {
const [columnName] = columnDefinition.trim().split(/\s+/);
if (!(await hasColumn(tableName, columnName))) {
await pool.query(`ALTER TABLE ${tableName} ADD COLUMN ${columnDefinition}`);
}
}
async function runMigration(id, migrate) {
if (await hasMigration(id)) return;
await withTransaction(async (client) => {
await migrate(client);
await recordMigration(client, id);
});
}
async function createBaseSchema(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
avatar_url TEXT,
bio TEXT,
profile_background_url TEXT,
email TEXT,
email_verified INTEGER NOT NULL DEFAULT 0,
phone TEXT,
wechat_openid TEXT,
wechat_unionid TEXT,
auth_provider TEXT NOT NULL DEFAULT 'password',
current_session_id TEXT,
current_session_started_at TIMESTAMPTZ,
role TEXT NOT NULL DEFAULT 'user',
max_concurrency INTEGER NOT NULL DEFAULT 30,
enabled INTEGER NOT NULL DEFAULT 1,
enterprise_id INTEGER,
is_enterprise_admin INTEGER NOT NULL DEFAULT 0,
balance_cents INTEGER NOT NULL DEFAULT 0,
billing_mode TEXT NOT NULL DEFAULT 'credits',
beta_expires_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS api_keys (
id SERIAL PRIMARY KEY,
provider TEXT NOT NULL,
api_key TEXT NOT NULL,
label TEXT DEFAULT '',
max_concurrency INTEGER NOT NULL DEFAULT 10,
active_count INTEGER NOT NULL DEFAULT 0,
total_used INTEGER NOT NULL DEFAULT 0,
enabled INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS key_leases (
id SERIAL PRIMARY KEY,
key_id INTEGER NOT NULL REFERENCES api_keys(id),
user_id INTEGER NOT NULL REFERENCES users(id),
leased_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
released_at TIMESTAMPTZ,
lease_token TEXT UNIQUE NOT NULL,
estimated_cost_cents INTEGER,
enterprise_id INTEGER,
settled INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS usage_logs (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
provider TEXT NOT NULL,
key_id INTEGER,
action TEXT NOT NULL,
model TEXT,
duration_ms INTEGER,
status TEXT,
enterprise_id INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS config_profiles (
id SERIAL PRIMARY KEY,
name TEXT UNIQUE NOT NULL DEFAULT 'default',
config_json TEXT NOT NULL DEFAULT '{}',
description TEXT DEFAULT '',
updated_by INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS sms_verification_codes (
id SERIAL PRIMARY KEY,
phone TEXT NOT NULL,
purpose TEXT NOT NULL,
code_hash TEXT NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
consumed_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS wechat_login_sessions (
state TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'pending',
user_id INTEGER REFERENCES users(id),
error TEXT,
consumed_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS api_call_logs (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
provider TEXT NOT NULL,
model TEXT,
display_model TEXT,
prompt_tokens INTEGER,
completion_tokens INTEGER,
duration_ms INTEGER,
status TEXT NOT NULL DEFAULT 'success',
cost_estimate REAL,
api_client TEXT,
enterprise_id INTEGER,
enterprise_name TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_api_keys_provider ON api_keys(provider, enabled);
CREATE INDEX IF NOT EXISTS idx_key_leases_active ON key_leases(key_id, released_at);
CREATE INDEX IF NOT EXISTS idx_usage_logs_user ON usage_logs(user_id, created_at);
CREATE INDEX IF NOT EXISTS idx_api_call_logs_user ON api_call_logs(user_id, created_at);
CREATE INDEX IF NOT EXISTS idx_api_call_logs_model ON api_call_logs(model, created_at);
CREATE INDEX IF NOT EXISTS idx_api_call_logs_created ON api_call_logs(created_at);
CREATE INDEX IF NOT EXISTS idx_users_enterprise ON users(enterprise_id, enabled);
CREATE UNIQUE INDEX IF NOT EXISTS idx_users_email_unique ON users(LOWER(email)) WHERE email IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS idx_users_phone_unique ON users(phone) WHERE phone IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS idx_users_wechat_openid_unique ON users(wechat_openid) WHERE wechat_openid IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_sms_verification_codes_lookup ON sms_verification_codes(phone, purpose, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_wechat_login_sessions_status ON wechat_login_sessions(status, expires_at);
CREATE INDEX IF NOT EXISTS idx_usage_logs_enterprise ON usage_logs(enterprise_id, created_at);
CREATE INDEX IF NOT EXISTS idx_api_call_logs_enterprise ON api_call_logs(enterprise_id, created_at);
CREATE INDEX IF NOT EXISTS idx_key_leases_settled ON key_leases(settled, released_at);
`);
}
async function migrateEnterpriseAndPricingSchema(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS enterprises (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
contact_name TEXT,
contact_phone TEXT,
balance_cents INTEGER NOT NULL DEFAULT 0,
tax_id TEXT,
legal_person_name TEXT,
legal_person_phone TEXT,
enterprise_code TEXT UNIQUE,
enabled INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS model_prices (
id SERIAL PRIMARY KEY,
model_key TEXT NOT NULL UNIQUE,
display_name TEXT NOT NULL,
category TEXT NOT NULL DEFAULT 'text',
pricing_type TEXT NOT NULL DEFAULT 'token',
input_price_mills INTEGER,
output_price_mills INTEGER,
flat_price_mills INTEGER,
currency TEXT NOT NULL DEFAULT 'CNY',
enabled INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_enterprises_code ON enterprises(enterprise_code);
CREATE INDEX IF NOT EXISTS idx_model_prices_enabled ON model_prices(enabled, category);
`);
}
async function migrateBillingSchema(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS transactions (
id SERIAL PRIMARY KEY,
enterprise_id INTEGER REFERENCES enterprises(id),
enterprise_name TEXT,
user_id INTEGER,
target_user_id INTEGER,
type TEXT NOT NULL,
amount_cents INTEGER NOT NULL,
balance_after_cents INTEGER NOT NULL,
description TEXT,
payment_order_id TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS packages (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
description TEXT DEFAULT '',
price_cents INTEGER NOT NULL,
credits_cents INTEGER NOT NULL DEFAULT 0,
image_quota INTEGER NOT NULL DEFAULT 0,
video_quota INTEGER NOT NULL DEFAULT 0,
text_quota INTEGER NOT NULL DEFAULT 0,
duration_days INTEGER NOT NULL DEFAULT 365,
enabled INTEGER NOT NULL DEFAULT 1,
sort_order INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS enterprise_packages (
id SERIAL PRIMARY KEY,
enterprise_id INTEGER NOT NULL REFERENCES enterprises(id),
package_id INTEGER NOT NULL REFERENCES packages(id),
remaining_image INTEGER NOT NULL DEFAULT 0,
remaining_video INTEGER NOT NULL DEFAULT 0,
remaining_text INTEGER NOT NULL DEFAULT 0,
expires_at TIMESTAMPTZ NOT NULL,
activated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS payment_orders (
id SERIAL PRIMARY KEY,
order_no TEXT UNIQUE NOT NULL,
enterprise_id INTEGER REFERENCES enterprises(id),
enterprise_name TEXT,
user_id INTEGER,
type TEXT NOT NULL,
amount_cents INTEGER NOT NULL,
package_id INTEGER,
status TEXT NOT NULL DEFAULT 'pending',
payment_method TEXT,
payment_trade_no TEXT,
paid_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS invoices (
id SERIAL PRIMARY KEY,
enterprise_id INTEGER NOT NULL REFERENCES enterprises(id),
enterprise_name TEXT,
payment_order_id INTEGER REFERENCES payment_orders(id),
type TEXT NOT NULL DEFAULT 'general',
title TEXT NOT NULL,
tax_no TEXT,
amount_cents INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
invoice_no TEXT,
invoice_url TEXT,
issued_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_enterprises_enabled ON enterprises(enabled);
CREATE INDEX IF NOT EXISTS idx_transactions_enterprise ON transactions(enterprise_id, created_at);
CREATE INDEX IF NOT EXISTS idx_enterprise_packages_enterprise ON enterprise_packages(enterprise_id, expires_at);
CREATE INDEX IF NOT EXISTS idx_payment_orders_enterprise ON payment_orders(enterprise_id, created_at);
CREATE INDEX IF NOT EXISTS idx_payment_orders_status ON payment_orders(status, created_at);
CREATE INDEX IF NOT EXISTS idx_invoices_enterprise ON invoices(enterprise_id, created_at);
CREATE INDEX IF NOT EXISTS idx_users_balance ON users(balance_cents);
`);
}
async function migrateTransactionsNullableEnterpriseId(client) {
await client.query("ALTER TABLE transactions ALTER COLUMN enterprise_id DROP NOT NULL");
}
async function migrateMultiDeviceSessionsSchema(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS user_sessions (
id TEXT PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
user_agent TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`);
await client.query(
"CREATE INDEX IF NOT EXISTS idx_user_sessions_user_created ON user_sessions(user_id, created_at DESC)",
);
}
async function migrateTaskPollHeartbeat(client) {
await addColumnIfMissing("generation_tasks", "last_poll_at TIMESTAMPTZ");
await client.query(
"UPDATE generation_tasks SET last_poll_at = updated_at WHERE last_poll_at IS NULL",
);
await client.query(
"CREATE INDEX IF NOT EXISTS idx_generation_tasks_poll_heartbeat ON generation_tasks(status, last_poll_at) WHERE status IN ('pending', 'running')",
);
}
async function migrateGenerationTasksUserStatusIndex(client) {
await client.query(
"CREATE INDEX IF NOT EXISTS idx_generation_tasks_user_status_updated ON generation_tasks(user_id, status, updated_at DESC)",
);
}
async function migrateGenerationTasksBillingColumns(client) {
await addColumnIfMissing("generation_tasks", "cost_cents INTEGER NOT NULL DEFAULT 0");
await addColumnIfMissing("generation_tasks", "billing_target TEXT");
await addColumnIfMissing("generation_tasks", "billing_refunded INTEGER NOT NULL DEFAULT 0");
await client.query(
"CREATE INDEX IF NOT EXISTS idx_generation_tasks_billing_refund ON generation_tasks(status, billing_refunded) WHERE billing_refunded = 0 AND cost_cents > 0",
);
}
async function ensureModelPriceSeed() {
const columns = await getColumnNames("model_prices");
const useMills = columns.includes("input_price_mills");
if (!useMills) return;
for (const row of DEFAULT_MODEL_PRICES) {
await pool.query(
`
INSERT INTO model_prices (
model_key, display_name, category, pricing_type,
input_price_mills, output_price_mills, flat_price_mills,
currency, enabled
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (model_key) DO NOTHING
`,
[
row.modelKey,
row.displayName,
row.category,
row.pricingType,
row.inputPriceMills,
row.outputPriceMills,
row.flatPriceMills,
row.currency,
row.enabled ? 1 : 0,
],
);
}
}
async function migrateEnterpriseFields(client) {
await addColumnIfMissing("enterprises", "tax_id TEXT");
await addColumnIfMissing("enterprises", "legal_person_name TEXT");
await addColumnIfMissing("enterprises", "legal_person_phone TEXT");
await addColumnIfMissing("enterprises", "enterprise_code TEXT UNIQUE");
const { rows } = await client.query(
"SELECT 1 FROM pg_indexes WHERE indexname = 'idx_enterprises_code'",
);
if (rows.length === 0) {
await client.query(
"CREATE INDEX IF NOT EXISTS idx_enterprises_code ON enterprises(enterprise_code)",
);
}
}
async function migrateUserBalance(client) {
await addColumnIfMissing("users", "balance_cents INTEGER NOT NULL DEFAULT 0");
await addColumnIfMissing("payment_orders", "user_id INTEGER");
await addColumnIfMissing("transactions", "target_user_id INTEGER");
const { rows } = await client.query(
"SELECT 1 FROM pg_indexes WHERE indexname = 'idx_users_balance'",
);
if (rows.length === 0) {
await client.query("CREATE INDEX IF NOT EXISTS idx_users_balance ON users(balance_cents)");
}
}
async function migrateProjectsTable(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS projects (
id VARCHAR(64) PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
name VARCHAR(200) NOT NULL,
description TEXT,
oss_key VARCHAR(500) NOT NULL,
thumbnail_url VARCHAR(1000),
storyboard_count INTEGER NOT NULL DEFAULT 0,
image_count INTEGER NOT NULL DEFAULT 0,
video_count INTEGER NOT NULL DEFAULT 0,
file_size BIGINT NOT NULL DEFAULT 0,
current_revision INTEGER NOT NULL DEFAULT 1,
current_fingerprint VARCHAR(128),
updated_by_device_id VARCHAR(128),
source_case_id INTEGER,
origin_type VARCHAR(32) NOT NULL DEFAULT 'manual',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS project_revisions (
project_id VARCHAR(64) NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
revision_number INTEGER NOT NULL,
oss_key VARCHAR(500) NOT NULL,
content_fingerprint VARCHAR(128),
source_device_id VARCHAR(128),
save_reason VARCHAR(32),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (project_id, revision_number)
);
CREATE INDEX IF NOT EXISTS idx_projects_user_id ON projects(user_id);
CREATE INDEX IF NOT EXISTS idx_projects_updated_at ON projects(updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_project_revisions_project_created_at ON project_revisions(project_id, created_at DESC);
`);
}
async function migrateProjectRevisionSchema(client) {
await client.query(`
ALTER TABLE projects
ADD COLUMN IF NOT EXISTS current_revision INTEGER NOT NULL DEFAULT 1,
ADD COLUMN IF NOT EXISTS current_fingerprint VARCHAR(128),
ADD COLUMN IF NOT EXISTS updated_by_device_id VARCHAR(128)
`);
await client.query(`
CREATE TABLE IF NOT EXISTS project_revisions (
project_id VARCHAR(64) NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
revision_number INTEGER NOT NULL,
oss_key VARCHAR(500) NOT NULL,
content_fingerprint VARCHAR(128),
source_device_id VARCHAR(128),
save_reason VARCHAR(32),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (project_id, revision_number)
)
`);
await client.query(`
CREATE INDEX IF NOT EXISTS idx_project_revisions_project_created_at
ON project_revisions(project_id, created_at DESC)
`);
await client.query(`
INSERT INTO project_revisions (
project_id,
revision_number,
oss_key,
content_fingerprint,
source_device_id,
save_reason,
created_at
)
SELECT
p.id,
COALESCE(NULLIF(p.current_revision, 0), 1),
p.oss_key,
p.current_fingerprint,
p.updated_by_device_id,
'migration',
COALESCE(p.updated_at, NOW())
FROM projects p
WHERE NOT EXISTS (
SELECT 1
FROM project_revisions pr
WHERE pr.project_id = p.id
AND pr.revision_number = COALESCE(NULLIF(p.current_revision, 0), 1)
)
`);
}
async function migrateGenerationTasksSchema(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS generation_tasks (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
project_id VARCHAR(64) NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
client_queue_id VARCHAR(128) NOT NULL,
type VARCHAR(16) NOT NULL,
status VARCHAR(24) NOT NULL,
provider_task_id VARCHAR(256),
params_json TEXT NOT NULL DEFAULT '{}',
result_url VARCHAR(2000),
progress INTEGER NOT NULL DEFAULT 0,
error TEXT,
dedupe_key VARCHAR(256),
source_device_id VARCHAR(128),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
UNIQUE (project_id, client_queue_id)
);
CREATE INDEX IF NOT EXISTS idx_generation_tasks_user_project_updated
ON generation_tasks(user_id, project_id, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_generation_tasks_provider_task
ON generation_tasks(provider_task_id);
CREATE INDEX IF NOT EXISTS idx_generation_tasks_status
ON generation_tasks(status, updated_at DESC);
`);
}
async function migrateWorkbenchTaskSchema(client) {
await addColumnIfMissing("generation_tasks", "conversation_id INTEGER REFERENCES conversations(id) ON DELETE SET NULL");
const { rows: notNullRows } = await client.query(`
SELECT 1
FROM information_schema.columns
WHERE table_name = 'generation_tasks'
AND column_name = 'project_id'
AND is_nullable = 'NO'
`);
if (notNullRows.length > 0) {
await client.query("ALTER TABLE generation_tasks ALTER COLUMN project_id DROP NOT NULL");
}
await client.query(`
ALTER TABLE generation_tasks DROP CONSTRAINT IF EXISTS generation_tasks_project_id_client_queue_id_key;
DROP INDEX IF EXISTS idx_generation_tasks_user_project_updated;
CREATE INDEX IF NOT EXISTS idx_generation_tasks_user_conversation_updated
ON generation_tasks(user_id, conversation_id, updated_at DESC);
CREATE UNIQUE INDEX IF NOT EXISTS idx_generation_tasks_project_queue_unique
ON generation_tasks(project_id, client_queue_id)
WHERE project_id IS NOT NULL;
`);
}
async function migrateApiCallDisplayModel(_client) {
await addColumnIfMissing("api_call_logs", "display_model TEXT");
}
async function migrateExternalAuthSchema(client) {
await client.query(`
ALTER TABLE users
ADD COLUMN IF NOT EXISTS avatar_url TEXT,
ADD COLUMN IF NOT EXISTS phone TEXT,
ADD COLUMN IF NOT EXISTS wechat_openid TEXT,
ADD COLUMN IF NOT EXISTS wechat_unionid TEXT,
ADD COLUMN IF NOT EXISTS auth_provider TEXT NOT NULL DEFAULT 'password'
`);
await client.query(`
CREATE TABLE IF NOT EXISTS sms_verification_codes (
id SERIAL PRIMARY KEY,
phone TEXT NOT NULL,
purpose TEXT NOT NULL,
code_hash TEXT NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
consumed_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`);
await client.query(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_users_phone_unique ON users(phone) WHERE phone IS NOT NULL",
);
await client.query(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_users_wechat_openid_unique ON users(wechat_openid) WHERE wechat_openid IS NOT NULL",
);
await client.query(
"CREATE INDEX IF NOT EXISTS idx_sms_verification_codes_lookup ON sms_verification_codes(phone, purpose, created_at DESC)",
);
}
async function migrateUserAvatarSchema(_client) {
await addColumnIfMissing("users", "avatar_url TEXT");
}
async function migrateUserProfileAndEmailSchema(client) {
await client.query(`
ALTER TABLE users
ADD COLUMN IF NOT EXISTS bio TEXT,
ADD COLUMN IF NOT EXISTS profile_background_url TEXT,
ADD COLUMN IF NOT EXISTS email TEXT,
ADD COLUMN IF NOT EXISTS email_verified INTEGER NOT NULL DEFAULT 0
`);
await client.query(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_users_email_unique ON users(LOWER(email)) WHERE email IS NOT NULL",
);
}
async function migrateSingleDeviceSessionSchema(client) {
await client.query(`
ALTER TABLE users
ADD COLUMN IF NOT EXISTS current_session_id TEXT,
ADD COLUMN IF NOT EXISTS current_session_started_at TIMESTAMPTZ
`);
}
async function migrateCommunitySchema(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS community_cases (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
project_id VARCHAR(64) REFERENCES projects(id) ON DELETE SET NULL,
title VARCHAR(200) NOT NULL,
description TEXT,
cover_url VARCHAR(1000),
tags_json TEXT NOT NULL DEFAULT '[]',
metadata_json TEXT NOT NULL DEFAULT '{}',
status VARCHAR(24) NOT NULL DEFAULT 'pending',
review_note TEXT,
reviewed_by INTEGER REFERENCES users(id) ON DELETE SET NULL,
reviewed_at TIMESTAMPTZ,
published_at TIMESTAMPTZ,
copy_count INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS community_case_assets (
id SERIAL PRIMARY KEY,
case_id INTEGER NOT NULL REFERENCES community_cases(id) ON DELETE CASCADE,
asset_type VARCHAR(32) NOT NULL,
title VARCHAR(200),
url VARCHAR(1000),
oss_key VARCHAR(500),
metadata_json TEXT NOT NULL DEFAULT '{}',
sort_order INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS community_case_copies (
id SERIAL PRIMARY KEY,
case_id INTEGER NOT NULL REFERENCES community_cases(id) ON DELETE CASCADE,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
project_id VARCHAR(64) REFERENCES projects(id) ON DELETE SET NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_community_cases_status_updated
ON community_cases(status, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_community_cases_user_updated
ON community_cases(user_id, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_community_case_assets_case_order
ON community_case_assets(case_id, sort_order, id);
CREATE INDEX IF NOT EXISTS idx_community_case_copies_user_created
ON community_case_copies(user_id, created_at DESC);
`);
}
async function migrateCommunityReviewColumns(_client) {
await addColumnIfMissing("community_cases", "review_note TEXT");
await addColumnIfMissing("community_cases", "reviewed_by INTEGER REFERENCES users(id) ON DELETE SET NULL");
await addColumnIfMissing("community_cases", "reviewed_at TIMESTAMPTZ");
await addColumnIfMissing("community_cases", "published_at TIMESTAMPTZ");
await addColumnIfMissing("community_cases", "copy_count INTEGER NOT NULL DEFAULT 0");
await pool.query(`
CREATE TABLE IF NOT EXISTS community_case_reactions (
id SERIAL PRIMARY KEY,
case_id INTEGER NOT NULL REFERENCES community_cases(id) ON DELETE CASCADE,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
reaction_type VARCHAR(24) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(case_id, user_id, reaction_type)
);
CREATE INDEX IF NOT EXISTS idx_community_case_reactions_case_type
ON community_case_reactions(case_id, reaction_type);
CREATE INDEX IF NOT EXISTS idx_community_case_reactions_user
ON community_case_reactions(user_id, created_at DESC);
`);
}
async function migrateGenerationTasksProjectQueueIndex(client) {
await client.query(`
DELETE FROM generation_tasks older
USING generation_tasks newer
WHERE older.project_id IS NOT NULL
AND older.project_id = newer.project_id
AND older.client_queue_id = newer.client_queue_id
AND older.id < newer.id;
CREATE UNIQUE INDEX IF NOT EXISTS idx_generation_tasks_project_queue_unique
ON generation_tasks(project_id, client_queue_id)
WHERE project_id IS NOT NULL;
`);
}
async function migrateUserBillingMode(_client) {
await addColumnIfMissing("users", "billing_mode TEXT NOT NULL DEFAULT 'credits'");
await addColumnIfMissing("users", "beta_expires_at TIMESTAMPTZ");
}
async function migrateProjectOriginSchema(_client) {
await addColumnIfMissing("projects", "source_case_id INTEGER");
await addColumnIfMissing("projects", "origin_type VARCHAR(32) NOT NULL DEFAULT 'manual'");
}
async function migrateWechatLoginSessions(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS wechat_login_sessions (
state TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'pending',
user_id INTEGER REFERENCES users(id),
error TEXT,
consumed_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`);
await client.query(
"CREATE INDEX IF NOT EXISTS idx_wechat_login_sessions_status ON wechat_login_sessions(status, expires_at)",
);
}
async function migrateConversationsSchema(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS conversations (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
title VARCHAR(200) NOT NULL DEFAULT '新对话',
mode VARCHAR(20) NOT NULL DEFAULT 'chat',
messages_json TEXT NOT NULL DEFAULT '[]',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`);
await client.query("CREATE INDEX IF NOT EXISTS idx_conversations_user_id ON conversations(user_id)");
await client.query("CREATE INDEX IF NOT EXISTS idx_conversations_updated ON conversations(updated_at DESC)");
}
async function migrateReportsSchema(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS user_reports (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
report_type VARCHAR(64) NOT NULL DEFAULT 'other',
target_type VARCHAR(64),
target_id VARCHAR(128),
contact_name VARCHAR(120),
contact_email VARCHAR(200),
contact_phone VARCHAR(60),
title VARCHAR(200) NOT NULL,
description TEXT NOT NULL,
page_url TEXT,
status VARCHAR(32) NOT NULL DEFAULT 'pending',
ip_address VARCHAR(64),
user_agent TEXT,
handled_by INTEGER REFERENCES users(id) ON DELETE SET NULL,
handled_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`);
await client.query("CREATE INDEX IF NOT EXISTS idx_user_reports_status_created ON user_reports(status, created_at DESC)");
await client.query("CREATE INDEX IF NOT EXISTS idx_user_reports_user_created ON user_reports(user_id, created_at DESC)");
}
async function migrateWebPdrSchema(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS web_assets (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
type VARCHAR(32) NOT NULL DEFAULT 'asset',
name VARCHAR(200) NOT NULL,
description TEXT,
url VARCHAR(1000),
oss_key VARCHAR(500),
tags_json TEXT NOT NULL DEFAULT '[]',
status VARCHAR(32) NOT NULL DEFAULT 'ready',
source_task_id INTEGER REFERENCES generation_tasks(id) ON DELETE SET NULL,
source_project_id VARCHAR(64) REFERENCES projects(id) ON DELETE SET NULL,
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS web_notifications (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
type VARCHAR(64) NOT NULL,
title VARCHAR(200) NOT NULL,
description TEXT,
target_type VARCHAR(64),
target_id VARCHAR(128),
metadata_json TEXT NOT NULL DEFAULT '{}',
read_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS web_drafts (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
scope VARCHAR(64) NOT NULL,
target_id VARCHAR(128) NOT NULL DEFAULT 'default',
payload_json TEXT NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(user_id, scope, target_id)
);
CREATE TABLE IF NOT EXISTS community_case_reactions (
id SERIAL PRIMARY KEY,
case_id INTEGER NOT NULL REFERENCES community_cases(id) ON DELETE CASCADE,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
reaction_type VARCHAR(24) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(case_id, user_id, reaction_type)
);
CREATE INDEX IF NOT EXISTS idx_web_assets_user_updated
ON web_assets(user_id, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_web_assets_user_type
ON web_assets(user_id, type, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_web_assets_source_project
ON web_assets(source_project_id, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_web_notifications_user_created
ON web_notifications(user_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_web_notifications_user_unread
ON web_notifications(user_id, created_at DESC)
WHERE read_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_web_drafts_user_scope
ON web_drafts(user_id, scope, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_community_case_reactions_case_type
ON community_case_reactions(case_id, reaction_type);
CREATE INDEX IF NOT EXISTS idx_community_case_reactions_user
ON community_case_reactions(user_id, created_at DESC);
`);
}
async function migrateBetaInviteCodeUsageSchema(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS beta_invite_code_uses (
code TEXT PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
used_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_beta_invite_code_uses_user
ON beta_invite_code_uses(user_id, used_at DESC);
`);
}
async function migrateEnterpriseBetaSupportSchema(client) {
await client.query(`
ALTER TABLE enterprises
ADD COLUMN IF NOT EXISTS admin_user_id INTEGER;
CREATE TABLE IF NOT EXISTS enterprise_members (
id SERIAL PRIMARY KEY,
enterprise_id INTEGER NOT NULL REFERENCES enterprises(id) ON DELETE CASCADE,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role TEXT NOT NULL DEFAULT 'member',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(enterprise_id, user_id)
);
CREATE TABLE IF NOT EXISTS enterprise_invites (
id SERIAL PRIMARY KEY,
enterprise_id INTEGER NOT NULL REFERENCES enterprises(id) ON DELETE CASCADE,
code_hash TEXT NOT NULL UNIQUE,
code_label TEXT,
status TEXT NOT NULL DEFAULT 'active',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
used_at TIMESTAMPTZ
);
CREATE TABLE IF NOT EXISTS credit_ledger (
id SERIAL PRIMARY KEY,
enterprise_id INTEGER REFERENCES enterprises(id) ON DELETE SET NULL,
user_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
task_id INTEGER,
model TEXT,
task_type TEXT NOT NULL,
resolution TEXT,
duration_seconds INTEGER,
rate_cents_per_second INTEGER,
amount_cents INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'reserved',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_enterprise_members_enterprise
ON enterprise_members(enterprise_id, role);
CREATE INDEX IF NOT EXISTS idx_enterprise_members_user
ON enterprise_members(user_id);
CREATE INDEX IF NOT EXISTS idx_enterprise_invites_enterprise
ON enterprise_invites(enterprise_id, status);
CREATE INDEX IF NOT EXISTS idx_credit_ledger_enterprise
ON credit_ledger(enterprise_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_credit_ledger_user
ON credit_ledger(user_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_credit_ledger_task
ON credit_ledger(task_id);
`);
}
async function ensureSchema() {
await ensureMigrationTable();
await runMigration("001_base_schema", createBaseSchema);
await runMigration("002_enterprise_and_pricing_schema", migrateEnterpriseAndPricingSchema);
await runMigration("003_billing_schema", migrateBillingSchema);
await runMigration("005_enterprise_fields", migrateEnterpriseFields);
await runMigration("006_user_balance", migrateUserBalance);
await runMigration("007_projects_table", migrateProjectsTable);
await runMigration("008_project_revision_schema", migrateProjectRevisionSchema);
await runMigration("009_api_call_display_model", migrateApiCallDisplayModel);
await runMigration("010_external_auth_schema", migrateExternalAuthSchema);
await runMigration("011_wechat_login_sessions", migrateWechatLoginSessions);
await runMigration("012_generation_tasks_schema", migrateGenerationTasksSchema);
await runMigration("013_user_avatar_schema", migrateUserAvatarSchema);
await runMigration("014_community_schema", migrateCommunitySchema);
await runMigration("015_user_billing_mode", migrateUserBillingMode);
await runMigration("016_project_origin_schema", migrateProjectOriginSchema);
await runMigration("017_conversations_schema", migrateConversationsSchema);
await runMigration("018_user_reports_schema", migrateReportsSchema);
await runMigration("019_workbench_task_schema", migrateWorkbenchTaskSchema);
await runMigration("020_user_profile_and_email_schema", migrateUserProfileAndEmailSchema);
await runMigration("021_single_device_session_schema", migrateSingleDeviceSessionSchema);
await runMigration("022_web_pdr_schema", migrateWebPdrSchema);
await runMigration("023_community_review_columns", migrateCommunityReviewColumns);
await runMigration("024_generation_tasks_project_queue_index", migrateGenerationTasksProjectQueueIndex);
await runMigration("025_beta_invite_code_usage", migrateBetaInviteCodeUsageSchema);
await runMigration("026_enterprise_beta_support", migrateEnterpriseBetaSupportSchema);
await runMigration("027_transactions_nullable_enterprise_id", migrateTransactionsNullableEnterpriseId);
await runMigration("028_multi_device_sessions", migrateMultiDeviceSessionsSchema);
await runMigration("029_task_poll_heartbeat", migrateTaskPollHeartbeat);
await runMigration("030_generation_tasks_user_status_index", migrateGenerationTasksUserStatusIndex);
await runMigration("031_generation_tasks_billing_columns", migrateGenerationTasksBillingColumns);
await runMigration("032_ecommerce_video_history", migrateEcommerceVideoHistorySchema);
await ensureModelPriceSeed();
}
async function ensureDefaultAdmin(password) {
const { rows } = await pool.query("SELECT id FROM users WHERE username = $1", ["admin"]);
if (rows.length > 0) return false;
const resolvedPassword = getDefaultAdminPassword(password);
const hash = bcrypt.hashSync(resolvedPassword, 10);
await pool.query(
`
INSERT INTO users (username, password_hash, role, max_concurrency, enterprise_id, is_enterprise_admin, balance_cents)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`,
["admin", hash, "admin", 100, null, 0, 0],
);
return true;
}
function hashEnterpriseInviteCode(code) {
return crypto.createHash("sha256").update(normalizeEnterpriseInviteCode(code)).digest("hex");
}
async function ensureEnterpriseBetaAccounts(env = process.env) {
const passwordMap = createEnterpriseBetaPasswordMap(env);
const result = {
createdAdmins: [],
updatedAdmins: [],
skippedAdmins: [],
enterprises: [],
};
await withTransaction(async (client) => {
for (const account of ENTERPRISE_BETA_ACCOUNTS) {
const password = passwordMap.get(account.adminUsername);
if (!password) {
result.skippedAdmins.push({
username: account.adminUsername,
reason: "missing_password",
});
continue;
}
if (password.length < 6) {
throw new Error(`${account.adminUsername} password must be at least 6 characters`);
}
const hash = bcrypt.hashSync(password, 10);
const enterpriseRows = await client.query(
`
INSERT INTO enterprises (name, enterprise_code, balance_cents, enabled)
VALUES ($1, $2, $3, 1)
ON CONFLICT (enterprise_code) DO UPDATE
SET name = EXCLUDED.name,
balance_cents = CASE
WHEN enterprises.balance_cents <= 0 THEN EXCLUDED.balance_cents
ELSE enterprises.balance_cents
END,
enabled = 1,
updated_at = NOW()
RETURNING id, balance_cents
`,
[account.enterpriseName, account.enterpriseId, ENTERPRISE_BETA_INITIAL_BALANCE_CENTS],
);
const enterprise = enterpriseRows.rows[0];
const existingAdmin = await client.query("SELECT id FROM users WHERE username = $1", [
account.adminUsername,
]);
const adminRows = await client.query(
existingAdmin.rows.length > 0
? `
UPDATE users
SET password_hash = $2,
role = 'user',
max_concurrency = 30,
enterprise_id = $3,
is_enterprise_admin = 1,
balance_cents = 0,
enabled = 1,
updated_at = NOW()
WHERE username = $1
RETURNING id
`
: `
INSERT INTO users (username, password_hash, role, max_concurrency, enterprise_id, is_enterprise_admin, balance_cents)
VALUES ($1, $2, 'user', 30, $3, 1, 0)
RETURNING id
`,
[account.adminUsername, hash, enterprise.id],
);
const adminUserId = adminRows.rows[0].id;
await client.query("UPDATE enterprises SET admin_user_id = $1, updated_at = NOW() WHERE id = $2", [
adminUserId,
enterprise.id,
]);
await client.query(
`
INSERT INTO enterprise_members (enterprise_id, user_id, role)
VALUES ($1, $2, 'admin')
ON CONFLICT (enterprise_id, user_id) DO UPDATE SET role = 'admin'
`,
[enterprise.id, adminUserId],
);
await client.query(
`
INSERT INTO enterprise_invites (enterprise_id, code_hash, code_label, status)
VALUES ($1, $2, $3, 'active')
ON CONFLICT (code_hash) DO UPDATE
SET enterprise_id = EXCLUDED.enterprise_id,
code_label = EXCLUDED.code_label,
status = 'active'
`,
[enterprise.id, hashEnterpriseInviteCode(account.inviteCode), account.inviteCode],
);
if (existingAdmin.rows.length > 0) {
result.updatedAdmins.push(account.adminUsername);
} else {
result.createdAdmins.push(account.adminUsername);
}
result.enterprises.push({
enterpriseCode: account.enterpriseId,
adminUsername: account.adminUsername,
balanceCents: Number(enterprise.balance_cents),
});
}
});
return result;
}
async function ensureDatabase(password) {
await ensureSchema();
const createdDefaultAdmin = await ensureDefaultAdmin(password);
const enterpriseBetaAccounts = await ensureEnterpriseBetaAccounts();
return { createdDefaultAdmin, enterpriseBetaAccounts };
}
module.exports = {
ensureSchema,
ensureDefaultAdmin,
ensureEnterpriseBetaAccounts,
ensureDatabase,
hasColumn,
addColumnIfMissing,
};
async function migrateEcommerceVideoHistorySchema(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS ecommerce_video_history (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
title VARCHAR(200) NOT NULL DEFAULT '',
config_json TEXT NOT NULL DEFAULT '{}',
plan_json TEXT NOT NULL DEFAULT '{}',
scenes_json TEXT NOT NULL DEFAULT '[]',
source_image_urls TEXT NOT NULL DEFAULT '[]',
status VARCHAR(32) NOT NULL DEFAULT 'completed',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_ecommerce_video_history_user
ON ecommerce_video_history(user_id, created_at DESC);
`);
}