diff --git a/package.json b/package.json index 756c750..0365d00 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "check": "pnpm typecheck && pnpm test && pnpm test:go && pnpm build && pnpm pack:packages && pnpm smoke:consumer && pnpm smoke:cloudflare", "clean": "pnpm -r clean", "pack:packages": "mkdir -p .packs && pnpm -r --filter './packages/*' pack --pack-destination ./.packs", - "publish:packages": "pnpm --filter @better-matrix-js/cloudflare publish --access public --no-git-checks && pnpm --filter better-matrix-js publish --access public --no-git-checks && pnpm --filter @better-matrix-js/chat-adapter publish --access public --no-git-checks", + "publish:packages": "pnpm --filter @better-matrix-js/cloudflare publish --access public --no-git-checks && pnpm --filter better-matrix-js publish --access public --no-git-checks && pnpm --filter @better-matrix-js/chat-adapter publish --access public --no-git-checks && pnpm --filter @better-matrix-js/ai-sdk publish --access public --no-git-checks", "smoke:cloudflare": "node scripts/smoke-cloudflare-worker.mjs", "smoke:consumer": "node scripts/package-consumer-smoke.mjs", "smoke:package-consumer": "node scripts/package-consumer-smoke.mjs", diff --git a/packages/ai-sdk/LICENSE b/packages/ai-sdk/LICENSE new file mode 100644 index 0000000..14fac91 --- /dev/null +++ b/packages/ai-sdk/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/packages/ai-sdk/README.md b/packages/ai-sdk/README.md new file mode 100644 index 0000000..2224491 --- /dev/null +++ b/packages/ai-sdk/README.md @@ -0,0 +1,5 @@ +# @better-matrix-js/ai-sdk + +AI SDK stream adapters for `better-matrix-js`. + +This package is separate from `@better-matrix-js/chat-adapter` so Matrix and Chat SDK users do not need the AI SDK integration unless they want it. diff --git a/packages/ai-sdk/package.json b/packages/ai-sdk/package.json new file mode 100644 index 0000000..8df9ab8 --- /dev/null +++ b/packages/ai-sdk/package.json @@ -0,0 +1,59 @@ +{ + "name": "@better-matrix-js/ai-sdk", + "version": "0.1.0", + "description": "AI SDK stream adapters for better-matrix-js", + "type": "module", + "homepage": "https://github.com/batuhan/better-matrix-js#readme", + "repository": { + "type": "git", + "url": "git+https://github.com/batuhan/better-matrix-js.git", + "directory": "packages/ai-sdk" + }, + "bugs": { + "url": "https://github.com/batuhan/better-matrix-js/issues" + }, + "main": "./dist/index.js", + "module": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.js" + } + }, + "files": [ + "dist", + "README.md", + "LICENSE" + ], + "publishConfig": { + "access": "public" + }, + "scripts": { + "build": "tsup", + "clean": "rm -rf dist", + "prepublishOnly": "node ../../scripts/guard-pnpm-publish.mjs", + "test": "vitest run --coverage", + "typecheck": "tsc --noEmit" + }, + "peerDependencies": { + "@better-matrix-js/chat-adapter": "^0.1.0" + }, + "devDependencies": { + "@better-matrix-js/chat-adapter": "workspace:*", + "@types/node": "^25.3.2", + "tsup": "^8.3.5", + "typescript": "^5.7.2", + "vitest": "^4.0.18" + }, + "keywords": [ + "matrix", + "ai-sdk", + "streaming", + "beeper" + ], + "engines": { + "node": ">=20" + }, + "license": "MIT" +} diff --git a/packages/ai-sdk/src/index.test.ts b/packages/ai-sdk/src/index.test.ts new file mode 100644 index 0000000..aee900a --- /dev/null +++ b/packages/ai-sdk/src/index.test.ts @@ -0,0 +1,55 @@ +import { describe, expect, it } from "vitest"; +import { fromAIStreamResult, fromAIUIMessageStream, isAIUIMessageStreamResult } from "./index"; + +async function collect(iterable: AsyncIterable): Promise { + const values: T[] = []; + for await (const value of iterable) { + values.push(value); + } + return values; +} + +describe("AI SDK stream adapters", () => { + it("passes async iterable UI message chunks through structurally", async () => { + async function* chunks() { + yield { delta: "hello", id: "text-1", type: "text-delta" }; + } + + await expect(collect(fromAIUIMessageStream(chunks()))).resolves.toEqual([ + { delta: "hello", id: "text-1", type: "text-delta" }, + ]); + }); + + it("converts ReadableStream UI message chunks to async iterable Matrix streams", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue({ id: "reasoning-1", type: "reasoning-start" }); + controller.enqueue({ delta: "thinking", id: "reasoning-1", type: "reasoning-delta" }); + controller.close(); + }, + }); + + await expect(collect(fromAIUIMessageStream(stream))).resolves.toEqual([ + { id: "reasoning-1", type: "reasoning-start" }, + { delta: "thinking", id: "reasoning-1", type: "reasoning-delta" }, + ]); + }); + + it("accepts streamText-like results with toUIMessageStream", async () => { + const result = { + toUIMessageStream() { + return new ReadableStream({ + start(controller) { + controller.enqueue({ finishReason: "stop", type: "finish" }); + controller.close(); + }, + }); + }, + }; + + expect(isAIUIMessageStreamResult(result)).toBe(true); + await expect(collect(fromAIStreamResult(result))).resolves.toEqual([ + { finishReason: "stop", type: "finish" }, + ]); + }); +}); diff --git a/packages/ai-sdk/src/index.ts b/packages/ai-sdk/src/index.ts new file mode 100644 index 0000000..d1da844 --- /dev/null +++ b/packages/ai-sdk/src/index.ts @@ -0,0 +1,52 @@ +import type { MatrixStream } from "@better-matrix-js/chat-adapter"; + +export type AIUIMessageChunk = { + type: string; + [key: string]: unknown; +}; + +export type AIUIMessageChunkStream = + | AsyncIterable + | ReadableStream; + +export interface AIUIMessageStreamResult { + toUIMessageStream(): AIUIMessageChunkStream; +} + +export function fromAIUIMessageStream(stream: AIUIMessageChunkStream): MatrixStream { + if (isAsyncIterable(stream)) { + return stream; + } + return readableStreamToAsyncIterable(stream); +} + +export function fromAIStreamResult(result: AIUIMessageStreamResult): MatrixStream { + return fromAIUIMessageStream(result.toUIMessageStream()); +} + +export function isAIUIMessageStreamResult(value: unknown): value is AIUIMessageStreamResult { + return ( + typeof value === "object" && + value !== null && + typeof (value as { toUIMessageStream?: unknown }).toUIMessageStream === "function" + ); +} + +async function* readableStreamToAsyncIterable(stream: ReadableStream): AsyncIterable { + const reader = stream.getReader(); + try { + while (true) { + const result = await reader.read(); + if (result.done) { + return; + } + yield result.value; + } + } finally { + reader.releaseLock(); + } +} + +function isAsyncIterable(value: AIUIMessageChunkStream): value is AsyncIterable { + return Symbol.asyncIterator in value; +} diff --git a/packages/ai-sdk/tsconfig.json b/packages/ai-sdk/tsconfig.json new file mode 100644 index 0000000..39b47ed --- /dev/null +++ b/packages/ai-sdk/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "./dist" + }, + "include": ["src/**/*"], + "exclude": ["dist", "node_modules", "**/*.test.ts"] +} diff --git a/packages/ai-sdk/tsup.config.ts b/packages/ai-sdk/tsup.config.ts new file mode 100644 index 0000000..997ba9c --- /dev/null +++ b/packages/ai-sdk/tsup.config.ts @@ -0,0 +1,10 @@ +import { defineConfig } from "tsup"; + +export default defineConfig({ + entry: ["src/index.ts"], + format: ["esm"], + dts: true, + clean: true, + sourcemap: false, + external: ["@better-matrix-js/chat-adapter", "ai"], +}); diff --git a/packages/ai-sdk/vitest.config.ts b/packages/ai-sdk/vitest.config.ts new file mode 100644 index 0000000..1483ee2 --- /dev/null +++ b/packages/ai-sdk/vitest.config.ts @@ -0,0 +1,11 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + environment: "node", + coverage: { + provider: "v8", + reporter: ["text", "json-summary"], + }, + }, +}); diff --git a/packages/chat-adapter/src/adapter.test.ts b/packages/chat-adapter/src/adapter.test.ts index f1aaf8e..b95a87b 100644 --- a/packages/chat-adapter/src/adapter.test.ts +++ b/packages/chat-adapter/src/adapter.test.ts @@ -36,7 +36,7 @@ function makeCore(overrides: Partial = {}) { createBeeperStream: vi.fn(async () => ({ descriptor: { device_id: "DEVICE", - type: "com.beeper.ai.stream_event", + type: "com.beeper.llm", user_id: "@bot:example.com", }, })), @@ -398,7 +398,7 @@ describe("MatrixAdapter", () => { }); }); - it("streams Beeper homeserver chunks as encrypted Matrix ephemeral events", async () => { + it("streams Beeper homeserver chunks as Beeper Desktop stream deltas", async () => { const { core } = makeCore(); const adapter = new MatrixAdapter({ accessToken: "token", @@ -421,7 +421,7 @@ describe("MatrixAdapter", () => { expect(result.id).toBe("$message"); expect(core.createBeeperStream).toHaveBeenCalledWith({ roomId: "!room:example.com", - streamType: "com.beeper.ai.stream_event", + streamType: "com.beeper.llm", }); expect(core.postMessage).toHaveBeenCalledWith( expect.objectContaining({ @@ -429,7 +429,7 @@ describe("MatrixAdapter", () => { content: { "com.beeper.stream": { device_id: "DEVICE", - type: "com.beeper.ai.stream_event", + type: "com.beeper.llm", user_id: "@bot:example.com", }, }, @@ -437,10 +437,34 @@ describe("MatrixAdapter", () => { ); expect(core.publishBeeperStream).toHaveBeenCalledWith( expect.objectContaining({ - content: expect.objectContaining({ - part: { id: expect.any(String), type: "text-start" }, - target_event: "$message", - }), + content: { + "com.beeper.llm.deltas": [ + expect.objectContaining({ + "m.relates_to": { + event_id: "$message", + rel_type: "m.reference", + }, + part: { messageId: expect.any(String), messageMetadata: { turn_id: expect.any(String) }, type: "start" }, + seq: 1, + turn_id: expect.any(String), + }), + ], + }, + eventId: "$message", + roomId: "!room:example.com", + }) + ); + expect(core.publishBeeperStream).toHaveBeenCalledWith( + expect.objectContaining({ + content: { + "com.beeper.llm.deltas": [ + expect.objectContaining({ + part: { id: expect.any(String), type: "text-start" }, + seq: 2, + turn_id: expect.any(String), + }), + ], + }, eventId: "$message", roomId: "!room:example.com", }) @@ -448,7 +472,7 @@ describe("MatrixAdapter", () => { expect(core.editMessage).toHaveBeenCalledWith( expect.objectContaining({ body: "hello", - content: { "com.beeper.dont_render_edited": true }, + content: { "com.beeper.dont_render_edited": true, "com.beeper.stream": null }, messageId: "$message", }) ); @@ -479,21 +503,29 @@ describe("MatrixAdapter", () => { expect(core.publishBeeperStream).toHaveBeenCalledWith( expect.objectContaining({ - content: expect.objectContaining({ - part: { id: "reasoning-1", type: "reasoning-start" }, - }), + content: { + "com.beeper.llm.deltas": [ + expect.objectContaining({ + part: { id: "reasoning-1", type: "reasoning-start" }, + }), + ], + }, }) ); expect(core.publishBeeperStream).toHaveBeenCalledWith( expect.objectContaining({ - content: expect.objectContaining({ - part: { - input: { path: "/tmp/a" }, - toolCallId: "call-1", - toolName: "read_file", - type: "tool-input-available", - }, - }), + content: { + "com.beeper.llm.deltas": [ + expect.objectContaining({ + part: { + input: { path: "/tmp/a" }, + toolCallId: "call-1", + toolName: "read_file", + type: "tool-input-available", + }, + }), + ], + }, }) ); }); @@ -517,23 +549,32 @@ describe("MatrixAdapter", () => { expect(core.publishBeeperStream).toHaveBeenCalledWith( expect.objectContaining({ - content: expect.objectContaining({ - part: expect.objectContaining({ - data: expect.objectContaining({ call_id: "task-1", tool_name: "Search" }), - type: "data-tool-progress", - }), - }), + content: { + "com.beeper.llm.deltas": [ + expect.objectContaining({ + part: expect.objectContaining({ + data: expect.objectContaining({ call_id: "task-1", tool_name: "Search" }), + id: "task-1", + type: "data-tool-progress", + }), + }), + ], + }, }) ); expect(core.publishBeeperStream).toHaveBeenCalledWith( expect.objectContaining({ - content: expect.objectContaining({ - part: { - data: { title: "Reading results" }, - transient: true, - type: "data-plan-update", - }, - }), + content: { + "com.beeper.llm.deltas": [ + expect.objectContaining({ + part: { + data: { title: "Reading results" }, + transient: true, + type: "data-plan-update", + }, + }), + ], + }, }) ); }); diff --git a/packages/chat-adapter/src/adapter.ts b/packages/chat-adapter/src/adapter.ts index b962be7..6087bcf 100644 --- a/packages/chat-adapter/src/adapter.ts +++ b/packages/chat-adapter/src/adapter.ts @@ -33,8 +33,8 @@ import type { ThreadInfo, WebhookOptions, } from "chat"; -import type { MatrixStream } from "./beeper-streaming"; -import { createMatrixStreamDriver, isBeeperHomeserver } from "./beeper-streaming"; +import type { MatrixStream } from "./streaming"; +import { createMatrixStreamDriver, isBeeperHomeserver } from "./streaming"; import { ConsoleLogger, Message, @@ -346,15 +346,18 @@ export class MatrixAdapter { options?: StreamOptions ): Promise> { const parsed = this.decodeThreadId(threadId); - return createMatrixStreamDriver({ + const driver = await createMatrixStreamDriver({ core: this.#requireCore(), - editMessage: (targetThreadId, messageId, markdown) => - this.editMessage(targetThreadId, messageId, { markdown }), + editMessage: (targetThreadId, messageId, markdown, content) => + content + ? this.#editMessageWithContent(targetThreadId, messageId, markdown, content) + : this.editMessage(targetThreadId, messageId, { markdown }), homeserverUrl: this.#config.homeserverUrl, postMessage: (targetThreadId, markdown, content) => this.#postMessageWithContent(targetThreadId, markdown, content), roomId: parsed.roomId, - }).stream(threadId, textStream, options); + }); + return driver.stream(threadId, textStream, options); } async postObject( @@ -784,6 +787,35 @@ export class MatrixAdapter { return this.#rawMessage(raw.eventId, parsed.roomId, threadId, raw.raw); } + async #editMessageWithContent( + threadId: string, + messageId: string, + markdown: string, + content: Record + ): Promise> { + const core = this.#requireCore(); + const parsed = this.decodeThreadId(threadId); + const rendered = this.#formatConverter.renderPostableMessage({ markdown }); + const editOptions: Parameters[0] = { + body: rendered.body, + content: { + ...(this.#isBeeperHomeserver ? { "com.beeper.dont_render_edited": true } : {}), + ...content, + }, + messageId, + roomId: parsed.roomId, + }; + if (rendered.formattedBody !== undefined) { + editOptions.formattedBody = rendered.formattedBody; + } + const raw = await core.editMessage(editOptions); + return this.#rawMessage(messageId, parsed.roomId, threadId, { + logicalEventId: messageId, + replacementEventId: raw.eventId, + raw: raw.raw, + }); + } + async #collectUploads(message: AdapterPostableMessage): Promise { const uploads: OutboundUpload[] = []; for (const file of extractFilesFromMessage(message)) { diff --git a/packages/chat-adapter/src/beeper-streaming.ts b/packages/chat-adapter/src/beeper-streaming.ts index 033ed67..630c6a6 100644 --- a/packages/chat-adapter/src/beeper-streaming.ts +++ b/packages/chat-adapter/src/beeper-streaming.ts @@ -1,234 +1,7 @@ -import type { MatrixCore, MatrixRawMessage } from "better-matrix-js"; -import type { RawMessage, StreamChunk, StreamOptions } from "chat"; -import type { MatrixRawMessage as MatrixAdapterRawMessage } from "./types"; - -const BEEPER_STREAM_EVENT_TYPE = "com.beeper.ai.stream_event"; -const BEEPER_DOMAINS = new Set([ - "beeper.com", - "beeper-staging.com", - "beeper-dev.com", - "beeper.localtest.me", -]); - -export interface MatrixStreamDriver { - stream( - threadId: string, - textStream: MatrixStream, - options?: StreamOptions - ): Promise>; -} - -export type MatrixStream = AsyncIterable>; - -export interface MatrixStreamDriverOptions { - core: MatrixCore; - editMessage( - threadId: string, - messageId: string, - message: string - ): Promise>; - homeserverUrl: string; - postMessage( - threadId: string, - message: string, - content?: Record - ): Promise>; - roomId: string; -} - -export function isBeeperHomeserver(homeserverUrl: string): boolean { - try { - const hostname = new URL(homeserverUrl).hostname; - return BEEPER_DOMAINS.has(hostname) || [...BEEPER_DOMAINS].some((domain) => hostname.endsWith(`.${domain}`)); - } catch { - return false; - } -} - -export function createMatrixStreamDriver(options: MatrixStreamDriverOptions): MatrixStreamDriver { - return isBeeperHomeserver(options.homeserverUrl) - ? new BeeperStreamDriver(options) - : new DebouncedEditStreamDriver(options); -} - -class BeeperStreamDriver implements MatrixStreamDriver { - #options: MatrixStreamDriverOptions; - - constructor(options: MatrixStreamDriverOptions) { - this.#options = options; - } - - async stream( - threadId: string, - textStream: MatrixStream, - options?: StreamOptions - ): Promise> { - const stream = await this.#options.core.createBeeperStream({ - roomId: this.#options.roomId, - streamType: BEEPER_STREAM_EVENT_TYPE, - }); - const target = await this.#options.postMessage(threadId, "...", { - "com.beeper.stream": stream.descriptor, - }); - const turnId = `turn_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 10)}`; - const textId = `text_${turnId}`; - let seq = 1; - let textStarted = false; - let accumulated = ""; - - for await (const chunk of textStream) { - if (typeof chunk === "string") { - if (!textStarted) { - await this.#sendPart(turnId, seq++, { id: textId, type: "text-start" }, target.id, options); - textStarted = true; - } - accumulated += chunk; - await this.#sendPart(turnId, seq++, { delta: chunk, id: textId, type: "text-delta" }, target.id, options); - continue; - } - if (isStreamChunk(chunk) && chunk.type === "markdown_text") { - if (!textStarted) { - await this.#sendPart(turnId, seq++, { id: textId, type: "text-start" }, target.id, options); - textStarted = true; - } - accumulated += chunk.text; - await this.#sendPart(turnId, seq++, { delta: chunk.text, id: textId, type: "text-delta" }, target.id, options); - continue; - } - await this.#sendPart(turnId, seq++, streamPart(chunk), target.id, options); - } - - if (textStarted) { - await this.#sendPart(turnId, seq++, { id: textId, type: "text-end" }, target.id, options); - } - return accumulated - ? this.#options.editMessage(threadId, target.id, accumulated) - : target; - } - - async #sendPart( - turnId: string, - seq: number, - part: Record, - targetEventId: string, - options?: StreamOptions - ): Promise { - await this.#options.core.publishBeeperStream({ - content: { - "m.relates_to": { - event_id: targetEventId, - rel_type: "m.reference", - }, - ...(options?.recipientUserId ? { agent_id: options.recipientUserId } : {}), - part, - seq, - target_event: targetEventId, - turn_id: turnId, - }, - eventId: targetEventId, - roomId: this.#options.roomId, - }); - return { eventId: targetEventId, raw: {}, roomId: this.#options.roomId }; - } -} - -class DebouncedEditStreamDriver implements MatrixStreamDriver { - #options: MatrixStreamDriverOptions; - - constructor(options: MatrixStreamDriverOptions) { - this.#options = options; - } - - async stream( - threadId: string, - textStream: MatrixStream, - options?: StreamOptions - ): Promise> { - const intervalMs = options?.updateIntervalMs ?? 500; - let message: RawMessage | null = null; - let accumulated = ""; - let lastFlushed = ""; - let lastFlushAt = 0; - - for await (const chunk of textStream) { - const text = streamChunkText(chunk); - if (!text) { - continue; - } - accumulated += text; - if (!message) { - message = await this.#options.postMessage(threadId, accumulated); - lastFlushed = accumulated; - lastFlushAt = Date.now(); - continue; - } - if (Date.now() - lastFlushAt >= intervalMs && accumulated !== lastFlushed) { - await this.#options.editMessage(threadId, message.id, accumulated); - lastFlushed = accumulated; - lastFlushAt = Date.now(); - } - } - - if (!message) { - return this.#options.postMessage(threadId, "..."); - } - if (accumulated !== lastFlushed) { - return this.#options.editMessage(threadId, message.id, accumulated); - } - return message; - } -} - -function streamChunkText(chunk: string | StreamChunk | Record): string { - if (typeof chunk === "string") { - return chunk; - } - if (isStreamChunk(chunk) && chunk.type === "markdown_text") { - return chunk.text; - } - if (readString(chunk, "type") === "text-delta") { - return readString(chunk, "text") ?? readString(chunk, "delta") ?? readString(chunk, "textDelta") ?? ""; - } - return ""; -} - -function streamPart(chunk: StreamChunk | Record): Record { - if (!isStreamChunk(chunk)) { - return chunk; - } - switch (chunk.type) { - case "markdown_text": - return { delta: chunk.text, type: "text-delta" }; - case "task_update": - return { - data: { - call_id: chunk.id, - output: chunk.output, - progress: chunk.output, - status: chunk.status, - tool_name: chunk.title, - }, - transient: chunk.status === "pending" || chunk.status === "in_progress", - type: "data-tool-progress", - }; - case "plan_update": - return { - data: { title: chunk.title }, - transient: true, - type: "data-plan-update", - }; - } -} - -function isStreamChunk(value: unknown): value is StreamChunk { - const type = readString(value, "type"); - return type === "markdown_text" || type === "task_update" || type === "plan_update"; -} - -function readString(record: unknown, key: string): string | undefined { - if (!record || typeof record !== "object") { - return undefined; - } - const value = (record as Record)[key]; - return typeof value === "string" ? value : undefined; -} +export { + createMatrixStreamDriver, + isBeeperHomeserver, + type MatrixStream, + type MatrixStreamDriver, + type MatrixStreamDriverOptions, +} from "./streaming"; diff --git a/packages/chat-adapter/src/index.ts b/packages/chat-adapter/src/index.ts index 62bd566..7c46443 100644 --- a/packages/chat-adapter/src/index.ts +++ b/packages/chat-adapter/src/index.ts @@ -2,6 +2,7 @@ export { createMatrixAdapter, MatrixAdapter } from "./adapter"; export { MatrixFormatConverter } from "./format"; export type { RenderedMatrixMessage } from "./format"; export { loginMatrix, loginMatrixWithToken } from "./login"; +export type { MatrixStream } from "./streaming"; export { decodeMatrixChatThreadRef, encodeMatrixChatThreadRef, diff --git a/packages/chat-adapter/src/streaming/beeper/driver.ts b/packages/chat-adapter/src/streaming/beeper/driver.ts new file mode 100644 index 0000000..e017b7d --- /dev/null +++ b/packages/chat-adapter/src/streaming/beeper/driver.ts @@ -0,0 +1,136 @@ +import type { MatrixRawMessage } from "better-matrix-js"; +import type { RawMessage, StreamOptions } from "chat"; +import type { MatrixRawMessage as MatrixAdapterRawMessage } from "../../types"; +import { isStreamChunk, normalizeStreamPart, readString, streamChunkText, streamPart } from "../chunks"; +import type { MatrixStream, MatrixStreamDriver, MatrixStreamDriverOptions } from "../types"; +import { BEEPER_STREAM_EVENT_TYPE, buildStreamDelta, clearStreamContent, streamDescriptorType } from "./envelope"; + +export class BeeperStreamDriver implements MatrixStreamDriver { + #options: MatrixStreamDriverOptions; + + constructor(options: MatrixStreamDriverOptions) { + this.#options = options; + } + + async stream( + threadId: string, + textStream: MatrixStream, + options?: StreamOptions + ): Promise> { + const stream = await this.#options.core.createBeeperStream({ + roomId: this.#options.roomId, + streamType: BEEPER_STREAM_EVENT_TYPE, + }); + const target = await this.#options.postMessage(threadId, "...", { + "com.beeper.stream": stream.descriptor, + }); + const turnId = `turn_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 10)}`; + const textId = `text_${turnId}`; + let seq = 1; + let textStarted = false; + let streamStarted = false; + let streamFinished = false; + let accumulated = ""; + + for await (const chunk of textStream) { + const explicitType = typeof chunk === "string" ? undefined : readString(chunk, "type"); + if (!streamStarted && explicitType !== "start") { + await this.#sendPart( + turnId, + seq++, + { messageId: turnId, messageMetadata: { turn_id: turnId }, type: "start" }, + target.id, + stream.descriptor, + options + ); + streamStarted = true; + } + if (typeof chunk === "string") { + if (!textStarted) { + await this.#sendPart(turnId, seq++, { id: textId, type: "text-start" }, target.id, stream.descriptor, options); + textStarted = true; + } + accumulated += chunk; + await this.#sendPart( + turnId, + seq++, + { delta: chunk, id: textId, type: "text-delta" }, + target.id, + stream.descriptor, + options + ); + continue; + } + if (isStreamChunk(chunk) && chunk.type === "markdown_text") { + if (!textStarted) { + await this.#sendPart(turnId, seq++, { id: textId, type: "text-start" }, target.id, stream.descriptor, options); + textStarted = true; + } + accumulated += chunk.text; + await this.#sendPart( + turnId, + seq++, + { delta: chunk.text, id: textId, type: "text-delta" }, + target.id, + stream.descriptor, + options + ); + continue; + } + const part = normalizeStreamPart(streamPart(chunk), textId); + if (readString(part, "type") === "start") { + streamStarted = true; + } else if (readString(part, "type") === "finish") { + streamFinished = true; + } + accumulated += streamChunkText(part); + await this.#sendPart(turnId, seq++, part, target.id, stream.descriptor, options); + } + + if (textStarted) { + await this.#sendPart(turnId, seq++, { id: textId, type: "text-end" }, target.id, stream.descriptor, options); + } + if (!streamFinished) { + if (!streamStarted) { + await this.#sendPart( + turnId, + seq++, + { messageId: turnId, messageMetadata: { turn_id: turnId }, type: "start" }, + target.id, + stream.descriptor, + options + ); + } + await this.#sendPart( + turnId, + seq++, + { finishReason: "stop", messageMetadata: { finish_reason: "stop", turn_id: turnId }, type: "finish" }, + target.id, + stream.descriptor, + options + ); + } + return accumulated + ? this.#options.editMessage(threadId, target.id, accumulated, clearStreamContent()) + : target; + } + + async #sendPart( + turnId: string, + seq: number, + part: Record, + targetEventId: string, + descriptor: Record, + options?: StreamOptions + ): Promise { + const delta = buildStreamDelta(turnId, seq, part, targetEventId, options); + await this.#options.core.publishBeeperStream({ + content: { + [`${streamDescriptorType(descriptor)}.deltas`]: [delta], + }, + eventId: targetEventId, + roomId: this.#options.roomId, + }); + return { eventId: targetEventId, raw: {}, roomId: this.#options.roomId }; + } +} diff --git a/packages/chat-adapter/src/streaming/beeper/envelope.ts b/packages/chat-adapter/src/streaming/beeper/envelope.ts new file mode 100644 index 0000000..9a0e4df --- /dev/null +++ b/packages/chat-adapter/src/streaming/beeper/envelope.ts @@ -0,0 +1,33 @@ +import type { StreamOptions } from "chat"; + +export const BEEPER_STREAM_EVENT_TYPE = "com.beeper.llm"; + +export function buildStreamDelta( + turnId: string, + seq: number, + part: Record, + targetEventId: string, + options?: StreamOptions +): Record { + return { + "m.relates_to": { + event_id: targetEventId, + rel_type: "m.reference", + }, + ...(options?.recipientUserId ? { agent_id: options.recipientUserId } : {}), + part, + seq, + turn_id: turnId, + }; +} + +export function streamDescriptorType(descriptor: Record): string { + const type = descriptor.type; + return typeof type === "string" && type.trim() ? type : BEEPER_STREAM_EVENT_TYPE; +} + +export function clearStreamContent(): Record { + return { + "com.beeper.stream": null, + }; +} diff --git a/packages/chat-adapter/src/streaming/chunks.ts b/packages/chat-adapter/src/streaming/chunks.ts new file mode 100644 index 0000000..9807e31 --- /dev/null +++ b/packages/chat-adapter/src/streaming/chunks.ts @@ -0,0 +1,67 @@ +import type { StreamChunk } from "chat"; + +export function streamChunkText(chunk: string | StreamChunk | Record): string { + if (typeof chunk === "string") { + return chunk; + } + if (isStreamChunk(chunk) && chunk.type === "markdown_text") { + return chunk.text; + } + if (readString(chunk, "type") === "text-delta") { + return readString(chunk, "text") ?? readString(chunk, "delta") ?? readString(chunk, "textDelta") ?? ""; + } + return ""; +} + +export function streamPart(chunk: StreamChunk | Record): Record { + if (!isStreamChunk(chunk)) { + return chunk; + } + switch (chunk.type) { + case "markdown_text": + return { delta: chunk.text, type: "text-delta" }; + case "task_update": + return { + data: { + call_id: chunk.id, + output: chunk.output, + progress: chunk.output, + status: chunk.status, + tool_name: chunk.title, + }, + id: chunk.id, + transient: chunk.status === "pending" || chunk.status === "in_progress", + type: "data-tool-progress", + }; + case "plan_update": + return { + data: { title: chunk.title }, + transient: true, + type: "data-plan-update", + }; + } +} + +export function normalizeStreamPart(part: Record, defaultTextId: string): Record { + const type = readString(part, "type"); + if ( + (type === "text-start" || type === "text-delta" || type === "text-end") && + !readString(part, "id") + ) { + return { ...part, id: defaultTextId }; + } + return part; +} + +export function isStreamChunk(value: unknown): value is StreamChunk { + const type = readString(value, "type"); + return type === "markdown_text" || type === "task_update" || type === "plan_update"; +} + +export function readString(record: unknown, key: string): string | undefined { + if (!record || typeof record !== "object") { + return undefined; + } + const value = (record as Record)[key]; + return typeof value === "string" ? value : undefined; +} diff --git a/packages/chat-adapter/src/streaming/debounced-edit-driver.ts b/packages/chat-adapter/src/streaming/debounced-edit-driver.ts new file mode 100644 index 0000000..3959669 --- /dev/null +++ b/packages/chat-adapter/src/streaming/debounced-edit-driver.ts @@ -0,0 +1,51 @@ +import type { RawMessage, StreamOptions } from "chat"; +import type { MatrixRawMessage } from "../types"; +import { streamChunkText } from "./chunks"; +import type { MatrixStream, MatrixStreamDriver, MatrixStreamDriverOptions } from "./types"; + +export class DebouncedEditStreamDriver implements MatrixStreamDriver { + #options: MatrixStreamDriverOptions; + + constructor(options: MatrixStreamDriverOptions) { + this.#options = options; + } + + async stream( + threadId: string, + textStream: MatrixStream, + options?: StreamOptions + ): Promise> { + const intervalMs = options?.updateIntervalMs ?? 500; + let message: RawMessage | null = null; + let accumulated = ""; + let lastFlushed = ""; + let lastFlushAt = 0; + + for await (const chunk of textStream) { + const text = streamChunkText(chunk); + if (!text) { + continue; + } + accumulated += text; + if (!message) { + message = await this.#options.postMessage(threadId, accumulated); + lastFlushed = accumulated; + lastFlushAt = Date.now(); + continue; + } + if (Date.now() - lastFlushAt >= intervalMs && accumulated !== lastFlushed) { + await this.#options.editMessage(threadId, message.id, accumulated); + lastFlushed = accumulated; + lastFlushAt = Date.now(); + } + } + + if (!message) { + return this.#options.postMessage(threadId, "..."); + } + if (accumulated !== lastFlushed) { + return this.#options.editMessage(threadId, message.id, accumulated); + } + return message; + } +} diff --git a/packages/chat-adapter/src/streaming/homeserver.ts b/packages/chat-adapter/src/streaming/homeserver.ts new file mode 100644 index 0000000..6e3ff11 --- /dev/null +++ b/packages/chat-adapter/src/streaming/homeserver.ts @@ -0,0 +1,15 @@ +const BEEPER_DOMAINS = new Set([ + "beeper.com", + "beeper-staging.com", + "beeper-dev.com", + "beeper.localtest.me", +]); + +export function isBeeperHomeserver(homeserverUrl: string): boolean { + try { + const hostname = new URL(homeserverUrl).hostname; + return BEEPER_DOMAINS.has(hostname) || [...BEEPER_DOMAINS].some((domain) => hostname.endsWith(`.${domain}`)); + } catch { + return false; + } +} diff --git a/packages/chat-adapter/src/streaming/index.ts b/packages/chat-adapter/src/streaming/index.ts new file mode 100644 index 0000000..d5a6d5f --- /dev/null +++ b/packages/chat-adapter/src/streaming/index.ts @@ -0,0 +1,14 @@ +import { DebouncedEditStreamDriver } from "./debounced-edit-driver"; +import { isBeeperHomeserver } from "./homeserver"; +import type { MatrixStreamDriver, MatrixStreamDriverOptions } from "./types"; + +export type { MatrixStream, MatrixStreamDriver, MatrixStreamDriverOptions } from "./types"; +export { isBeeperHomeserver } from "./homeserver"; + +export async function createMatrixStreamDriver(options: MatrixStreamDriverOptions): Promise { + if (!isBeeperHomeserver(options.homeserverUrl)) { + return new DebouncedEditStreamDriver(options); + } + const { BeeperStreamDriver } = await import("./beeper/driver"); + return new BeeperStreamDriver(options); +} diff --git a/packages/chat-adapter/src/streaming/types.ts b/packages/chat-adapter/src/streaming/types.ts new file mode 100644 index 0000000..05ef865 --- /dev/null +++ b/packages/chat-adapter/src/streaming/types.ts @@ -0,0 +1,30 @@ +import type { MatrixCore } from "better-matrix-js"; +import type { RawMessage, StreamChunk, StreamOptions } from "chat"; +import type { MatrixRawMessage } from "../types"; + +export type MatrixStream = AsyncIterable>; + +export interface MatrixStreamDriver { + stream( + threadId: string, + textStream: MatrixStream, + options?: StreamOptions + ): Promise>; +} + +export interface MatrixStreamDriverOptions { + core: MatrixCore; + editMessage( + threadId: string, + messageId: string, + message: string, + content?: Record + ): Promise>; + homeserverUrl: string; + postMessage( + threadId: string, + message: string, + content?: Record + ): Promise>; + roomId: string; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7b5116c..01c7089 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -25,7 +25,25 @@ importers: version: 5.9.3 vitest: specifier: ^4.0.18 - version: 4.1.5(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) + version: 4.1.5(@opentelemetry/api@1.9.0)(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) + + packages/ai-sdk: + devDependencies: + '@better-matrix-js/chat-adapter': + specifier: workspace:* + version: link:../chat-adapter + '@types/node': + specifier: ^25.3.2 + version: 25.6.0 + tsup: + specifier: ^8.3.5 + version: 8.5.1(postcss@8.5.10)(typescript@5.9.3) + typescript: + specifier: ^5.7.2 + version: 5.9.3 + vitest: + specifier: ^4.0.18 + version: 4.1.5(@opentelemetry/api@1.9.0)(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) packages/chat-adapter: dependencies: @@ -53,7 +71,7 @@ importers: version: 5.9.3 vitest: specifier: ^4.0.18 - version: 4.1.5(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) + version: 4.1.5(@opentelemetry/api@1.9.0)(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) packages/cloudflare: devDependencies: @@ -68,7 +86,7 @@ importers: version: 5.9.3 vitest: specifier: ^4.0.18 - version: 4.1.5(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) + version: 4.1.5(@opentelemetry/api@1.9.0)(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) packages/core: dependencies: @@ -87,7 +105,7 @@ importers: version: 5.9.3 vitest: specifier: ^4.0.18 - version: 4.1.5(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) + version: 4.1.5(@opentelemetry/api@1.9.0)(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) packages: @@ -296,6 +314,10 @@ packages: '@emnapi/core': ^1.7.1 '@emnapi/runtime': ^1.7.1 + '@opentelemetry/api@1.9.0': + resolution: {integrity: sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==} + engines: {node: '>=8.0.0'} + '@oxc-project/types@0.127.0': resolution: {integrity: sha512-aIYXQBo4lCbO4z0R3FHeucQHpF46l2LbMdxRvqvuRuW2OxdnSkcng5B8+K12spgLDj93rtN3+J2Vac/TIO+ciQ==} @@ -1428,6 +1450,9 @@ snapshots: '@tybys/wasm-util': 0.10.1 optional: true + '@opentelemetry/api@1.9.0': + optional: true + '@oxc-project/types@0.127.0': {} '@rolldown/binding-android-arm64@1.0.0-rc.17': @@ -1600,7 +1625,7 @@ snapshots: obug: 2.1.1 std-env: 4.1.0 tinyrainbow: 3.1.0 - vitest: 4.1.5(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) + vitest: 4.1.5(@opentelemetry/api@1.9.0)(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) '@vitest/expect@4.1.5': dependencies: @@ -2472,7 +2497,7 @@ snapshots: esbuild: 0.27.7 fsevents: 2.3.3 - vitest@4.1.5(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)): + vitest@4.1.5(@opentelemetry/api@1.9.0)(@types/node@25.6.0)(@vitest/coverage-v8@4.1.5)(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)): dependencies: '@vitest/expect': 4.1.5 '@vitest/mocker': 4.1.5(vite@8.0.10(@types/node@25.6.0)(esbuild@0.27.7)) @@ -2495,6 +2520,7 @@ snapshots: vite: 8.0.10(@types/node@25.6.0)(esbuild@0.27.7) why-is-node-running: 2.3.0 optionalDependencies: + '@opentelemetry/api': 1.9.0 '@types/node': 25.6.0 '@vitest/coverage-v8': 4.1.5(vitest@4.1.5) transitivePeerDependencies: diff --git a/tsconfig.base.json b/tsconfig.base.json index 2e038c9..f0d4845 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -13,6 +13,8 @@ "baseUrl": ".", "paths": { "better-matrix-js": ["packages/core/src/index.ts"], + "@better-matrix-js/ai-sdk": ["packages/ai-sdk/src/index.ts"], + "@better-matrix-js/chat-adapter": ["packages/chat-adapter/src/index.ts"], "@better-matrix-js/cloudflare": ["packages/cloudflare/src/index.ts"], "better-matrix-js/cloudflare": ["packages/core/src/cloudflare.ts"], "better-matrix-js/node": ["packages/core/src/node.ts"]