Skip to content
10 changes: 8 additions & 2 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,14 @@ async def main():
except Exception as e:
print(f"Could not convert to Pandas: {e}")

# TODO: support to_arrow_batch_reader()
# which is reserved for streaming use cases
# to_arrow_batch_reader() — returns a lazy PyArrow RecordBatchReader
batch_scanner_reader = await table.new_scan().create_record_batch_log_scanner()
batch_scanner_reader.subscribe_buckets(
{i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}
)
arrow_reader = batch_scanner_reader.to_arrow_batch_reader()
reader_table = pa.Table.from_batches(list(arrow_reader), schema=arrow_reader.schema)
print(f"\nVia to_arrow_batch_reader(): {reader_table.num_rows} rows")

# TODO: support to_duckdb()

Expand Down
24 changes: 24 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,26 @@ class LogScanner:
or timeout expires.
"""
...
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
"""Create a lazy Arrow RecordBatchReader that reads until latest offsets.

Returns a ``pyarrow.RecordBatchReader`` that lazily polls batches one at
a time (streaming). Prefer this when you want to process batches without
holding the full result in memory at once.

Do not call ``poll_arrow`` / ``poll_record_batch`` on this scanner while
iterating the reader; they share the same underlying scanner state.
Overlapping calls are not supported. Use one active
polling/consumption path at a time.

Requires a batch-based scanner (created with ``new_scan().create_record_batch_log_scanner()``).
You must call ``subscribe()``, ``subscribe_buckets()``, ``subscribe_partition()``,
or ``subscribe_partition_buckets()`` first.

Returns:
``pyarrow.RecordBatchReader`` yielding ``RecordBatch`` objects.
"""
...
async def to_pandas(self) -> pd.DataFrame:
"""Convert all data to Pandas DataFrame.

Expand All @@ -910,6 +930,10 @@ class LogScanner:
async def to_arrow(self) -> pa.Table:
"""Convert all data to Arrow Table.

Batches are collected in Rust then combined into one table (no per-batch
Python iteration). Do not interleave with ``poll_arrow`` / ``poll_record_batch``
for the same subscription session; overlapping use is not supported.

Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()).
Reads from currently subscribed buckets until reaching their latest offsets.

Expand Down
Loading
Loading