feat: Sessions - bidirectional durable agent streams#3417
feat: Sessions - bidirectional durable agent streams#3417
Conversation
Durable, typed, bidirectional I/O primitive that outlives a single run.
Ship target is agent/chat use cases; run-scoped streams.pipe/streams.input
are untouched and do not create Session rows.
Postgres
- New Session table: id, friendlyId, externalId, type (plain string),
denormalised project/environment/organization scalar columns (no FKs),
taskIdentifier, tags String[], metadata Json, closedAt, closedReason,
expiresAt, timestamps
- Point-lookup indexes only (friendlyId unique, (env, externalId) unique,
expiresAt). List queries are served from ClickHouse so Postgres stays
minimal and insert-heavy.
Control-plane API
- POST /api/v1/sessions create (idempotent via externalId)
- GET /api/v1/sessions list with filters (type, tag,
taskIdentifier, externalId, status
ACTIVE|CLOSED|EXPIRED, period/from/to)
and cursor pagination, ClickHouse-backed
- GET /api/v1/sessions/:session retrieve — polymorphic: `session_` prefix
hits friendlyId, otherwise externalId
- PATCH /api/v1/sessions/:session update tags/metadata/externalId
- POST /api/v1/sessions/:session/close terminal close (idempotent)
Realtime (S2-backed)
- PUT /realtime/v1/sessions/:session/:io returns S2 creds
- GET /realtime/v1/sessions/:session/:io SSE subscribe
- POST /realtime/v1/sessions/:session/:io/append server-side append
- S2 key format: sessions/{friendlyId}/{out|in}
Auth
- sessions added to ResourceTypes. read:sessions:{id},
write:sessions:{id}, admin:sessions:{id} scopes work via existing JWT
validation.
ClickHouse
- sessions_v1 ReplacingMergeTree table
- SessionsReplicationService mirrors RunsReplicationService exactly:
logical replication with leader-locked consumer, ConcurrentFlushScheduler,
retry with exponential backoff + jitter, identical metric shape.
Dedicated slot + publication (sessions_to_clickhouse_v1[_publication]).
- SessionsRepository + ClickHouseSessionsRepository expose list, count,
tags with cursor pagination keyed by (created_at DESC, session_id DESC).
- Derived status (ACTIVE/CLOSED/EXPIRED) computed from closed_at + expires_at;
in-memory fallback on list results to catch pre-replication writes.
Verification
- Webapp typecheck 10/10
- Core + SDK build 3/3
- sessionsReplicationService.test.ts integration tests 2/2 (insert + update
round-trip via testcontainers)
- Live round-trip against local dev: create -> retrieve (friendlyId and
externalId) -> out.initialize -> out.append x2 -> in.send -> out.subscribe
(receives records) -> close -> ClickHouse sessions_v1 shows the replicated
row with closed_reason
- Live list smoke: tag, type, status CLOSED, externalId, and cursor pagination
🦋 Changeset detectedLatest commit: 2210fe2 The changes in this PR will be included in the next version bump. This PR includes changesets to release 29 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📜 Recent review details⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (28)
🧰 Additional context used📓 Path-based instructions (7)**/*.{ts,tsx}📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Files:
{packages/core,apps/webapp}/**/*.{ts,tsx}📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Files:
**/*.{ts,tsx,js,jsx}📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Files:
**/*.ts📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
Files:
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}📄 CodeRabbit inference engine (AGENTS.md)
Files:
**/*.ts{,x}📄 CodeRabbit inference engine (CLAUDE.md)
Files:
apps/webapp/**/*.{ts,tsx}📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Files:
🧠 Learnings (23)📓 Common learnings📚 Learning: 2026-04-20T15:06:16.910ZApplied to files:
📚 Learning: 2026-04-20T15:08:57.551ZApplied to files:
📚 Learning: 2026-04-20T15:08:49.959ZApplied to files:
📚 Learning: 2026-04-20T15:05:57.327ZApplied to files:
📚 Learning: 2026-04-16T14:19:16.309ZApplied to files:
📚 Learning: 2026-04-13T21:44:00.032ZApplied to files:
📚 Learning: 2026-04-16T14:21:15.229ZApplied to files:
📚 Learning: 2026-03-13T13:42:25.092ZApplied to files:
📚 Learning: 2026-04-16T13:45:18.782ZApplied to files:
📚 Learning: 2026-03-22T13:49:23.474ZApplied to files:
📚 Learning: 2026-04-16T14:21:14.907ZApplied to files:
📚 Learning: 2026-03-26T10:02:25.354ZApplied to files:
📚 Learning: 2026-04-16T14:19:16.309ZApplied to files:
📚 Learning: 2026-04-07T14:12:59.018ZApplied to files:
📚 Learning: 2026-02-25T17:28:20.456ZApplied to files:
📚 Learning: 2025-09-02T11:18:06.602ZApplied to files:
📚 Learning: 2026-04-16T14:21:09.410ZApplied to files:
📚 Learning: 2026-03-10T17:56:26.581ZApplied to files:
📚 Learning: 2025-08-14T10:53:54.526ZApplied to files:
📚 Learning: 2026-02-11T16:50:14.167ZApplied to files:
📚 Learning: 2026-03-22T13:26:12.060ZApplied to files:
📚 Learning: 2026-03-22T19:24:14.403ZApplied to files:
🔇 Additional comments (3)
WalkthroughAdds a durable Session primitive across the stack: a Prisma Session model and migration, ClickHouse sessions_v1 table and ClickHouse client helpers, new ClickHouse-backed SessionsRepository, a SessionsReplicationService that streams Postgres logical replication into ClickHouse, session-friendly ID and API schemas in core, multiple REST and realtime routes for session CRUD and streaming/append, environment config and startup wiring for replication, session helper utilities, and end-to-end replication tests. Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…te/update The session_ prefix identifies internal friendlyIds. Allowing it in a user-supplied externalId would misroute subsequent GET/PATCH/close requests through resolveSessionByIdOrExternalId to a friendlyId lookup, returning null or the wrong session. Reject at the schema boundary so both routes surface a clean 422.
Without allowJWT/corsStrategy, frontend clients holding public access tokens hit 401 on GET /api/v1/sessions and browser preflights fail. Matches the single-session GET/PATCH/close routes and the runs list endpoint.
- Derive isCached from the upsert result (id mismatch = pre-existing row) instead of doing a separate findFirst first. The pre-check was racy — two concurrent first-time POSTs could both return 201 with isCached: false. Using the returned row's id is atomic and saves a round-trip. - Scope the list endpoint's authorization to the standard action/resource pattern (matches api.v1.runs.ts): task-scoped JWTs can list sessions filtered by their task, and broader super-scopes (read:sessions, read:all, admin) authorize unfiltered listing. - Log and swallow unexpected errors on POST rather than returning the raw error.message. Prisma/internal messages can leak column names and query fragments.
What this enables
A new first-class primitive, Session, for durable bidirectional I/O that outlives a single run. Sessions give you a server-managed channel pair (
.outfrom the task,.infrom the client) that you can write to, read from, and subscribe to across many runs, filter, list, and close, all through a single identifier.Use cases unblocked
.in, the client writes to.in, and the server enforces no-writes-after-close..outafter the task finishes to replay the history.Public API surface
Control plane
POST /api/v1/sessionsto create. Idempotent when you supplyexternalId.GET /api/v1/sessions/:sessionto retrieve by friendlyId (session_abc) or by your ownexternalId. The server disambiguates via thesession_prefix.GET /api/v1/sessionsto list with filters (type,tag,taskIdentifier,externalId, derivedstatus= ACTIVE/CLOSED/EXPIRED, created-at period/from/to) and cursor pagination. Backed by ClickHouse.PATCH /api/v1/sessions/:sessionto update tags/metadata/externalId.POST /api/v1/sessions/:session/closeto terminate. Idempotent, hard-blocks new server-brokered writes.Realtime
PUT /realtime/v1/sessions/:session/:ioto initialize a channel. Returns S2 credentials in headers so clients can write direct to S2 for high-throughput cases.GET /realtime/v1/sessions/:session/:iofor SSE subscribe.POST /realtime/v1/sessions/:session/:io/appendfor server-side appends.Scopes
sessionsis now a ResourceType.read:sessions:{id},write:sessions:{id},admin:sessions:{id}all flow through the existing JWT validator.Implementation summary
Postgres (
Sessiontable)friendlyIdunique,(env, externalId)unique,expiresAt. List queries are served from ClickHouse, so Postgres stays insert-heavy.closedAt,closedReason,expiresAt) are write-once. No status enum, no counters, no currentRunId pointer. All run-related state is derived.ClickHouse (
sessions_v1)(org_id, project_id, environment_id, created_at, session_id).tagsindexed with a tokenbf_v1 skip index.SessionsReplicationServicemirrorsRunsReplicationServiceexactly: logical replication with leader-locked consumer,ConcurrentFlushScheduler, retry with exponential backoff + jitter, identical metric shape. Dedicated slot + publication so the two consume independently.SessionsRepository+ClickHouseSessionsRepositoryexpose list / count / tags with the same cursor pagination convention as runs and waitpoints.S2
sessions/{friendlyId}/{out|in}. The existingruns/{runId}/{streamId}format for implicit run streams is completely untouched.What did not change
streams.pipe/streams.inputstill behave exactly as before. They do not create Session rows and the existing routes are unchanged. Sessions are a net-new primitive for the next phase of agent features, not a reshaping of the current streams API.Verification
apps/webapp/test/sessionsReplicationService.test.tsexercises insert and update round-trips through Postgres logical replication into ClickHouse via testcontainers..out.initialize,.out.appendx2,.in.send,.out.subscribeover SSE, list (type, tag, status, externalId, pagination), close, idempotent re-close. Replicated row lands in ClickHouse within ~1s withclosed_reasonintact.Not in this PR
chat.agent).chat.agentintegration.Test plan
pnpm run typecheck --filter webapppnpm run test --filter webapp ./test/sessionsReplicationService.test.ts --runSESSION_REPLICATION_CLICKHOUSE_URLandSESSION_REPLICATION_ENABLED=1set. Confirm the slot and publication auto-create on boot.POST /api/v1/sessionsand verify the row replicates totrigger_dev.sessions_v1within a couple of seconds.POST /api/v1/sessions/:id/closeand confirm subsequentPOST /realtime/v1/sessions/:id/out/appendreturns 400.