feat: Add RecordBatchLogReader for bounded log reading#446
feat: Add RecordBatchLogReader for bounded log reading#446charlesdong1991 wants to merge 9 commits into
Conversation
| arrow_schema: SchemaRef, | ||
| /// Serializes overlapping `poll` / `poll_batches` across clones sharing this `Arc`. | ||
| /// | ||
| /// TODO: Consider an API that consumes |
There was a problem hiding this comment.
it is cheap to clone for this record batch log scanner, but all clones will share one Arc , so two overlapping poll is not supported under current usage model, i add a client-side guard with poll_session so overlapping calls can fail fast.
Not sure what you think, i am happy to create a new issue and do a follow-up on that, or if you prefer i can have a stricter API in this PR?
There was a problem hiding this comment.
Let's do it properly in this PR. The reader should take ownership of the scanner (move, not clone). That way the compiler prevents concurrent polls - no mutex needed.
There was a problem hiding this comment.
I don't think we solved this: new_shared_handle surfaces the similar pattern for bindings again.
Scenario: new_shared_handle re-enables the original shared-state problem for bindings. Concretely: if a user calls scanner.subscribe(new_bucket) while the reader is iterating, filter_batches silently drops new_bucket's batches because it's not in stopping_offsets.
There was a problem hiding this comment.
you are totally right! 👍
with another look, this new_shared_handle will reopen the hole. i added a guard in LogScannerInner and added to all subscribe/unsubscribe* methods to check the guard.
wdyt of this approach? i think it's a lightweight runtime safety check for binding layer
fresh-borzoni
left a comment
There was a problem hiding this comment.
@charlesdong1991 Ty for the PR. Left comments, PTAL
| /// The projected row type to use for record-based scanning | ||
| projected_row_type: fcore::metadata::RowType, | ||
| /// Cache for partition_id -> partition_name mapping (avoids repeated list_partition_infos calls) | ||
| partition_name_cache: std::sync::RwLock<Option<HashMap<i64, String>>>, |
There was a problem hiding this comment.
Why have we removed this?
There was a problem hiding this comment.
since it had no remaining caller after offset/poll loop moved to rust core, wdyt?
There was a problem hiding this comment.
since it had no remaining caller after offset/poll loop moved to rust core, wdyt?
| arrow_schema: SchemaRef, | ||
| /// Serializes overlapping `poll` / `poll_batches` across clones sharing this `Arc`. | ||
| /// | ||
| /// TODO: Consider an API that consumes |
There was a problem hiding this comment.
Let's do it properly in this PR. The reader should take ownership of the scanner (move, not clone). That way the compiler prevents concurrent polls - no mutex needed.
|
Hi @fresh-borzoni Sorry for late response, thanks for reviews. As i have been travelling without my laptop, i will come back to this in 2 weeks. |
1502702 to
23f666d
Compare
|
Thanks for your reviews, did some refactoring, PTAL @fresh-borzoni 🙏 |
|
@charlesdong1991 Ty for the PR, I looked briefly, looks good now, but let's wait until we decide if we want to move to fully async api for python polls, and then if it's the case - merge it first, rebase/resolve conflicts herr and I'll review one more time. WDYT? |
oh, that's good to hear, let me take a look too to get some understanding! |
|
Ty @charlesdong1991 for rebasing, I'll take a look today-tomorrow to unblock you with this |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@charlesdong1991 Ty for the changes, left some comments, PTAL
| /// | ||
| /// Returns: | ||
| /// ``pyarrow.RecordBatchReader`` yielding ``RecordBatch`` objects | ||
| fn to_arrow_batch_reader(&self, py: Python) -> PyResult<Py<PyAny>> { |
There was a problem hiding this comment.
to_arrow_batch_reader() is a sync Python method but does RPC work via block_on, and iteration also blocks via RecordBatchReader.__next__.
I think this is acceptable only if documented as a blocking/sync Arrow interop API, not an asyncio-native streaming API.
We may wish to provide proper asyncio-native streaming api as well as a follow-up issue. Do you mind to file it?
There was a problem hiding this comment.
Good one, i created #545 as follow-up, and i can work on that.
| .collect(); | ||
|
|
||
| let table_id = scanner.table_id(); | ||
| Ok(offsets |
There was a problem hiding this comment.
nit: should defensively intersect returned offsets with subscribed buckets?
Otherwise an unexpected bucket from list_offsets() can enter stopping_offsets and make the reader wait forever.
Just a cheap defensive check, server doesn't return funny things now, but it's a bit brittle
There was a problem hiding this comment.
Good catch. Changed both query_latest_offsets and query_partitioned_offsets to skip bucket ids that not present
| await admin.drop_table(table_path, ignore_if_not_exists=False) | ||
|
|
||
|
|
||
| async def test_to_arrow_batch_reader(connection, admin): |
There was a problem hiding this comment.
Shall we add to test Drop behaviour, as it is rather sophisticated tbh?
I'm think about one integration test that subscribes, starts a reader, drops it mid-iteration, then asserts the original scanner sees no leftover subscriptions for buckets the reader hadn't completed.
| arrow_schema: SchemaRef, | ||
| /// Serializes overlapping `poll` / `poll_batches` across clones sharing this `Arc`. | ||
| /// | ||
| /// TODO: Consider an API that consumes |
There was a problem hiding this comment.
I don't think we solved this: new_shared_handle surfaces the similar pattern for bindings again.
Scenario: new_shared_handle re-enables the original shared-state problem for bindings. Concretely: if a user calls scanner.subscribe(new_bucket) while the reader is iterating, filter_batches silently drops new_bucket's batches because it's not in stopping_offsets.
|
Thanks a lot for reviews @fresh-borzoni i made some changes, let me know what you think 🙏 |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@charlesdong1991 Ty for the changes, LGTM overall, only minor comments
| admin: &FlussAdmin, | ||
| ) -> Result<Self> { | ||
| let subscribed = scanner.get_subscribed_buckets(); | ||
| if subscribed.is_empty() { |
There was a problem hiding this comment.
what if unsubscribe called in between get_subscribed_buckets and guard acquisition?
I think we will pass through stale subscription state and try to read smth and hang, so it's a possible race
There was a problem hiding this comment.
Nice catch, could cause race condition indeed 👍
| /// | ||
| /// **Not intended for general use** — prefer the async [`unsubscribe`]. | ||
| #[doc(hidden)] | ||
| pub fn unsubscribe_sync(&self, bucket: i32) { |
There was a problem hiding this comment.
Shall we use (crate) visibility?
| /// [`unsubscribe_partition`](Self::unsubscribe_partition). See | ||
| /// [`unsubscribe_sync`](Self::unsubscribe_sync) for rationale. | ||
| #[doc(hidden)] | ||
| pub fn unsubscribe_partition_sync(&self, partition_id: PartitionId, bucket: i32) { |
|
thank you for the nice catch! @fresh-borzoni addressed |
|
@charlesdong1991 Ty, can you rebase, pls? |
Purpose
Move query_latest_offsets and poll-until-offsets logic from Python binding into Rust core as RecordBatchLogReader.
This enables both Python and C++ bindings to share the same bounded-read implementation.
Linked issue: close #406
Tests
Tests are passed locally
API and Format
Documentation