feat: MLOps external services, AI stack planning, admin MLOps hub

Infrastructure:
- Add `mlops` compose profile: MLflow (basic-auth, /mlflow path) + Airflow (LocalExecutor, /airflow path) + airflow-db
- infra/mlflow/basic_auth.ini for MLflow auth config
- Caddy routes /mlflow* and /airflow* inside existing o.alogins.net block (see agap_git)
- Dockerfile.admin: NEXT_PUBLIC_MLFLOW_URL / NEXT_PUBLIC_AIRFLOW_URL build args (default /mlflow, /airflow)

Admin panel:
- /admin/models: replace MLflow iframe with external link cards
- /admin/experiments: replace LinUCB stats with MLOps hub (links to MLflow experiments/models + Airflow DAGs/datasets)
- AdminShell: external nav links for MLflow ↗ and Airflow ↗ under MLOps section

Docs & planning:
- README: new AI stack section (Ollama/LiteLLM/OpenWebUI three-tier, tip generation pipeline, model aliases)
- README: Phase 2 expanded with AI infra issues (#86-#93) and granular pipeline breakdown
- README: Phase 4 expanded with LLM MLOps items (#94-#97)
- CLAUDE.md: AI stack section, updated current phase (M1 shipped / M2 in progress), compose profiles, updated What NOT to do
- docs/architecture/overview.md: AI stack section, updated decision flow diagram for Phase 2 LLM pipeline
- ADR-0006: updated to reflect external services (path-based, not embedded)
- Gitea issues #86-#97 created (M2: AI infra + pipeline; M4: LLM MLOps)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-17 08:20:44 +00:00
parent faf44c18fc
commit 85367aeaa0
25 changed files with 695 additions and 222 deletions

View File

@@ -14,7 +14,7 @@ function optional(name: string, fallback: string): string {
}
export const config = {
PORT: parseInt(optional('PORT', '3078'), 10),
PORT: parseInt(optional('PORT', '3001'), 10),
NODE_ENV: optional('NODE_ENV', 'development'),
DATABASE_PATH: optional('DATABASE_PATH', './data/oo.db'),

View File

@@ -22,12 +22,27 @@ export type TipServedEvent = {
export type TipFeedbackEvent = {
userId: string;
tipId: string;
action: 'done' | 'dismiss' | 'snooze';
action: 'done' | 'dismiss' | 'snooze' | 'helpful' | 'not_helpful';
reward: number; // inferred from action + dwellMs (see inferReward in recommender.ts)
dwellMs: number | null;
createdAt: string;
};
export type IntegrationTokenExpiredEvent = {
userId: string;
provider: string;
detectedAt: string;
};
export type RewardDeliveryFailedEvent = {
userId: string;
tipId: string;
reward: number;
attempts: number;
error: string;
failedAt: string;
};
export type TaskSyncedEvent = {
userId: string;
count: number;
@@ -37,7 +52,9 @@ export type TaskSyncedEvent = {
type EventMap = {
'signals.tip.served': TipServedEvent;
'signals.tip.feedback': TipFeedbackEvent;
'signals.tip.reward_failed': RewardDeliveryFailedEvent;
'signals.task.synced': TaskSyncedEvent;
'signals.integration.token_expired': IntegrationTokenExpiredEvent;
};
export type StoredEvent = {

View File

@@ -3,7 +3,9 @@ import express from 'express';
import cookieParser from 'cookie-parser';
import cors from 'cors';
import { config } from './config.js';
import { runMigrations } from './db/index.js';
import { db, runMigrations } from './db/index.js';
import { tipScores, tipFeedback } from './db/schema.js';
import { lt } from 'drizzle-orm';
import { sessionMiddleware } from './middleware/session.js';
import { authRouter } from './routes/auth.js';
import { integrationsRouter } from './routes/integrations.js';
@@ -20,6 +22,15 @@ import type { Request, Response } from 'express';
await mkdir(dirname(config.DATABASE_PATH), { recursive: true });
runMigrations();
// Keep the API alive on stray async faults (e.g. a single bad admin route)
// rather than dropping the whole process.
process.on('unhandledRejection', (reason) => {
console.error('[api] unhandledRejection', reason);
});
process.on('uncaughtException', (err) => {
console.error('[api] uncaughtException', err);
});
const app = express();
app.use(
@@ -61,6 +72,19 @@ app.use('/api/ml', requireAuth as any, requireAdmin as any, async (req: Request,
}
});
async function purgeExpiredData() {
const cutoff = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000).toISOString();
try {
await db.delete(tipScores).where(lt(tipScores.servedAt, cutoff));
await db.delete(tipFeedback).where(lt(tipFeedback.createdAt, cutoff));
} catch (err: any) {
console.error(`[purge] retention cleanup failed: ${err.message}`);
}
}
purgeExpiredData();
setInterval(purgeExpiredData, 24 * 60 * 60 * 1000);
app.listen(config.PORT, () => {
console.log(`oO API listening on http://localhost:${config.PORT}`);
});

View File

@@ -368,7 +368,7 @@ router.get('/reward-analytics', async (req: AuthenticatedRequest, res: Response)
.select({
action: tipFeedback.action,
count: sql<number>`count(*)`,
avgHour: sql<number>`avg(json_extract(ts.features_json, '$.hour_of_day'))`,
avgHour: sql<number>`avg(json_extract(${tipScores.featuresJson}, '$.hour_of_day'))`,
})
.from(tipFeedback)
.leftJoin(tipScores, eq(tipFeedback.tipId, tipScores.tipId))
@@ -683,6 +683,18 @@ router.post('/simulate/start', async (req: AuthenticatedRequest, res: Response)
_simProcesses.set(id, { pid: child.pid, startedAt: now });
}
// Without this listener, a spawn failure (ENOENT when python3 is absent
// — e.g. in the alpine api container) would emit an unhandled 'error' event
// and crash the whole API process.
child.on('error', async (err) => {
console.error('[sim] spawn error', err);
_simProcesses.delete(id);
await db
.update(simRuns)
.set({ status: 'failed', finishedAt: new Date().toISOString() })
.where(eq(simRuns.id, id));
});
// Capture stderr for debugging
const stderrLines: string[] = [];
child.stderr?.on('data', (d: Buffer) => stderrLines.push(d.toString()));

View File

@@ -65,7 +65,17 @@ async function fetchTodoistTasks(userId: string, accessToken: string): Promise<C
headers: { Authorization: `Bearer ${accessToken}` },
});
if (!res.ok) return cached?.tasks ?? [];
if (!res.ok) {
if (res.status === 401) {
console.error(`[todoist] token expired for user ${userId}`);
bus.publish('signals.integration.token_expired', {
userId,
provider: 'todoist',
detectedAt: new Date().toISOString(),
});
}
return cached?.tasks ?? [];
}
const body = (await res.json()) as {
results: Array<{
@@ -230,18 +240,20 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
// ---------------------------------------------------------------------------
// Reward inference from action + dwell time
//
// Feedback is now 3 signals only: done / snooze / dismiss.
// "Helpfulness" is inferred from how long the user took to act on a tip:
// dismiss → -1.0 (clear rejection)
// snooze → +0.1 (tip noticed, timing off — mild positive)
// helpful → +0.5 (explicit positive signal)
// not_helpful → -0.5 (explicit negative signal)
// done < 15 s → -0.3 (almost certainly a stale task, not magic)
// done 15 s 2 min → +1.0 (magic zone: user saw tip and acted)
// done 2 10 min → +0.6 (good: user engaged, acted in same session)
// done > 10 min → +0.3 (eventually done; tip may have helped, unclear)
// ---------------------------------------------------------------------------
function inferReward(action: string, dwellMs: number | null): number {
if (action === 'dismiss') return -1.0;
if (action === 'snooze') return 0.1;
if (action === 'dismiss') return -1.0;
if (action === 'snooze') return 0.1;
if (action === 'helpful') return 0.5;
if (action === 'not_helpful') return -0.5;
// done — use dwell time
if (dwellMs === null || dwellMs < 0) return 0.5; // unknown dwell: neutral positive
if (dwellMs < 15_000) return -0.3; // stale / reflex
@@ -250,6 +262,51 @@ function inferReward(action: string, dwellMs: number | null): number {
return 0.3; // eventually
}
// ---------------------------------------------------------------------------
// Reward delivery with retry (bug #75 — was fire-and-forget)
// ---------------------------------------------------------------------------
async function sendRewardWithRetry(
userId: string,
tipId: string,
reward: number,
features: TaskFeatures,
): Promise<void> {
const body = JSON.stringify({
user_id: userId,
tip_id: tipId,
reward,
features,
day_of_week: new Date().getDay(),
});
for (let attempt = 1; attempt <= 3; attempt++) {
try {
const res = await fetch(`${config.ML_SERVING_URL}/reward/egreedy`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,
signal: AbortSignal.timeout(3000),
});
if (res.ok) return;
throw new Error(`HTTP ${res.status}`);
} catch (err: any) {
if (attempt === 3) {
console.error(`[reward] failed after 3 attempts for tip ${tipId}: ${err.message}`);
bus.publish('signals.tip.reward_failed', {
userId,
tipId,
reward,
attempts: 3,
error: err.message,
failedAt: new Date().toISOString(),
});
return;
}
await new Promise((r) => setTimeout(r, 250 * Math.pow(2, attempt)));
}
}
}
// ---------------------------------------------------------------------------
// POST /api/tip/:id/feedback
// ---------------------------------------------------------------------------
@@ -258,7 +315,7 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest,
const tipId = String(req.params.id);
const now = new Date();
const validActions = ['done', 'dismiss', 'snooze'];
const validActions = ['done', 'dismiss', 'snooze', 'helpful', 'not_helpful'];
if (!validActions.includes(action)) {
res.status(400).json({ error: 'Invalid action' });
return;
@@ -297,25 +354,14 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest,
bus.publish('signals.tip.feedback', {
userId: req.userId!,
tipId,
action: action as 'done' | 'dismiss' | 'snooze',
action: action as 'done' | 'dismiss' | 'snooze' | 'helpful' | 'not_helpful',
reward,
dwellMs,
createdAt: now.toISOString(),
});
if (task) {
// Send reward to egreedy-v1 (active policy — ADR-0007)
fetch(`${config.ML_SERVING_URL}/reward/egreedy`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
user_id: req.userId!,
tip_id: tipId,
reward,
features: task.features,
day_of_week: new Date().getDay(),
}),
}).catch(() => {});
sendRewardWithRetry(req.userId!, tipId, reward, task.features);
}
// Mark complete in Todoist if done

View File

@@ -41,6 +41,8 @@ export function makeTestDb() {
tip_id TEXT NOT NULL,
action TEXT NOT NULL,
source_id TEXT,
dwell_ms INTEGER,
reward_milli INTEGER,
created_at TEXT NOT NULL
);
@@ -76,6 +78,60 @@ export function makeTestDb() {
detail TEXT,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS tip_scores (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES users(id),
tip_id TEXT NOT NULL,
policy TEXT NOT NULL,
ml_score INTEGER,
features_json TEXT,
candidate_count INTEGER,
latency_ms INTEGER,
served_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS saved_queries (
id TEXT PRIMARY KEY,
admin_id TEXT NOT NULL REFERENCES users(id),
name TEXT NOT NULL,
sql TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sim_runs (
id TEXT PRIMARY KEY,
policy_a TEXT NOT NULL,
policy_b TEXT NOT NULL,
n_users INTEGER NOT NULL,
n_rounds INTEGER NOT NULL,
tasks_per_round INTEGER NOT NULL DEFAULT 8,
use_llm INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'pending',
summary_json TEXT,
winner TEXT,
persona_breakdown_json TEXT,
created_at TEXT NOT NULL,
finished_at TEXT
);
CREATE TABLE IF NOT EXISTS sim_events (
id TEXT PRIMARY KEY,
run_id TEXT NOT NULL REFERENCES sim_runs(id),
round INTEGER NOT NULL,
user_id TEXT NOT NULL,
persona TEXT NOT NULL,
policy TEXT NOT NULL,
tip_content TEXT NOT NULL,
priority INTEGER NOT NULL,
is_overdue INTEGER NOT NULL,
action TEXT NOT NULL,
dwell_ms INTEGER,
reward_milli INTEGER NOT NULL,
hour INTEGER NOT NULL,
day_of_week INTEGER NOT NULL,
created_at TEXT NOT NULL
);
`);
return drizzle(sqlite, { schema });