feat(schema): protobuf event registry + buf CI gate (#54)
- Add proto schemas in packages/shared-types/events/ (oo.events.v1): envelope.proto, signals.proto, integration.proto - buf.yaml with STANDARD lint + FILE breaking-change rules - .gitea/workflows/buf-check.yaml: lint + breaking check on every PR touching events/ (needs a Gitea Actions runner to execute) - scripts/buf-check.sh: local equivalent of the CI check - NormalizedEvent TS envelope gains eventId, schemaVersion, producer to align with the proto Envelope message - ml/serving/schemas.py: pydantic models mirroring the v1 proto types - nats_consumer.py: validate payloads via pydantic instead of raw .get() A field-rename PR will now fail buf breaking with exit code 100 and show the offending messages. To make a breaking change: keep the old field reserved, add the new one, bump schema_version to v2. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -23,6 +23,8 @@ import time
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from schemas import TaskSyncedPayload, TipFeedbackPayload
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
NATS_URL = os.getenv("NATS_URL", "")
|
||||
@@ -48,19 +50,18 @@ def _sync_meta_path(state_dir: Path, user_id: str) -> Path:
|
||||
|
||||
async def _handle(subject: str, payload: dict, state_dir: Path) -> None:
|
||||
if subject == "signals.task.synced":
|
||||
user_id = payload.get("userId", "")
|
||||
if user_id:
|
||||
p = _sync_meta_path(state_dir, user_id)
|
||||
p.write_text(json.dumps({
|
||||
"last_sync_ts": payload.get("syncedAt") or time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||||
"task_count": payload.get("count", 0),
|
||||
}))
|
||||
logger.info("[nats] task_synced user=%s count=%s", user_id, payload.get("count"))
|
||||
msg = TaskSyncedPayload.model_validate(payload)
|
||||
p = _sync_meta_path(state_dir, msg.userId)
|
||||
p.write_text(json.dumps({
|
||||
"last_sync_ts": msg.syncedAt,
|
||||
"task_count": msg.count,
|
||||
}))
|
||||
logger.info("[nats] task_synced user=%s count=%s", msg.userId, msg.count)
|
||||
elif subject == "signals.tip.feedback":
|
||||
msg = TipFeedbackPayload.model_validate(payload)
|
||||
logger.info(
|
||||
"[nats] tip_feedback user=%s tip=%s action=%s reward=%s",
|
||||
payload.get("userId"), payload.get("tipId"),
|
||||
payload.get("action"), payload.get("reward"),
|
||||
msg.userId, msg.tipId, msg.action, msg.reward,
|
||||
)
|
||||
else:
|
||||
logger.debug("[nats] unhandled subject=%s", subject)
|
||||
|
||||
50
ml/serving/schemas.py
Normal file
50
ml/serving/schemas.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""
|
||||
Pydantic models mirroring oo.events.v1 proto schemas.
|
||||
|
||||
Field names use camelCase to match the proto3 JSON mapping convention
|
||||
and the TypeScript payload shapes published by services/api.
|
||||
|
||||
Keep in sync with packages/shared-types/events/oo/events/v1/.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Literal, Optional
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class TaskSyncedPayload(BaseModel):
|
||||
userId: str
|
||||
source: str
|
||||
count: int
|
||||
syncedAt: str
|
||||
|
||||
|
||||
class TipServedPayload(BaseModel):
|
||||
userId: str
|
||||
tipId: str
|
||||
policy: str
|
||||
servedAt: str
|
||||
|
||||
|
||||
class TipFeedbackPayload(BaseModel):
|
||||
userId: str
|
||||
tipId: str
|
||||
action: Literal['done', 'dismiss', 'snooze', 'helpful', 'not_helpful']
|
||||
reward: float
|
||||
dwellMs: Optional[int] = None
|
||||
createdAt: str
|
||||
|
||||
|
||||
class TipRewardFailedPayload(BaseModel):
|
||||
userId: str
|
||||
tipId: str
|
||||
reward: float
|
||||
attempts: int
|
||||
error: str
|
||||
failedAt: str
|
||||
|
||||
|
||||
class IntegrationTokenExpiredPayload(BaseModel):
|
||||
userId: str
|
||||
provider: str
|
||||
detectedAt: str
|
||||
Reference in New Issue
Block a user