From 3df5b7d5cd7d9cdc897ecb6b3fa38d2120b4822d Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Sat, 27 Jun 2026 11:44:39 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(reader):=20surface=20per-zone=20stats?= =?UTF-8?q?=20from=20the=20zone-map=20table=20(ADR=200013=20=C2=A76)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add ScanIterator.columnZoneStats(col): one ArrayStats per zone with min/max/sum/null-count, the read-side feed for aggregate push-down. Sum is decoded from the column's vortex.stats zone-map table rather than per-flat node stats — matching Rust, whose flat writer retains only pre-computed stats (flat/writer.rs) and emits SUM only in the zoned table (zoned/writer.rs). Falls back to per-chunk node stats (sum null) when a column has no zone map. - ArrayStats gains a sum component; fromFbs decodes it (forward-compat). - ZonedStatsSchema moves inspector -> reader so the read path can reconstruct the stats-table dtype; cli/inspector imports updated. - VortexWriter is unchanged functionally (comment only); sum continues to live in the zone-map table. Calcite VortexAggregates.SUM/AVG now fold the per-zone sums instead of a full scan: metadata-only when every zone carries a sum, falling back to a streaming scan only when a column has no zone map. Verified both interop directions, incl. a new test folding per-zone sums from a Rust-written file to the exact column total. Co-Authored-By: Claude Opus 4.8 --- TODO.md | 5 + .../dfa1/vortex/calcite/VortexAggregates.java | 58 ++++++- .../calcite/VortexAdapterCoverageTest.java | 35 ++++- .../vortex/cli/tui/VortexInspectorTui.java | 2 +- .../dfa1/vortex/inspect/VortexInspector.java | 2 +- .../vortex/inspect/VortexInspectorTest.java | 4 +- .../RustWritesJavaReadsIntegrationTest.java | 30 ++++ .../github/dfa1/vortex/reader/ArrayStats.java | 9 +- .../dfa1/vortex/reader/ScanIterator.java | 144 ++++++++++++++++++ .../dfa1/vortex/reader/VortexReader.java | 8 +- .../dfa1/vortex/reader}/ZonedStatsSchema.java | 13 +- .../vortex/reader}/ZonedStatsSchemaTest.java | 2 +- .../dfa1/vortex/writer/VortexWriter.java | 4 +- .../vortex/writer/ColumnZoneStatsTest.java | 132 ++++++++++++++++ 14 files changed, 420 insertions(+), 28 deletions(-) rename {inspector/src/main/java/io/github/dfa1/vortex/inspect => reader/src/main/java/io/github/dfa1/vortex/reader}/ZonedStatsSchema.java (94%) rename {inspector/src/test/java/io/github/dfa1/vortex/inspect => reader/src/test/java/io/github/dfa1/vortex/reader}/ZonedStatsSchemaTest.java (99%) create mode 100644 writer/src/test/java/io/github/dfa1/vortex/writer/ColumnZoneStatsTest.java diff --git a/TODO.md b/TODO.md index 8895b0dc..f50bd832 100644 --- a/TODO.md +++ b/TODO.md @@ -96,6 +96,11 @@ Per-encoding gotchas: - [ ] **Compute primitives — masks, kernels, no-materialise** — pushdown filter/compare/aggregate kernels operating on Lazy arrays without materialising. See [ADR-0013](docs/adr/0013-compute-primitives.md) (Proposed). Gate: a concrete downstream consumer (e.g. the vortex-arrow bridge or filter pushdown). + Done: §6 read-side surface — `ScanIterator.columnZoneStats(col)` exposes per-zone + min/max/sum/null count, decoding sum from the `vortex.stats` zone-map table (matches files from + Rust, whose flat writer omits per-flat sum). Calcite `VortexAggregates.SUM`/`AVG` now fold those + per-zone sums (metadata-only), falling back to a full scan only when a column has no zone map. + Next: `Mask`/`Predicate`/kernel vocab and the two-tier whole-zone+residual reduce. ## Encodings diff --git a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java index a127fccd..e74a3af8 100644 --- a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java +++ b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java @@ -11,13 +11,16 @@ import io.github.dfa1.vortex.reader.array.IntArray; import io.github.dfa1.vortex.reader.array.LongArray; +import java.util.List; + /// Column aggregates answered with the cheapest available source. /// /// `MIN` / `MAX` / `COUNT` are read from the per-segment zone-map statistics embedded in the -/// file footer — no data segment is decoded. `SUM` (and therefore `AVG`) has no zone statistic -/// in the current writer, so it falls back to a streaming scan. This split is the whole point -/// of the demo: the stats-backed aggregates are effectively free, and `SUM`/`AVG` show what the -/// next writer increment (emit a per-zone `SUM`) would make free too (ADR 0013 §6). +/// file footer — no data segment is decoded. `SUM` (and therefore `AVG`) folds the per-zone +/// `SUM` rows surfaced by [ScanIterator#columnZoneStats(String)] (ADR 0013 §6): when every zone +/// carries a sum the answer is metadata-only too, with no data segment touched. Only when a zone +/// lacks a sum — a column with no zone map, whose flat nodes do not retain it — does it fall back +/// to a streaming scan. public final class VortexAggregates { /// Where an aggregate's value came from. @@ -65,13 +68,52 @@ public static Summary of(VortexReader reader, String column) { Object min = stats.min(); Object max = stats.max(); - // SUM/AVG: no SUM zone-stat exists today, so stream the column once. Integer columns sum - // into a long (exact); floating columns into a double. - Number sum = scanSum(reader, column); + // SUM/AVG: fold the per-zone SUM rows when every zone carries one (metadata-only); fall + // back to a single streaming scan otherwise. Integer columns sum into a long (exact); + // floating columns into a double. + Number sum = zoneSum(reader, column); + Source sumSource = Source.ZONE_STATS_PUSHDOWN; + if (sum == null) { + sum = scanSum(reader, column); + sumSource = Source.FULL_SCAN; + } Double avg = count == 0 ? null : sum.doubleValue() / count; return new Summary(column, min, max, count, sum, avg, - Source.ZONE_STATS_PUSHDOWN, Source.FULL_SCAN); + Source.ZONE_STATS_PUSHDOWN, sumSource); + } + + /// Folds the per-zone `SUM` statistics for `column`, or returns `null` when any zone lacks a + /// sum (so the caller streams the column instead). Integer columns fold into a [Long] (exact), + /// floating columns into a [Double]; the zone-stat boxing already distinguishes the two. + private static Number zoneSum(VortexReader reader, String column) { + try (ScanIterator scan = reader.scan(ScanOptions.columns(column))) { + List zones = scan.columnZoneStats(column); + if (zones.isEmpty()) { + return null; + } + long longSum = 0L; + double doubleSum = 0.0; + boolean isFloating = false; + for (ArrayStats zone : zones) { + switch (zone.sum()) { + case Long l -> longSum += l; + case Double d -> { + isFloating = true; + doubleSum += d; + } + case null, default -> { + // A zone with no usable sum (no zone map, or an unhandled stat type) — + // can't push down, let the caller fall back to a full scan. + return null; + } + } + } + if (isFloating) { + return doubleSum; + } + return longSum; + } } private static long totalRows(VortexReader reader) { diff --git a/calcite/src/test/java/io/github/dfa1/vortex/calcite/VortexAdapterCoverageTest.java b/calcite/src/test/java/io/github/dfa1/vortex/calcite/VortexAdapterCoverageTest.java index b4772210..d39463c4 100644 --- a/calcite/src/test/java/io/github/dfa1/vortex/calcite/VortexAdapterCoverageTest.java +++ b/calcite/src/test/java/io/github/dfa1/vortex/calcite/VortexAdapterCoverageTest.java @@ -217,7 +217,8 @@ void integerColumn_sumIsExactLong() throws Exception { assertThat(((Number) s.min()).longValue()).isEqualTo(1000L); assertThat(((Number) s.max()).longValue()).isEqualTo(3000L); assertThat(s.minMaxSource()).isEqualTo(VortexAggregates.Source.ZONE_STATS_PUSHDOWN); - assertThat(s.sumSource()).isEqualTo(VortexAggregates.Source.FULL_SCAN); + // SUM now folds the per-zone zone-map sum the Java writer emits — no data decoded. + assertThat(s.sumSource()).isEqualTo(VortexAggregates.Source.ZONE_STATS_PUSHDOWN); } } @@ -266,5 +267,37 @@ void nonNumericColumn_throws() throws Exception { .hasMessageContaining("not a numeric column"); } } + + @Test + void noZoneMap_sumFallsBackToFullScan(@TempDir Path noStats) throws Exception { + // Given — a file written with zone maps off, so no per-zone SUM exists to fold + Path bare = noStats.resolve("nostats.vortex"); + WriteOptions noZoneMaps = new WriteOptions(65_536, false, 0.90, 0, true, false); + try (var ch = FileChannel.open(bare, StandardOpenOption.CREATE, StandardOpenOption.WRITE); + var w = VortexWriter.create(ch, SCHEMA, noZoneMaps)) { + w.writeChunk(Map.ofEntries( + Map.entry("i8", new byte[]{1, 2, 3}), + Map.entry("i16", new short[]{10, 20, 30}), + Map.entry("i32", new int[]{100, 200, 300}), + Map.entry("i64", new long[]{1000L, 2000L, 3000L}), + Map.entry("u8", new byte[]{4, 5, 6}), + Map.entry("u16", new short[]{40, 50, 60}), + Map.entry("u32", new int[]{400, 500, 600}), + Map.entry("u64", new long[]{4000L, 5000L, 6000L}), + Map.entry("f32", new float[]{1.5f, 2.5f, 3.5f}), + Map.entry("f64", new double[]{1.25, 2.25, 3.25}), + Map.entry("s", new String[]{"a", "b", "c"}), + Map.entry("b", new boolean[]{true, false, true}))); + } + + // When + try (VortexReader reader = VortexReader.open(bare, registry())) { + VortexAggregates.Summary s = VortexAggregates.of(reader, "i64"); + + // Then — sum still exact, but sourced from a streaming scan, not the (absent) zone map + assertThat(s.sum()).isInstanceOf(Long.class).isEqualTo(6000L); + assertThat(s.sumSource()).isEqualTo(VortexAggregates.Source.FULL_SCAN); + } + } } } diff --git a/cli/src/main/java/io/github/dfa1/vortex/cli/tui/VortexInspectorTui.java b/cli/src/main/java/io/github/dfa1/vortex/cli/tui/VortexInspectorTui.java index 51c4b3cc..cc832134 100644 --- a/cli/src/main/java/io/github/dfa1/vortex/cli/tui/VortexInspectorTui.java +++ b/cli/src/main/java/io/github/dfa1/vortex/cli/tui/VortexInspectorTui.java @@ -9,7 +9,7 @@ import io.github.dfa1.vortex.cli.tui.term.Terminal; import io.github.dfa1.vortex.inspect.ByteSize; import io.github.dfa1.vortex.inspect.InspectorTree; -import io.github.dfa1.vortex.inspect.ZonedStatsSchema; +import io.github.dfa1.vortex.reader.ZonedStatsSchema; import io.github.dfa1.vortex.reader.VortexHandle; import io.github.dfa1.vortex.reader.Chunk; import io.github.dfa1.vortex.reader.ScanIterator; diff --git a/inspector/src/main/java/io/github/dfa1/vortex/inspect/VortexInspector.java b/inspector/src/main/java/io/github/dfa1/vortex/inspect/VortexInspector.java index c61cc707..92764c25 100644 --- a/inspector/src/main/java/io/github/dfa1/vortex/inspect/VortexInspector.java +++ b/inspector/src/main/java/io/github/dfa1/vortex/inspect/VortexInspector.java @@ -102,7 +102,7 @@ private static ArrayStats aggregateStats(InspectorTree.Node node) { if (min == null && max == null) { return ArrayStats.empty(); } - return new ArrayStats(min, max, null, null, null, null); + return new ArrayStats(min, max, null, null, null, null, null); } @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/inspector/src/test/java/io/github/dfa1/vortex/inspect/VortexInspectorTest.java b/inspector/src/test/java/io/github/dfa1/vortex/inspect/VortexInspectorTest.java index 1d00463b..a96b78d0 100644 --- a/inspector/src/test/java/io/github/dfa1/vortex/inspect/VortexInspectorTest.java +++ b/inspector/src/test/java/io/github/dfa1/vortex/inspect/VortexInspectorTest.java @@ -133,9 +133,9 @@ void render_aggregatesMinMaxAcrossChunks() { Layout structLayout = new Layout("vortex.struct", 1000, null, List.of(chunked), List.of()); InspectorTree.Node c1 = new InspectorTree.Node(chunk1, Optional.empty(), Set.of(), - new ArrayStats(10L, 50L, null, null, null, null), List.of()); + new ArrayStats(10L, 50L, null, null, null, null, null), List.of()); InspectorTree.Node c2 = new InspectorTree.Node(chunk2, Optional.empty(), Set.of(), - new ArrayStats(5L, 100L, null, null, null, null), List.of()); + new ArrayStats(5L, 100L, null, null, null, null, null), List.of()); InspectorTree.Node chunkedN = new InspectorTree.Node(chunked, Optional.of("id"), Set.of("vortex.flat"), ArrayStats.empty(), List.of(c1, c2)); InspectorTree.Node rootN = new InspectorTree.Node(structLayout, Optional.empty(), diff --git a/integration/src/test/java/io/github/dfa1/vortex/integration/RustWritesJavaReadsIntegrationTest.java b/integration/src/test/java/io/github/dfa1/vortex/integration/RustWritesJavaReadsIntegrationTest.java index a7a86d3b..40fc34e4 100644 --- a/integration/src/test/java/io/github/dfa1/vortex/integration/RustWritesJavaReadsIntegrationTest.java +++ b/integration/src/test/java/io/github/dfa1/vortex/integration/RustWritesJavaReadsIntegrationTest.java @@ -14,6 +14,7 @@ import io.github.dfa1.vortex.reader.array.Array; import io.github.dfa1.vortex.reader.array.DoubleArray; import io.github.dfa1.vortex.reader.array.LongArray; +import io.github.dfa1.vortex.reader.ArrayStats; import io.github.dfa1.vortex.reader.ReadRegistry; import io.github.dfa1.vortex.reader.VortexReader; import org.apache.arrow.c.ArrowArray; @@ -269,6 +270,35 @@ void jniWriter_javaReader_singleChunk(@TempDir Path tmp) throws IOException { } } + @Test + void jniWriter_perZoneSum_readFromZoneMapTable(@TempDir Path tmp) throws IOException { + // Given — a Rust-written file large enough that the JNI writer emits a multi-zone column. + // Sum lives only in Rust's vortex.stats zone-map table (its flat writer doesn't retain it), + // so this proves the Java reader decodes that table for per-zone SUM (ADR 0013 §6 parity). + int n = 200_000; + long[] ids = new long[n]; + double[] vals = new double[n]; + for (int i = 0; i < n; i++) { + ids[i] = i; + vals[i] = i; + } + Path file = tmp.resolve("jni_zones.vtx"); + writeJni(file, ids, vals); + long expected = (long) n * (n - 1) / 2; // Σ 0..n-1 + + // When — fold the per-zone SUM rows the reader surfaces from the zone-map table + try (var vf = VortexReader.open(file, ReadRegistry.loadAll()); + var iter = vf.scan(io.github.dfa1.vortex.reader.ScanOptions.all())) { + List zones = iter.columnZoneStats("id"); + + // Then — every zone carries a SUM (came from Rust's table, not a Java-side recompute) + // and the whole-zone fold equals the column total. + assertThat(zones).isNotEmpty().allSatisfy(z -> assertThat(z.sum()).isNotNull()); + long total = zones.stream().mapToLong(z -> (Long) z.sum()).sum(); + assertThat(total).isEqualTo(expected); + } + } + @Test void jniWriter_javaReader_multipleChunks(@TempDir Path tmp) throws IOException { // Given diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/ArrayStats.java b/reader/src/main/java/io/github/dfa1/vortex/reader/ArrayStats.java index f553c802..7ae682f6 100644 --- a/reader/src/main/java/io/github/dfa1/vortex/reader/ArrayStats.java +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/ArrayStats.java @@ -11,6 +11,7 @@ /// /// @param min minimum value in the array, or `null` if unknown /// @param max maximum value in the array, or `null` if unknown +/// @param sum sum of values (`Long` for integer columns, `Double` for floats), or `null` if unknown /// @param trueCount number of `true` values for bool columns, or `null` if unknown /// @param nullCount number of null values, or `null` if unknown /// @param isSorted `true` if the array is sorted in ascending order, or `null` if unknown @@ -18,12 +19,13 @@ public record ArrayStats( Object min, Object max, + Object sum, Long trueCount, Long nullCount, Boolean isSorted, Boolean isStrictSorted ) { - private static final ArrayStats EMPTY = new ArrayStats(null, null, null, null, null, null); + private static final ArrayStats EMPTY = new ArrayStats(null, null, null, null, null, null, null); /// Returns an empty stats instance with all fields set to `null`. /// @@ -43,11 +45,12 @@ public static ArrayStats fromFbs(io.github.dfa1.vortex.core.fbs.FbsArrayStats fb } Object min = decodeScalar(fbs.minAsSegment()); Object max = decodeScalar(fbs.maxAsSegment()); + Object sum = decodeScalar(fbs.sumAsSegment()); Long nullCount = fbs.hasNullCount() ? fbs.nullCount() : null; - if (min == null && max == null && nullCount == null) { + if (min == null && max == null && sum == null && nullCount == null) { return EMPTY; } - return new ArrayStats(min, max, null, nullCount, null, null); + return new ArrayStats(min, max, sum, null, nullCount, null, null); } private static Object decodeScalar(MemorySegment seg) { diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/ScanIterator.java b/reader/src/main/java/io/github/dfa1/vortex/reader/ScanIterator.java index 7baeaf09..cfc38bc9 100644 --- a/reader/src/main/java/io/github/dfa1/vortex/reader/ScanIterator.java +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/ScanIterator.java @@ -395,6 +395,150 @@ public long[] chunkRowCounts() { return out; } + /// Returns the per-zone statistics for one column, one entry per zone in scan order. + /// + /// When the column carries a `vortex.stats` (zone-map) layout, the rows come from that + /// table — min/max/sum/null count per zone — decoded once from the small stats segment + /// without touching any data segment. This is the source Rust populates for `SUM`, so the + /// values match files written by either implementation. When no zone map is present the + /// list falls back to each chunk's embedded `ArrayStats` (min/max/null count; `sum` is + /// `null`, since the flat writer does not retain it). Either way the result has one entry + /// per chunk/zone, positionally aligned with [#chunkRowCounts()]; a column that is absent + /// or carries no stats yields [ArrayStats#empty()] per zone. + /// + /// This is the read-side surface for aggregate push-down (ADR 0013 §6): a reduction can + /// fold whole zones from these rows and fall back to a streaming decode only for the + /// boundary zones a predicate partially selects. + /// + /// Like [#chunkRowCounts()], filter pruning and `ScanOptions.limit()` are not applied — + /// the list reflects the raw layout shape. + /// + /// @param column the column name + /// @return per-zone stats in zone order; empty list if the file has no chunks + public List columnZoneStats(String column) { + if (chunks == null) { + initialize(); + } + List fromTable = decodeZoneTable(column); + if (fromTable != null) { + return fromTable; + } + // No zone-map table — surface each chunk's embedded ArrayStats (sum absent). + List out = new ArrayList<>(chunks.size()); + for (ChunkSpec spec : chunks) { + Layout flat = spec.layoutFor(column); + out.add(flat == null ? ArrayStats.empty() : readFlatStats(flat)); + } + return out; + } + + /// Decodes the column's `vortex.stats` zone-map table into one [ArrayStats] per zone, or + /// returns `null` when the column has no zone map (so the caller falls back to per-chunk + /// node stats). The table is a single flat segment encoding a struct with a subset of the + /// `min`/`max`/`sum`/`null_count` fields (see [ZonedStatsSchema]); it is decoded into a + /// short-lived confined arena and the scalar values are boxed out before the arena closes. + private List decodeZoneTable(String column) { + Layout zoned = findZonedLayout(file.layout(), column); + if (zoned == null || zoned.children().size() < 2) { + return null; + } + Layout statsFlat = zoned.children().get(1); + if (!statsFlat.isFlat() || statsFlat.segments().isEmpty()) { + return null; + } + DType columnDtype = columnDType(column); + if (columnDtype == null) { + return null; + } + int segIdx = statsFlat.segments().getFirst(); + if (segIdx < 0 || segIdx >= file.footer().segmentSpecs().size()) { + return null; + } + DType.Struct statsDtype = ZonedStatsSchema.statsTableDtype(columnDtype, zoned.metadata()); + long nZones = statsFlat.rowCount(); + SegmentSpec spec = file.footer().segmentSpecs().get(segIdx); + try (Arena tableArena = Arena.ofConfined()) { + Array decoded = file.decodeFlatSegment(spec, statsDtype, nZones, tableArena); + if (!(decoded instanceof StructArray table)) { + return null; + } + Array minA = fieldOrNull(table, "min"); + Array maxA = fieldOrNull(table, "max"); + Array sumA = fieldOrNull(table, "sum"); + Array nullCountA = fieldOrNull(table, "null_count"); + List out = new ArrayList<>((int) nZones); + for (long i = 0; i < nZones; i++) { + Object nullCount = boxedScalar(nullCountA, i); + out.add(new ArrayStats( + boxedScalar(minA, i), + boxedScalar(maxA, i), + boxedScalar(sumA, i), + null, + nullCount == null ? null : ((Number) nullCount).longValue(), + null, null)); + } + return out; + } + } + + /// Finds the first `vortex.stats` layout in the subtree of `column`'s top-level layout, or + /// `null` when the column is not zone-mapped. + private Layout findZonedLayout(Layout root, String column) { + if (!(file.dtype() instanceof DType.Struct struct) || !root.isStruct()) { + return null; + } + int idx = struct.fieldNames().indexOf(column); + if (idx < 0 || idx >= root.children().size()) { + return null; + } + return firstZoned(root.children().get(idx)); + } + + private static Layout firstZoned(Layout layout) { + if (layout.isZoned()) { + return layout; + } + for (Layout child : layout.children()) { + Layout found = firstZoned(child); + if (found != null) { + return found; + } + } + return null; + } + + private static Array fieldOrNull(StructArray table, String field) { + if (((DType.Struct) table.dtype()).fieldNames().contains(field)) { + return table.field(field); + } + return null; + } + + /// Reads the boxed scalar at index `i` from a (possibly nullable) stats column, or `null` + /// when the array is absent or the position is invalid. + private static Object boxedScalar(Array array, long i) { + if (array == null) { + return null; + } + if (array instanceof MaskedArray masked) { + if (!masked.isValid(i)) { + return null; + } + return boxedScalar(masked.inner(), i); + } + return switch (array) { + case LongArray a -> a.getLong(i); + case IntArray a -> a.getInt(i); + case DoubleArray a -> a.getDouble(i); + case FloatArray a -> a.getFloat(i); + case ShortArray a -> a.getShort(i); + case ByteArray a -> a.getByte(i); + case BoolArray a -> a.getBoolean(i); + case VarBinArray a -> a.getString(i); + default -> null; + }; + } + /// Runs `action` on each remaining chunk inside a try-with-resources /// block so every chunk's [Arena] is released before the next iteration. /// Prefer this over a manual `while (hasNext()) { next(); `} loop diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/VortexReader.java b/reader/src/main/java/io/github/dfa1/vortex/reader/VortexReader.java index 022e60bb..fb35dab6 100644 --- a/reader/src/main/java/io/github/dfa1/vortex/reader/VortexReader.java +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/VortexReader.java @@ -180,8 +180,8 @@ public Map columnStats() { private ArrayStats aggregateStats(List flats) { Object globalMin = null; Object globalMax = null; - // Sum is meaningful only when every chunk carries a null count; one missing makes the - // column total unknown (null), so don't report a partial sum. + // Null count is meaningful only when every chunk carries it; one missing makes the column + // total unknown (null), so don't report a partial count. long totalNullCount = 0L; boolean allHaveNullCount = !flats.isEmpty(); for (Layout flat : flats) { @@ -202,7 +202,9 @@ private ArrayStats aggregateStats(List flats) { if (globalMin == null && globalMax == null && nullCount == null) { return ArrayStats.empty(); } - return new ArrayStats(globalMin, globalMax, null, nullCount, null, null); + // Sum is left null at the file level: it lives in the per-zone stats table, surfaced by + // ScanIterator.columnZoneStats rather than folded here. + return new ArrayStats(globalMin, globalMax, null, null, nullCount, null, null); } private ArrayStats readFlatStats(Layout flat) { diff --git a/inspector/src/main/java/io/github/dfa1/vortex/inspect/ZonedStatsSchema.java b/reader/src/main/java/io/github/dfa1/vortex/reader/ZonedStatsSchema.java similarity index 94% rename from inspector/src/main/java/io/github/dfa1/vortex/inspect/ZonedStatsSchema.java rename to reader/src/main/java/io/github/dfa1/vortex/reader/ZonedStatsSchema.java index 0983525a..2f9ad720 100644 --- a/inspector/src/main/java/io/github/dfa1/vortex/inspect/ZonedStatsSchema.java +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/ZonedStatsSchema.java @@ -1,4 +1,4 @@ -package io.github.dfa1.vortex.inspect; +package io.github.dfa1.vortex.reader; import io.github.dfa1.vortex.core.model.DType; @@ -14,15 +14,15 @@ /// /// The shape is sourced from the Rust reference implementation: /// - Metadata format -/// ( -/// vortex-layout/src/layouts/zoned/mod.rs — `ZonedMetadata`): +/// ([vortex-layout/src/layouts/zoned/mod.rs](https://github.com/spiraldb/vortex/blob/develop/vortex-layout/src/layouts/zoned/mod.rs) +/// — `ZonedMetadata`): /// bytes [0..4) are the zone length as a little-endian `u32`; /// remaining bytes form a `Stat` bitset (LSB-first per byte). Each /// set bit at index `i` indicates that the [Stat] with that /// ordinal is present in the auxiliary stats table. /// - Schema construction -/// ( -/// vortex-layout/src/layouts/zoned/schema.rs — `stats_table_dtype`): +/// ([vortex-layout/src/layouts/zoned/schema.rs](https://github.com/spiraldb/vortex/blob/develop/vortex-layout/src/layouts/zoned/schema.rs) +/// — `stats_table_dtype`): /// for each present stat in ordinal order, append a struct field with the /// stat's name and the stat's nullable dtype. `Max` and `Min` /// each get an extra trailing field `max_is_truncated` / @@ -167,8 +167,7 @@ public static DType.Struct statsTableDtype(DType columnDtype, List present /// /// Mirrors Rust's `Stat::dtype(&DType)` plus the aggregate-function /// return-type rules (see - /// - /// vortex-array/src/aggregate_fn/fns): + /// [vortex-array/src/aggregate_fn/fns](https://github.com/spiraldb/vortex/blob/develop/vortex-array/src/aggregate_fn/fns)): /// - min/max → same dtype as column (except DType.Null → null); /// - is_constant/is_sorted/is_strict_sorted → non-nullable Bool; /// - null_count → non-nullable U64; diff --git a/inspector/src/test/java/io/github/dfa1/vortex/inspect/ZonedStatsSchemaTest.java b/reader/src/test/java/io/github/dfa1/vortex/reader/ZonedStatsSchemaTest.java similarity index 99% rename from inspector/src/test/java/io/github/dfa1/vortex/inspect/ZonedStatsSchemaTest.java rename to reader/src/test/java/io/github/dfa1/vortex/reader/ZonedStatsSchemaTest.java index 026e57e8..4ac75667 100644 --- a/inspector/src/test/java/io/github/dfa1/vortex/inspect/ZonedStatsSchemaTest.java +++ b/reader/src/test/java/io/github/dfa1/vortex/reader/ZonedStatsSchemaTest.java @@ -1,4 +1,4 @@ -package io.github.dfa1.vortex.inspect; +package io.github.dfa1.vortex.reader; import java.lang.foreign.MemorySegment; diff --git a/writer/src/main/java/io/github/dfa1/vortex/writer/VortexWriter.java b/writer/src/main/java/io/github/dfa1/vortex/writer/VortexWriter.java index ab86aab9..72875826 100644 --- a/writer/src/main/java/io/github/dfa1/vortex/writer/VortexWriter.java +++ b/writer/src/main/java/io/github/dfa1/vortex/writer/VortexWriter.java @@ -642,7 +642,9 @@ private ByteBuffer buildArrayFlatBuffer(EncodeResult result, long nullCount) { var fbb = new FbsBuilder(256); // Stats for the root node only (build vectors before the ArrayStats table). null_count is - // always recorded; min/max only when the encoder produced them. + // always recorded; min/max only when the encoder produced them. Sum is not embedded per-flat + // (Rust's flat writer doesn't either — flat/writer.rs retains only pre-computed stats); the + // per-zone sum lives in the vortex.stats zone-map table emitted by flushZoneMaps(). int minVec = result.hasStats() ? io.github.dfa1.vortex.core.fbs.FbsArrayStats.createMinVector(fbb, result.statsMin()) : 0; int maxVec = result.hasStats() diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/ColumnZoneStatsTest.java b/writer/src/test/java/io/github/dfa1/vortex/writer/ColumnZoneStatsTest.java new file mode 100644 index 00000000..120df7a5 --- /dev/null +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/ColumnZoneStatsTest.java @@ -0,0 +1,132 @@ +package io.github.dfa1.vortex.writer; + +import io.github.dfa1.vortex.core.model.DType; +import io.github.dfa1.vortex.reader.ArrayStats; +import io.github.dfa1.vortex.reader.ReadRegistry; +import io.github.dfa1.vortex.reader.ScanOptions; +import io.github.dfa1.vortex.reader.VortexReader; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/// Round-trip tests for [io.github.dfa1.vortex.reader.ScanIterator#columnZoneStats] — the read-side +/// surface for ADR 0013 §6 aggregate push-down. One zone per written chunk; each carries the +/// min/max/sum/null-count the writer embedded in the chunk's `ArrayStats`, read without decoding any +/// data segment. +class ColumnZoneStatsTest { + + private static final DType.Struct SCHEMA = new DType.Struct( + List.of("id"), + List.of(DType.I64), + false); + + private static final DType.Struct F64_SCHEMA = new DType.Struct( + List.of("v"), + List.of(DType.F64), + false); + + // Three chunks of id: [1..50], [51..100], [101..150]. Per-zone sums are the closed-form + // triangular sums, chosen so a wrong fold (e.g. summing the wrong zone) is visible. + private static Path writeThreeChunks(Path tmp) throws IOException { + Path file = tmp.resolve("zones.vtx"); + try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE); + var sut = VortexWriter.create(ch, SCHEMA, WriteOptions.defaults())) { + sut.writeChunk(Map.of("id", range(1L, 50L))); + sut.writeChunk(Map.of("id", range(51L, 100L))); + sut.writeChunk(Map.of("id", range(101L, 150L))); + } + return file; + } + + private static long[] range(long from, long to) { + long[] arr = new long[(int) (to - from + 1)]; + for (int i = 0; i < arr.length; i++) { + arr[i] = from + i; + } + return arr; + } + + private static ReadRegistry registry() { + return ReadRegistry.builder().registerServiceLoaded().build(); + } + + private static List zoneStats(Path file, String column) throws IOException { + try (VortexReader vf = VortexReader.open(file, registry()); + var iter = vf.scan(new ScanOptions(List.of(), null, ScanOptions.NO_LIMIT))) { + return iter.columnZoneStats(column); + } + } + + @Test + void perZoneMinMaxSumNullCount(@TempDir Path tmp) throws IOException { + // Given — three I64 chunks with known min/max/sum per zone + Path file = writeThreeChunks(tmp); + + // When + List result = zoneStats(file, "id"); + + // Then — one zone per chunk, each carrying that chunk's stats (no data decoded) + assertThat(result).hasSize(3); + assertThat(result.get(0).min()).isEqualTo(1L); + assertThat(result.get(0).max()).isEqualTo(50L); + assertThat(result.get(0).sum()).isEqualTo(1275L); // 1+..+50 + assertThat(result.get(0).nullCount()).isEqualTo(0L); + assertThat(result.get(1).min()).isEqualTo(51L); + assertThat(result.get(1).max()).isEqualTo(100L); + assertThat(result.get(1).sum()).isEqualTo(3775L); // 51+..+100 + assertThat(result.get(2).min()).isEqualTo(101L); + assertThat(result.get(2).max()).isEqualTo(150L); + assertThat(result.get(2).sum()).isEqualTo(6275L); // 101+..+150 + } + + @Test + void summingPerZoneSumsEqualsFileTotal(@TempDir Path tmp) throws IOException { + // Given — the whole-zone tier of an aggregate: SUM(id) folds per-zone sums, no data decode + Path file = writeThreeChunks(tmp); + + // When + long total = zoneStats(file, "id").stream() + .mapToLong(s -> (Long) s.sum()) + .sum(); + + // Then — equals 1+..+150 + assertThat(total).isEqualTo(11325L); + } + + @Test + void floatColumnZoneSumsAreDoubles(@TempDir Path tmp) throws IOException { + // Given — a float column so the sum stat decodes as Double, not Long + Path file = tmp.resolve("f64.vtx"); + try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE); + var sut = VortexWriter.create(ch, F64_SCHEMA, WriteOptions.defaults())) { + sut.writeChunk(Map.of("v", new double[]{1.5, 2.5, 3.0})); + } + + // When + List result = zoneStats(file, "v"); + + // Then + assertThat(result).hasSize(1); + assertThat(result.getFirst().sum()).isEqualTo(7.0); + } + + @Test + void missingColumnYieldsEmptyStatsPerZone(@TempDir Path tmp) throws IOException { + // Given + Path file = writeThreeChunks(tmp); + + // When — a column that does not exist still returns one entry per zone, all empty + List result = zoneStats(file, "does_not_exist"); + + // Then — aligned with chunk count, no stats + assertThat(result).hasSize(3).allSatisfy(s -> assertThat(s).isEqualTo(ArrayStats.empty())); + } +} From 6d15ca17107c30cf0b5a3e60df52dfaf1117126b Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Sat, 27 Jun 2026 16:54:20 +0200 Subject: [PATCH 2/2] docs(reader): correct zone-alignment claim and stale ZonedStatsSchema ref MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit columnZoneStats javadoc overstated alignment: zone-map rows align with chunkRowCounts() only on the fallback path and for files this writer produces (one zone per chunk). A foreign writer may use a fixed zone length independent of chunk boundaries, so the zone count need not match. Reword to scope the guarantee. Also fix VortexWriter's stale [io.github.dfa1.vortex.inspect] reference to ZonedStatsSchema — the class moved to the reader package in this branch. Co-Authored-By: Claude Opus 4.8 --- .../java/io/github/dfa1/vortex/reader/ScanIterator.java | 9 +++++++-- .../java/io/github/dfa1/vortex/writer/VortexWriter.java | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/ScanIterator.java b/reader/src/main/java/io/github/dfa1/vortex/reader/ScanIterator.java index cfc38bc9..74029ce4 100644 --- a/reader/src/main/java/io/github/dfa1/vortex/reader/ScanIterator.java +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/ScanIterator.java @@ -402,10 +402,15 @@ public long[] chunkRowCounts() { /// without touching any data segment. This is the source Rust populates for `SUM`, so the /// values match files written by either implementation. When no zone map is present the /// list falls back to each chunk's embedded `ArrayStats` (min/max/null count; `sum` is - /// `null`, since the flat writer does not retain it). Either way the result has one entry - /// per chunk/zone, positionally aligned with [#chunkRowCounts()]; a column that is absent + /// `null`, since the flat writer does not retain it). Either way a column that is absent /// or carries no stats yields [ArrayStats#empty()] per zone. /// + /// Zone granularity is the layout's, not the scan's. The fallback path is one entry per + /// chunk, positionally aligned with [#chunkRowCounts()]. The zone-map path is one entry per + /// zone of the stats table: this writer emits one zone per chunk (so the same alignment + /// holds), but a file from another writer may use a fixed zone length independent of chunk + /// boundaries, in which case the zone count need not match [#chunkRowCounts()]. + /// /// This is the read-side surface for aggregate push-down (ADR 0013 §6): a reduction can /// fold whole zones from these rows and fall back to a streaming decode only for the /// boundary zones a predicate partially selects. diff --git a/writer/src/main/java/io/github/dfa1/vortex/writer/VortexWriter.java b/writer/src/main/java/io/github/dfa1/vortex/writer/VortexWriter.java index 72875826..0348cbc2 100644 --- a/writer/src/main/java/io/github/dfa1/vortex/writer/VortexWriter.java +++ b/writer/src/main/java/io/github/dfa1/vortex/writer/VortexWriter.java @@ -821,7 +821,7 @@ private int wrapZoneMap(FbsBuilder fbb, String colName, int dataLayout, long col /// `vortex.stats` metadata: `u32` zone length (LE) + a 1-byte stat bitset (LSB-first) with the /// NULL_COUNT bit always set and the MAX/MIN and SUM bits set when present, matching - /// [io.github.dfa1.vortex.inspect] `ZonedStatsSchema`. + /// [io.github.dfa1.vortex.reader] `ZonedStatsSchema`. private static byte[] zonedMetadataBytes(long zoneLen, boolean hasMinMax, boolean hasSum) { byte[] meta = new byte[5]; ByteBuffer.wrap(meta).order(ByteOrder.LITTLE_ENDIAN).putInt((int) zoneLen);