diff --git a/.changeset/lazy-step-loaders.md b/.changeset/lazy-step-loaders.md new file mode 100644 index 0000000000..66f17760e7 --- /dev/null +++ b/.changeset/lazy-step-loaders.md @@ -0,0 +1,7 @@ +--- +'@workflow/core': patch +'@workflow/builders': patch +'workflow': patch +--- + +Defer step module loading until step execution so module load failures are recorded as step failures instead of route 500s. diff --git a/packages/builders/src/base-builder.ts b/packages/builders/src/base-builder.ts index c0e1278dab..b2fb8ec966 100644 --- a/packages/builders/src/base-builder.ts +++ b/packages/builders/src/base-builder.ts @@ -1586,6 +1586,23 @@ export const POST = workflowEntrypoint(workflowCode${workflowEntrypointOptionsCo } } + private createStepLoaderRegistrationsCode( + manifest: WorkflowManifest, + loaderSpecifier: string + ): string { + const registrations: string[] = []; + + for (const entries of Object.values(manifest.steps ?? {})) { + for (const { stepId } of Object.values(entries)) { + registrations.push( + `registerStepFunctionLoader(${JSON.stringify(stepId)}, () => import(${JSON.stringify(loaderSpecifier)}));` + ); + } + } + + return registrations.join('\n'); + } + /** * V2: Creates a combined bundle that includes both step registrations and * workflow orchestration in a single route. The combined entrypoint executes @@ -1687,6 +1704,10 @@ export const POST = workflowEntrypoint(workflowCode${workflowEntrypointOptionsCo // 3. Generate combined route file const stepsRelativePath = `./${basename(stepsOutfile).replace(/\\/g, '/')}`; + const stepLoaderRegistrationsCode = this.createStepLoaderRegistrationsCode( + stepsManifest, + stepsRelativePath + ); const escapedVMCode = workflowVMCode.replace(/[\\`$]/g, '\\$&'); const workflowEntrypointOptionsCode = createWorkflowEntrypointOptionsCode({ routeModuleBodyStartedAt: 'workflowRouteModuleBodyStartedAt', @@ -1694,13 +1715,11 @@ export const POST = workflowEntrypoint(workflowCode${workflowEntrypointOptionsCo const combinedFunctionCode = `// biome-ignore-all lint: generated file /* eslint-disable */ -import { __steps_registered } from '${stepsRelativePath}'; -import { workflowEntrypoint } from 'workflow/runtime'; +import { registerStepFunctionLoader, workflowEntrypoint } from 'workflow/runtime'; const workflowRouteModuleBodyStartedAt = Date.now(); -// Prevent rollup from tree-shaking the steps side-effect import -void __steps_registered; +${stepLoaderRegistrationsCode} const workflowCode = \`${escapedVMCode}\`; @@ -1769,12 +1788,11 @@ export const POST = workflowEntrypoint(workflowCode${workflowEntrypointOptionsCo ); const code = `// biome-ignore-all lint: generated file /* eslint-disable */ -import { __steps_registered } from '${stepsRelativePath}'; -import { workflowEntrypoint } from 'workflow/runtime'; +import { registerStepFunctionLoader, workflowEntrypoint } from 'workflow/runtime'; const workflowRouteModuleBodyStartedAt = Date.now(); -void __steps_registered; +${stepLoaderRegistrationsCode} const workflowCode = \`${escaped}\`; diff --git a/packages/builders/src/step-source-registration.test.ts b/packages/builders/src/step-source-registration.test.ts index 694f6357ea..366aecc323 100644 --- a/packages/builders/src/step-source-registration.test.ts +++ b/packages/builders/src/step-source-registration.test.ts @@ -8,6 +8,7 @@ import { } from 'node:fs'; import { tmpdir } from 'node:os'; import { dirname, join } from 'node:path'; +import { pathToFileURL } from 'node:url'; import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { BaseBuilder, type DiscoveredEntries } from './base-builder.js'; import type { StandaloneConfig } from './types.js'; @@ -31,6 +32,22 @@ class TestBuilder extends BaseBuilder { discoveredEntries, }); } + + public createCombinedRoute( + inputFiles: string[], + stepsOutfile: string, + flowOutfile: string + ) { + return this.createCombinedBundle({ + inputFiles, + stepsOutfile, + flowOutfile, + bundleFinalOutput: false, + externalizeNonSteps: true, + bundleTransitiveLocalStepDependencies: false, + sourceStepRegistrationImports: true, + }); + } } const realTmpdir = realpathSync(tmpdir()); @@ -59,7 +76,28 @@ describe('step source registration', () => { testRoot = mkdtempSync(join(realTmpdir, 'workflow-step-registration-')); writeFile( join(testRoot, 'node_modules', 'workflow', 'package.json'), - JSON.stringify({ name: 'workflow', version: '1.0.0' }) + JSON.stringify({ + name: 'workflow', + version: '1.0.0', + type: 'module', + exports: { + './runtime': './runtime.js', + './internal/builtins': './internal/builtins.js', + }, + }) + ); + writeFile( + join(testRoot, 'node_modules', 'workflow', 'runtime.js'), + `globalThis.__capturedStepLoaders = []; +export function registerStepFunctionLoader(stepId, loader) { + globalThis.__capturedStepLoaders.push({ stepId, loader }); +} +export function workflowEntrypoint() { + return function POST() { + return new Response('ok'); + }; +} +` ); writeFile( join(testRoot, 'node_modules', 'workflow', 'internal', 'builtins.js'), @@ -117,4 +155,121 @@ describe('step source registration', () => { expect(generated).toContain('import "../src/serde.ts";'); expect(Object.keys(manifest.classes ?? {})).toContain('src/serde.ts'); }); + + it('registers lazy step loaders in combined routes', async () => { + const workflowFile = join(testRoot, 'workflows', 'image.ts'); + const stepsOutfile = join(testRoot, '.workflow', '__step_registrations.js'); + const flowOutfile = join(testRoot, '.workflow', 'route.js'); + + mkdirSync(dirname(flowOutfile), { recursive: true }); + writeFile( + workflowFile, + `export async function imageWorkflow() { + 'use workflow'; + return resize(); +} + +export async function resize() { + 'use step'; + return 1; +} +` + ); + + const { stepsManifest } = await createBuilder(testRoot).createCombinedRoute( + [workflowFile], + stepsOutfile, + flowOutfile + ); + const routeCode = readFileSync(flowOutfile, 'utf-8'); + const stepIds = Object.values(stepsManifest.steps ?? {}).flatMap( + (entries) => Object.values(entries).map(({ stepId }) => stepId) + ); + + expect(stepIds).toHaveLength(1); + expect(routeCode).toContain( + "import { registerStepFunctionLoader, workflowEntrypoint } from 'workflow/runtime';" + ); + expect(routeCode).toContain( + `registerStepFunctionLoader(${JSON.stringify(stepIds[0])}, () => import("./__step_registrations.js"));` + ); + expect(routeCode).not.toContain('import { __steps_registered }'); + expect(routeCode).not.toContain('void __steps_registered'); + }); + + it('defers step registration module load failures until a step loader runs', async () => { + const workflowFile = join(testRoot, 'workflows', 'image.ts'); + const stepsOutfile = join(testRoot, '.workflow', '__step_registrations.js'); + const legacyStepsOutfile = join( + testRoot, + '.workflow', + '__legacy_step_registrations.js' + ); + const legacyRouteOutfile = join(testRoot, '.workflow', 'legacy-route.js'); + const flowOutfile = join(testRoot, '.workflow', 'route.js'); + const sharpLoadError = + 'Could not load the "sharp" module using the linux-x64 runtime'; + + mkdirSync(dirname(flowOutfile), { recursive: true }); + writeFile( + workflowFile, + `export async function imageWorkflow() { + 'use workflow'; + return resize(); +} + +export async function resize() { + 'use step'; + return 1; +} +` + ); + writeFile( + legacyStepsOutfile, + `throw new Error(${JSON.stringify(sharpLoadError)}); +export const __steps_registered = true; +` + ); + writeFile( + legacyRouteOutfile, + `import { __steps_registered } from './__legacy_step_registrations.js'; +void __steps_registered; +export function POST() {} +` + ); + + await expect( + import(`${pathToFileURL(legacyRouteOutfile).href}?legacy=${Date.now()}`) + ).rejects.toThrow(sharpLoadError); + + const { stepsManifest } = await createBuilder(testRoot).createCombinedRoute( + [workflowFile], + stepsOutfile, + flowOutfile + ); + const stepIds = Object.values(stepsManifest.steps ?? {}).flatMap( + (entries) => Object.values(entries).map(({ stepId }) => stepId) + ); + writeFile( + stepsOutfile, + `throw new Error(${JSON.stringify(sharpLoadError)}); +export const __steps_registered = true; +` + ); + + await expect( + import(`${pathToFileURL(flowOutfile).href}?fixed=${Date.now()}`) + ).resolves.toHaveProperty('POST'); + + const loaders = ( + globalThis as typeof globalThis & { + __capturedStepLoaders?: Array<{ + stepId: string; + loader: () => Promise; + }>; + } + ).__capturedStepLoaders; + expect(loaders).toEqual([expect.objectContaining({ stepId: stepIds[0] })]); + await expect(loaders?.[0]?.loader()).rejects.toThrow(sharpLoadError); + }); }); diff --git a/packages/core/e2e/module-load-failures.test.ts b/packages/core/e2e/module-load-failures.test.ts new file mode 100644 index 0000000000..e0b0362387 --- /dev/null +++ b/packages/core/e2e/module-load-failures.test.ts @@ -0,0 +1,303 @@ +import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { dirname, join } from 'node:path'; +import { pathToFileURL } from 'node:url'; +import { WorkflowRunFailedError } from '@workflow/errors'; +import { getQueueTopicPrefix } from '@workflow/world'; +import { afterEach, assert, describe, expect, test } from 'vitest'; +import type { WorkflowManifest } from '../../builders/src/apply-swc-transform'; +import { BaseBuilder } from '../../builders/src/base-builder'; +import type { StandaloneConfig } from '../../builders/src/types'; +import { createLocalWorld, type LocalWorld } from '../../world-local/src'; +import { + registerStepFunctionLoader, + setWorld, + start, + workflowEntrypoint, +} from '../src/runtime'; + +const sharpLoadError = + 'Could not load the "sharp" module using the linux-x64 runtime'; + +class E2EBuilder extends BaseBuilder { + async build(): Promise { + // The tests call the protected bundle helper directly. + } + + createCombinedRoute({ + inputFiles, + stepsOutfile, + flowOutfile, + }: { + inputFiles: string[]; + stepsOutfile: string; + flowOutfile: string; + }) { + return this.createCombinedBundle({ + inputFiles, + stepsOutfile, + flowOutfile, + bundleFinalOutput: false, + externalizeNonSteps: true, + sourceStepRegistrationImports: true, + }); + } +} + +function createBuilder(workingDir: string): E2EBuilder { + const config: StandaloneConfig = { + buildTarget: 'standalone', + workingDir, + dirs: ['.'], + stepsBundlePath: join(workingDir, '.workflow', '__step_registrations.js'), + workflowsBundlePath: join(workingDir, '.workflow', 'flow.js'), + webhookBundlePath: join(workingDir, '.workflow', 'webhook.js'), + }; + return new E2EBuilder(config); +} + +async function writeFileWithParents(filePath: string, contents: string) { + await mkdir(dirname(filePath), { recursive: true }); + await writeFile(filePath, contents); +} + +function findManifestEntry( + entries: WorkflowManifest['workflows'] | WorkflowManifest['steps'], + fnName: string +): { workflowId?: string; stepId?: string } { + for (const functions of Object.values(entries ?? {})) { + const entry = functions[fnName]; + if (entry) return entry; + } + throw new Error(`Could not find ${fnName} in generated manifest`); +} + +async function installWorkflowRuntimePackage(testRoot: string) { + const packageRoot = join(testRoot, 'node_modules', 'workflow'); + await writeFileWithParents( + join(packageRoot, 'package.json'), + JSON.stringify({ + name: 'workflow', + type: 'module', + exports: { + './runtime': './runtime.js', + './internal/builtins': './internal/builtins.js', + }, + }) + ); + await writeFileWithParents( + join(packageRoot, 'runtime.js'), + `const runtime = globalThis.__workflowModuleLoadFailureE2ERuntime; +if (!runtime) { + throw new Error('Missing workflow module load failure e2e runtime'); +} +export const registerStepFunctionLoader = runtime.registerStepFunctionLoader; +export const workflowEntrypoint = runtime.workflowEntrypoint; +` + ); + await writeFileWithParents( + join(packageRoot, 'internal', 'builtins.js'), + `export const __workflow_e2e_builtins = true; +` + ); +} + +async function setupGeneratedRoute({ + files, + inputFileNames, +}: { + files: Record; + inputFileNames: string[]; +}): Promise<{ + testRoot: string; + world: LocalWorld; + manifest: WorkflowManifest; +}> { + const testRoot = await mkdtemp(join(tmpdir(), 'workflow-module-load-e2e-')); + await installWorkflowRuntimePackage(testRoot); + + for (const [fileName, contents] of Object.entries(files)) { + await writeFileWithParents(join(testRoot, fileName), contents); + } + + const builder = createBuilder(testRoot); + const routeDir = join(testRoot, '.workflow'); + const stepsOutfile = join(routeDir, '__step_registrations.js'); + const flowOutfile = join(routeDir, 'flow.js'); + await mkdir(routeDir, { recursive: true }); + const { manifest } = await builder.createCombinedRoute({ + inputFiles: inputFileNames.map((fileName) => join(testRoot, fileName)), + stepsOutfile, + flowOutfile, + }); + + ( + globalThis as typeof globalThis & { + __workflowModuleLoadFailureE2ERuntime?: { + registerStepFunctionLoader: typeof registerStepFunctionLoader; + workflowEntrypoint: typeof workflowEntrypoint; + }; + } + ).__workflowModuleLoadFailureE2ERuntime = { + registerStepFunctionLoader, + workflowEntrypoint, + }; + const routeModule = (await import( + `${pathToFileURL(flowOutfile).href}?t=${Date.now()}` + )) as { POST: (request: Request) => Promise }; + + const world = createLocalWorld({ + dataDir: join(testRoot, '.workflow-data'), + recoverActiveRuns: false, + }); + await world.start(); + world.registerHandler(getQueueTopicPrefix('workflow'), routeModule.POST); + setWorld(world); + + return { testRoot, world, manifest }; +} + +describe('module load failures e2e', () => { + const cleanup: Array<{ testRoot: string; world: LocalWorld }> = []; + + afterEach(async () => { + setWorld(undefined); + delete ( + globalThis as typeof globalThis & { + __workflowModuleLoadFailureE2ERuntime?: unknown; + } + ).__workflowModuleLoadFailureE2ERuntime; + await Promise.all( + cleanup.splice(0).map(async ({ testRoot, world }) => { + await world.close(); + await rm(testRoot, { recursive: true, force: true }); + }) + ); + }); + + test('step registration module load failure is recorded as step_failed', async () => { + const stepId = 'step//./workflows/step-load-failure//brokenStep'; + const fixture = await setupGeneratedRoute({ + files: { + 'workflows/step-load-failure.js': `import './sharp-load-failure.js'; + +export async function brokenStep() { + 'use step'; + return 'unreachable'; +} +`, + 'workflows/sharp-load-failure.js': `throw new Error(${JSON.stringify( + sharpLoadError + )}); +`, + 'workflows/step-load-failure-workflow.js': `export async function stepModuleLoadFailureWorkflow() { + 'use workflow'; + const brokenStep = globalThis[Symbol.for('WORKFLOW_USE_STEP')](${JSON.stringify( + stepId + )}); + try { + await brokenStep(); + return { caught: false }; + } catch (error) { + return { + caught: true, + message: error?.message, + name: error?.name, + causeMessage: error?.cause?.message, + }; + } +} +`, + }, + inputFileNames: [ + 'workflows/step-load-failure.js', + 'workflows/step-load-failure-workflow.js', + ], + }); + cleanup.push(fixture); + + expect(findManifestEntry(fixture.manifest.steps, 'brokenStep').stepId).toBe( + stepId + ); + const workflowId = findManifestEntry( + fixture.manifest.workflows, + 'stepModuleLoadFailureWorkflow' + ).workflowId; + assert(workflowId); + + const run = await start< + [], + { + caught: boolean; + message?: string; + name?: string; + causeMessage?: string; + } + >({ workflowId }, [], { world: fixture.world }); + const result = await run.returnValue; + + expect(result.caught).toBe(true); + expect(result.name).toBe('FatalError'); + expect(result.message).toContain(sharpLoadError); + expect(result.causeMessage).toContain(sharpLoadError); + + const events = await fixture.world.events.list({ + runId: run.runId, + resolveData: 'none', + pagination: { limit: 100, sortOrder: 'asc' }, + }); + expect(events.data.map((event) => event.eventType)).toEqual( + expect.arrayContaining(['step_started', 'step_failed', 'run_completed']) + ); + const stepStartedIndex = events.data.findIndex( + (event) => event.eventType === 'step_started' + ); + const stepFailedIndex = events.data.findIndex( + (event) => event.eventType === 'step_failed' + ); + expect(stepStartedIndex).toBeGreaterThanOrEqual(0); + expect(stepFailedIndex).toBeGreaterThan(stepStartedIndex); + }); + + test('workflow bundle evaluation failure is recorded as run_failed', async () => { + const fixture = await setupGeneratedRoute({ + files: { + 'workflows/workflow-load-failure.js': `throw new Error(${JSON.stringify( + sharpLoadError + )}); + +export async function workflowModuleLoadFailureWorkflow() { + 'use workflow'; + return 'unreachable'; +} +`, + }, + inputFileNames: ['workflows/workflow-load-failure.js'], + }); + cleanup.push(fixture); + + const workflowId = findManifestEntry( + fixture.manifest.workflows, + 'workflowModuleLoadFailureWorkflow' + ).workflowId; + assert(workflowId); + + const run = await start({ workflowId }, [], { world: fixture.world }); + const error = await run.returnValue.catch((err: unknown) => err); + + expect(WorkflowRunFailedError.is(error)).toBe(true); + assert(WorkflowRunFailedError.is(error)); + expect(error.errorCode).toBe('USER_ERROR'); + assert(error.cause instanceof Error); + expect(error.cause.message).toContain(sharpLoadError); + + const events = await fixture.world.events.list({ + runId: run.runId, + resolveData: 'none', + pagination: { limit: 100, sortOrder: 'asc' }, + }); + expect(events.data.map((event) => event.eventType)).toEqual( + expect.arrayContaining(['run_started', 'run_failed']) + ); + }); +}); diff --git a/packages/core/src/private.test.ts b/packages/core/src/private.test.ts new file mode 100644 index 0000000000..bb16fa8794 --- /dev/null +++ b/packages/core/src/private.test.ts @@ -0,0 +1,43 @@ +import { randomUUID } from 'node:crypto'; +import { describe, expect, it } from 'vitest'; +import { + loadStepFunction, + registerStepFunction, + registerStepFunctionLoader, +} from './private.js'; + +describe('lazy step function loaders', () => { + it('loads and returns a step function registered by a loader', async () => { + const stepId = `step//./workflows/lazy-${randomUUID()}//resize`; + const stepFn = async () => 'loaded'; + let loadCount = 0; + + registerStepFunctionLoader(stepId, () => { + loadCount++; + registerStepFunction(stepId, stepFn); + }); + + await expect(loadStepFunction(stepId)).resolves.toBe(stepFn); + await expect(loadStepFunction(stepId)).resolves.toBe(stepFn); + expect(loadCount).toBe(2); + }); + + it('lets a loader refresh an already registered step function', async () => { + const stepId = `step//./workflows/lazy-${randomUUID()}//resize`; + const firstStepFn = async () => 'first'; + const secondStepFn = async () => 'second'; + let loadCount = 0; + + registerStepFunctionLoader(stepId, () => { + loadCount++; + registerStepFunction( + stepId, + loadCount === 1 ? firstStepFn : secondStepFn + ); + }); + + await expect(loadStepFunction(stepId)).resolves.toBe(firstStepFn); + await expect(loadStepFunction(stepId)).resolves.toBe(secondStepFn); + expect(loadCount).toBe(2); + }); +}); diff --git a/packages/core/src/private.ts b/packages/core/src/private.ts index 2379016d33..8499517cf8 100644 --- a/packages/core/src/private.ts +++ b/packages/core/src/private.ts @@ -17,10 +17,16 @@ export type StepFunction< stepId?: string; }; +export type StepFunctionLoader = () => Promise | unknown; + const RegisteredStepsKey = Symbol.for('@workflow/core//registeredSteps'); +const RegisteredStepLoadersKey = Symbol.for( + '@workflow/core//registeredStepLoaders' +); const globalSymbols: typeof globalThis & { [RegisteredStepsKey]?: Map; + [RegisteredStepLoadersKey]?: Map; } = globalThis; // biome-ignore lint/suspicious/noAssignInExpressions: / @@ -28,6 +34,9 @@ const registeredSteps = (globalSymbols[RegisteredStepsKey] ??= new Map< string, StepFunction >()); +// biome-ignore lint/suspicious/noAssignInExpressions: / +const registeredStepLoaders = (globalSymbols[RegisteredStepLoadersKey] ??= + new Map()); const BUILTIN_RESPONSE_STEP_NAMES = new Set([ '__builtin_response_array_buffer', @@ -85,6 +94,23 @@ function getBuiltinResponseStepAlias(stepId: string): StepFunction | undefined { return undefined; } +function getStepIdMatch(registry: Map, stepId: string) { + const directMatch = registry.get(stepId); + if (directMatch) { + return directMatch; + } + + // Support equivalent workflow path aliases in mixed symlink environments. + for (const aliasStepId of getStepIdAliasCandidates(stepId)) { + const aliasMatch = registry.get(aliasStepId); + if (aliasMatch) { + return aliasMatch; + } + } + + return undefined; +} + /** * Register a step function to be served in the server bundle. * Also sets the stepId property on the function for serialization support. @@ -99,22 +125,27 @@ export function registerStepFunction(stepId: string, stepFn: StepFunction) { stepFn.stepId = stepId; } +/** + * Register a lazy module loader for a step function. + * + * Framework integrations can use this to keep step modules out of route + * initialization. If importing the module fails, the runtime can record that + * failure against the specific step instead of failing the whole route before + * the queue message has step context. + */ +export function registerStepFunctionLoader( + stepId: string, + loader: StepFunctionLoader +) { + registeredStepLoaders.set(stepId, loader); +} + /** * Find a registered step function by name */ export function getStepFunction(stepId: string): StepFunction | undefined { - const directMatch = registeredSteps.get(stepId); - if (directMatch) { - return directMatch; - } - - // Support equivalent workflow path aliases in mixed symlink environments. - for (const aliasStepId of getStepIdAliasCandidates(stepId)) { - const aliasMatch = registeredSteps.get(aliasStepId); - if (aliasMatch) { - return aliasMatch; - } - } + const registeredMatch = getStepIdMatch(registeredSteps, stepId); + if (registeredMatch) return registeredMatch; const builtinAliasMatch = getBuiltinResponseStepAlias(stepId); if (builtinAliasMatch) { @@ -124,6 +155,21 @@ export function getStepFunction(stepId: string): StepFunction | undefined { return undefined; } +/** + * Load the module that registers a step function, then return the registered + * function. If no loader is known, this behaves like getStepFunction(). + */ +export async function loadStepFunction( + stepId: string +): Promise { + const loader = getStepIdMatch(registeredStepLoaders, stepId); + if (loader) { + await loader(); + } + + return getStepFunction(stepId); +} + // Note: __private_getClosureVars is no longer re-exported here. // The SWC compiler plugin now inlines closure variable access as a // self-contained IIFE that reads directly from the global AsyncLocalStorage diff --git a/packages/core/src/runtime.test.ts b/packages/core/src/runtime.test.ts index 1cf2e3d0a0..9d8aea44e9 100644 --- a/packages/core/src/runtime.test.ts +++ b/packages/core/src/runtime.test.ts @@ -135,6 +135,44 @@ describe('workflowEntrypoint replay guards', () => { `;globalThis.__private_workflows = new Map(); globalThis.__private_workflows.set(${JSON.stringify(workflowName)}, ${workflowName});`; + it('records run_failed when workflow bundle evaluation throws', async () => { + const sharpLoadError = + 'Could not load the "sharp" module using the linux-x64 runtime'; + const workflowRun: WorkflowRun = { + runId: 'wrun_workflow_bundle_load_error', + workflowName: 'workflow', + status: 'running', + input: await dehydrateWorkflowArguments( + [], + 'wrun_workflow_bundle_load_error', + undefined, + [] + ), + createdAt: new Date('2024-01-01T00:00:00.000Z'), + updatedAt: new Date('2024-01-01T00:00:00.000Z'), + startedAt: new Date('2024-01-01T00:00:00.000Z'), + deploymentId: 'test-deployment', + }; + + const createdEvents = await runWorkflowHandlerWithEvents( + `throw new Error(${JSON.stringify(sharpLoadError)});`, + workflowRun, + [] + ); + + expect(createdEvents).toContainEqual( + expect.objectContaining({ eventType: 'run_started' }) + ); + expect(createdEvents).toContainEqual( + expect.objectContaining({ + eventType: 'run_failed', + eventData: expect.objectContaining({ + errorCode: RUN_ERROR_CODES.USER_ERROR, + }), + }) + ); + }); + it('records run_failed when run_started response schema validation fails', async () => { const createdEvents: unknown[] = []; const schemaError = new WorkflowWorldError( diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 7c533c242c..cb9b6c43de 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -81,6 +81,11 @@ import { runWorkflow } from './workflow.js'; export type { Event, WorkflowRun }; export { WorkflowSuspension } from './global.js'; +// V2: stepEntrypoint is no longer re-exported — the combined handler +// (workflowEntrypoint) executes steps inline. Removing the re-export +// prevents Turbopack from tracing step-handler.js → get-port.js +// filesystem operations into the flow route bundle. +export { registerStepFunctionLoader } from './private.js'; export { type HealthCheckEndpoint, type HealthCheckOptions, @@ -117,10 +122,6 @@ export { type StartOptionsWithoutDeploymentId, start, } from './runtime/start.js'; -// V2: stepEntrypoint is no longer re-exported — the combined handler -// (workflowEntrypoint) executes steps inline. Removing the re-export -// prevents Turbopack from tracing step-handler.js → get-port.js -// filesystem operations into the flow route bundle. export { createWorld, getWorld, diff --git a/packages/core/src/runtime/step-executor.test.ts b/packages/core/src/runtime/step-executor.test.ts new file mode 100644 index 0000000000..9057dd3f06 --- /dev/null +++ b/packages/core/src/runtime/step-executor.test.ts @@ -0,0 +1,118 @@ +import type { World } from '@workflow/world'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const { + mockDehydrateStepError, + mockEventsCreate, + mockLoadStepFunction, + mockRuntimeLogger, + mockStepLogger, +} = vi.hoisted(() => ({ + mockDehydrateStepError: vi.fn().mockResolvedValue(new Uint8Array([4, 5, 6])), + mockEventsCreate: vi.fn(), + mockLoadStepFunction: vi.fn(), + mockRuntimeLogger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, + mockStepLogger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); + +vi.mock('@vercel/functions', () => ({ + waitUntil: vi.fn(), +})); + +vi.mock('../private.js', () => ({ + loadStepFunction: mockLoadStepFunction, +})); + +vi.mock('../logger.js', () => ({ + runtimeLogger: mockRuntimeLogger, + stepLogger: mockStepLogger, +})); + +vi.mock('../serialization.js', async () => { + const actual = await vi.importActual( + '../serialization.js' + ); + return { + ...actual, + dehydrateStepError: (...args: unknown[]) => mockDehydrateStepError(...args), + }; +}); + +vi.mock('../telemetry.js', () => ({ + trace: vi.fn((_name: string, _opts: unknown, fn?: unknown) => { + const callback = typeof _opts === 'function' ? _opts : fn; + return (callback as (span?: undefined) => unknown)(undefined); + }), +})); + +import { executeStep } from './step-executor.js'; + +describe('executeStep', () => { + beforeEach(() => { + vi.clearAllMocks(); + mockDehydrateStepError.mockResolvedValue(new Uint8Array([4, 5, 6])); + mockEventsCreate.mockImplementation( + (_runId: string, event: { eventType: string }) => { + if (event.eventType === 'step_started') { + return Promise.resolve({ + step: { + stepId: 'step_abc', + status: 'running', + attempt: 1, + startedAt: new Date(), + input: [], + }, + event: {}, + }); + } + return Promise.resolve({ event: {} }); + } + ); + }); + + it('records step_failed when a lazy step module loader throws', async () => { + mockLoadStepFunction.mockRejectedValue( + new Error('Could not load the "sharp" module using the linux-x64 runtime') + ); + + const world = { + events: { create: mockEventsCreate }, + getEncryptionKeyForRun: vi.fn().mockResolvedValue(undefined), + } as unknown as World; + + const result = await executeStep({ + world, + workflowRunId: 'wrun_test123', + workflowName: 'test-workflow', + workflowStartedAt: Date.now(), + stepId: 'step_abc', + stepName: 'step//./workflows/image//resize', + }); + + expect(result).toEqual({ type: 'failed' }); + expect( + mockEventsCreate.mock.calls.map(([, event]) => event.eventType) + ).toEqual(['step_started', 'step_failed']); + expect(mockEventsCreate).toHaveBeenLastCalledWith( + 'wrun_test123', + expect.objectContaining({ + eventType: 'step_failed', + correlationId: 'step_abc', + eventData: expect.objectContaining({ + stepName: 'step//./workflows/image//resize', + error: expect.any(Uint8Array), + }), + }) + ); + }); +}); diff --git a/packages/core/src/runtime/step-executor.ts b/packages/core/src/runtime/step-executor.ts index 7f21bba0fc..3dd3bb4a3e 100644 --- a/packages/core/src/runtime/step-executor.ts +++ b/packages/core/src/runtime/step-executor.ts @@ -16,7 +16,7 @@ import { } from '@workflow/world'; import type { CryptoKey } from '../encryption.js'; import { runtimeLogger, stepLogger } from '../logger.js'; -import { getStepFunction } from '../private.js'; +import { loadStepFunction } from '../private.js'; import { dehydrateStepError, dehydrateStepReturnValue, @@ -197,89 +197,6 @@ export async function executeStep( // fetch / HKDF derivation; subsequent callers await the cached promise. const getEncryptionKey = memoizeEncryptionKey(world, workflowRunId); - const stepFn = getStepFunction(stepName); - if (!stepFn || typeof stepFn !== 'function') { - // Step function not registered — fail the step immediately (not the run). - // This matches the V1 step handler pattern: create step_failed event so - // the workflow can handle it gracefully via try/catch in user code. - const errorMessage = `Step "${stepName}" is not registered in the current deployment. This usually indicates a build or bundling issue that caused the step to not be included in the deployment.`; - runtimeLogger.error('Step function not registered, failing step', { - workflowRunId, - stepName, - stepId, - }); - // On the lazy inline path the suspension handler deferred this step's - // `step_created`, expecting executeStep to materialize the step via a - // lazy `step_started` carrying its input. We never get that far for an - // unregistered step, so the step entity does not exist yet — writing - // `step_failed` straight away would hit the world's "step must exist" - // ordering guard and wedge the run. Send the lazy `step_started` first - // (it creates the step + synthetic `step_created` atomically and keeps - // replay correct), then fail it below. This also preserves the - // exactly-one-owner guarantee: a concurrent handler that won the create - // makes our lazy `step_started` reject with EntityConflictError → we - // return `skipped` and never write the failure twice. - if (params.lazyStepInput !== undefined) { - try { - // Turbo: this lazy `step_started` must not precede the backgrounded - // `run_started`. Order it after the run-ready barrier (best-effort — - // a barrier rejection means the run doesn't exist, and the create - // below surfaces the real error). No-op outside turbo. - if (params.runReadyBarrier) { - await params.runReadyBarrier.catch(() => {}); - } - await world.events.create(workflowRunId, { - eventType: 'step_started', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { stepName, workflowName, input: params.lazyStepInput }, - }); - } catch (startErr) { - if (EntityConflictError.is(startErr)) { - return { type: 'skipped' }; - } - if (RunExpiredError.is(startErr)) { - return { type: 'gone' }; - } - throw startErr; - } - } - try { - await world.events.create(workflowRunId, { - eventType: 'step_failed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - stepName, - error: await dehydrateStepError( - new FatalError(errorMessage), - workflowRunId, - await getEncryptionKey(), - [], - globalThis, - compression - ), - }, - }); - } catch (stepFailErr) { - if (EntityConflictError.is(stepFailErr)) { - return { type: 'skipped' }; - } - throw stepFailErr; - } - span?.setAttributes({ - ...Attribute.StepStatus('failed'), - ...Attribute.StepFatalError(true), - }); - return { type: 'failed' }; - } - - const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - - span?.setAttributes({ - ...Attribute.StepMaxRetries(maxRetries), - }); - // Maps a `step_started` rejection to a terminal StepExecutionResult, // shared by the await path (below) and the optimistic-start reconciliation. // Returns undefined when the error is not one we translate (caller rethrows). @@ -448,6 +365,110 @@ export async function executeStep( ...Attribute.StepStatus(step.status), }); + let stepFn: Awaited>; + try { + stepFn = await loadStepFunction(stepName); + } catch (err) { + if (optimisticStart) { + const reconcile = await reconcileOptimisticStart(); + if (reconcile) return reconcile; + } + + const normalizedError = await normalizeUnknownError(err); + const normalizedStack = normalizedError.stack || getErrorStack(err) || ''; + const wrappedError = new FatalError( + `Failed to load step "${stepName}": ${normalizedError.message}` + ); + (wrappedError as Error).cause = err; + if (normalizedStack) wrappedError.stack = normalizedStack; + + stepLogger.error('Failed to load step function, failing step', { + workflowRunId, + stepName, + errorStack: normalizedStack, + }); + + try { + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + stepName, + error: await dehydrateStepError( + wrappedError, + workflowRunId, + await getEncryptionKey(), + [], + globalThis, + compression + ), + }, + }); + } catch (stepFailErr) { + if (EntityConflictError.is(stepFailErr)) { + return { type: 'skipped' }; + } + throw stepFailErr; + } + + span?.setAttributes({ + ...Attribute.StepStatus('failed'), + ...Attribute.StepFatalError(true), + }); + return { type: 'failed' }; + } + + if (!stepFn || typeof stepFn !== 'function') { + // Step function not registered — fail the step (not the run). + const errorMessage = `Step "${stepName}" is not registered in the current deployment. This usually indicates a build or bundling issue that caused the step to not be included in the deployment.`; + runtimeLogger.error('Step function not registered, failing step', { + workflowRunId, + stepName, + stepId, + }); + + if (optimisticStart) { + const reconcile = await reconcileOptimisticStart(); + if (reconcile) return reconcile; + } + + try { + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + stepName, + error: await dehydrateStepError( + new FatalError(errorMessage), + workflowRunId, + await getEncryptionKey(), + [], + globalThis, + compression + ), + }, + }); + } catch (stepFailErr) { + if (EntityConflictError.is(stepFailErr)) { + return { type: 'skipped' }; + } + throw stepFailErr; + } + span?.setAttributes({ + ...Attribute.StepStatus('failed'), + ...Attribute.StepFatalError(true), + }); + return { type: 'failed' }; + } + + const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; + + span?.setAttributes({ + ...Attribute.StepMaxRetries(maxRetries), + }); + let result: unknown; // Check max retries AFTER step_started (attempt was just incremented). diff --git a/packages/core/src/runtime/step-handler.test.ts b/packages/core/src/runtime/step-handler.test.ts index 5f126f0280..581d0b998b 100644 --- a/packages/core/src/runtime/step-handler.test.ts +++ b/packages/core/src/runtime/step-handler.test.ts @@ -175,7 +175,7 @@ vi.mock('../types.js', () => ({ // Mock private module vi.mock('../private.js', () => ({ - getStepFunction: vi.fn().mockReturnValue(mockStepFn), + loadStepFunction: vi.fn().mockResolvedValue(mockStepFn), })); // Mock get-port @@ -183,7 +183,7 @@ vi.mock('@workflow/utils/get-port', () => ({ getPort: vi.fn().mockResolvedValue(3000), })); -import { getStepFunction } from '../private.js'; +import { loadStepFunction } from '../private.js'; import { dehydrateStepError } from '../serialization.js'; import { getErrorName, @@ -244,7 +244,7 @@ describe('step-handler 409 handling', () => { beforeEach(() => { vi.clearAllMocks(); // Re-set mocks after clearAllMocks - vi.mocked(getStepFunction).mockReturnValue(mockStepFn); + vi.mocked(loadStepFunction).mockResolvedValue(mockStepFn); vi.mocked(normalizeUnknownError).mockImplementation( async (err: unknown) => ({ message: err instanceof Error ? err.message : String(err), @@ -575,7 +575,7 @@ describe('step-handler 409 handling', () => { describe('step-handler max deliveries', () => { beforeEach(() => { vi.clearAllMocks(); - vi.mocked(getStepFunction).mockReturnValue(mockStepFn); + vi.mocked(loadStepFunction).mockResolvedValue(mockStepFn); mockStepFn.mockReset().mockResolvedValue('step-result'); mockStepFn.maxRetries = 3; mockQueueMessage.mockResolvedValue(undefined); @@ -684,7 +684,7 @@ describe('step-handler step not found', () => { }); it('should fail the step (not the run) when step function is not found', async () => { - vi.mocked(getStepFunction).mockReturnValue(undefined); + vi.mocked(loadStepFunction).mockResolvedValue(undefined); const result = await capturedHandler( createMessage(), @@ -729,8 +729,10 @@ describe('step-handler step not found', () => { }); it('should fail the step when step function is not a function', async () => { - vi.mocked(getStepFunction).mockReturnValue( - 'not-a-function' as unknown as ReturnType + vi.mocked(loadStepFunction).mockResolvedValue( + 'not-a-function' as unknown as Awaited< + ReturnType + > ); const result = await capturedHandler( @@ -756,8 +758,46 @@ describe('step-handler step not found', () => { expect(mockQueueMessage).toHaveBeenCalled(); }); + it('should fail the step when loading the step module throws', async () => { + vi.mocked(loadStepFunction).mockRejectedValue( + new Error('Could not load the "sharp" module using the linux-x64 runtime') + ); + + const result = await capturedHandler( + createMessage(), + createMetadata('imageStep') + ); + + expect(result).toBeUndefined(); + expect(mockEventsCreate).toHaveBeenCalledWith( + 'wrun_test123', + expect.objectContaining({ + eventType: 'step_started', + correlationId: 'step_abc', + eventData: expect.objectContaining({ + stepName: 'imageStep', + }), + }), + expect.anything() + ); + expect(mockEventsCreate).toHaveBeenCalledWith( + 'wrun_test123', + expect.objectContaining({ + eventType: 'step_failed', + correlationId: 'step_abc', + eventData: expect.objectContaining({ + stepName: 'imageStep', + error: expect.any(Uint8Array), + }), + }), + expect.anything() + ); + expect(mockQueueMessage).toHaveBeenCalled(); + expect(mockStepFn).not.toHaveBeenCalled(); + }); + it('should handle EntityConflictError when failing step for missing function', async () => { - vi.mocked(getStepFunction).mockReturnValue(undefined); + vi.mocked(loadStepFunction).mockResolvedValue(undefined); let callCount = 0; mockEventsCreate.mockImplementation( (_runId: string, event: { eventType: string }) => { @@ -820,7 +860,7 @@ describe('step-handler step not found', () => { describe('step-handler fatal vs retryable behavior', () => { beforeEach(() => { vi.clearAllMocks(); - vi.mocked(getStepFunction).mockReturnValue(mockStepFn); + vi.mocked(loadStepFunction).mockResolvedValue(mockStepFn); vi.mocked(normalizeUnknownError).mockImplementation( async (err: unknown) => ({ message: err instanceof Error ? err.message : String(err), @@ -957,7 +997,7 @@ describe('step-handler fatal vs retryable behavior', () => { describe('executeStep inline-delta threading', () => { beforeEach(() => { vi.clearAllMocks(); - vi.mocked(getStepFunction).mockReturnValue(mockStepFn); + vi.mocked(loadStepFunction).mockResolvedValue(mockStepFn); vi.mocked(normalizeUnknownError).mockImplementation( async (err: unknown) => ({ message: err instanceof Error ? err.message : String(err), @@ -1208,7 +1248,7 @@ describe('executeStep optimistic inline start', () => { // Optimistic start is OFF by default — explicitly enable it so these tests // exercise the optimistic path. (The disabled-path test overrides to '0'.) process.env.WORKFLOW_OPTIMISTIC_INLINE_START = '1'; - vi.mocked(getStepFunction).mockReturnValue(mockStepFn); + vi.mocked(loadStepFunction).mockResolvedValue(mockStepFn); vi.mocked(normalizeUnknownError).mockImplementation( async (err: unknown) => ({ message: err instanceof Error ? err.message : String(err), diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 2a8ba2631f..2ea344ee90 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -21,7 +21,7 @@ import { } from '@workflow/world'; import { describeError } from '../describe-error.js'; import { runtimeLogger, stepLogger } from '../logger.js'; -import { getStepFunction } from '../private.js'; +import { loadStepFunction } from '../private.js'; import { cancelAbortReaders, dehydrateStepError, @@ -237,11 +237,6 @@ function createStepHandler(namespace?: string) { ...getQueueOverhead({ requestedAt }), }); - // Note: Step function validation happens after step_started so we can - // properly fail the step (not the run) if the function is not registered. - // This allows the workflow to handle the step failure gracefully. - const stepFn = getStepFunction(stepName); - span?.setAttributes({ ...Attribute.WorkflowName(workflowName), ...Attribute.WorkflowRunId(workflowRunId), @@ -370,6 +365,80 @@ function createStepHandler(namespace?: string) { ...Attribute.StepStatus(step.status), }); + let stepFn: Awaited>; + try { + stepFn = await loadStepFunction(stepName); + } catch (err) { + const normalizedError = await normalizeUnknownError(err); + const normalizedStack = + normalizedError.stack || getErrorStack(err) || ''; + const wrappedError = new FatalError( + `Failed to load step "${stepName}": ${normalizedError.message}` + ); + (wrappedError as Error).cause = err; + if (normalizedStack) wrappedError.stack = normalizedStack; + + stepRuntimeLogger.error( + 'Failed to load step function, failing step (not run)', + { + errorName: wrappedError.name, + errorMessage: wrappedError.message, + errorStack: normalizedStack, + } + ); + + try { + await world.events.create( + workflowRunId, + { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + stepName, + error: await dehydrateStepError( + wrappedError, + workflowRunId, + await getEncryptionKey(), + [], + globalThis, + compressionForStep() + ), + }, + }, + { requestId } + ); + } catch (stepFailErr) { + if (EntityConflictError.is(stepFailErr)) { + stepRuntimeLogger.info( + 'Tried failing step for module load failure, but step has already finished.', + { + errorName: stepFailErr.name, + errorMessage: stepFailErr.message, + } + ); + return; + } + throw stepFailErr; + } + + span?.setAttributes({ + ...Attribute.StepStatus('failed'), + ...Attribute.StepFatalError(true), + }); + + await queueMessage( + world, + getWorkflowQueueName(workflowName, resolvedNamespace), + { + runId: workflowRunId, + traceCarrier: await nextTraceCarrier(), + requestedAt: new Date(), + } + ); + return; + } + // Validate step function exists AFTER step_started so we can // properly fail the step (not the run) if the function is missing. // This allows the workflow to handle the step failure gracefully, diff --git a/packages/workflow/src/runtime.test.ts b/packages/workflow/src/runtime.test.ts new file mode 100644 index 0000000000..97ca08ca1e --- /dev/null +++ b/packages/workflow/src/runtime.test.ts @@ -0,0 +1,15 @@ +import { describe, expect, it, vi } from 'vitest'; + +vi.mock('@workflow/core/runtime', () => ({ + registerStepFunctionLoader: vi.fn(), + workflowEntrypoint: vi.fn(), +})); + +describe('workflow/runtime re-exports', () => { + it('exports the step loader registration API used by generated routes', async () => { + const runtime = await import('./runtime'); + + expect(typeof runtime.registerStepFunctionLoader).toBe('function'); + expect(typeof runtime.workflowEntrypoint).toBe('function'); + }); +}); diff --git a/packages/workflow/src/runtime.ts b/packages/workflow/src/runtime.ts index 9d8fcb20fa..fd555823bf 100644 --- a/packages/workflow/src/runtime.ts +++ b/packages/workflow/src/runtime.ts @@ -2,10 +2,11 @@ export { createWorld, getWorld, getWorldHandlers, - healthCheck, type HealthCheckEndpoint, type HealthCheckOptions, type HealthCheckResult, + healthCheck, + registerStepFunctionLoader, setWorld, workflowEntrypoint, } from '@workflow/core/runtime';