From 7bc3ba2a87f32ee786aa4236db3bf9207447f9bb Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Sat, 27 Jun 2026 17:12:18 +0200 Subject: [PATCH 1/2] feat(reader): extract per-zone SUM fold into reader.compute.ZoneReducer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move the zone-map SUM fold out of the Calcite adapter into a reusable reader-side primitive (ADR 0013 §6, whole-zone tier). ZoneReducer.sum folds the per-zone SUM rows from ScanIterator.columnZoneStats with no data segment decoded, returning null when no zone carries a usable sum so the caller streams instead — identical semantics to the inlined fold it replaces. Lives in reader.compute as the seam a future vortex-compute module extracts: it depends only on the public reader scan API, so the move is mechanical once a second consumer appears. The predicate/residual tier waits on its consumer (Calcite aggregate + WHERE push-down). VortexAggregates now delegates SUM to ZoneReducer; behaviour unchanged. Co-Authored-By: Claude Opus 4.8 --- TODO.md | 6 +- .../dfa1/vortex/calcite/VortexAggregates.java | 46 ++------- .../vortex/reader/compute/ZoneReducer.java | 76 ++++++++++++++ .../dfa1/vortex/writer/ZoneReducerTest.java | 98 +++++++++++++++++++ 4 files changed, 185 insertions(+), 41 deletions(-) create mode 100644 reader/src/main/java/io/github/dfa1/vortex/reader/compute/ZoneReducer.java create mode 100644 writer/src/test/java/io/github/dfa1/vortex/writer/ZoneReducerTest.java diff --git a/TODO.md b/TODO.md index f50bd832..27a58741 100644 --- a/TODO.md +++ b/TODO.md @@ -100,7 +100,11 @@ Per-encoding gotchas: min/max/sum/null count, decoding sum from the `vortex.stats` zone-map table (matches files from Rust, whose flat writer omits per-flat sum). Calcite `VortexAggregates.SUM`/`AVG` now fold those per-zone sums (metadata-only), falling back to a full scan only when a column has no zone map. - Next: `Mask`/`Predicate`/kernel vocab and the two-tier whole-zone+residual reduce. + The fold is now a reusable `reader.compute.ZoneReducer.sum(col)` (the seam a future + `vortex-compute` extracts); Calcite consumes it. + Next: the residual tier needs a consumer first — wire Calcite aggregate+`WHERE` push-down, then + add `ZoneReducer` predicate support (whole-zone fold + boundary-zone streaming) and the + `Mask`/`Predicate`/kernel vocab on top. ## Encodings diff --git a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java index e74a3af8..65930aa4 100644 --- a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java +++ b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java @@ -10,17 +10,16 @@ import io.github.dfa1.vortex.reader.array.FloatArray; import io.github.dfa1.vortex.reader.array.IntArray; import io.github.dfa1.vortex.reader.array.LongArray; - -import java.util.List; +import io.github.dfa1.vortex.reader.compute.ZoneReducer; /// Column aggregates answered with the cheapest available source. /// /// `MIN` / `MAX` / `COUNT` are read from the per-segment zone-map statistics embedded in the /// file footer — no data segment is decoded. `SUM` (and therefore `AVG`) folds the per-zone -/// `SUM` rows surfaced by [ScanIterator#columnZoneStats(String)] (ADR 0013 §6): when every zone -/// carries a sum the answer is metadata-only too, with no data segment touched. Only when a zone -/// lacks a sum — a column with no zone map, whose flat nodes do not retain it — does it fall back -/// to a streaming scan. +/// `SUM` rows via [ZoneReducer#sum(String)] (ADR 0013 §6): when every zone carries a sum the +/// answer is metadata-only too, with no data segment touched. Only when a zone lacks a sum — a +/// column with no zone map, whose flat nodes do not retain it — does it fall back to a streaming +/// scan. public final class VortexAggregates { /// Where an aggregate's value came from. @@ -71,7 +70,7 @@ public static Summary of(VortexReader reader, String column) { // SUM/AVG: fold the per-zone SUM rows when every zone carries one (metadata-only); fall // back to a single streaming scan otherwise. Integer columns sum into a long (exact); // floating columns into a double. - Number sum = zoneSum(reader, column); + Number sum = new ZoneReducer(reader).sum(column); Source sumSource = Source.ZONE_STATS_PUSHDOWN; if (sum == null) { sum = scanSum(reader, column); @@ -83,39 +82,6 @@ public static Summary of(VortexReader reader, String column) { Source.ZONE_STATS_PUSHDOWN, sumSource); } - /// Folds the per-zone `SUM` statistics for `column`, or returns `null` when any zone lacks a - /// sum (so the caller streams the column instead). Integer columns fold into a [Long] (exact), - /// floating columns into a [Double]; the zone-stat boxing already distinguishes the two. - private static Number zoneSum(VortexReader reader, String column) { - try (ScanIterator scan = reader.scan(ScanOptions.columns(column))) { - List zones = scan.columnZoneStats(column); - if (zones.isEmpty()) { - return null; - } - long longSum = 0L; - double doubleSum = 0.0; - boolean isFloating = false; - for (ArrayStats zone : zones) { - switch (zone.sum()) { - case Long l -> longSum += l; - case Double d -> { - isFloating = true; - doubleSum += d; - } - case null, default -> { - // A zone with no usable sum (no zone map, or an unhandled stat type) — - // can't push down, let the caller fall back to a full scan. - return null; - } - } - } - if (isFloating) { - return doubleSum; - } - return longSum; - } - } - private static long totalRows(VortexReader reader) { try (ScanIterator scan = reader.scan(ScanOptions.all())) { long total = 0L; diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/compute/ZoneReducer.java b/reader/src/main/java/io/github/dfa1/vortex/reader/compute/ZoneReducer.java new file mode 100644 index 00000000..5a8e27b4 --- /dev/null +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/compute/ZoneReducer.java @@ -0,0 +1,76 @@ +package io.github.dfa1.vortex.reader.compute; + +import io.github.dfa1.vortex.reader.ArrayStats; +import io.github.dfa1.vortex.reader.ScanIterator; +import io.github.dfa1.vortex.reader.ScanOptions; +import io.github.dfa1.vortex.reader.VortexReader; + +import java.util.List; +import java.util.Objects; + +/// Answers column reductions from the per-zone statistics table, without decoding any data +/// segment. +/// +/// This is the whole-zone tier of the aggregate push-down sketched in ADR 0013 §6: a reduction +/// folds the per-zone rows surfaced by [ScanIterator#columnZoneStats(String)] instead of streaming +/// the column. The boundary (residual) tier — partially-selected zones under a predicate — is a +/// later increment; today the reducer takes no predicate and folds every zone. +/// +/// The reducer lives in `reader.compute` as the seam a future `vortex-compute` module extracts: it +/// depends only on the public reader scan API, so the move is mechanical once a second consumer +/// (the arrow bridge, a query façade) appears. +public final class ZoneReducer { + + private final VortexReader reader; + + /// Creates a reducer over an open reader. + /// + /// @param reader an open reader over the file; not closed by this reducer + public ZoneReducer(VortexReader reader) { + this.reader = Objects.requireNonNull(reader, "reader"); + } + + /// Folds the per-zone `SUM` statistics for `column` into the column total, or returns `null` + /// when the reduction cannot be answered from the zone-map table — a column with no zone map, + /// or a zone whose `SUM` was not retained (e.g. an overflowed zone) — so the caller streams the + /// column instead. + /// + /// Integer columns fold into a [Long] (exact, wrapping at 2^63 like SQL `SUM(BIGINT)`); floating + /// columns into a [Double]. The two are never mixed: a column is one dtype, and the zone-stat + /// boxing already distinguishes them. + /// + /// @param column the numeric column name + /// @return the column sum as a [Long] or [Double], or `null` if no zone carries a usable sum + public Number sum(String column) { + Objects.requireNonNull(column, "column"); + try (ScanIterator scan = reader.scan(ScanOptions.columns(column))) { + List zones = scan.columnZoneStats(column); + if (zones.isEmpty()) { + return null; + } + long longSum = 0L; + double doubleSum = 0.0; + boolean isFloating = false; + for (ArrayStats zone : zones) { + switch (zone.sum()) { + case Long l -> longSum += l; + case Double d -> { + isFloating = true; + doubleSum += d; + } + case null, default -> { + // A zone with no usable sum (no zone map, or an unhandled stat type) — the + // whole reduction can't be pushed down, so signal a fall back to a scan. + return null; + } + } + } + // Return via if/else, not a ?: ternary: a numeric conditional unboxes both branches and + // widens the Long to double, silently turning an integer-column sum into a Double. + if (isFloating) { + return doubleSum; + } + return longSum; + } + } +} diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/ZoneReducerTest.java b/writer/src/test/java/io/github/dfa1/vortex/writer/ZoneReducerTest.java new file mode 100644 index 00000000..ad908935 --- /dev/null +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/ZoneReducerTest.java @@ -0,0 +1,98 @@ +package io.github.dfa1.vortex.writer; + +import io.github.dfa1.vortex.core.model.DType; +import io.github.dfa1.vortex.reader.ReadRegistry; +import io.github.dfa1.vortex.reader.VortexReader; +import io.github.dfa1.vortex.reader.compute.ZoneReducer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/// Round-trip tests for [ZoneReducer] — the whole-zone tier of ADR 0013 §6 aggregate push-down. +/// SUM folds the per-zone `SUM` rows the writer embeds in each chunk's zone-map stats, with no data +/// segment decoded. +class ZoneReducerTest { + + private static final DType.Struct I64_SCHEMA = new DType.Struct( + List.of("id"), List.of(DType.I64), false); + + private static final DType.Struct F64_SCHEMA = new DType.Struct( + List.of("v"), List.of(DType.F64), false); + + private static ReadRegistry registry() { + return ReadRegistry.builder().registerServiceLoaded().build(); + } + + private static long[] range(long from, long to) { + long[] arr = new long[(int) (to - from + 1)]; + for (int i = 0; i < arr.length; i++) { + arr[i] = from + i; + } + return arr; + } + + @Test + void integerSumFoldsZonesToExactLong(@TempDir Path tmp) throws IOException { + // Given — three I64 chunks (one zone each); column total is 1+..+150 = 11325 + Path file = tmp.resolve("ints.vtx"); + try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE); + var w = VortexWriter.create(ch, I64_SCHEMA, WriteOptions.defaults())) { + w.writeChunk(Map.of("id", range(1L, 50L))); + w.writeChunk(Map.of("id", range(51L, 100L))); + w.writeChunk(Map.of("id", range(101L, 150L))); + } + + // When + try (VortexReader reader = VortexReader.open(file, registry())) { + Number result = new ZoneReducer(reader).sum("id"); + + // Then — exact Long (not a Double widening), no data segment decoded + assertThat(result).isInstanceOf(Long.class).isEqualTo(11325L); + } + } + + @Test + void floatSumFoldsZonesToDouble(@TempDir Path tmp) throws IOException { + // Given — a single F64 chunk so the sum stat boxes as Double, not Long + Path file = tmp.resolve("floats.vtx"); + try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE); + var w = VortexWriter.create(ch, F64_SCHEMA, WriteOptions.defaults())) { + w.writeChunk(Map.of("v", new double[]{1.5, 2.5, 3.0})); + } + + // When + try (VortexReader reader = VortexReader.open(file, registry())) { + Number result = new ZoneReducer(reader).sum("v"); + + // Then + assertThat(result).isInstanceOf(Double.class).isEqualTo(7.0); + } + } + + @Test + void noZoneMapYieldsNull(@TempDir Path tmp) throws IOException { + // Given — zone maps disabled, so no per-zone SUM exists to fold + Path file = tmp.resolve("nostats.vtx"); + WriteOptions noZoneMaps = new WriteOptions(65_536, false, 0.90, 0, true, false); + try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE); + var w = VortexWriter.create(ch, I64_SCHEMA, noZoneMaps)) { + w.writeChunk(Map.of("id", range(1L, 50L))); + } + + // When + try (VortexReader reader = VortexReader.open(file, registry())) { + Number result = new ZoneReducer(reader).sum("id"); + + // Then — null signals "not answerable from zones", caller must stream + assertThat(result).isNull(); + } + } +} From f820919f7244b5d2bf4d5d4880a7be43d99c1d7e Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Sat, 27 Jun 2026 17:20:47 +0200 Subject: [PATCH 2/2] test(reader): cover overflowed-zone null path in ZoneReducer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an overflowed-zone case to ZoneReducerTest: zone maps enabled but one I64 chunk's SUM overflows (Math.addExact drops it), so the zone-map table is present yet a zone carries no usable sum — exercising the fold's bail path distinctly from the absent-table case. Also add WriteOptions.withZoneMaps(boolean), mirroring withGlobalDict / withZstd, so the no-zone-map test reads as WriteOptions.defaults().withZoneMaps(false) instead of a brittle positional constructor call. Co-Authored-By: Claude Opus 4.8 --- .../dfa1/vortex/writer/WriteOptions.java | 8 +++++++ .../dfa1/vortex/writer/ZoneReducerTest.java | 23 ++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/writer/src/main/java/io/github/dfa1/vortex/writer/WriteOptions.java b/writer/src/main/java/io/github/dfa1/vortex/writer/WriteOptions.java index b927e8cd..c49ca804 100644 --- a/writer/src/main/java/io/github/dfa1/vortex/writer/WriteOptions.java +++ b/writer/src/main/java/io/github/dfa1/vortex/writer/WriteOptions.java @@ -37,6 +37,14 @@ public static WriteOptions cascading(int depth) { return new WriteOptions(65_536, true, 0.90, depth, true, false); } + /// Returns a copy of these options with zone-map statistics set to `enabled`. + /// + /// @param enabled `true` to write per-chunk min/max/sum statistics for zone-map pruning + /// @return a new `WriteOptions` with the zone-map flag updated + public WriteOptions withZoneMaps(boolean enabled) { + return new WriteOptions(chunkSize, enabled, compressionRatioThreshold, allowedCascading, globalDict, enableZstd); + } + /// Returns a copy of these options with global dictionary encoding set to `enabled`. /// /// @param enabled `true` to enable global dictionary encoding across chunks diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/ZoneReducerTest.java b/writer/src/test/java/io/github/dfa1/vortex/writer/ZoneReducerTest.java index ad908935..dd30171b 100644 --- a/writer/src/test/java/io/github/dfa1/vortex/writer/ZoneReducerTest.java +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/ZoneReducerTest.java @@ -81,7 +81,7 @@ void floatSumFoldsZonesToDouble(@TempDir Path tmp) throws IOException { void noZoneMapYieldsNull(@TempDir Path tmp) throws IOException { // Given — zone maps disabled, so no per-zone SUM exists to fold Path file = tmp.resolve("nostats.vtx"); - WriteOptions noZoneMaps = new WriteOptions(65_536, false, 0.90, 0, true, false); + WriteOptions noZoneMaps = WriteOptions.defaults().withZoneMaps(false); try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE); var w = VortexWriter.create(ch, I64_SCHEMA, noZoneMaps)) { w.writeChunk(Map.of("id", range(1L, 50L))); @@ -95,4 +95,25 @@ void noZoneMapYieldsNull(@TempDir Path tmp) throws IOException { assertThat(result).isNull(); } } + + @Test + void overflowedZoneYieldsNull(@TempDir Path tmp) throws IOException { + // Given — zone maps ON, but one zone's I64 SUM overflows long (Math.addExact in the writer + // drops it), so a zone map exists yet a zone carries no usable sum. Distinct from + // noZoneMapYieldsNull: there the table is absent; here it is present but incomplete. + Path file = tmp.resolve("overflow.vtx"); + try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE); + var w = VortexWriter.create(ch, I64_SCHEMA, WriteOptions.defaults())) { + w.writeChunk(Map.of("id", range(1L, 50L))); // sums fine + w.writeChunk(Map.of("id", new long[]{Long.MAX_VALUE, Long.MAX_VALUE})); // overflows + } + + // When + try (VortexReader reader = VortexReader.open(file, registry())) { + Number result = new ZoneReducer(reader).sum("id"); + + // Then — one unusable zone forces the whole fold to bail; no partial sum returned + assertThat(result).isNull(); + } + } }