Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ Per-encoding gotchas:
- [ ] **Compute primitives — masks, kernels, no-materialise** — pushdown filter/compare/aggregate
kernels operating on Lazy arrays without materialising. See [ADR-0013](docs/adr/0013-compute-primitives.md)
(Proposed). Gate: a concrete downstream consumer (e.g. the vortex-arrow bridge or filter pushdown).
Done: §6 read-side surface — `ScanIterator.columnZoneStats(col)` exposes per-zone
min/max/sum/null count, decoding sum from the `vortex.stats` zone-map table (matches files from
Rust, whose flat writer omits per-flat sum). Calcite `VortexAggregates.SUM`/`AVG` now fold those
per-zone sums (metadata-only), falling back to a full scan only when a column has no zone map.
Next: `Mask`/`Predicate`/kernel vocab and the two-tier whole-zone+residual reduce.

## Encodings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import io.github.dfa1.vortex.reader.array.IntArray;
import io.github.dfa1.vortex.reader.array.LongArray;

import java.util.List;

/// Column aggregates answered with the cheapest available source.
///
/// `MIN` / `MAX` / `COUNT` are read from the per-segment zone-map statistics embedded in the
/// file footer — no data segment is decoded. `SUM` (and therefore `AVG`) has no zone statistic
/// in the current writer, so it falls back to a streaming scan. This split is the whole point
/// of the demo: the stats-backed aggregates are effectively free, and `SUM`/`AVG` show what the
/// next writer increment (emit a per-zone `SUM`) would make free too (ADR 0013 §6).
/// file footer — no data segment is decoded. `SUM` (and therefore `AVG`) folds the per-zone
/// `SUM` rows surfaced by [ScanIterator#columnZoneStats(String)] (ADR 0013 §6): when every zone
/// carries a sum the answer is metadata-only too, with no data segment touched. Only when a zone
/// lacks a sum — a column with no zone map, whose flat nodes do not retain it — does it fall back
/// to a streaming scan.
public final class VortexAggregates {

/// Where an aggregate's value came from.
Expand Down Expand Up @@ -65,13 +68,52 @@ public static Summary of(VortexReader reader, String column) {
Object min = stats.min();
Object max = stats.max();

// SUM/AVG: no SUM zone-stat exists today, so stream the column once. Integer columns sum
// into a long (exact); floating columns into a double.
Number sum = scanSum(reader, column);
// SUM/AVG: fold the per-zone SUM rows when every zone carries one (metadata-only); fall
// back to a single streaming scan otherwise. Integer columns sum into a long (exact);
// floating columns into a double.
Number sum = zoneSum(reader, column);
Source sumSource = Source.ZONE_STATS_PUSHDOWN;
if (sum == null) {
sum = scanSum(reader, column);
sumSource = Source.FULL_SCAN;
}
Double avg = count == 0 ? null : sum.doubleValue() / count;

return new Summary(column, min, max, count, sum, avg,
Source.ZONE_STATS_PUSHDOWN, Source.FULL_SCAN);
Source.ZONE_STATS_PUSHDOWN, sumSource);
}

/// Folds the per-zone `SUM` statistics for `column`, or returns `null` when any zone lacks a
/// sum (so the caller streams the column instead). Integer columns fold into a [Long] (exact),
/// floating columns into a [Double]; the zone-stat boxing already distinguishes the two.
private static Number zoneSum(VortexReader reader, String column) {
try (ScanIterator scan = reader.scan(ScanOptions.columns(column))) {
List<ArrayStats> zones = scan.columnZoneStats(column);
if (zones.isEmpty()) {
return null;
}
long longSum = 0L;
double doubleSum = 0.0;
boolean isFloating = false;
for (ArrayStats zone : zones) {
switch (zone.sum()) {
case Long l -> longSum += l;
case Double d -> {
isFloating = true;
doubleSum += d;
}
case null, default -> {
// A zone with no usable sum (no zone map, or an unhandled stat type) —
// can't push down, let the caller fall back to a full scan.
return null;
}
}
}
if (isFloating) {
return doubleSum;
}
return longSum;
}
}

private static long totalRows(VortexReader reader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ void integerColumn_sumIsExactLong() throws Exception {
assertThat(((Number) s.min()).longValue()).isEqualTo(1000L);
assertThat(((Number) s.max()).longValue()).isEqualTo(3000L);
assertThat(s.minMaxSource()).isEqualTo(VortexAggregates.Source.ZONE_STATS_PUSHDOWN);
assertThat(s.sumSource()).isEqualTo(VortexAggregates.Source.FULL_SCAN);
// SUM now folds the per-zone zone-map sum the Java writer emits — no data decoded.
assertThat(s.sumSource()).isEqualTo(VortexAggregates.Source.ZONE_STATS_PUSHDOWN);
}
}

Expand Down Expand Up @@ -266,5 +267,37 @@ void nonNumericColumn_throws() throws Exception {
.hasMessageContaining("not a numeric column");
}
}

@Test
void noZoneMap_sumFallsBackToFullScan(@TempDir Path noStats) throws Exception {
// Given — a file written with zone maps off, so no per-zone SUM exists to fold
Path bare = noStats.resolve("nostats.vortex");
WriteOptions noZoneMaps = new WriteOptions(65_536, false, 0.90, 0, true, false);
try (var ch = FileChannel.open(bare, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var w = VortexWriter.create(ch, SCHEMA, noZoneMaps)) {
w.writeChunk(Map.ofEntries(
Map.entry("i8", new byte[]{1, 2, 3}),
Map.entry("i16", new short[]{10, 20, 30}),
Map.entry("i32", new int[]{100, 200, 300}),
Map.entry("i64", new long[]{1000L, 2000L, 3000L}),
Map.entry("u8", new byte[]{4, 5, 6}),
Map.entry("u16", new short[]{40, 50, 60}),
Map.entry("u32", new int[]{400, 500, 600}),
Map.entry("u64", new long[]{4000L, 5000L, 6000L}),
Map.entry("f32", new float[]{1.5f, 2.5f, 3.5f}),
Map.entry("f64", new double[]{1.25, 2.25, 3.25}),
Map.entry("s", new String[]{"a", "b", "c"}),
Map.entry("b", new boolean[]{true, false, true})));
}

// When
try (VortexReader reader = VortexReader.open(bare, registry())) {
VortexAggregates.Summary s = VortexAggregates.of(reader, "i64");

// Then — sum still exact, but sourced from a streaming scan, not the (absent) zone map
assertThat(s.sum()).isInstanceOf(Long.class).isEqualTo(6000L);
assertThat(s.sumSource()).isEqualTo(VortexAggregates.Source.FULL_SCAN);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.github.dfa1.vortex.cli.tui.term.Terminal;
import io.github.dfa1.vortex.inspect.ByteSize;
import io.github.dfa1.vortex.inspect.InspectorTree;
import io.github.dfa1.vortex.inspect.ZonedStatsSchema;
import io.github.dfa1.vortex.reader.ZonedStatsSchema;
import io.github.dfa1.vortex.reader.VortexHandle;
import io.github.dfa1.vortex.reader.Chunk;
import io.github.dfa1.vortex.reader.ScanIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private static ArrayStats aggregateStats(InspectorTree.Node node) {
if (min == null && max == null) {
return ArrayStats.empty();
}
return new ArrayStats(min, max, null, null, null, null);
return new ArrayStats(min, max, null, null, null, null, null);
}

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ void render_aggregatesMinMaxAcrossChunks() {
Layout structLayout = new Layout("vortex.struct", 1000, null, List.of(chunked), List.of());

InspectorTree.Node c1 = new InspectorTree.Node(chunk1, Optional.empty(), Set.of(),
new ArrayStats(10L, 50L, null, null, null, null), List.of());
new ArrayStats(10L, 50L, null, null, null, null, null), List.of());
InspectorTree.Node c2 = new InspectorTree.Node(chunk2, Optional.empty(), Set.of(),
new ArrayStats(5L, 100L, null, null, null, null), List.of());
new ArrayStats(5L, 100L, null, null, null, null, null), List.of());
InspectorTree.Node chunkedN = new InspectorTree.Node(chunked, Optional.of("id"),
Set.of("vortex.flat"), ArrayStats.empty(), List.of(c1, c2));
InspectorTree.Node rootN = new InspectorTree.Node(structLayout, Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.github.dfa1.vortex.reader.array.Array;
import io.github.dfa1.vortex.reader.array.DoubleArray;
import io.github.dfa1.vortex.reader.array.LongArray;
import io.github.dfa1.vortex.reader.ArrayStats;
import io.github.dfa1.vortex.reader.ReadRegistry;
import io.github.dfa1.vortex.reader.VortexReader;
import org.apache.arrow.c.ArrowArray;
Expand Down Expand Up @@ -269,6 +270,35 @@ void jniWriter_javaReader_singleChunk(@TempDir Path tmp) throws IOException {
}
}

@Test
void jniWriter_perZoneSum_readFromZoneMapTable(@TempDir Path tmp) throws IOException {
// Given — a Rust-written file large enough that the JNI writer emits a multi-zone column.
// Sum lives only in Rust's vortex.stats zone-map table (its flat writer doesn't retain it),
// so this proves the Java reader decodes that table for per-zone SUM (ADR 0013 §6 parity).
int n = 200_000;
long[] ids = new long[n];
double[] vals = new double[n];
for (int i = 0; i < n; i++) {
ids[i] = i;
vals[i] = i;
}
Path file = tmp.resolve("jni_zones.vtx");
writeJni(file, ids, vals);
long expected = (long) n * (n - 1) / 2; // Σ 0..n-1

// When — fold the per-zone SUM rows the reader surfaces from the zone-map table
try (var vf = VortexReader.open(file, ReadRegistry.loadAll());
var iter = vf.scan(io.github.dfa1.vortex.reader.ScanOptions.all())) {
List<ArrayStats> zones = iter.columnZoneStats("id");

// Then — every zone carries a SUM (came from Rust's table, not a Java-side recompute)
// and the whole-zone fold equals the column total.
assertThat(zones).isNotEmpty().allSatisfy(z -> assertThat(z.sum()).isNotNull());
long total = zones.stream().mapToLong(z -> (Long) z.sum()).sum();
assertThat(total).isEqualTo(expected);
}
}

@Test
void jniWriter_javaReader_multipleChunks(@TempDir Path tmp) throws IOException {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@
///
/// @param min minimum value in the array, or `null` if unknown
/// @param max maximum value in the array, or `null` if unknown
/// @param sum sum of values (`Long` for integer columns, `Double` for floats), or `null` if unknown
/// @param trueCount number of `true` values for bool columns, or `null` if unknown
/// @param nullCount number of null values, or `null` if unknown
/// @param isSorted `true` if the array is sorted in ascending order, or `null` if unknown
/// @param isStrictSorted `true` if the array is strictly sorted (no duplicates), or `null` if unknown
public record ArrayStats(
Object min,
Object max,
Object sum,
Long trueCount,
Long nullCount,
Boolean isSorted,
Boolean isStrictSorted
) {
private static final ArrayStats EMPTY = new ArrayStats(null, null, null, null, null, null);
private static final ArrayStats EMPTY = new ArrayStats(null, null, null, null, null, null, null);

/// Returns an empty stats instance with all fields set to `null`.
///
Expand All @@ -43,11 +45,12 @@ public static ArrayStats fromFbs(io.github.dfa1.vortex.core.fbs.FbsArrayStats fb
}
Object min = decodeScalar(fbs.minAsSegment());
Object max = decodeScalar(fbs.maxAsSegment());
Object sum = decodeScalar(fbs.sumAsSegment());
Long nullCount = fbs.hasNullCount() ? fbs.nullCount() : null;
if (min == null && max == null && nullCount == null) {
if (min == null && max == null && sum == null && nullCount == null) {
return EMPTY;
}
return new ArrayStats(min, max, null, nullCount, null, null);
return new ArrayStats(min, max, sum, null, nullCount, null, null);
}

private static Object decodeScalar(MemorySegment seg) {
Expand Down
Loading
Loading