Skip to content

feat: Add RecordBatchLogReader for bounded log reading#446

Open
charlesdong1991 wants to merge 9 commits into
apache:mainfrom
charlesdong1991:arrow-batch-reader
Open

feat: Add RecordBatchLogReader for bounded log reading#446
charlesdong1991 wants to merge 9 commits into
apache:mainfrom
charlesdong1991:arrow-batch-reader

Conversation

@charlesdong1991
Copy link
Copy Markdown
Contributor

Purpose

Move query_latest_offsets and poll-until-offsets logic from Python binding into Rust core as RecordBatchLogReader.

This enables both Python and C++ bindings to share the same bounded-read implementation.

Linked issue: close #406

Tests

Tests are passed locally

API and Format

Documentation

arrow_schema: SchemaRef,
/// Serializes overlapping `poll` / `poll_batches` across clones sharing this `Arc`.
///
/// TODO: Consider an API that consumes
Copy link
Copy Markdown
Contributor Author

@charlesdong1991 charlesdong1991 Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is cheap to clone for this record batch log scanner, but all clones will share one Arc , so two overlapping poll is not supported under current usage model, i add a client-side guard with poll_session so overlapping calls can fail fast.

Not sure what you think, i am happy to create a new issue and do a follow-up on that, or if you prefer i can have a stricter API in this PR?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it properly in this PR. The reader should take ownership of the scanner (move, not clone). That way the compiler prevents concurrent polls - no mutex needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni May 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we solved this: new_shared_handle surfaces the similar pattern for bindings again.

Scenario: new_shared_handle re-enables the original shared-state problem for bindings. Concretely: if a user calls scanner.subscribe(new_bucket) while the reader is iterating, filter_batches silently drops new_bucket's batches because it's not in stopping_offsets.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are totally right! 👍

with another look, this new_shared_handle will reopen the hole. i added a guard in LogScannerInner and added to all subscribe/unsubscribe* methods to check the guard.

wdyt of this approach? i think it's a lightweight runtime safety check for binding layer

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charlesdong1991 Ty for the PR. Left comments, PTAL

Comment thread crates/fluss/src/client/table/reader.rs Outdated
Comment thread bindings/python/src/table.rs Outdated
/// The projected row type to use for record-based scanning
projected_row_type: fcore::metadata::RowType,
/// Cache for partition_id -> partition_name mapping (avoids repeated list_partition_infos calls)
partition_name_cache: std::sync::RwLock<Option<HashMap<i64, String>>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have we removed this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it had no remaining caller after offset/poll loop moved to rust core, wdyt?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it had no remaining caller after offset/poll loop moved to rust core, wdyt?

Comment thread bindings/python/src/lib.rs Outdated
Comment thread bindings/python/src/table.rs Outdated
Comment thread crates/fluss/src/client/table/reader.rs
Comment thread crates/fluss/src/client/table/reader.rs Outdated
arrow_schema: SchemaRef,
/// Serializes overlapping `poll` / `poll_batches` across clones sharing this `Arc`.
///
/// TODO: Consider an API that consumes
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it properly in this PR. The reader should take ownership of the scanner (move, not clone). That way the compiler prevents concurrent polls - no mutex needed.

@charlesdong1991
Copy link
Copy Markdown
Contributor Author

Hi @fresh-borzoni Sorry for late response, thanks for reviews. As i have been travelling without my laptop, i will come back to this in 2 weeks.
In the meantime, i will convert this to draft to avoid confusion. 🙏

@charlesdong1991 charlesdong1991 marked this pull request as draft March 30, 2026 18:10
@charlesdong1991 charlesdong1991 marked this pull request as ready for review April 18, 2026 15:17
@charlesdong1991
Copy link
Copy Markdown
Contributor Author

Thanks for your reviews, did some refactoring, PTAL @fresh-borzoni 🙏

@fresh-borzoni
Copy link
Copy Markdown
Member

fresh-borzoni commented Apr 19, 2026

@charlesdong1991 Ty for the PR, I looked briefly, looks good now, but let's wait until we decide if we want to move to fully async api for python polls, and then if it's the case - merge it first, rebase/resolve conflicts herr and I'll review one more time.

WDYT?

@charlesdong1991
Copy link
Copy Markdown
Contributor Author

let's wait until we decide if we want to move to fully async api for python polls, and then if it's the case - merge it first

oh, that's good to hear, let me take a look too to get some understanding!

@fresh-borzoni
Copy link
Copy Markdown
Member

Ty @charlesdong1991 for rebasing, I'll take a look today-tomorrow to unblock you with this

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charlesdong1991 Ty for the changes, left some comments, PTAL

///
/// Returns:
/// ``pyarrow.RecordBatchReader`` yielding ``RecordBatch`` objects
fn to_arrow_batch_reader(&self, py: Python) -> PyResult<Py<PyAny>> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_arrow_batch_reader() is a sync Python method but does RPC work via block_on, and iteration also blocks via RecordBatchReader.__next__.

I think this is acceptable only if documented as a blocking/sync Arrow interop API, not an asyncio-native streaming API.

We may wish to provide proper asyncio-native streaming api as well as a follow-up issue. Do you mind to file it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good one, i created #545 as follow-up, and i can work on that.

.collect();

let table_id = scanner.table_id();
Ok(offsets
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should defensively intersect returned offsets with subscribed buckets?
Otherwise an unexpected bucket from list_offsets() can enter stopping_offsets and make the reader wait forever.

Just a cheap defensive check, server doesn't return funny things now, but it's a bit brittle

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Changed both query_latest_offsets and query_partitioned_offsets to skip bucket ids that not present

await admin.drop_table(table_path, ignore_if_not_exists=False)


async def test_to_arrow_batch_reader(connection, admin):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add to test Drop behaviour, as it is rather sophisticated tbh?

I'm think about one integration test that subscribes, starts a reader, drops it mid-iteration, then asserts the original scanner sees no leftover subscriptions for buckets the reader hadn't completed.

arrow_schema: SchemaRef,
/// Serializes overlapping `poll` / `poll_batches` across clones sharing this `Arc`.
///
/// TODO: Consider an API that consumes
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni May 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we solved this: new_shared_handle surfaces the similar pattern for bindings again.

Scenario: new_shared_handle re-enables the original shared-state problem for bindings. Concretely: if a user calls scanner.subscribe(new_bucket) while the reader is iterating, filter_batches silently drops new_bucket's batches because it's not in stopping_offsets.

@charlesdong1991
Copy link
Copy Markdown
Contributor Author

Thanks a lot for reviews @fresh-borzoni i made some changes, let me know what you think 🙏

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charlesdong1991 Ty for the changes, LGTM overall, only minor comments

admin: &FlussAdmin,
) -> Result<Self> {
let subscribed = scanner.get_subscribed_buckets();
if subscribed.is_empty() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if unsubscribe called in between get_subscribed_buckets and guard acquisition?
I think we will pass through stale subscription state and try to read smth and hang, so it's a possible race

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, could cause race condition indeed 👍

///
/// **Not intended for general use** — prefer the async [`unsubscribe`].
#[doc(hidden)]
pub fn unsubscribe_sync(&self, bucket: i32) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use (crate) visibility?

/// [`unsubscribe_partition`](Self::unsubscribe_partition). See
/// [`unsubscribe_sync`](Self::unsubscribe_sync) for rationale.
#[doc(hidden)]
pub fn unsubscribe_partition_sync(&self, partition_id: PartitionId, bucket: i32) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@charlesdong1991
Copy link
Copy Markdown
Contributor Author

thank you for the nice catch! @fresh-borzoni addressed

@fresh-borzoni
Copy link
Copy Markdown
Member

@charlesdong1991 Ty, can you rebase, pls?
Also do you mind to file an issue to add the same logic for C++?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

NO to_arrow_batch_reader support in python binding

2 participants