feat: replace custom shuffle block format with Arrow IPC streams#3911
Draft
andygrove wants to merge 19 commits intoapache:mainfrom
Draft
feat: replace custom shuffle block format with Arrow IPC streams#3911andygrove wants to merge 19 commits intoapache:mainfrom
andygrove wants to merge 19 commits intoapache:mainfrom
Conversation
Addresses apache#3882 — shuffle format overhead with default batch size.
Replace the custom shuffle block format (per-batch IPC streams with custom length-prefix headers and external compression wrappers) with standard Arrow IPC streams using built-in body compression. Key changes: - Replace ShuffleBlockWriter with CompressionCodec::ipc_write_options() that creates IpcWriteOptions with LZ4_FRAME or ZSTD body compression - Rewrite BufBatchWriter to use a persistent StreamWriter that writes the schema once, then appends N record batch messages - Rewrite PartitionWriter (spill) to create StreamWriter over spill files - Rewrite PartitionOutputStream (immediate mode) to use persistent StreamWriter<Vec<u8>> with lazy creation and drain/finish lifecycle - Simplify SinglePartitionShufflePartitioner by removing manual batch coalescing (handled by BufBatchWriter's BatchCoalescer) - Update sort-based shuffle in spark_unsafe/row.rs to use StreamWriter - Remove snappy from shuffle compression options (keep Snappy variant in CompressionCodec enum for Parquet writer compatibility) - Update all tests to use Arrow StreamReader for roundtrip verification - Update shuffle_bench binary and criterion benchmarks The old ipc.rs read path is preserved for Task 6. The core crate will have expected compile errors in shuffle_scan.rs tests and jni_api.rs due to removed ShuffleBlockWriter export.
Add JniInputStream (implements std::io::Read by pulling bytes from a JVM InputStream via JNI with 64KB read-ahead buffer) and ShuffleStreamReader (wraps Arrow StreamReader<JniInputStream> for lifecycle management). Replace decodeShuffleBlock JNI function with four new streaming functions: openShuffleStream, nextShuffleStreamBatch, shuffleStreamNumFields, and closeShuffleStream. The old read_ipc_compressed is retained for the legacy ShuffleScanExec code path.
Replace decodeShuffleBlock JNI declaration with four new streaming methods: openShuffleStream, nextShuffleStreamBatch, shuffleStreamNumFields, and closeShuffleStream. Rewrite NativeBatchDecoderIterator to use a native handle pattern instead of manual header parsing and ByteBuffer management.
… streams Replace the old CometShuffleBlockIterator-based read path in ShuffleScanExec with ShuffleStreamReader, which reads standard Arrow IPC streams directly from JVM InputStreams via JniInputStream. This eliminates the custom per-batch block format (8-byte length + 8-byte field count + 4-byte codec + compressed IPC) and the per-batch JNI calls (hasNext/getBuffer) in favor of streaming reads. Changes: - CometShuffledRowRDD: return raw InputStream instead of CometShuffleBlockIterator - CometExecIterator: accept Map[Int, InputStream] instead of Map[Int, CometShuffleBlockIterator] - ShuffleScanExec (Rust): lazily create ShuffleStreamReader from InputStream GlobalRef, read batches via reader.next_batch() instead of JNI block-by-block dance - Add Send+Sync impls for SharedJniStream/StreamReadAdapter to satisfy ExecutionPlan bounds
…ings - Hold a single StreamWriter across all batches in process_sorted_row_partition instead of creating a fresh writer per batch - Remove read_ipc_compressed and snap/lz4_flex/zstd dependencies from shuffle crate - Remove dead CometShuffleBlockIterator.java and its JNI bridge - Rename shuffle_block_writer.rs to codec.rs to reflect its contents - Remove unused _write_time parameter from BufBatchWriter write/flush - Make CompressionCodec::Snappy return an error in ipc_write_options - Remove Snappy from shuffle writer codec mappings in planner and JNI
Update shuffle IPC code to work with jni 0.22 API changes: - GlobalRef → Global<JObject<'static>> / Global<JPrimitiveArray<'static, i8>> - JNIEnv → Env, EnvUnowned - JavaVM::attach_current_thread now takes closure - JVMClasses::get_env() → JVMClasses::with_env() Also update EmptySchemaShufflePartitioner to use Arrow IPC StreamWriter instead of removed ShuffleBlockWriter.
Empty shuffle partitions must have zero bytes in the data file so that Spark's MapOutputTracker reports zero-size blocks. Writing schema+EOS for empty partitions changed the block sizes, which affected coalesce partition grouping in DefaultPartitionCoalescer. Also add miri ignore attribute to shuffle_partitioner_memory test since the spill path now uses IPC StreamWriter which calls into zstd FFI.
Member
Author
|
I ran TPC-H @ 1TB and did not see any significant change in performance |
Member
Author
|
@Kontinuation fyi |
Contributor
What about amount of shuffle data written? |
…r to 8KB Arrow's StreamWriter issues multiple small writes per batch (continuation marker, flatbuf metadata, padding) before the body data. Wrapping the output File in BufWriter coalesces these small writes. The body flush after each batch means the buffer only needs to hold metadata, so 1MB was wasteful — 8KB is sufficient.
Member
Author
|
I am seeing a performance regression with the standalone benchmark binary, so moving this to draft until I understand why. |
Member
|
Ipc StreamWriter also seems to compress data on a per-RecordBatch. If we want to improve the compression rate, we may need to perform compression after StreamWriter finish. |
Member
We can also wrap the output writer with the compression stream_writer. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Partial fix for #3882
Rationale for this change
The native columnar shuffle currently uses a custom per-block format (length-prefixed compressed IPC messages) that requires manual framing, per-block schema headers, and a custom Java reader (
CometShuffleBlockIterator). Replacing this with standard Arrow IPC streams eliminates custom serialization code, enables built-in IPC body compression (zstd/lz4), and allows the shuffle reader to use Arrow'sStreamReaderdirectly.What changes are included in this PR?
Write path:
ShuffleBlockWriterwith Arrow IPCStreamWriterin all partitioners (single, multi, empty-schema)BufBatchWriterbefore serializationipc_compressionfeature in the Arrow dependency for built-in zstd/lz4 body compressionRead path (native):
JniInputStream: a RustReadimpl that pulls bytes from a JVMInputStreamvia JNI with 64KB read-ahead bufferingShuffleStreamReader: manages reading potentially concatenated IPC streams (from spills) using Arrow'sStreamReaderShuffleScanExecto lazily create aShuffleStreamReaderinstead of calling per-block decode methodsRead path (JVM):
CometShuffleBlockIterator(custom Java reader) with a simpleInputStreampassed to native via JNINativeBatchDecoderIteratorto open a stream, read batches, and close — no more per-block ByteBuffer managementopenShuffleStream,nextShuffleStreamBatch,shuffleStreamNumFields,closeShuffleStreamJNI methodsCleanup:
ShuffleBlockWriter,CometShuffleBlockIterator, and shuffle block iterator JNI bridge codeCOMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED,COMET_SHUFFLE_PREFER_DICTIONARY_RATIO)How are these changes tested?
-D warnings