Skip to content

feat(write): add write pipeline with DataFusion INSERT INTO/OVERWRITE support#234

Open
JingsongLi wants to merge 1 commit intoapache:mainfrom
JingsongLi:writer
Open

feat(write): add write pipeline with DataFusion INSERT INTO/OVERWRITE support#234
JingsongLi wants to merge 1 commit intoapache:mainfrom
JingsongLi:writer

Conversation

@JingsongLi
Copy link
Copy Markdown
Contributor

Purpose

Subtask of #232

Add TableWrite for writing Arrow RecordBatches to Paimon append-only tables. Each (partition, bucket) pair gets its own DataFileWriter with direct writes (matching delta-rs DeltaWriter pattern). File rolling uses tokio::spawn for background close, and prepare_commit uses try_join_all for parallel finalization across partition writers.

Key components:

  • TableWrite: routes batches by partition/bucket, holds DataFileWriters
  • DataFileWriter: manages parquet file lifecycle with rolling support
  • WriteBuilder: creates TableWrite and TableCommit instances
  • PaimonDataSink: DataFusion DataSink integration for INSERT/OVERWRITE
  • FormatFileWriter: extended with flush() and in_progress_size()

Configurable options via CoreOptions:

  • file.compression (default: zstd)
  • target-file-size (default: 256MB)
  • write.parquet-buffer-size (default: 256MB)

Includes E2E integration tests for unpartitioned, partitioned, fixed-bucket, multi-commit, column projection, and bucket filtering.

Brief change log

Tests

API and Format

Documentation

… support

Add TableWrite for writing Arrow RecordBatches to Paimon append-only
tables. Each (partition, bucket) pair gets its own DataFileWriter with
direct writes (matching delta-rs DeltaWriter pattern). File rolling
uses tokio::spawn for background close, and prepare_commit uses
try_join_all for parallel finalization across partition writers.

Key components:
- TableWrite: routes batches by partition/bucket, holds DataFileWriters
- DataFileWriter: manages parquet file lifecycle with rolling support
- WriteBuilder: creates TableWrite and TableCommit instances
- PaimonDataSink: DataFusion DataSink integration for INSERT/OVERWRITE
- FormatFileWriter: extended with flush() and in_progress_size()

Configurable options via CoreOptions:
- file.compression (default: zstd)
- target-file-size (default: 256MB)
- write.parquet-buffer-size (default: 256MB)

Includes E2E integration tests for unpartitioned, partitioned,
fixed-bucket, multi-commit, column projection, and bucket filtering.
let row = BinaryRow::from_serialized_bytes(&msg.partition)?;
let mut spec = HashMap::new();
for (i, key) in partition_keys.iter().enumerate() {
if let Some(datum) = extract_datum(&row, i, &data_types[i])? {
Copy link
Copy Markdown
Contributor

@littlecoder04 littlecoder04 Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will drop NULL partition keys from the overwrite predicate. I reproduced a case where overwriting the NULL partition also deletes other partitions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants