support for PREPARE and EXECUTE statement#298
support for PREPARE and EXECUTE statement#298TwistingTwists wants to merge 11 commits intodatafusion-contrib:masterfrom
Conversation
Allows coordinated development of prepare/execute feature across both crates. Amp-Thread-ID: https://ampcode.com/threads/T-019c8b05-dedd-70ce-a89a-673841929957 Co-authored-by: Amp <amp@ampcode.com>
- Detect Statement::Prepare in do_query - Validate inner statement by converting to LogicalPlan - Return PREPARE response - Add unit test for basic PREPARE functionality Amp-Thread-ID: https://ampcode.com/threads/T-019c8b05-dedd-70ce-a89a-673841929957 Co-authored-by: Amp <amp@ampcode.com>
Implemented as PrepareExecuteHook with internal statement storage: - PREPARE: parses inner statement to LogicalPlan, stores in local HashMap - EXECUTE: retrieves prepared statement and executes it - DEALLOCATE: removes statement from storage Benefits: - No type system constraints (vs portal store approach) - Follows existing hook architecture pattern - Per-hook statement isolation - Clean separation of concerns Added comprehensive tests: - test_simple_prepare: PREPARE statement handling - test_simple_execute_prepared: PREPARE + EXECUTE workflow - test_simple_deallocate: DEALLOCATE statement handling - test_execute_nonexistent_statement: Error case - test_prepare_execute_deallocate_workflow: Full lifecycle + reuse All 17 tests pass.
Added full parameter evaluation and substitution: - Convert SQL Expr parameters to DataFusion Expr using session context - Evaluate expressions to scalar values using ExprSimplifier - Build ParamValues map from evaluated expressions - Substitute , , ... into LogicalPlan before execution - Reject non-constant expressions with SQLSTATE 22023 Added tests: - test_execute_with_integer_parameter: Query with BIGINT parameter - test_execute_with_text_parameter: Query with VARCHAR parameter All 19 tests pass.
Current state: - 22/22 tests passing - Parameter substitution working (task 6 ✓) - DEALLOCATE ALL support added with tests - Parameter type validation in progress (task 7) Test coverage includes: - test_execute_with_integer_parameter - test_execute_with_text_parameter - test_deallocate_all - test_deallocate_all_prepare_keyword
…y tests - Implement parameter type validation in EXECUTE handler * Store SqlDataType vector with PreparedStatementInfo * Add sql_type_to_arrow() to convert SQL types to Arrow types * Add types_compatible() to validate type compatibility * Support type coercion via scalar.cast_to() - Add comprehensive test coverage for parameter validation * test_execute_wrong_parameter_count - SQLSTATE 07001 * test_execute_parameter_type_mismatch - SQLSTATE 22023 * test_execute_correct_parameter_count - positive case - Add wire protocol interoperability tests documenting namespace separation * test_sql_prepare_extended_execute - SQL PREPARE vs wire Parse * test_extended_parse_sql_execute - wire Parse vs SQL EXECUTE * test_namespace_isolation - independent storage with same name - Create ExtendedMockClient test helper for wire protocol testing * Implements ClientInfo, ClientPortalStore with MemPortalStore * Allows testing extended query protocol handlers Result: 28/28 tests passing
Remove local path dependency on ../pgwire and use published version 0.38 from crates.io. The pgwire branch documentation is optional and the feature implementation doesn't require any pgwire code changes.
Remove the public clone_statements() method from PrepareExecuteHook as it was never used. This fixes the clippy private-interfaces warning and ensures CI builds pass with -D warnings.
49a7daa to
eb65d45
Compare
4aab57d to
82f055c
Compare
…for EXECUTE - Add SessionExtensions API to pgwire ClientInfo for typed per-connection state storage (keyed by TypeId, interior mutability via RwLock) - Migrate PrepareExecuteHook from shared global HashMap to per-session storage via SessionExtensions, preventing cross-client statement leaks - Use client.metadata() instead of empty HashMap for EXECUTE result encoding, ensuring session-specific format options are respected
| datafusion = { version = "52" } | ||
| futures = "0.3" | ||
| pgwire = { version = "0.38", default-features = false } | ||
| pgwire = { path = "../pgwire", version = "0.38", default-features = false } |
There was a problem hiding this comment.
I think first release of pgwire needs to be made and then update that here ?
This comes from testing locally.
relevant branch - sunng87/pgwire#409
sequenceDiagram
participant U as User SQL Statement
participant H as PrepareExecuteHook
participant E as SessionExtensions
participant O as Outer Lock (extensions map)
participant P as Arc<PreparedStatements>
participant I as Inner Lock (PreparedStatements)
U->>H: PREPARE / EXECUTE / DEALLOCATE
H->>E: statements()
E->>O: acquire read lock
alt Type entry exists
O-->>E: found
E-->>O: release read lock
else Type entry missing
O-->>E: not found
E-->>O: release read lock
E->>O: acquire write lock
O-->>E: insert/get via or_insert_with
E-->>O: release write lock
end
E-->>H: Arc<PreparedStatements>
alt PREPARE / DEALLOCATE (mutating statements)
H->>I: acquire write lock
I-->>H: insert/remove/update
H-->>I: release write lock
else EXECUTE (reading statement)
H->>I: acquire read lock
I-->>H: lookup by statement name
H-->>I: release read lock
end
Putting this here -> may be this helps to visualise flow easier. It certainly helped me. |
|
Hey @sunng87 , |
|
@TwistingTwists Sorry for delay. I'm still trying to figure out a better way to support this in upstream pgwire API. |
|
Hey @sunng87 , now that upstream PR is merged, do you think this PR is roughly in the right direction? |
|
Hi @TwistingTwists , with latest pgwire, we will be able to use downcast to get portalstore work for simplequeryhandler. So I think we are now able to implement it just like we did with extended query. |
|
Ah. Awesome 😎. Will update this PR with those changes then. :) |
Sister PR: sunng87/pgwire#409
Closes: #292
Prepared Statement Test Results
All tests passed end-to-end with real
psql.PREPARE my_stmt AS SELECT 42PREPAREEXECUTE my_stmt(twice)42both timesPREPARE param_stmt (INT) AS SELECT $1 * 2PREPAREEXECUTE param_stmt(21)42EXECUTE param_stmt(100)200DEALLOCATE my_stmtDEALLOCATEEXECUTE my_stmtafter deallocate"does not exist"DEALLOCATE ALLDEALLOCATEEXECUTE stmt_aafterDEALLOCATE ALL"does not exist"