Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ Per-encoding gotchas:
min/max/sum/null count, decoding sum from the `vortex.stats` zone-map table (matches files from
Rust, whose flat writer omits per-flat sum). Calcite `VortexAggregates.SUM`/`AVG` now fold those
per-zone sums (metadata-only), falling back to a full scan only when a column has no zone map.
Next: `Mask`/`Predicate`/kernel vocab and the two-tier whole-zone+residual reduce.
The fold is now a reusable `reader.compute.ZoneReducer.sum(col)` (the seam a future
`vortex-compute` extracts); Calcite consumes it.
Next: the residual tier needs a consumer first — wire Calcite aggregate+`WHERE` push-down, then
add `ZoneReducer` predicate support (whole-zone fold + boundary-zone streaming) and the
`Mask`/`Predicate`/kernel vocab on top.

## Encodings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@
import io.github.dfa1.vortex.reader.array.FloatArray;
import io.github.dfa1.vortex.reader.array.IntArray;
import io.github.dfa1.vortex.reader.array.LongArray;

import java.util.List;
import io.github.dfa1.vortex.reader.compute.ZoneReducer;

/// Column aggregates answered with the cheapest available source.
///
/// `MIN` / `MAX` / `COUNT` are read from the per-segment zone-map statistics embedded in the
/// file footer — no data segment is decoded. `SUM` (and therefore `AVG`) folds the per-zone
/// `SUM` rows surfaced by [ScanIterator#columnZoneStats(String)] (ADR 0013 §6): when every zone
/// carries a sum the answer is metadata-only too, with no data segment touched. Only when a zone
/// lacks a sum — a column with no zone map, whose flat nodes do not retain it — does it fall back
/// to a streaming scan.
/// `SUM` rows via [ZoneReducer#sum(String)] (ADR 0013 §6): when every zone carries a sum the
/// answer is metadata-only too, with no data segment touched. Only when a zone lacks a sum — a
/// column with no zone map, whose flat nodes do not retain it — does it fall back to a streaming
/// scan.
public final class VortexAggregates {

/// Where an aggregate's value came from.
Expand Down Expand Up @@ -71,7 +70,7 @@ public static Summary of(VortexReader reader, String column) {
// SUM/AVG: fold the per-zone SUM rows when every zone carries one (metadata-only); fall
// back to a single streaming scan otherwise. Integer columns sum into a long (exact);
// floating columns into a double.
Number sum = zoneSum(reader, column);
Number sum = new ZoneReducer(reader).sum(column);
Source sumSource = Source.ZONE_STATS_PUSHDOWN;
if (sum == null) {
sum = scanSum(reader, column);
Expand All @@ -83,39 +82,6 @@ public static Summary of(VortexReader reader, String column) {
Source.ZONE_STATS_PUSHDOWN, sumSource);
}

/// Folds the per-zone `SUM` statistics for `column`, or returns `null` when any zone lacks a
/// sum (so the caller streams the column instead). Integer columns fold into a [Long] (exact),
/// floating columns into a [Double]; the zone-stat boxing already distinguishes the two.
private static Number zoneSum(VortexReader reader, String column) {
try (ScanIterator scan = reader.scan(ScanOptions.columns(column))) {
List<ArrayStats> zones = scan.columnZoneStats(column);
if (zones.isEmpty()) {
return null;
}
long longSum = 0L;
double doubleSum = 0.0;
boolean isFloating = false;
for (ArrayStats zone : zones) {
switch (zone.sum()) {
case Long l -> longSum += l;
case Double d -> {
isFloating = true;
doubleSum += d;
}
case null, default -> {
// A zone with no usable sum (no zone map, or an unhandled stat type) —
// can't push down, let the caller fall back to a full scan.
return null;
}
}
}
if (isFloating) {
return doubleSum;
}
return longSum;
}
}

private static long totalRows(VortexReader reader) {
try (ScanIterator scan = reader.scan(ScanOptions.all())) {
long total = 0L;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.github.dfa1.vortex.reader.compute;

import io.github.dfa1.vortex.reader.ArrayStats;
import io.github.dfa1.vortex.reader.ScanIterator;
import io.github.dfa1.vortex.reader.ScanOptions;
import io.github.dfa1.vortex.reader.VortexReader;

import java.util.List;
import java.util.Objects;

/// Answers column reductions from the per-zone statistics table, without decoding any data
/// segment.
///
/// This is the whole-zone tier of the aggregate push-down sketched in ADR 0013 §6: a reduction
/// folds the per-zone rows surfaced by [ScanIterator#columnZoneStats(String)] instead of streaming
/// the column. The boundary (residual) tier — partially-selected zones under a predicate — is a
/// later increment; today the reducer takes no predicate and folds every zone.
///
/// The reducer lives in `reader.compute` as the seam a future `vortex-compute` module extracts: it
/// depends only on the public reader scan API, so the move is mechanical once a second consumer
/// (the arrow bridge, a query façade) appears.
public final class ZoneReducer {

private final VortexReader reader;

/// Creates a reducer over an open reader.
///
/// @param reader an open reader over the file; not closed by this reducer
public ZoneReducer(VortexReader reader) {
this.reader = Objects.requireNonNull(reader, "reader");
}

/// Folds the per-zone `SUM` statistics for `column` into the column total, or returns `null`
/// when the reduction cannot be answered from the zone-map table — a column with no zone map,
/// or a zone whose `SUM` was not retained (e.g. an overflowed zone) — so the caller streams the
/// column instead.
///
/// Integer columns fold into a [Long] (exact, wrapping at 2^63 like SQL `SUM(BIGINT)`); floating
/// columns into a [Double]. The two are never mixed: a column is one dtype, and the zone-stat
/// boxing already distinguishes them.
///
/// @param column the numeric column name
/// @return the column sum as a [Long] or [Double], or `null` if no zone carries a usable sum
public Number sum(String column) {
Objects.requireNonNull(column, "column");
try (ScanIterator scan = reader.scan(ScanOptions.columns(column))) {
List<ArrayStats> zones = scan.columnZoneStats(column);
if (zones.isEmpty()) {
return null;
}
long longSum = 0L;
double doubleSum = 0.0;
boolean isFloating = false;
for (ArrayStats zone : zones) {
switch (zone.sum()) {
case Long l -> longSum += l;
case Double d -> {
isFloating = true;
doubleSum += d;
}
case null, default -> {
// A zone with no usable sum (no zone map, or an unhandled stat type) — the
// whole reduction can't be pushed down, so signal a fall back to a scan.
return null;
}
}
}
// Return via if/else, not a ?: ternary: a numeric conditional unboxes both branches and
// widens the Long to double, silently turning an integer-column sum into a Double.
if (isFloating) {
return doubleSum;
}
return longSum;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public static WriteOptions cascading(int depth) {
return new WriteOptions(65_536, true, 0.90, depth, true, false);
}

/// Returns a copy of these options with zone-map statistics set to `enabled`.
///
/// @param enabled `true` to write per-chunk min/max/sum statistics for zone-map pruning
/// @return a new `WriteOptions` with the zone-map flag updated
public WriteOptions withZoneMaps(boolean enabled) {
return new WriteOptions(chunkSize, enabled, compressionRatioThreshold, allowedCascading, globalDict, enableZstd);
}

/// Returns a copy of these options with global dictionary encoding set to `enabled`.
///
/// @param enabled `true` to enable global dictionary encoding across chunks
Expand Down
119 changes: 119 additions & 0 deletions writer/src/test/java/io/github/dfa1/vortex/writer/ZoneReducerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package io.github.dfa1.vortex.writer;

import io.github.dfa1.vortex.core.model.DType;
import io.github.dfa1.vortex.reader.ReadRegistry;
import io.github.dfa1.vortex.reader.VortexReader;
import io.github.dfa1.vortex.reader.compute.ZoneReducer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

/// Round-trip tests for [ZoneReducer] — the whole-zone tier of ADR 0013 §6 aggregate push-down.
/// SUM folds the per-zone `SUM` rows the writer embeds in each chunk's zone-map stats, with no data
/// segment decoded.
class ZoneReducerTest {

private static final DType.Struct I64_SCHEMA = new DType.Struct(
List.of("id"), List.of(DType.I64), false);

private static final DType.Struct F64_SCHEMA = new DType.Struct(
List.of("v"), List.of(DType.F64), false);

private static ReadRegistry registry() {
return ReadRegistry.builder().registerServiceLoaded().build();
}

private static long[] range(long from, long to) {
long[] arr = new long[(int) (to - from + 1)];
for (int i = 0; i < arr.length; i++) {
arr[i] = from + i;
}
return arr;
}

@Test
void integerSumFoldsZonesToExactLong(@TempDir Path tmp) throws IOException {
// Given — three I64 chunks (one zone each); column total is 1+..+150 = 11325
Path file = tmp.resolve("ints.vtx");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var w = VortexWriter.create(ch, I64_SCHEMA, WriteOptions.defaults())) {
w.writeChunk(Map.of("id", range(1L, 50L)));
w.writeChunk(Map.of("id", range(51L, 100L)));
w.writeChunk(Map.of("id", range(101L, 150L)));
}

// When
try (VortexReader reader = VortexReader.open(file, registry())) {
Number result = new ZoneReducer(reader).sum("id");

// Then — exact Long (not a Double widening), no data segment decoded
assertThat(result).isInstanceOf(Long.class).isEqualTo(11325L);
}
}

@Test
void floatSumFoldsZonesToDouble(@TempDir Path tmp) throws IOException {
// Given — a single F64 chunk so the sum stat boxes as Double, not Long
Path file = tmp.resolve("floats.vtx");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var w = VortexWriter.create(ch, F64_SCHEMA, WriteOptions.defaults())) {
w.writeChunk(Map.of("v", new double[]{1.5, 2.5, 3.0}));
}

// When
try (VortexReader reader = VortexReader.open(file, registry())) {
Number result = new ZoneReducer(reader).sum("v");

// Then
assertThat(result).isInstanceOf(Double.class).isEqualTo(7.0);
}
}

@Test
void noZoneMapYieldsNull(@TempDir Path tmp) throws IOException {
// Given — zone maps disabled, so no per-zone SUM exists to fold
Path file = tmp.resolve("nostats.vtx");
WriteOptions noZoneMaps = WriteOptions.defaults().withZoneMaps(false);
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var w = VortexWriter.create(ch, I64_SCHEMA, noZoneMaps)) {
w.writeChunk(Map.of("id", range(1L, 50L)));
}

// When
try (VortexReader reader = VortexReader.open(file, registry())) {
Number result = new ZoneReducer(reader).sum("id");

// Then — null signals "not answerable from zones", caller must stream
assertThat(result).isNull();
}
}

@Test
void overflowedZoneYieldsNull(@TempDir Path tmp) throws IOException {
// Given — zone maps ON, but one zone's I64 SUM overflows long (Math.addExact in the writer
// drops it), so a zone map exists yet a zone carries no usable sum. Distinct from
// noZoneMapYieldsNull: there the table is absent; here it is present but incomplete.
Path file = tmp.resolve("overflow.vtx");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var w = VortexWriter.create(ch, I64_SCHEMA, WriteOptions.defaults())) {
w.writeChunk(Map.of("id", range(1L, 50L))); // sums fine
w.writeChunk(Map.of("id", new long[]{Long.MAX_VALUE, Long.MAX_VALUE})); // overflows
}

// When
try (VortexReader reader = VortexReader.open(file, registry())) {
Number result = new ZoneReducer(reader).sum("id");

// Then — one unusable zone forces the whole fold to bail; no partial sum returned
assertThat(result).isNull();
}
}
}
Loading