From afda3b0bb93d0f68f40642a5cec58b4580cf7d05 Mon Sep 17 00:00:00 2001 From: Dillon Mulroy Date: Mon, 8 Jun 2026 23:41:49 -0700 Subject: [PATCH] Batch OpenAPI plugin storage writes --- .changeset/bulk-openapi-plugin-storage.md | 6 + packages/core/sdk/src/executor.ts | 109 +++++++++++++++++++ packages/core/sdk/src/plugin-storage.test.ts | 63 +++++++++++ packages/core/sdk/src/plugin-storage.ts | 16 +++ packages/plugins/openapi/src/sdk/store.ts | 21 ++-- 5 files changed, 203 insertions(+), 12 deletions(-) create mode 100644 .changeset/bulk-openapi-plugin-storage.md diff --git a/.changeset/bulk-openapi-plugin-storage.md b/.changeset/bulk-openapi-plugin-storage.md new file mode 100644 index 000000000..d05cf721c --- /dev/null +++ b/.changeset/bulk-openapi-plugin-storage.md @@ -0,0 +1,6 @@ +--- +"@executor-js/sdk": patch +"@executor-js/plugin-openapi": patch +--- + +Batch OpenAPI operation metadata writes through plugin storage so adding large built-in OpenAPI sources no longer performs thousands of sequential D1 operations. diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 800649841..9bd222c80 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -136,6 +136,7 @@ import { } from "./oauth-helpers"; import { connectionIdentifier } from "./connection-name-identifier"; +const PLUGIN_STORAGE_DELETE_KEY_BATCH_SIZE = 90; const MAX_APPROVAL_ARGUMENT_PREVIEW_CHARS = 4_000; // --------------------------------------------------------------------------- @@ -860,6 +861,112 @@ const makePluginStorageFacade = (input: { }); }); + const keysByCollection = ( + entries: readonly { readonly collection: string; readonly key: string }[], + ) => { + const grouped = new Map>(); + for (const entry of entries) { + const keys = grouped.get(entry.collection); + if (keys) { + keys.add(entry.key); + } else { + grouped.set(entry.collection, new Set([entry.key])); + } + } + return grouped; + }; + + const deleteManyImpl = ( + owner: Owner, + subject: string, + entries: readonly { readonly collection: string; readonly key: string }[], + ) => + Effect.gen(function* () { + for (const [collection, keys] of keysByCollection(entries)) { + const uniqueKeys = [...keys]; + for ( + let offset = 0; + offset < uniqueKeys.length; + offset += PLUGIN_STORAGE_DELETE_KEY_BATCH_SIZE + ) { + const batchKeys = uniqueKeys.slice(offset, offset + PLUGIN_STORAGE_DELETE_KEY_BATCH_SIZE); + yield* input.core.deleteMany("plugin_storage", { + where: (b) => + b.and( + b("plugin_id", "=", input.pluginId), + b("collection", "=", collection), + b("key", "in", batchKeys), + b("owner", "=", owner), + b("subject", "=", subject), + ), + }); + } + } + }); + + const putManyImpl = ( + owner: Owner, + entries: readonly { + readonly collection: string; + readonly key: string; + readonly data: unknown; + }[], + ) => + Effect.gen(function* () { + const os = ownerSubject(owner); + if (!os) { + return yield* new StorageError({ + message: `Cannot write plugin storage for owner "user": executor has no subject.`, + cause: undefined, + }); + } + const entriesById = new Map( + entries.map((entry) => [ + pluginStorageId({ + pluginId: input.pluginId, + collection: entry.collection, + key: entry.key, + }), + entry, + ]), + ); + const uniqueEntries = [...entriesById.values()]; + if (uniqueEntries.length === 0) return; + + yield* deleteManyImpl(owner, os.subject, uniqueEntries); + + const now = new Date(); + yield* input.core.createMany( + "plugin_storage", + uniqueEntries.map((entry) => ({ + tenant, + owner: os.owner, + subject: os.subject, + plugin_id: input.pluginId, + collection: entry.collection, + key: entry.key, + data: entry.data, + created_at: now, + updated_at: now, + })), + ); + }); + + const removeManyImpl = ( + owner: Owner, + entries: readonly { readonly collection: string; readonly key: string }[], + ) => + Effect.gen(function* () { + const os = ownerSubject(owner); + if (!os) { + return yield* new StorageError({ + message: `Cannot delete plugin storage for owner "user": executor has no subject.`, + cause: undefined, + }); + } + yield* deleteManyImpl(owner, os.subject, entries); + }); + const queryCollection = ( definition: TDefinition, queryInput?: PluginStorageCollectionQueryInput, @@ -962,8 +1069,10 @@ const makePluginStorageFacade = (input: { }), put: (storageInput) => putImpl(storageInput.owner, storageInput.collection, storageInput.key, storageInput.data), + putMany: (storageInput) => putManyImpl(storageInput.owner, storageInput.entries), remove: (storageInput) => removeImpl(storageInput.owner, storageInput.collection, storageInput.key), + removeMany: (storageInput) => removeManyImpl(storageInput.owner, storageInput.entries), }; }; diff --git a/packages/core/sdk/src/plugin-storage.test.ts b/packages/core/sdk/src/plugin-storage.test.ts index a9fbfd4e6..c8d8cb972 100644 --- a/packages/core/sdk/src/plugin-storage.test.ts +++ b/packages/core/sdk/src/plugin-storage.test.ts @@ -61,6 +61,23 @@ const executionHistoryPlugin = definePlugin(() => ({ extension: (ctx) => ({ record: (owner: Owner, key: string, data: ToolCall) => ctx.storage.toolCalls.put({ owner, key, data }), + recordMany: ( + owner: Owner, + rows: readonly { readonly key: string; readonly data: ToolCall }[], + ) => + ctx.pluginStorage.putMany({ + owner, + entries: rows.map((row) => ({ + collection: toolCalls.name, + key: row.key, + data: row.data, + })), + }), + removeMany: (owner: Owner, keys: readonly string[]) => + ctx.pluginStorage.removeMany({ + owner, + entries: keys.map((key) => ({ collection: toolCalls.name, key })), + }), get: (key: string) => ctx.storage.toolCalls.get({ key }), getForOwner: (owner: Owner, key: string) => ctx.storage.toolCalls.getForOwner({ owner, key }), query: (input?: PluginStorageCollectionQueryInput) => @@ -159,6 +176,52 @@ describe("plugin storage collections", () => { }), ); + it.effect("bulk puts and removes plugin storage rows", () => + Effect.gen(function* () { + const executor = yield* makeTestExecutor({ + backend: "sqlite", + plugins: [executionHistoryPlugin] as const, + }); + const rows = Array.from({ length: 95 }, (_, index) => ({ + key: `call-${String(index).padStart(3, "0")}`, + data: call({ + runId: "run-bulk", + toolId: index % 2 === 0 ? "browser" : "shell", + status: "ok", + startedAt: new Date(Date.UTC(2026, 4, 29, 12, index)).toISOString(), + }), + })); + + yield* executor.executionHistory.recordMany("org", rows); + yield* executor.executionHistory.recordMany("org", [ + { + key: "call-000", + data: call({ + runId: "run-bulk", + toolId: "browser", + status: "failed", + startedAt: "2026-05-29T12:00:00.000Z", + }), + }, + ]); + + const stored = yield* executor.executionHistory.query({ + where: { runId: "run-bulk" }, + orderBy: [{ field: "startedAt" }], + }); + expect(stored).toHaveLength(95); + expect(stored[0]?.key).toBe("call-000"); + expect(stored[0]?.data.status).toBe("failed"); + + yield* executor.executionHistory.removeMany( + "org", + rows.map((row) => row.key), + ); + const remaining = yield* executor.executionHistory.query({ where: { runId: "run-bulk" } }); + expect(remaining).toEqual([]); + }), + ); + it.effect("user rows shadow org rows on read; both share one plugin_storage table", () => Effect.gen(function* () { // One executor bound to a subject sees both org and user owner rows; a diff --git a/packages/core/sdk/src/plugin-storage.ts b/packages/core/sdk/src/plugin-storage.ts index 879074106..e854feb51 100644 --- a/packages/core/sdk/src/plugin-storage.ts +++ b/packages/core/sdk/src/plugin-storage.ts @@ -113,6 +113,20 @@ export interface PluginStoragePutInput extends PluginStorageScopedKeyInput { readonly data: unknown; } +export interface PluginStoragePutManyEntry extends PluginStorageKeyInput { + readonly data: unknown; +} + +export interface PluginStoragePutManyInput { + readonly owner: Owner; + readonly entries: readonly PluginStoragePutManyEntry[]; +} + +export interface PluginStorageRemoveManyInput { + readonly owner: Owner; + readonly entries: readonly PluginStorageKeyInput[]; +} + export interface PluginStorageCollectionKeyInput { readonly key: string; } @@ -204,7 +218,9 @@ export interface PluginStorageFacade { readonly put: ( input: PluginStoragePutInput, ) => Effect.Effect, StorageFailure>; + readonly putMany: (input: PluginStoragePutManyInput) => Effect.Effect; readonly remove: (input: PluginStorageScopedKeyInput) => Effect.Effect; + readonly removeMany: (input: PluginStorageRemoveManyInput) => Effect.Effect; } export const pluginStorageId = (input: { diff --git a/packages/plugins/openapi/src/sdk/store.ts b/packages/plugins/openapi/src/sdk/store.ts index 224acc003..e2efe89bc 100644 --- a/packages/plugins/openapi/src/sdk/store.ts +++ b/packages/plugins/openapi/src/sdk/store.ts @@ -102,27 +102,24 @@ export const makeDefaultOpenapiStore = ({ pluginStorage }: StorageDeps): Openapi const removeOperations = (integration: string) => Effect.gen(function* () { const rows = yield* listRows(integration); - for (const row of rows) { - yield* pluginStorage.remove({ - owner: STORE_OWNER, - collection: OPERATION_COLLECTION, - key: row.key, - }); - } + yield* pluginStorage.removeMany({ + owner: STORE_OWNER, + entries: rows.map((row) => ({ collection: OPERATION_COLLECTION, key: row.key })), + }); }); return { putOperations: (integration, operations) => Effect.gen(function* () { yield* removeOperations(integration); - for (const operation of operations) { - yield* pluginStorage.put({ - owner: STORE_OWNER, + yield* pluginStorage.putMany({ + owner: STORE_OWNER, + entries: operations.map((operation) => ({ collection: OPERATION_COLLECTION, key: operationKey(integration, operation.toolName), data: operationData(operation), - }); - } + })), + }); }), getOperation: (integration, toolName) =>