diff --git a/.changeset/next-base-path.md b/.changeset/next-base-path.md new file mode 100644 index 0000000000..a3db430b64 --- /dev/null +++ b/.changeset/next-base-path.md @@ -0,0 +1,10 @@ +--- +"@workflow/core": patch +"@workflow/builders": patch +"@workflow/next": patch +"@workflow/utils": patch +"@workflow/world-local": patch +"@workflow/world-postgres": patch +--- + +Respect Next.js basePath when constructing workflow runtime URLs. diff --git a/packages/builders/src/base-builder.ts b/packages/builders/src/base-builder.ts index c0e1278dab..066914f760 100644 --- a/packages/builders/src/base-builder.ts +++ b/packages/builders/src/base-builder.ts @@ -1484,6 +1484,7 @@ export const __steps_registered = true; const workflowEntrypointOptionsCode = createWorkflowEntrypointOptionsCode( { + basePath: this.config.basePath, routeModuleBodyStartedAt: 'workflowRouteModuleBodyStartedAt', } ); @@ -1689,6 +1690,7 @@ export const POST = workflowEntrypoint(workflowCode${workflowEntrypointOptionsCo const stepsRelativePath = `./${basename(stepsOutfile).replace(/\\/g, '/')}`; const escapedVMCode = workflowVMCode.replace(/[\\`$]/g, '\\$&'); const workflowEntrypointOptionsCode = createWorkflowEntrypointOptionsCode({ + basePath: this.config.basePath, routeModuleBodyStartedAt: 'workflowRouteModuleBodyStartedAt', }); @@ -1764,6 +1766,7 @@ export const POST = workflowEntrypoint(workflowCode${workflowEntrypointOptionsCo const escaped = interimBundleText.replace(/[\\`$]/g, '\\$&'); const workflowEntrypointOptionsCode = createWorkflowEntrypointOptionsCode( { + basePath: this.config.basePath, routeModuleBodyStartedAt: 'workflowRouteModuleBodyStartedAt', } ); diff --git a/packages/builders/src/constants.test.ts b/packages/builders/src/constants.test.ts index 3e086100f9..d257434428 100644 --- a/packages/builders/src/constants.test.ts +++ b/packages/builders/src/constants.test.ts @@ -53,10 +53,11 @@ describe('createWorkflowEntrypointOptionsCode', () => { expect( createWorkflowEntrypointOptionsCode({ namespace: 'custom', + basePath: '/v2', routeModuleBodyStartedAt: 'workflowRouteModuleBodyStartedAt', }) ).toBe( - ', { namespace: "custom", routeModuleBodyStartedAt: workflowRouteModuleBodyStartedAt }' + ', { namespace: "custom", basePath: "/v2", routeModuleBodyStartedAt: workflowRouteModuleBodyStartedAt }' ); }); }); diff --git a/packages/builders/src/constants.ts b/packages/builders/src/constants.ts index 7d063890e8..24e52a1d55 100644 --- a/packages/builders/src/constants.ts +++ b/packages/builders/src/constants.ts @@ -54,6 +54,7 @@ export function createWorkflowQueueTrigger(options?: { namespace?: string }) { */ export function createWorkflowEntrypointOptionsCode(options?: { namespace?: string; + basePath?: string; /** Raw code identifier/expression emitted into generated route files, not data. */ routeModuleBodyStartedAt?: string; }) { @@ -66,6 +67,10 @@ export function createWorkflowEntrypointOptionsCode(options?: { fields.push(`namespace: ${JSON.stringify(namespace)}`); } + if (options?.basePath !== undefined) { + fields.push(`basePath: ${JSON.stringify(options.basePath)}`); + } + if (options?.routeModuleBodyStartedAt) { fields.push( `routeModuleBodyStartedAt: ${options.routeModuleBodyStartedAt}` diff --git a/packages/builders/src/types.ts b/packages/builders/src/types.ts index 74fcd3b885..f7ddc05d9e 100644 --- a/packages/builders/src/types.ts +++ b/packages/builders/src/types.ts @@ -60,6 +60,8 @@ interface BaseWorkflowConfig { // artifact locations. distDir?: string; + basePath?: string; + // Suppress informational logs emitted by createWorkflowsBundle() // (e.g. intermediate/final workflow bundle timing logs). suppressCreateWorkflowsBundleLogs?: boolean; diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 7c533c242c..27a08efdcc 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -9,6 +9,7 @@ import { RunExpiredError, WorkflowRuntimeError, } from '@workflow/errors'; +import { setWorkflowBasePath } from '@workflow/utils'; import { parseWorkflowName, workflowDisplayName, @@ -291,8 +292,16 @@ function hasOpenHookOrWait(events: Event[]): boolean { */ export function workflowEntrypoint( workflowCode: string, - options?: { namespace?: string; routeModuleBodyStartedAt?: number } + options?: { + namespace?: string; + routeModuleBodyStartedAt?: number; + basePath?: string; + } ): (req: Request) => Promise { + if (options?.basePath !== undefined) { + setWorkflowBasePath(options.basePath); + } + const NO_INLINE_REPLAY_AFTER_MS = Number(process.env.WORKFLOW_V2_TIMEOUT_MS) || 120_000; diff --git a/packages/core/src/runtime/step-executor.ts b/packages/core/src/runtime/step-executor.ts index 7f21bba0fc..c4d10fb618 100644 --- a/packages/core/src/runtime/step-executor.ts +++ b/packages/core/src/runtime/step-executor.ts @@ -8,7 +8,11 @@ import { TooEarlyError, WorkflowRuntimeError, } from '@workflow/errors'; -import { pluralize, stepDisplayName } from '@workflow/utils'; +import { + getWorkflowBasePath, + pluralize, + stepDisplayName, +} from '@workflow/utils'; import type { Event, SerializedData, Step, World } from '@workflow/world'; import { SPEC_VERSION_CURRENT, @@ -32,7 +36,6 @@ import { normalizeUnknownError, promoteAbortErrorToFatal, } from '../types.js'; - import { isOptimisticInlineStartEnabled, isOptimisticInlineStartExplicitlyDisabled, @@ -563,7 +566,10 @@ export async function executeStep( const args = hydratedInput.args; const thisVal = hydratedInput.thisVal ?? null; - const port = isVercel ? undefined : await getPortLazy(); + const basePath = getWorkflowBasePath(); + const workflowBaseUrl = isVercel + ? `https://${process.env.VERCEL_URL}${basePath}` + : `http://localhost:${(await getPortLazy()) ?? 3000}${basePath}`; const executionStartTime = Date.now(); result = await trace('step.execute', {}, async () => { @@ -579,9 +585,7 @@ export async function executeStep( workflowName, workflowRunId, workflowStartedAt: new Date(+workflowStartedAt), - url: isVercel - ? `https://${process.env.VERCEL_URL}` - : `http://localhost:${port ?? 3000}`, + url: workflowBaseUrl, features: { encryption: !!encryptionKey }, }, workflowDeploymentId: params.workflowDeploymentId, diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 2a8ba2631f..3dd78cc534 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -9,7 +9,12 @@ import { WorkflowRuntimeError, WorkflowWorldError, } from '@workflow/errors'; -import { formatStepName, pluralize, stepDisplayName } from '@workflow/utils'; +import { + formatStepName, + getWorkflowBasePath, + pluralize, + stepDisplayName, +} from '@workflow/utils'; import { getPort } from '@workflow/utils/get-port'; import { getQueueTopicPrefix, @@ -221,6 +226,10 @@ function createStepHandler(namespace?: string) { isVercel ? undefined : getPort(), getSpanKind('CONSUMER'), ]); + const basePath = getWorkflowBasePath(); + const workflowBaseUrl = isVercel + ? `https://${process.env.VERCEL_URL}${basePath}` + : `http://localhost:${port ?? 3000}${basePath}`; return trace( `step.execute ${stepDisplayName(stepName)}`, @@ -666,11 +675,7 @@ function createStepHandler(namespace?: string) { workflowName, workflowRunId, workflowStartedAt: new Date(+workflowStartedAt), - // TODO: there should be a getUrl method on the world interface itself. This - // solution only works for vercel + local worlds. - url: isVercel - ? `https://${process.env.VERCEL_URL}` - : `http://localhost:${port ?? 3000}`, + url: workflowBaseUrl, features: { encryption: !!encryptionKey }, }, workflowDeploymentId: process.env.VERCEL_DEPLOYMENT_ID, diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index af78708ceb..f6778a9ea6 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -4,7 +4,7 @@ import { WorkflowNotRegisteredError, WorkflowRuntimeError, } from '@workflow/errors'; -import { withResolvers } from '@workflow/utils'; +import { getWorkflowBasePath, withResolvers } from '@workflow/utils'; import { parseWorkflowName } from '@workflow/utils/parse-name'; import type { Event, WorkflowRun } from '@workflow/world'; import { SPEC_VERSION_SUPPORTS_COMPRESSION } from '@workflow/world'; @@ -194,7 +194,10 @@ export async function runWorkflow( // fs ops (readdir, readFile) into the flow route bundle. The resolved // port is cached per process (see get-port-lazy.ts), so this is cheap // on replays after the first. - const port = isVercel ? undefined : await getPortLazy(); + const basePath = getWorkflowBasePath(); + const workflowBaseUrl = isVercel + ? `https://${process.env.VERCEL_URL}${basePath}` + : `http://localhost:${(await getPortLazy()) ?? 3000}${basePath}`; const { context, @@ -311,18 +314,12 @@ export async function runWorkflow( vmGlobalThis[WORKFLOW_GET_STREAM_ID] = (namespace?: string) => getWorkflowRunStreamId(workflowRun.runId, namespace); - // TODO: there should be a getUrl method on the world interface itself. This - // solution only works for vercel + local worlds. - const url = isVercel - ? `https://${process.env.VERCEL_URL}` - : `http://localhost:${port ?? 3000}`; - // For the workflow VM, we store the context in a symbol on the `globalThis` object const ctx: WorkflowMetadata = { workflowName: workflowRun.workflowName, workflowRunId: workflowRun.runId, workflowStartedAt: new vmGlobalThis.Date(+startedAt), - url, + url: workflowBaseUrl, features: { encryption: !!encryptionKey }, }; diff --git a/packages/next/src/index.test.ts b/packages/next/src/index.test.ts index 6996e331b3..5872361bee 100644 --- a/packages/next/src/index.test.ts +++ b/packages/next/src/index.test.ts @@ -2,6 +2,7 @@ import { existsSync, mkdirSync, mkdtempSync, + readFileSync, realpathSync, rmSync, writeFileSync, @@ -50,6 +51,9 @@ import { withWorkflow } from './index.js'; const loaderStubPath = join(__dirname, 'loader.js'); const hadLoaderStub = existsSync(loaderStubPath); const realTmpDir = realpathSync(tmpdir()); +const BASE_PATH_SYMBOL = Symbol.for('@workflow/base-path'); +const workflowGlobal = globalThis as typeof globalThis & + Record; function writeFile(path: string, contents: string): void { mkdirSync(dirname(path), { recursive: true }); @@ -81,9 +85,11 @@ describe('withWorkflow builder config', () => { delete process.env.WORKFLOW_LOCAL_DATA_DIR; delete process.env.WORKFLOW_NEXT_PRIVATE_BUILT; delete process.env.WORKFLOW_TARGET_WORLD; + workflowGlobal[BASE_PATH_SYMBOL] = undefined; }); afterEach(() => { + workflowGlobal[BASE_PATH_SYMBOL] = undefined; if (!hadLoaderStub && existsSync(loaderStubPath)) { rmSync(loaderStubPath); } @@ -173,6 +179,78 @@ describe('withWorkflow builder config', () => { }); }); + it('configures workflow URLs from Next.js basePath', async () => { + const config = withWorkflow({ + basePath: '/v2', + env: { + EXISTING_ENV: '1', + }, + }); + + const nextConfig = await config('phase-production-build', { + defaultConfig: {}, + }); + + expect(workflowGlobal[BASE_PATH_SYMBOL]).toBe('/v2'); + expect(nextConfig.env).toEqual({ + EXISTING_ENV: '1', + }); + expect(builderConfigs[0]).toMatchObject({ + basePath: '/v2', + }); + }); + + it('writes a Vercel launcher entrypoint for basePath workflow routes', async () => { + const projectDir = mkdtempSync(join(realTmpDir, 'workflow-next-basepath-')); + const routeFile = join( + projectDir, + '.next/server/app/.well-known/workflow/v1/flow/route.js' + ); + const traceFile = `${routeFile}.nft.json`; + + try { + writeFile(routeFile, 'module.exports = {};\n'); + writeFile(traceFile, JSON.stringify({ version: 1, files: [] })); + + const config = withWorkflow({ + basePath: '/v2', + }); + const nextConfig = await config('phase-production-build', { + defaultConfig: {}, + }); + const runAfterProductionCompile = ( + nextConfig.compiler as { + runAfterProductionCompile(metadata: { + projectDir: string; + distDir: string; + }): Promise; + } + ).runAfterProductionCompile; + + await runAfterProductionCompile({ + projectDir, + distDir: '.next', + }); + + expect( + readFileSync( + join( + projectDir, + '.next/server/pages/v2/.well-known/workflow/v1/flow.js' + ), + 'utf-8' + ) + ).toBe( + 'module.exports = require("../../../../../app/.well-known/workflow/v1/flow/route.js");\n' + ); + expect(JSON.parse(readFileSync(traceFile, 'utf-8')).files).toContain( + '../../../../../pages/v2/.well-known/workflow/v1/flow.js' + ); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + it('externalizes the built-in Vercel world while preserving user externals', async () => { const config = withWorkflow({ serverExternalPackages: ['@node-rs/xxhash'], diff --git a/packages/next/src/index.ts b/packages/next/src/index.ts index 18262b2c51..bc6054f933 100644 --- a/packages/next/src/index.ts +++ b/packages/next/src/index.ts @@ -1,6 +1,6 @@ import { copyFileSync, mkdirSync, statSync } from 'node:fs'; -import { copyFile, mkdir, readFile } from 'node:fs/promises'; -import { dirname, isAbsolute, join } from 'node:path'; +import { copyFile, mkdir, readFile, writeFile } from 'node:fs/promises'; +import { dirname, isAbsolute, join, relative } from 'node:path'; import type { NextConfig } from 'next'; import semver from 'semver'; import { getNextBuilder } from './builder.js'; @@ -27,6 +27,13 @@ const workflowSerdeComputedPropertyPattern = const PSEUDO_EXTERNAL_PACKAGES = new Set(['server-only', 'client-only']); const warnedAutoRemovedServerExternalPackages = new Set(); +const BASE_PATH_SYMBOL = Symbol.for('@workflow/base-path'); + +function setWorkflowBasePath(basePath: string | undefined): void { + (globalThis as typeof globalThis & Record)[ + BASE_PATH_SYMBOL + ] = basePath ?? ''; +} interface WorkflowPatternMatch { hasUseWorkflow: boolean; @@ -275,6 +282,69 @@ async function copyWorkflowDiagnosticsManifest(metadata: { await copyFile(manifestPath, diagnosticsManifestPath); } +function resolveDistDir({ + projectDir, + distDir, +}: { + projectDir: string; + distDir: string; +}): string { + return isAbsolute(distDir) ? distDir : join(projectDir, distDir); +} + +function toRequirePath(path: string): string { + const normalizedPath = path.replaceAll('\\', '/'); + return normalizedPath.startsWith('.') + ? normalizedPath + : `./${normalizedPath}`; +} + +async function writeWorkflowBasePathEntrypoint( + metadata: { projectDir: string; distDir: string }, + basePath: string | undefined +): Promise { + if (!basePath) { + return; + } + + const distDir = resolveDistDir(metadata); + const appRouteFile = join( + distDir, + 'server/app/.well-known/workflow/v1/flow/route.js' + ); + const pagesRouteFile = join( + distDir, + 'server/pages', + basePath.slice(1), + '.well-known/workflow/v1/flow.js' + ); + const routeTraceFile = `${appRouteFile}.nft.json`; + + if (!fileExists(appRouteFile) || !fileExists(routeTraceFile)) { + return; + } + + await mkdir(dirname(pagesRouteFile), { recursive: true }); + await writeFile( + pagesRouteFile, + `module.exports = require(${JSON.stringify( + toRequirePath(relative(dirname(pagesRouteFile), appRouteFile)) + )});\n` + ); + + const routeTrace = JSON.parse(await readFile(routeTraceFile, 'utf-8')) as { + files: string[]; + }; + const tracedEntrypoint = relative( + dirname(appRouteFile), + pagesRouteFile + ).replaceAll('\\', '/'); + if (!routeTrace.files.includes(tracedEntrypoint)) { + routeTrace.files.push(tracedEntrypoint); + await writeFile(routeTraceFile, JSON.stringify(routeTrace)); + } +} + function copyWorkflowDiagnosticsManifestSync(metadata: { projectDir: string; distDir: string; @@ -375,6 +445,8 @@ export function withWorkflow( } // shallow clone to avoid read-only on top-level nextConfig = Object.assign({}, nextConfig); + const workflowBasePath = nextConfig.basePath; + setWorkflowBasePath(workflowBasePath); nextConfig.serverExternalPackages = [ ...new Set([ ...(nextConfig.serverExternalPackages || []), @@ -456,6 +528,7 @@ export function withWorkflow( await existingRunAfterProductionCompile(metadata); } await copyWorkflowDiagnosticsManifest(metadata); + await writeWorkflowBasePathEntrypoint(metadata, workflowBasePath); registerWorkflowDiagnosticsManifestCopy(metadata); }, }; @@ -478,6 +551,7 @@ export function withWorkflow( moduleSpecifierRoot: process.cwd(), workingDir: process.cwd(), distDir, + basePath: workflowBasePath, diagnosticsDir: `${distDir}/diagnostics`, buildTarget: 'next', workflowsBundlePath: '', // not used in base diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index 5d485cf047..d22076dc55 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -1,4 +1,3 @@ -export { pluralize } from './pluralize.js'; export { formatStepName, formatWorkflowName, @@ -8,8 +7,15 @@ export { stepDisplayName, workflowDisplayName, } from './parse-name.js'; +export { pluralize } from './pluralize.js'; export { once, type PromiseWithResolvers, withResolvers } from './promise.js'; export { parseDurationToDate } from './time.js'; +export { + createWorkflowHealthEndpoint, + createWorkflowRouteUrl, + getWorkflowBasePath, + setWorkflowBasePath, +} from './workflow-config.js'; export { isVercelWorldTarget, resolveWorkflowTargetWorld, diff --git a/packages/utils/src/workflow-config.test.ts b/packages/utils/src/workflow-config.test.ts new file mode 100644 index 0000000000..1fa339f831 --- /dev/null +++ b/packages/utils/src/workflow-config.test.ts @@ -0,0 +1,28 @@ +import { afterEach, describe, expect, it } from 'vitest'; +import { + createWorkflowHealthEndpoint, + createWorkflowRouteUrl, + getWorkflowBasePath, + setWorkflowBasePath, +} from './workflow-config.js'; + +describe('workflow basePath config', () => { + afterEach(() => { + setWorkflowBasePath(undefined); + }); + + it('uses the configured base path for health probes', () => { + setWorkflowBasePath('/v2'); + + expect(getWorkflowBasePath()).toBe('/v2'); + expect(createWorkflowHealthEndpoint()).toBe( + '/v2/.well-known/workflow/v1/flow?__health' + ); + }); + + it('preserves a base URL path when building workflow route URLs', () => { + expect( + createWorkflowRouteUrl('http://localhost:3000/v2///?debug=1', 'step') + ).toBe('http://localhost:3000/v2/.well-known/workflow/v1/step'); + }); +}); diff --git a/packages/utils/src/workflow-config.ts b/packages/utils/src/workflow-config.ts new file mode 100644 index 0000000000..f1237e811c --- /dev/null +++ b/packages/utils/src/workflow-config.ts @@ -0,0 +1,27 @@ +const WORKFLOW_ROUTE_BASE = '/.well-known/workflow/v1'; +const BASE_PATH_SYMBOL = Symbol.for('@workflow/base-path'); +const workflowConfig = globalThis as typeof globalThis & + Record; + +export function setWorkflowBasePath(basePath: string | undefined): void { + workflowConfig[BASE_PATH_SYMBOL] = basePath ?? ''; +} + +export function getWorkflowBasePath(): string { + return workflowConfig[BASE_PATH_SYMBOL] ?? ''; +} + +export function createWorkflowRouteUrl( + baseUrl: string, + pathname: 'flow' | 'step' +): string { + const url = new URL(baseUrl); + url.pathname = `${url.pathname.replace(/\/+$/, '')}${WORKFLOW_ROUTE_BASE}/${pathname}`; + url.search = ''; + url.hash = ''; + return url.toString(); +} + +export function createWorkflowHealthEndpoint() { + return `${getWorkflowBasePath()}${WORKFLOW_ROUTE_BASE}/flow?__health`; +} diff --git a/packages/world-local/src/config.test.ts b/packages/world-local/src/config.test.ts index 3bc9a5b779..ba35fccedd 100644 --- a/packages/world-local/src/config.test.ts +++ b/packages/world-local/src/config.test.ts @@ -1,3 +1,4 @@ +import { setWorkflowBasePath } from '@workflow/utils'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { resolveBaseUrl } from './config'; @@ -15,6 +16,7 @@ describe('resolveBaseUrl', () => { afterEach(() => { process.env = originalEnv; + setWorkflowBasePath(undefined); vi.clearAllMocks(); }); @@ -61,11 +63,14 @@ describe('resolveBaseUrl', () => { const { getWorkflowPort } = await import('@workflow/utils/get-port'); vi.mocked(getWorkflowPort).mockResolvedValue(5173); delete process.env.PORT; + setWorkflowBasePath('/v2'); const result = await resolveBaseUrl({}); - expect(result).toBe('http://localhost:5173'); - expect(getWorkflowPort).toHaveBeenCalled(); + expect(result).toBe('http://localhost:5173/v2'); + expect(getWorkflowPort).toHaveBeenCalledWith({ + endpoint: '/v2/.well-known/workflow/v1/flow?__health', + }); }); it('should throw error when all detection methods fail', async () => { @@ -227,6 +232,15 @@ describe('resolveBaseUrl', () => { expect(result).toBe('https://example.com'); expect(getWorkflowPort).not.toHaveBeenCalled(); }); + + it('should not append the configured basePath to explicit WORKFLOW_LOCAL_BASE_URL', async () => { + process.env.WORKFLOW_LOCAL_BASE_URL = 'http://localhost:3000/custom'; + setWorkflowBasePath('/v2'); + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3000/custom'); + }); }); describe('edge cases', () => { diff --git a/packages/world-local/src/config.ts b/packages/world-local/src/config.ts index 923bda8e01..363a1765c0 100644 --- a/packages/world-local/src/config.ts +++ b/packages/world-local/src/config.ts @@ -1,3 +1,7 @@ +import { + createWorkflowHealthEndpoint, + getWorkflowBasePath, +} from '@workflow/utils'; import { getWorkflowPort } from '@workflow/utils/get-port'; import { once } from './util.js'; @@ -42,6 +46,14 @@ export const config = once(() => { return { dataDir, baseUrl }; }); +export function resolveDirectBaseUrl(config: Partial): string { + return ( + config.baseUrl ?? + process.env.WORKFLOW_LOCAL_BASE_URL ?? + `http://localhost${getWorkflowBasePath()}` + ); +} + /** * Resolves the base URL for queue requests following the priority order: * 1. config.baseUrl (highest priority - full override from args) @@ -61,17 +73,21 @@ export async function resolveBaseUrl(config: Partial): Promise { return process.env.WORKFLOW_LOCAL_BASE_URL; } + const basePath = getWorkflowBasePath(); + if (typeof config.port === 'number') { - return `http://localhost:${config.port}`; + return `http://localhost:${config.port}${basePath}`; } if (process.env.PORT) { - return `http://localhost:${process.env.PORT}`; + return `http://localhost:${process.env.PORT}${basePath}`; } - const detectedPort = await getWorkflowPort(); + const detectedPort = await getWorkflowPort({ + endpoint: createWorkflowHealthEndpoint(), + }); if (detectedPort) { - return `http://localhost:${detectedPort}`; + return `http://localhost:${detectedPort}${basePath}`; } throw new Error('Unable to resolve base URL for workflow queue.'); diff --git a/packages/world-local/src/queue.test.ts b/packages/world-local/src/queue.test.ts index 9e190e1ac7..b78fec8c4c 100644 --- a/packages/world-local/src/queue.test.ts +++ b/packages/world-local/src/queue.test.ts @@ -1,3 +1,4 @@ +import { setWorkflowBasePath } from '@workflow/utils'; import type { StepInvokePayload } from '@workflow/world'; import { MessageId, ValidQueueName } from '@workflow/world'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; @@ -262,7 +263,7 @@ describe('queue delaySeconds', () => { // Real-ish sleep: never resolves, rejects with AbortError on signal // abort — mirrors node:timers/promises semantics for long delays. vi.mocked(mockSetTimeout).mockImplementationOnce( - (_delay?: number, value?: unknown, opts?: { signal?: AbortSignal }) => + (_delay?: number, _value?: unknown, opts?: { signal?: AbortSignal }) => new Promise((_resolve, reject) => { opts?.signal?.addEventListener('abort', () => { const err = new Error('The operation was aborted'); @@ -345,6 +346,31 @@ describe('queue delaySeconds', () => { }); }); +describe('queue basePath routing', () => { + let localQueue: ReturnType; + + afterEach(async () => { + await localQueue?.close(); + setWorkflowBasePath(undefined); + }); + + it('uses basePath when delivering through a direct handler', async () => { + setWorkflowBasePath('/v2'); + localQueue = createQueue({}); + const handler = vi.fn(async (req: Request) => { + expect(req.url).toBe('http://localhost/v2/.well-known/workflow/v1/flow'); + return Response.json({ ok: true }, { status: 200 }); + }); + localQueue.registerHandler('__wkf_workflow_', handler); + + await localQueue.queue('__wkf_workflow_test' as any, { + runId: 'run_01ABC', + }); + + await vi.waitFor(() => expect(handler).toHaveBeenCalledOnce()); + }); +}); + /** undici's shape for a saturated-local-server connect timeout. */ function fetchFailedTimeout(): TypeError { const err = new TypeError('fetch failed'); diff --git a/packages/world-local/src/queue.ts b/packages/world-local/src/queue.ts index b803fd43a3..0846513115 100644 --- a/packages/world-local/src/queue.ts +++ b/packages/world-local/src/queue.ts @@ -1,5 +1,6 @@ import { setTimeout } from 'node:timers/promises'; import type { Transport } from '@vercel/queue'; +import { createWorkflowRouteUrl } from '@workflow/utils'; import { MessageId, parseQueueName, @@ -12,7 +13,7 @@ import { monotonicFactory } from 'ulid'; import { Agent } from 'undici'; import { z } from 'zod/v4'; import type { Config } from './config.js'; -import { resolveBaseUrl } from './config.js'; +import { resolveBaseUrl, resolveDirectBaseUrl } from './config.js'; import { jsonReplacer, jsonReviver } from './fs.js'; import { getPackageInfo } from './init.js'; @@ -212,28 +213,27 @@ export function createQueue(config: Partial): LocalQueue { try { if (directHandler) { - const req = new Request( - `http://localhost/.well-known/workflow/v1/${pathname}`, - { - method: 'POST', - headers, - body, - } + const url = createWorkflowRouteUrl( + resolveDirectBaseUrl(config), + pathname ); + const req = new Request(url, { + method: 'POST', + headers, + body, + }); response = await directHandler(req); } else { const baseUrl = await resolveBaseUrl(config); + const url = createWorkflowRouteUrl(baseUrl, pathname); // eslint-disable-next-line @typescript-eslint/no-explicit-any -- undici v7 dispatcher types don't match @types/node's RequestInit - response = await fetch( - `${baseUrl}/.well-known/workflow/v1/${pathname}`, - { - method: 'POST', - duplex: 'half', - dispatcher: httpAgent, - headers, - body, - } as any - ); + response = await fetch(url, { + method: 'POST', + duplex: 'half', + dispatcher: httpAgent, + headers, + body, + } as any); } } catch (err) { // The delivery never reached the handler: undici threw before a diff --git a/packages/world-postgres/src/queue.test.ts b/packages/world-postgres/src/queue.test.ts index f7d975d799..600c1d8b93 100644 --- a/packages/world-postgres/src/queue.test.ts +++ b/packages/world-postgres/src/queue.test.ts @@ -1,5 +1,6 @@ import { createServer, type Server } from 'node:http'; import { JsonTransport } from '@vercel/queue'; +import { setWorkflowBasePath } from '@workflow/utils'; import { getWorkflowPort } from '@workflow/utils/get-port'; import { MessageId, parseQueueName, type QueuePayload } from '@workflow/world'; import { createLocalWorld } from '@workflow/world-local'; @@ -78,6 +79,7 @@ describe('postgres queue http execution', () => { ) ); vi.useRealTimers(); + setWorkflowBasePath(undefined); delete process.env.WORKFLOW_LOCAL_BASE_URL; delete process.env.PORT; }); @@ -145,13 +147,14 @@ describe('postgres queue http execution', () => { }> = []; const port = await getUnusedLoopbackPort(); vi.mocked(getWorkflowPort).mockResolvedValue(port); + setWorkflowBasePath('/v2'); const queue = buildQueue({ connectionString: 'postgres://test' }, pool); await queue.start(); expect(run).not.toHaveBeenCalled(); - await startWorkflowHttpServer(requests, port); + await startWorkflowHttpServer(requests, port, '/v2'); await vi.waitFor(() => { expect(run).toHaveBeenCalledTimes(1); }); @@ -170,11 +173,13 @@ describe('postgres queue http execution', () => { await expect(task(payload, {} as any)).resolves.toBeUndefined(); - expect(getWorkflowPort).toHaveBeenCalled(); + expect(getWorkflowPort).toHaveBeenCalledWith({ + endpoint: '/v2/.well-known/workflow/v1/flow?__health', + }); expect(requests).toEqual([ expect.objectContaining({ method: 'POST', - url: '/.well-known/workflow/v1/step', + url: '/v2/.well-known/workflow/v1/step', }), ]); }); @@ -487,7 +492,8 @@ async function startWorkflowHttpServer( headers: Record; body: string; }>, - port = 0 + port = 0, + basePath = '' ) { const server = createServer(async (req, res) => { const body = await new Promise((resolve, reject) => { @@ -508,7 +514,10 @@ async function startWorkflowHttpServer( }; requests.push(request); - if (req.method === 'POST' && req.url === '/.well-known/workflow/v1/step') { + if ( + req.method === 'POST' && + req.url === `${basePath}/.well-known/workflow/v1/step` + ) { res.writeHead(200, { 'content-type': 'application/json' }); res.end(JSON.stringify({ ok: true })); return; diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index 962f74f6d4..dadd34a675 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -2,6 +2,11 @@ import { connect } from 'node:net'; import * as Stream from 'node:stream'; import { setTimeout as sleep } from 'node:timers/promises'; import type { Transport } from '@vercel/queue'; +import { + createWorkflowHealthEndpoint, + createWorkflowRouteUrl, + getWorkflowBasePath, +} from '@workflow/utils'; import { getWorkflowPort } from '@workflow/utils/get-port'; import { getQueuePrefixKind, @@ -216,17 +221,21 @@ export function createQueue( return process.env.WORKFLOW_LOCAL_BASE_URL; } + const basePath = getWorkflowBasePath(); + if (typeof port === 'number') { - return `http://localhost:${port}`; + return `http://localhost:${port}${basePath}`; } if (process.env.PORT) { - return `http://localhost:${process.env.PORT}`; + return `http://localhost:${process.env.PORT}${basePath}`; } - const detectedPort = await getWorkflowPort(); + const detectedPort = await getWorkflowPort({ + endpoint: createWorkflowHealthEndpoint(), + }); if (typeof detectedPort === 'number') { - return `http://localhost:${detectedPort}`; + return `http://localhost:${detectedPort}${basePath}`; } return undefined; @@ -359,15 +368,12 @@ export function createQueue( } const pathname = getQueueRoute(queueName); - const response = await fetch( - `${baseUrl}/.well-known/workflow/v1/${pathname}`, - { - method: 'POST', - duplex: 'half', - headers, - body, - } as any - ); + const response = await fetch(createWorkflowRouteUrl(baseUrl, pathname), { + method: 'POST', + duplex: 'half', + headers, + body, + } as any); const text = await response.text(); if (!response.ok) {