diff --git a/README.md b/README.md index 9e4fbd9..6d46a28 100644 --- a/README.md +++ b/README.md @@ -1299,10 +1299,14 @@ abs datasources test --json-config '{"type": "postgres", ...}' abs datasources introspect --json-config '{"type": "postgres", ...}' abs datasources validate-query --json-config '{"query": "SELECT ..."}' abs datasources preview-query --json-config '{"query": "SELECT ..."}' +abs datasources query 6 "SELECT experiment_id, COUNT(*) FROM exposures GROUP BY 1" --limit 20 +abs datasources query 6 --sql - < my_query.sql abs datasources set-default 1 abs datasources schema 1 ``` +By default, `query` and `preview-query` reshape the columnar API response (`columnNames`, `columnTypes`, `rows`) into row-objects so `-o table`, `-o vertical`, `-o json`, etc. render naturally. Pass `--raw` to get the original columnar payload. + ### Export configurations Manage scheduled data export configurations. diff --git a/src/commands/datasources/datasources.test.ts b/src/commands/datasources/datasources.test.ts index e22e986..0466e3a 100644 --- a/src/commands/datasources/datasources.test.ts +++ b/src/commands/datasources/datasources.test.ts @@ -6,6 +6,8 @@ import { printFormatted, } from '../../lib/utils/api-helper.js'; import { resetCommand } from '../../test/helpers/command-reset.js'; +import { PassThrough } from 'node:stream'; +import { setTTYOverride } from '../../lib/utils/stdin.js'; vi.mock('../../lib/utils/api-helper.js', async (importOriginal) => { const actual = await importOriginal(); @@ -31,7 +33,14 @@ describe('datasources command', () => { testDatasource: vi.fn().mockResolvedValue(undefined), introspectDatasource: vi.fn().mockResolvedValue({ schema: [] }), validateDatasourceQuery: vi.fn().mockResolvedValue(undefined), - previewDatasourceQuery: vi.fn().mockResolvedValue({ result: [] }), + previewDatasourceQuery: vi.fn().mockResolvedValue({ + columnNames: ['experiment_id', 'cnt'], + columnTypes: ['INT64', 'INT64'], + rows: [ + [1, 538217], + [5, 250000], + ], + }), setDefaultDatasource: vi.fn().mockResolvedValue(undefined), getDatasourceSchema: vi.fn().mockResolvedValue({ tables: [] }), deleteDatasource: vi.fn().mockResolvedValue(undefined), @@ -153,7 +162,7 @@ describe('datasources command', () => { expect(mockClient.validateDatasourceQuery).toHaveBeenCalledWith({ query: 'SELECT 1' }); }); - it('should preview a datasource query', async () => { + it('should preview a datasource query and reshape columnar response to rows', async () => { await datasourcesCommand.parseAsync([ 'node', 'test', @@ -163,7 +172,37 @@ describe('datasources command', () => { ]); expect(mockClient.previewDatasourceQuery).toHaveBeenCalledWith({ query: 'SELECT 1' }); - expect(printFormatted).toHaveBeenCalled(); + expect(printFormatted).toHaveBeenCalledWith( + [ + { experiment_id: 1, cnt: 538217 }, + { experiment_id: 5, cnt: 250000 }, + ], + expect.anything() + ); + }); + + it('should preview a datasource query with --raw and keep the columnar shape', async () => { + vi.mocked(getGlobalOptions).mockReturnValueOnce({ output: 'table', raw: true } as any); + + await datasourcesCommand.parseAsync([ + 'node', + 'test', + 'preview-query', + '--json-config', + '{"query":"SELECT 1"}', + ]); + + expect(printFormatted).toHaveBeenCalledWith( + { + columnNames: ['experiment_id', 'cnt'], + columnTypes: ['INT64', 'INT64'], + rows: [ + [1, 538217], + [5, 250000], + ], + }, + expect.anything() + ); }); it('should set default datasource', async () => { @@ -212,4 +251,235 @@ describe('datasources command', () => { expect(mockClient.recreateDatasourceJsonLayouts).not.toHaveBeenCalled(); expect(consoleErrorSpy).toHaveBeenCalledWith(expect.stringContaining('--yes')); }); + + it('should run query with positional SQL and reshape the response', async () => { + await datasourcesCommand.parseAsync(['node', 'test', 'query', '6', 'SELECT 1 AS one']); + + expect(mockClient.previewDatasourceQuery).toHaveBeenCalledWith({ + datasource_id: 6, + query: 'SELECT 1 AS one', + }); + expect(printFormatted).toHaveBeenCalled(); + }); + + it('should run query with --sql flag', async () => { + await datasourcesCommand.parseAsync(['node', 'test', 'query', '6', '--sql', 'SELECT 2 AS two']); + + expect(mockClient.previewDatasourceQuery).toHaveBeenCalledWith({ + datasource_id: 6, + query: 'SELECT 2 AS two', + }); + }); + + it('should reject when both positional sql and --sql are provided', async () => { + await expect( + datasourcesCommand.parseAsync(['node', 'test', 'query', '6', 'SELECT 1', '--sql', 'SELECT 2']) + ).rejects.toThrow(/process\.exit: 1/); + + expect(mockClient.previewDatasourceQuery).not.toHaveBeenCalled(); + expect(consoleErrorSpy).toHaveBeenCalledWith(expect.stringContaining('--sql')); + }); + + it('should pass --limit through to the request body', async () => { + await datasourcesCommand.parseAsync([ + 'node', + 'test', + 'query', + '6', + 'SELECT 1', + '--limit', + '20', + ]); + + expect(mockClient.previewDatasourceQuery).toHaveBeenCalledWith({ + datasource_id: 6, + query: 'SELECT 1', + limit: 20, + }); + }); + + it('should omit limit from the body when --limit is not set', async () => { + await datasourcesCommand.parseAsync(['node', 'test', 'query', '6', 'SELECT 1']); + + const call = mockClient.previewDatasourceQuery.mock.calls[0]?.[0]; + expect(call).not.toHaveProperty('limit'); + }); + + it('re-exports columnarToRows from core/datasources', async () => { + const mod = await import('../../core/datasources/datasources.js'); + expect(typeof (mod as { columnarToRows?: unknown }).columnarToRows).toBe('function'); + }); + + it('query reshapes columnar response to rows by default', async () => { + await datasourcesCommand.parseAsync(['node', 'test', 'query', '6', 'SELECT 1']); + + expect(printFormatted).toHaveBeenCalledWith( + [ + { experiment_id: 1, cnt: 538217 }, + { experiment_id: 5, cnt: 250000 }, + ], + expect.anything() + ); + }); + + it('query --raw preserves the columnar response', async () => { + vi.mocked(getGlobalOptions).mockReturnValueOnce({ output: 'table', raw: true } as any); + + await datasourcesCommand.parseAsync(['node', 'test', 'query', '6', 'SELECT 1']); + + expect(printFormatted).toHaveBeenCalledWith( + { + columnNames: ['experiment_id', 'cnt'], + columnTypes: ['INT64', 'INT64'], + rows: [ + [1, 538217], + [5, 250000], + ], + }, + expect.anything() + ); + }); + + it('should accept --json-config as the full body when no other inputs are present', async () => { + await datasourcesCommand.parseAsync([ + 'node', + 'test', + 'query', + '6', + '--json-config', + '{"datasource_id":6,"query":"SELECT 1","limit":3}', + ]); + + expect(mockClient.previewDatasourceQuery).toHaveBeenCalledWith({ + datasource_id: 6, + query: 'SELECT 1', + limit: 3, + }); + }); + + it('should reject --json-config combined with positional sql', async () => { + await expect( + datasourcesCommand.parseAsync([ + 'node', + 'test', + 'query', + '6', + 'SELECT 1', + '--json-config', + '{"query":"SELECT 1"}', + ]) + ).rejects.toThrow(/process\.exit: 1/); + + expect(mockClient.previewDatasourceQuery).not.toHaveBeenCalled(); + }); + + it('should reject --json-config combined with --sql', async () => { + await expect( + datasourcesCommand.parseAsync([ + 'node', + 'test', + 'query', + '6', + '--sql', + 'SELECT 1', + '--json-config', + '{"query":"SELECT 1"}', + ]) + ).rejects.toThrow(/process\.exit: 1/); + + expect(mockClient.previewDatasourceQuery).not.toHaveBeenCalled(); + }); + + it('should reject --json-config combined with --limit', async () => { + await expect( + datasourcesCommand.parseAsync([ + 'node', + 'test', + 'query', + '6', + '--limit', + '5', + '--json-config', + '{"query":"SELECT 1"}', + ]) + ).rejects.toThrow(/process\.exit: 1/); + + expect(mockClient.previewDatasourceQuery).not.toHaveBeenCalled(); + }); + + it('should reject when no SQL source is provided', async () => { + await expect(datasourcesCommand.parseAsync(['node', 'test', 'query', '6'])).rejects.toThrow( + /process\.exit: 1/ + ); + + expect(mockClient.previewDatasourceQuery).not.toHaveBeenCalled(); + }); +}); + +describe('datasources query --sql -', () => { + let consoleErrorSpy: ReturnType; + let processExitSpy: ReturnType; + let originalStdin: typeof process.stdin; + + const mockClient = { + previewDatasourceQuery: vi.fn().mockResolvedValue({ + columnNames: ['answer'], + columnTypes: ['INT64'], + rows: [[42]], + }), + }; + + beforeEach(() => { + vi.clearAllMocks(); + resetCommand(datasourcesCommand); + vi.mocked(getAPIClientFromOptions).mockResolvedValue(mockClient as any); + vi.mocked(getGlobalOptions).mockReturnValue({ output: 'table' } as any); + vi.mocked(printFormatted).mockImplementation(() => {}); + consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + processExitSpy = vi.spyOn(process, 'exit').mockImplementation((code?) => { + throw new Error(`process.exit: ${code}`); + }); + }); + + afterEach(() => { + if (originalStdin) { + Object.defineProperty(process, 'stdin', { + value: originalStdin, + configurable: true, + }); + } + setTTYOverride({ stdin: true, stdout: true }); + consoleErrorSpy.mockRestore(); + processExitSpy.mockRestore(); + }); + + function replaceStdin(content: string): void { + originalStdin = process.stdin; + const stream = new PassThrough(); + stream.end(content); + Object.defineProperty(process, 'stdin', { + value: stream, + configurable: true, + }); + setTTYOverride({ stdin: false }); + } + + it('reads SQL from stdin when --sql is "-"', async () => { + replaceStdin('SELECT 42 AS answer\n'); + await datasourcesCommand.parseAsync(['node', 'test', 'query', '6', '--sql', '-']); + + expect(mockClient.previewDatasourceQuery).toHaveBeenCalledWith({ + datasource_id: 6, + query: 'SELECT 42 AS answer\n', + }); + }); + + it('errors when --sql is "-" but stdin is empty', async () => { + replaceStdin(''); + await expect( + datasourcesCommand.parseAsync(['node', 'test', 'query', '6', '--sql', '-']) + ).rejects.toThrow(/process\.exit: 1/); + + expect(mockClient.previewDatasourceQuery).not.toHaveBeenCalled(); + }); }); diff --git a/src/commands/datasources/index.ts b/src/commands/datasources/index.ts index ce46cd4..14fdbfe 100644 --- a/src/commands/datasources/index.ts +++ b/src/commands/datasources/index.ts @@ -8,6 +8,7 @@ import { withErrorHandling, } from '../../lib/utils/api-helper.js'; import { parseDatasourceId, validateJSON } from '../../lib/utils/validators.js'; +import { readStdinText } from '../../lib/utils/stdin.js'; import type { DatasourceId } from '../../lib/api/branded-types.js'; import { listDatasources as coreListDatasources, @@ -25,6 +26,7 @@ import { createDatasourceJsonLayouts as coreCreateDatasourceJsonLayouts, recreateDatasourceJsonLayouts as coreRecreateDatasourceJsonLayouts, previewDatasourceJsonLayouts as corePreviewDatasourceJsonLayouts, + columnarToRows, } from '../../core/datasources/datasources.js'; export const datasourcesCommand = new Command('datasources') @@ -150,10 +152,89 @@ const previewQueryCommand = new Command('preview-query') const client = await getAPIClientFromOptions(globalOptions); const config = validateJSON(options.jsonConfig, '--json-config') as Record; const result = await corePreviewDatasourceQuery(client, { config }); - printFormatted(result.data, globalOptions); + printFormatted(globalOptions.raw ? result.data : columnarToRows(result.data), globalOptions); }) ); +const queryCommand = new Command('query') + .description('Run a SQL query against a datasource') + .argument('', 'datasource ID', parseDatasourceId) + .argument('[sql]', 'SQL query (positional). Mutually exclusive with --sql and --json-config.') + .option( + '--sql ', + 'SQL query. Use "-" to read from stdin. Mutually exclusive with the positional argument and --json-config.' + ) + .option('--limit ', 'maximum number of rows to return', (value) => Number(value)) + .option( + '--json-config ', + 'full request body as JSON. Overrides all other inputs and rejects them if set.' + ) + .action( + withErrorHandling( + async ( + id: DatasourceId, + positionalSql: string | undefined, + options: { sql?: string; limit?: number; jsonConfig?: string } + ) => { + const globalOptions = getGlobalOptions(queryCommand); + + let config: Record; + + if (options.jsonConfig !== undefined) { + if ( + positionalSql !== undefined || + options.sql !== undefined || + options.limit !== undefined + ) { + console.error( + chalk.red( + '✗ --json-config cannot be combined with positional SQL, --sql, or --limit.' + ) + ); + process.exit(1); + } + config = validateJSON(options.jsonConfig, '--json-config') as Record; + } else { + if (positionalSql !== undefined && options.sql !== undefined) { + console.error( + chalk.red('✗ Provide SQL via the positional argument OR --sql, not both.') + ); + process.exit(1); + } + + let sql: string | undefined = positionalSql ?? options.sql; + if (sql === '-') { + sql = await readStdinText(); + if (sql.length === 0) { + console.error(chalk.red('✗ --sql - was specified but stdin was empty.')); + process.exit(1); + } + } + + if (sql === undefined) { + console.error(chalk.red('✗ SQL is required. Provide it positionally or via --sql.')); + process.exit(1); + } + + config = { + datasource_id: id, + query: sql, + }; + if (options.limit !== undefined) { + config.limit = options.limit; + } + } + + const client = await getAPIClientFromOptions(globalOptions); + const result = await corePreviewDatasourceQuery(client, { config }); + printFormatted( + globalOptions.raw ? result.data : columnarToRows(result.data), + globalOptions + ); + } + ) + ); + const setDefaultCommand = new Command('set-default') .description('Set a datasource as the default') .argument('', 'datasource ID', parseDatasourceId) @@ -260,6 +341,7 @@ datasourcesCommand.addCommand(testCommand); datasourcesCommand.addCommand(introspectCommand); datasourcesCommand.addCommand(validateQueryCommand); datasourcesCommand.addCommand(previewQueryCommand); +datasourcesCommand.addCommand(queryCommand); datasourcesCommand.addCommand(setDefaultCommand); datasourcesCommand.addCommand(schemaCommand); datasourcesCommand.addCommand(deleteCommand); diff --git a/src/core/datasources/datasources.ts b/src/core/datasources/datasources.ts index 2227ef2..4829231 100644 --- a/src/core/datasources/datasources.ts +++ b/src/core/datasources/datasources.ts @@ -176,3 +176,5 @@ export async function previewDatasourceJsonLayouts( const data = await client.previewDatasourceJsonLayouts(params.id); return { data }; } + +export { columnarToRows } from '../events/events.js'; diff --git a/src/core/datasources/index.ts b/src/core/datasources/index.ts index 112484b..ec62bb9 100644 --- a/src/core/datasources/index.ts +++ b/src/core/datasources/index.ts @@ -10,6 +10,7 @@ export { previewDatasourceQuery, setDefaultDatasource, getDatasourceSchema, + columnarToRows, } from './datasources.js'; export type { GetDatasourceParams, diff --git a/src/lib/utils/stdin.test.ts b/src/lib/utils/stdin.test.ts index bffcaa6..d72e60b 100644 --- a/src/lib/utils/stdin.test.ts +++ b/src/lib/utils/stdin.test.ts @@ -1,5 +1,6 @@ import { describe, it, expect, afterEach } from 'vitest'; -import { isStdinPiped, isStdoutPiped, setTTYOverride } from './stdin.js'; +import { PassThrough } from 'node:stream'; +import { isStdinPiped, isStdoutPiped, setTTYOverride, readStdinText } from './stdin.js'; describe('stdin utilities', () => { afterEach(() => { @@ -26,3 +27,46 @@ describe('stdin utilities', () => { expect(isStdoutPiped()).toBe(true); }); }); + +describe('readStdinText', () => { + let originalStdin: typeof process.stdin; + + afterEach(() => { + if (originalStdin) { + Object.defineProperty(process, 'stdin', { + value: originalStdin, + configurable: true, + }); + } + setTTYOverride({ stdin: true, stdout: true }); + }); + + function replaceStdin(content: string): void { + originalStdin = process.stdin; + const stream = new PassThrough(); + stream.end(content); + Object.defineProperty(process, 'stdin', { + value: stream, + configurable: true, + }); + setTTYOverride({ stdin: false }); + } + + it('returns the full stdin contents verbatim', async () => { + replaceStdin('SELECT 1\n'); + const result = await readStdinText(); + expect(result).toBe('SELECT 1\n'); + }); + + it('preserves embedded blank lines and trailing whitespace', async () => { + replaceStdin('SELECT a,\n\n b\nFROM t\n'); + const result = await readStdinText(); + expect(result).toBe('SELECT a,\n\n b\nFROM t\n'); + }); + + it('returns an empty string when stdin is empty', async () => { + replaceStdin(''); + const result = await readStdinText(); + expect(result).toBe(''); + }); +}); diff --git a/src/lib/utils/stdin.ts b/src/lib/utils/stdin.ts index 9f0aadf..59c6040 100644 --- a/src/lib/utils/stdin.ts +++ b/src/lib/utils/stdin.ts @@ -30,3 +30,14 @@ export async function readLinesFromStdin(): Promise { .map((line) => line.trim()) .filter((line) => line.length > 0); } + +export async function readStdinText(): Promise { + if (!isStdinPiped()) { + console.error('Waiting for input on stdin... (pipe data or press Ctrl+D to end)'); + } + const chunks: Buffer[] = []; + for await (const chunk of process.stdin) { + chunks.push(chunk as Buffer); + } + return Buffer.concat(chunks).toString('utf-8'); +}