From d539fde0c17e9cf6d1a951fcfd83052ea8091f34 Mon Sep 17 00:00:00 2001 From: alvis Date: Sat, 25 Apr 2026 16:48:24 +0000 Subject: [PATCH] feat(schema): protobuf event registry + buf CI gate (#54) - Add proto schemas in packages/shared-types/events/ (oo.events.v1): envelope.proto, signals.proto, integration.proto - buf.yaml with STANDARD lint + FILE breaking-change rules - .gitea/workflows/buf-check.yaml: lint + breaking check on every PR touching events/ (needs a Gitea Actions runner to execute) - scripts/buf-check.sh: local equivalent of the CI check - NormalizedEvent TS envelope gains eventId, schemaVersion, producer to align with the proto Envelope message - ml/serving/schemas.py: pydantic models mirroring the v1 proto types - nats_consumer.py: validate payloads via pydantic instead of raw .get() A field-rename PR will now fail buf breaking with exit code 100 and show the offending messages. To make a breaking change: keep the old field reserved, add the new one, bump schema_version to v2. Co-Authored-By: Claude Sonnet 4.6 --- .gitea/workflows/buf-check.yaml | 37 ++++++++++++++ ml/serving/nats_consumer.py | 21 ++++---- ml/serving/schemas.py | 50 +++++++++++++++++++ packages/shared-types/events/buf.yaml | 7 +++ .../events/oo/events/v1/envelope.proto | 25 ++++++++++ .../events/oo/events/v1/integration.proto | 9 ++++ .../events/oo/events/v1/signals.proto | 39 +++++++++++++++ packages/shared-types/package.json | 4 +- packages/shared-types/src/events/index.ts | 10 +++- scripts/buf-check.sh | 24 +++++++++ 10 files changed, 213 insertions(+), 13 deletions(-) create mode 100644 .gitea/workflows/buf-check.yaml create mode 100644 ml/serving/schemas.py create mode 100644 packages/shared-types/events/buf.yaml create mode 100644 packages/shared-types/events/oo/events/v1/envelope.proto create mode 100644 packages/shared-types/events/oo/events/v1/integration.proto create mode 100644 packages/shared-types/events/oo/events/v1/signals.proto create mode 100755 scripts/buf-check.sh diff --git a/.gitea/workflows/buf-check.yaml b/.gitea/workflows/buf-check.yaml new file mode 100644 index 0000000..103d83a --- /dev/null +++ b/.gitea/workflows/buf-check.yaml @@ -0,0 +1,37 @@ +name: buf-check + +on: + push: + branches: [main] + paths: + - 'packages/shared-types/events/**' + pull_request: + paths: + - 'packages/shared-types/events/**' + +jobs: + buf: + name: Lint & breaking-change check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Install buf + run: | + BUF_VERSION=1.50.0 + curl -sSfL \ + "https://github.com/bufbuild/buf/releases/download/v${BUF_VERSION}/buf-Linux-x86_64" \ + -o /usr/local/bin/buf + chmod +x /usr/local/bin/buf + buf --version + + - name: buf lint + run: buf lint packages/shared-types/events + + - name: buf breaking + if: github.event_name == 'pull_request' + run: | + buf breaking packages/shared-types/events \ + --against ".git#branch=${{ github.base_ref }},subdir=packages/shared-types/events" diff --git a/ml/serving/nats_consumer.py b/ml/serving/nats_consumer.py index 30b1fc1..8aa4d0f 100644 --- a/ml/serving/nats_consumer.py +++ b/ml/serving/nats_consumer.py @@ -23,6 +23,8 @@ import time from pathlib import Path from typing import Optional +from schemas import TaskSyncedPayload, TipFeedbackPayload + logger = logging.getLogger(__name__) NATS_URL = os.getenv("NATS_URL", "") @@ -48,19 +50,18 @@ def _sync_meta_path(state_dir: Path, user_id: str) -> Path: async def _handle(subject: str, payload: dict, state_dir: Path) -> None: if subject == "signals.task.synced": - user_id = payload.get("userId", "") - if user_id: - p = _sync_meta_path(state_dir, user_id) - p.write_text(json.dumps({ - "last_sync_ts": payload.get("syncedAt") or time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - "task_count": payload.get("count", 0), - })) - logger.info("[nats] task_synced user=%s count=%s", user_id, payload.get("count")) + msg = TaskSyncedPayload.model_validate(payload) + p = _sync_meta_path(state_dir, msg.userId) + p.write_text(json.dumps({ + "last_sync_ts": msg.syncedAt, + "task_count": msg.count, + })) + logger.info("[nats] task_synced user=%s count=%s", msg.userId, msg.count) elif subject == "signals.tip.feedback": + msg = TipFeedbackPayload.model_validate(payload) logger.info( "[nats] tip_feedback user=%s tip=%s action=%s reward=%s", - payload.get("userId"), payload.get("tipId"), - payload.get("action"), payload.get("reward"), + msg.userId, msg.tipId, msg.action, msg.reward, ) else: logger.debug("[nats] unhandled subject=%s", subject) diff --git a/ml/serving/schemas.py b/ml/serving/schemas.py new file mode 100644 index 0000000..4120592 --- /dev/null +++ b/ml/serving/schemas.py @@ -0,0 +1,50 @@ +""" +Pydantic models mirroring oo.events.v1 proto schemas. + +Field names use camelCase to match the proto3 JSON mapping convention +and the TypeScript payload shapes published by services/api. + +Keep in sync with packages/shared-types/events/oo/events/v1/. +""" +from __future__ import annotations + +from typing import Literal, Optional +from pydantic import BaseModel + + +class TaskSyncedPayload(BaseModel): + userId: str + source: str + count: int + syncedAt: str + + +class TipServedPayload(BaseModel): + userId: str + tipId: str + policy: str + servedAt: str + + +class TipFeedbackPayload(BaseModel): + userId: str + tipId: str + action: Literal['done', 'dismiss', 'snooze', 'helpful', 'not_helpful'] + reward: float + dwellMs: Optional[int] = None + createdAt: str + + +class TipRewardFailedPayload(BaseModel): + userId: str + tipId: str + reward: float + attempts: int + error: str + failedAt: str + + +class IntegrationTokenExpiredPayload(BaseModel): + userId: str + provider: str + detectedAt: str diff --git a/packages/shared-types/events/buf.yaml b/packages/shared-types/events/buf.yaml new file mode 100644 index 0000000..0ed83f4 --- /dev/null +++ b/packages/shared-types/events/buf.yaml @@ -0,0 +1,7 @@ +version: v1 +lint: + use: + - STANDARD +breaking: + use: + - FILE diff --git a/packages/shared-types/events/oo/events/v1/envelope.proto b/packages/shared-types/events/oo/events/v1/envelope.proto new file mode 100644 index 0000000..ddf7216 --- /dev/null +++ b/packages/shared-types/events/oo/events/v1/envelope.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; +package oo.events.v1; + +import "oo/events/v1/signals.proto"; +import "oo/events/v1/integration.proto"; + +// Envelope wraps every event on the bus and on NATS JetStream. +// Wire format: proto3 JSON (camelCase field names). +// schema_version = "v1" — bump to "v2" only for breaking payload changes. +message Envelope { + string event_id = 1; // UUID assigned by bus on publish + string occurred_at = 2; // ISO 8601 + string schema_version = 3; // "v1" + string producer = 4; // e.g. "services/api" + string subject = 5; // NATS-style subject: domain.entity.verb + uint64 seq = 6; // monotonic sequence from the bus ring + + oneof payload { + TaskSyncedPayload task_synced = 10; + TipServedPayload tip_served = 11; + TipFeedbackPayload tip_feedback = 12; + TipRewardFailedPayload tip_reward_failed = 13; + IntegrationTokenExpiredPayload integration_token_expired = 14; + } +} diff --git a/packages/shared-types/events/oo/events/v1/integration.proto b/packages/shared-types/events/oo/events/v1/integration.proto new file mode 100644 index 0000000..72f5de2 --- /dev/null +++ b/packages/shared-types/events/oo/events/v1/integration.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; +package oo.events.v1; + +// subject: signals.integration.token_expired +message IntegrationTokenExpiredPayload { + string user_id = 1; + string provider = 2; + string detected_at = 3; // ISO 8601 +} diff --git a/packages/shared-types/events/oo/events/v1/signals.proto b/packages/shared-types/events/oo/events/v1/signals.proto new file mode 100644 index 0000000..4ab9b4a --- /dev/null +++ b/packages/shared-types/events/oo/events/v1/signals.proto @@ -0,0 +1,39 @@ +syntax = "proto3"; +package oo.events.v1; + +// subject: signals.task.synced +message TaskSyncedPayload { + string user_id = 1; + string source = 2; // e.g. "todoist" + int32 count = 3; + string synced_at = 4; // ISO 8601 +} + +// subject: signals.tip.served +message TipServedPayload { + string user_id = 1; + string tip_id = 2; + string policy = 3; + string served_at = 4; // ISO 8601 +} + +// subject: signals.tip.feedback +// action: done | dismiss | snooze | helpful | not_helpful +message TipFeedbackPayload { + string user_id = 1; + string tip_id = 2; + string action = 3; + double reward = 4; + optional int64 dwell_ms = 5; // null when no dwell was recorded + string created_at = 6; // ISO 8601 +} + +// subject: signals.tip.reward_failed +message TipRewardFailedPayload { + string user_id = 1; + string tip_id = 2; + double reward = 3; + int32 attempts = 4; + string error = 5; + string failed_at = 6; // ISO 8601 +} diff --git a/packages/shared-types/package.json b/packages/shared-types/package.json index 332cf2f..d6ea223 100644 --- a/packages/shared-types/package.json +++ b/packages/shared-types/package.json @@ -15,7 +15,9 @@ "test": "vitest run", "test:watch": "vitest", "type-check": "tsc --noEmit", - "clean": "rm -rf dist" + "clean": "rm -rf dist", + "buf:lint": "buf lint events", + "buf:breaking": "buf breaking events --against '.git#branch=main,subdir=packages/shared-types/events'" }, "devDependencies": { "@vitest/coverage-v8": "^4.1.4", diff --git a/packages/shared-types/src/events/index.ts b/packages/shared-types/src/events/index.ts index 5802e99..1a92c80 100644 --- a/packages/shared-types/src/events/index.ts +++ b/packages/shared-types/src/events/index.ts @@ -1,6 +1,6 @@ /** * NormalizedEvent — the durable envelope for all events flowing through - * the system. Today: in-process EventEmitter. Tomorrow: NATS JetStream. + * the system. Mirrors oo.events.v1.Envelope in packages/shared-types/events/. * * Subject taxonomy: * signals.task.synced — Todoist (or other source) task list refreshed @@ -10,10 +10,16 @@ * signals.integration.token_expired — OAuth token needs reconnect */ export interface NormalizedEvent { + /** UUID assigned by bus on publish */ + eventId: string; /** NATS-style subject: domain.entity.verb */ subject: string; /** ISO 8601 timestamp */ - ts: string; + occurredAt: string; + /** "v1" — bump for breaking payload changes; see packages/shared-types/events/ */ + schemaVersion: 'v1'; + /** e.g. "services/api" */ + producer: string; /** Monotonically increasing sequence number (in-process ring; JetStream seq in prod) */ seq: number; payload: T; diff --git a/scripts/buf-check.sh b/scripts/buf-check.sh new file mode 100755 index 0000000..36befe4 --- /dev/null +++ b/scripts/buf-check.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +# Run buf lint and breaking-change detection locally. +# Usage: ./scripts/buf-check.sh [against-branch] +# Default against-branch: main +set -euo pipefail + +AGAINST="${1:-main}" +ROOT="$(cd "$(dirname "$0")/.." && pwd)" +EVENTS="$ROOT/packages/shared-types/events" + +if ! command -v buf &>/dev/null; then + echo "buf not found. Install: https://buf.build/docs/installation" + echo " curl -sSfL https://github.com/bufbuild/buf/releases/latest/download/buf-Linux-x86_64 -o /usr/local/bin/buf && chmod +x /usr/local/bin/buf" + exit 1 +fi + +echo "==> buf lint" +buf lint "$EVENTS" + +echo "==> buf breaking against $AGAINST" +buf breaking "$EVENTS" \ + --against ".git#branch=${AGAINST},subdir=packages/shared-types/events" + +echo "All checks passed."