Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/bulk-openapi-plugin-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@executor-js/sdk": patch
"@executor-js/plugin-openapi": patch
---

Batch OpenAPI operation metadata writes through plugin storage so adding large built-in OpenAPI sources no longer performs thousands of sequential D1 operations.
109 changes: 109 additions & 0 deletions packages/core/sdk/src/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ import {
} from "./oauth-helpers";
import { connectionIdentifier } from "./connection-name-identifier";

const PLUGIN_STORAGE_DELETE_KEY_BATCH_SIZE = 90;
const MAX_APPROVAL_ARGUMENT_PREVIEW_CHARS = 4_000;

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -860,6 +861,112 @@ const makePluginStorageFacade = (input: {
});
});

const keysByCollection = (
entries: readonly { readonly collection: string; readonly key: string }[],
) => {
const grouped = new Map<string, Set<string>>();
for (const entry of entries) {
const keys = grouped.get(entry.collection);
if (keys) {
keys.add(entry.key);
} else {
grouped.set(entry.collection, new Set([entry.key]));
}
}
return grouped;
};

const deleteManyImpl = (
owner: Owner,
subject: string,
entries: readonly { readonly collection: string; readonly key: string }[],
) =>
Effect.gen(function* () {
for (const [collection, keys] of keysByCollection(entries)) {
const uniqueKeys = [...keys];
for (
let offset = 0;
offset < uniqueKeys.length;
offset += PLUGIN_STORAGE_DELETE_KEY_BATCH_SIZE
) {
const batchKeys = uniqueKeys.slice(offset, offset + PLUGIN_STORAGE_DELETE_KEY_BATCH_SIZE);
yield* input.core.deleteMany("plugin_storage", {
where: (b) =>
b.and(
b("plugin_id", "=", input.pluginId),
b("collection", "=", collection),
b("key", "in", batchKeys),
b("owner", "=", owner),
b("subject", "=", subject),
),
});
}
}
});

const putManyImpl = (
owner: Owner,
entries: readonly {
readonly collection: string;
readonly key: string;
readonly data: unknown;
}[],
) =>
Effect.gen(function* () {
const os = ownerSubject(owner);
if (!os) {
return yield* new StorageError({
message: `Cannot write plugin storage for owner "user": executor has no subject.`,
cause: undefined,
});
}
const entriesById = new Map(
entries.map((entry) => [
pluginStorageId({
pluginId: input.pluginId,
collection: entry.collection,
key: entry.key,
}),
entry,
]),
);
const uniqueEntries = [...entriesById.values()];
if (uniqueEntries.length === 0) return;

yield* deleteManyImpl(owner, os.subject, uniqueEntries);

const now = new Date();
yield* input.core.createMany(
"plugin_storage",
uniqueEntries.map((entry) => ({
tenant,
owner: os.owner,
subject: os.subject,
plugin_id: input.pluginId,
collection: entry.collection,
key: entry.key,
data: entry.data,
created_at: now,
updated_at: now,
})),
);
});

const removeManyImpl = (
owner: Owner,
entries: readonly { readonly collection: string; readonly key: string }[],
) =>
Effect.gen(function* () {
const os = ownerSubject(owner);
if (!os) {
return yield* new StorageError({
message: `Cannot delete plugin storage for owner "user": executor has no subject.`,
cause: undefined,
});
}
yield* deleteManyImpl(owner, os.subject, entries);
});

const queryCollection = <TDefinition extends PluginStorageCollectionDefinition>(
definition: TDefinition,
queryInput?: PluginStorageCollectionQueryInput<TDefinition>,
Expand Down Expand Up @@ -962,8 +1069,10 @@ const makePluginStorageFacade = (input: {
}),
put: (storageInput) =>
putImpl(storageInput.owner, storageInput.collection, storageInput.key, storageInput.data),
putMany: (storageInput) => putManyImpl(storageInput.owner, storageInput.entries),
remove: (storageInput) =>
removeImpl(storageInput.owner, storageInput.collection, storageInput.key),
removeMany: (storageInput) => removeManyImpl(storageInput.owner, storageInput.entries),
};
};

Expand Down
63 changes: 63 additions & 0 deletions packages/core/sdk/src/plugin-storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,23 @@ const executionHistoryPlugin = definePlugin(() => ({
extension: (ctx) => ({
record: (owner: Owner, key: string, data: ToolCall) =>
ctx.storage.toolCalls.put({ owner, key, data }),
recordMany: (
owner: Owner,
rows: readonly { readonly key: string; readonly data: ToolCall }[],
) =>
ctx.pluginStorage.putMany({
owner,
entries: rows.map((row) => ({
collection: toolCalls.name,
key: row.key,
data: row.data,
})),
}),
removeMany: (owner: Owner, keys: readonly string[]) =>
ctx.pluginStorage.removeMany({
owner,
entries: keys.map((key) => ({ collection: toolCalls.name, key })),
}),
get: (key: string) => ctx.storage.toolCalls.get({ key }),
getForOwner: (owner: Owner, key: string) => ctx.storage.toolCalls.getForOwner({ owner, key }),
query: (input?: PluginStorageCollectionQueryInput<typeof toolCalls>) =>
Expand Down Expand Up @@ -159,6 +176,52 @@ describe("plugin storage collections", () => {
}),
);

it.effect("bulk puts and removes plugin storage rows", () =>
Effect.gen(function* () {
const executor = yield* makeTestExecutor({
backend: "sqlite",
plugins: [executionHistoryPlugin] as const,
});
const rows = Array.from({ length: 95 }, (_, index) => ({
key: `call-${String(index).padStart(3, "0")}`,
data: call({
runId: "run-bulk",
toolId: index % 2 === 0 ? "browser" : "shell",
status: "ok",
startedAt: new Date(Date.UTC(2026, 4, 29, 12, index)).toISOString(),
}),
}));

yield* executor.executionHistory.recordMany("org", rows);
yield* executor.executionHistory.recordMany("org", [
{
key: "call-000",
data: call({
runId: "run-bulk",
toolId: "browser",
status: "failed",
startedAt: "2026-05-29T12:00:00.000Z",
}),
},
]);

const stored = yield* executor.executionHistory.query({
where: { runId: "run-bulk" },
orderBy: [{ field: "startedAt" }],
});
expect(stored).toHaveLength(95);
expect(stored[0]?.key).toBe("call-000");
expect(stored[0]?.data.status).toBe("failed");

yield* executor.executionHistory.removeMany(
"org",
rows.map((row) => row.key),
);
const remaining = yield* executor.executionHistory.query({ where: { runId: "run-bulk" } });
expect(remaining).toEqual([]);
}),
);

it.effect("user rows shadow org rows on read; both share one plugin_storage table", () =>
Effect.gen(function* () {
// One executor bound to a subject sees both org and user owner rows; a
Expand Down
16 changes: 16 additions & 0 deletions packages/core/sdk/src/plugin-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ export interface PluginStoragePutInput extends PluginStorageScopedKeyInput {
readonly data: unknown;
}

export interface PluginStoragePutManyEntry extends PluginStorageKeyInput {
readonly data: unknown;
}

export interface PluginStoragePutManyInput {
readonly owner: Owner;
readonly entries: readonly PluginStoragePutManyEntry[];
}

export interface PluginStorageRemoveManyInput {
readonly owner: Owner;
readonly entries: readonly PluginStorageKeyInput[];
}

export interface PluginStorageCollectionKeyInput {
readonly key: string;
}
Expand Down Expand Up @@ -204,7 +218,9 @@ export interface PluginStorageFacade {
readonly put: <T = unknown>(
input: PluginStoragePutInput,
) => Effect.Effect<PluginStorageEntry<T>, StorageFailure>;
readonly putMany: (input: PluginStoragePutManyInput) => Effect.Effect<void, StorageFailure>;
readonly remove: (input: PluginStorageScopedKeyInput) => Effect.Effect<void, StorageFailure>;
readonly removeMany: (input: PluginStorageRemoveManyInput) => Effect.Effect<void, StorageFailure>;
}

export const pluginStorageId = (input: {
Expand Down
21 changes: 9 additions & 12 deletions packages/plugins/openapi/src/sdk/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,24 @@ export const makeDefaultOpenapiStore = ({ pluginStorage }: StorageDeps): Openapi
const removeOperations = (integration: string) =>
Effect.gen(function* () {
const rows = yield* listRows(integration);
for (const row of rows) {
yield* pluginStorage.remove({
owner: STORE_OWNER,
collection: OPERATION_COLLECTION,
key: row.key,
});
}
yield* pluginStorage.removeMany({
owner: STORE_OWNER,
entries: rows.map((row) => ({ collection: OPERATION_COLLECTION, key: row.key })),
});
});

return {
putOperations: (integration, operations) =>
Effect.gen(function* () {
yield* removeOperations(integration);
for (const operation of operations) {
yield* pluginStorage.put({
owner: STORE_OWNER,
yield* pluginStorage.putMany({
owner: STORE_OWNER,
entries: operations.map((operation) => ({
collection: OPERATION_COLLECTION,
key: operationKey(integration, operation.toolName),
data: operationData(operation),
});
}
})),
});
}),

getOperation: (integration, toolName) =>
Expand Down