diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index 1fafa59c2a40..cf51777ff2f6 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -460,7 +460,7 @@ public long unsharedHeapSize() if(this == NONE) return 0; - return EMPTY_SIZE + BTree.sizeOfStructureOnHeap(columns); + return EMPTY_SIZE + BTree.sizeOnHeapOf(columns); } @Override diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index b8df8450c136..531b1cdba081 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -569,7 +569,7 @@ public long unsharedHeapSize() + clustering.unsharedHeapSize() + primaryKeyLivenessInfo.unsharedHeapSize() + deletion.unsharedHeapSize() - + BTree.sizeOfStructureOnHeap(btree); + + BTree.sizeOnHeapOf(btree); return accumulate((cd, v) -> v + cd.unsharedHeapSize(), heapSize); } @@ -581,7 +581,7 @@ public long unsharedHeapSizeExcludingData() + clustering.unsharedHeapSizeExcludingData() + primaryKeyLivenessInfo.unsharedHeapSize() + deletion.unsharedHeapSize() - + BTree.sizeOfStructureOnHeap(btree); + + BTree.sizeOnHeapOf(btree); return accumulate((cd, v) -> v + cd.unsharedHeapSizeExcludingData(), heapSize); } @@ -660,10 +660,20 @@ else if (rowDeletion.isShadowedBy(livenessInfo)) { // The update's deletion shadows part of the existing row. Those cells ARE owned by // the memtable, so record their removal via retain(). - existingBtree = BTree.transformAndFilter(existingBtree, reconciler::retain); + Object[] retained = BTree.transformAndFilter(existingBtree, reconciler::retain); + if (existingBtree != retained) + { + reconcileF.onAllocatedOnHeap(BTree.sizeOnHeapOf(retained) - BTree.sizeOnHeapOf(existingBtree)); + existingBtree = retained; + } } } Object[] tree = BTree.update(existingBtree, updateBtree, ColumnData.comparator, reconciler); + // BTree.update and the reconciler only account the column data (cells and column-tree nodes); the row's + // own LivenessInfo/Deletion are not. When they change (e.g. a row tombstone supersedes a live row) the + // new objects become memtable-owned and the old ones are released, so account that delta here. + reconcileF.onAllocatedOnHeap((livenessInfo.unsharedHeapSize() + rowDeletion.unsharedHeapSize()) + - (existing.primaryKeyLivenessInfo().unsharedHeapSize() + existing.deletion().unsharedHeapSize())); return new BTreeRow(existing.clustering, livenessInfo, rowDeletion, tree, minDeletionTime(tree, livenessInfo, deletion)); } } diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java index 8f44a9c1cf6a..6f64ca62c55f 100644 --- a/src/java/org/apache/cassandra/db/rows/ColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java @@ -156,6 +156,7 @@ public ColumnData merge(ColumnData existing, ColumnData update) } cells = BTree.update(existingTree, updateTree, existingComplex.column.cellComparator(), (UpdateFunction) reconciler); } + onAllocatedOnHeap(maxComplexDeletion.unsharedHeapSize() - existingDeletion.unsharedHeapSize()); return new ComplexColumnData(existingComplex.column, cells, maxComplexDeletion); } } @@ -217,8 +218,28 @@ private ColumnData removeShadowed(ColumnData existing, PostReconciliationFunctio ComplexColumnData existingComplex = (ComplexColumnData) existing; if (activeDeletion.supersedes(existingComplex.complexDeletion())) { - Object[] cells = BTree.transformAndFilter(existingComplex.tree(), (ColumnData cd) -> removeShadowed(cd, recordDeletion)); - return BTree.isEmpty(cells) ? null : new ComplexColumnData(existingComplex.column, cells, DeletionTime.LIVE); + Object[] existingTree = existingComplex.tree(); + Object[] cells = BTree.transformAndFilter(existingTree, (ColumnData cd) -> removeShadowed(cd, recordDeletion)); + ComplexColumnData result = BTree.isEmpty(cells) ? null + : new ComplexColumnData(existingComplex.column, cells, DeletionTime.LIVE); + // The shadowed inner cells are released through recordDeletion.delete above, but that does not cover + // the complex column's own structure: its cell tree (which can span multiple BTree nodes), its + // complex deletion, and, when the column is dropped entirely, its wrapper. All were counted as + // owned when the column was first written (ComplexColumnData.unsharedHeapSizeExcludingData), so release that + // delta here. The rewritten column carries DeletionTime.LIVE, so the dropped complex + // deletion's heap is released too, matching the swap Reconciler.merge accounts on its path. + // On the update side (recordDeletion == noOp) this is a no-op, so skip it entirely. + if (recordDeletion != ColumnData.noOp) + { + long structureBefore = ComplexColumnData.EMPTY_SIZE + + existingComplex.complexDeletion().unsharedHeapSize() + + BTree.sizeOnHeapOf(existingTree); + long structureAfter = result == null ? 0 : ComplexColumnData.EMPTY_SIZE + + DeletionTime.LIVE.unsharedHeapSize() + + BTree.sizeOnHeapOf(cells); + recordDeletion.onAllocatedOnHeap(structureAfter - structureBefore); + } + return result; } } diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index 062b389ad847..20b6793abc14 100644 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@ -51,11 +51,11 @@ public class ComplexColumnData extends ColumnData implements Iterable> { static final Cell[] NO_CELLS = new Cell[0]; - private static final long EMPTY_SIZE = ObjectSizes.measure(new ComplexColumnData(ColumnMetadata.regularColumn("", - "", - "", - SetType.getInstance(ByteType.instance, true), - ColumnMetadata.NO_UNIQUE_ID), + static final long EMPTY_SIZE = ObjectSizes.measure(new ComplexColumnData(ColumnMetadata.regularColumn("", + "", + "", + SetType.getInstance(ByteType.instance, true), + ColumnMetadata.NO_UNIQUE_ID), NO_CELLS, DeletionTime.build(0, 0))); @@ -151,7 +151,7 @@ public long unsharedHeapSize() @Override public long unsharedHeapSizeExcludingData() { - long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells); + long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells) + complexDeletion.unsharedHeapSize(); // TODO: this can be turned into a simple multiplication, at least while we have only one Cell implementation for (Cell cell : this) heapSize += cell.unsharedHeapSizeExcludingData(); diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java index e198e06834b7..0b277d5b2bfd 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTree.java +++ b/src/java/org/apache/cassandra/utils/btree/BTree.java @@ -86,6 +86,7 @@ public class BTree public static final int MIN_KEYS = BRANCH_FACTOR / 2 - 1; public static final int MAX_KEYS = BRANCH_FACTOR - 1; public static final long STOP_SENTINEL_VALUE = Long.MAX_VALUE; + private static final long MAX_KEYS_REF_ARRAY_SIZE = ObjectSizes.sizeOfReferenceArray(MAX_KEYS); // An empty BTree Leaf - which is the same as an empty BTree private static final Object[] EMPTY_LEAF = new Object[1]; @@ -979,19 +980,6 @@ public static int size(Object[] tree) return sizeMap(tree)[(length / 2) - 1]; } - public static long sizeOfStructureOnHeap(Object[] tree) - { - if (tree == EMPTY_LEAF) - return 0; - - long size = ObjectSizes.sizeOfArray(tree); - if (isLeaf(tree)) - return size; - for (int i = getChildStart(tree); i < getChildEnd(tree); i++) - size += sizeOfStructureOnHeap((Object[]) tree[i]); - return size; - } - /** * Checks is the node is a leaf. * @@ -2331,6 +2319,23 @@ private static long sizeOnHeapOfLeaf(Object[] tree) return ObjectSizes.sizeOfArray(tree); } + /** + * The heap occupied by a single node (its backing array, plus the sizeMap for a branch), not including + * its children. This is the per-node contribution to {@link #sizeOnHeapOf(Object[])}; the builders use it to + * keep their running {@code allocated} total net: every newly retained node adds its shallow heap and every + * source node it replaces subtracts its shallow heap, so reused subtrees cancel. + */ + private static long shallowHeapOf(Object[] node) + { + if (isEmpty(node)) + return 0; + + long size = ObjectSizes.sizeOfArray(node); + if (!isLeaf(node)) + size += ObjectSizes.sizeOfArray(sizeMap(node)); + return size; + } + // Arbitrary boundaries private static Object POSITIVE_INFINITY = new Object(); private static Object NEGATIVE_INFINITY = new Object(); @@ -2646,6 +2651,7 @@ static boolean areIdentical(int[] a, int aOffset, int[] b, int bOffset, int coun */ private static abstract class LeafBuilder extends LeafOrBranchBuilder { + static final long DISABLED = Long.MIN_VALUE; long allocated; LeafBuilder() @@ -2654,6 +2660,13 @@ private static abstract class LeafBuilder extends LeafOrBranchBuilder buffer = new Object[MAX_KEYS]; } + /** Adjust the running heap total, when accounting is enabled */ + final void addAllocated(long bytes) + { + if (allocated != DISABLED) + allocated += bytes; + } + /** * Add {@code nextKey} to the buffer, overflowing if necessary */ @@ -2830,8 +2843,11 @@ Object[] redistributeAndDrain(Object[] pred, int predSize, Object predNextKey) int newPredecessorCount = predSize - steal; Object[] newPredecessor = new Object[newPredecessorCount | 1]; System.arraycopy(pred, 0, newPredecessor, 0, newPredecessorCount); - if (allocated >= 0) - allocated += ObjectSizes.sizeOfReferenceArray(newPredecessorCount | 1); + // we replace the single source leaf with two new leaves (newPredecessor + newLeaf); account both and + // release the source we consumed, so the running total stays a net delta + addAllocated(ObjectSizes.sizeOfArray(newPredecessor) + + ObjectSizes.sizeOfArray(newLeaf) + - (sourceNode == null ? 0 : sizeOnHeapOfLeaf(sourceNode))); ensureParent().addChildAndNextKey(newPredecessor, newPredecessorCount, pred[newPredecessorCount]); return newLeaf; } @@ -2883,8 +2899,7 @@ void propagateOverflow() { // propagate the leaf we have saved in savedBuffer // precondition: savedLeafCount == MAX_KEYS - if (allocated >= 0) - allocated += ObjectSizes.sizeOfReferenceArray(MAX_KEYS); + addAllocated(MAX_KEYS_REF_ARRAY_SIZE); ensureParent().addChildAndNextKey(savedBuffer, MAX_KEYS, savedNextKey); savedBuffer = null; savedNextKey = null; @@ -2921,9 +2936,7 @@ else if (!hasOverflow() && sourceNode != null && count == sizeOfLeaf(sourceNode) propagateOverflow(); sizeOfLeaf = count; - leaf = drain(); - if (allocated >= 0 && sizeOfLeaf > 0) - allocated += ObjectSizes.sizeOfReferenceArray(sizeOfLeaf | 1) - (sourceNode == null ? 0 : sizeOnHeapOfLeaf(sourceNode)); + leaf = drain(); // drain() accounts the new leaf and releases the source it replaces } count = 0; @@ -2941,19 +2954,27 @@ Object[] drain() // the number of children here may be smaller than MIN_KEYS if this is the root node assert !hasOverflow(); if (count == 0) + { + // defensive: unreachable from BTree.update + if (sourceNode != null) + addAllocated(-sizeOnHeapOfLeaf(sourceNode)); + clearSourceNode(); return empty(); + } Object[] newLeaf; if (sourceNode != null && count == sizeOfLeaf(sourceNode) && areIdentical(buffer, 0, sourceNode, 0, count)) { - newLeaf = sourceNode; + newLeaf = sourceNode; // reused unchanged: neither allocated nor released } else { newLeaf = new Object[count | 1]; System.arraycopy(buffer, 0, newLeaf, 0, count); + // account the new leaf and release the source it replaces, before clearSourceNode() drops it + addAllocated(ObjectSizes.sizeOfArray(newLeaf) - (sourceNode == null ? 0 : sizeOnHeapOfLeaf(sourceNode))); } count = 0; clearSourceNode(); @@ -3054,10 +3075,10 @@ final void addChildAndNextKey(Object[] newChild, int newChildSize, Object nextKe */ final void propagateOverflow() { - // propagate the leaf we have saved in leaf().savedBuffer - if (leaf.allocated >= 0) - leaf.allocated += ObjectSizes.sizeOfReferenceArray(2 * (1 + MAX_KEYS)); + // propagate the branch we have saved in savedBuffer; it is a brand new node, so account it in full + // (array + sizeMap) with nothing to release int size = setOverflowSizeMap(savedBuffer, MAX_KEYS); + leaf.addAllocated(shallowHeapOf(savedBuffer)); ensureParent().addChildAndNextKey(savedBuffer, size, savedNextKey); savedBuffer = null; savedNextKey = null; @@ -3114,8 +3135,11 @@ Object[] redistributeOverflowAndDrain() System.arraycopy(savedBuffer, 0, savedBranch, 0, savedBranchCount); System.arraycopy(savedBuffer, MAX_KEYS, savedBranch, savedBranchCount, savedBranchCount + 1); int savedBranchSize = setOverflowSizeMap(savedBranch, savedBranchCount); - if (leaf.allocated >= 0) - leaf.allocated += ObjectSizes.sizeOfReferenceArray(2 * (1 + savedBranchCount)); + // we replace the single source branch with two new branches (savedBranch + newBranch); account both + // (array + sizeMap) and release the source we consumed + leaf.addAllocated(shallowHeapOf(savedBranch) + + shallowHeapOf(newBranch) + - (sourceNode == null ? 0 : shallowHeapOf(sourceNode))); ensureParent().addChildAndNextKey(savedBranch, savedBranchSize, savedBuffer[savedBranchCount]); savedNextKey = null; @@ -3224,6 +3248,8 @@ && areIdentical(buffer, MAX_KEYS, unode, usz, usz + 1)) System.arraycopy(buffer, 0, branch, 0, count); System.arraycopy(buffer, MAX_KEYS, branch, count, count + 1); sizeOfBranch = setDrainSizeMap(unode, usz, branch, count); + // account the new branch (array + sizeMap) and release the source branch it replaces + leaf.addAllocated(shallowHeapOf(branch) - (unode == null ? 0 : shallowHeapOf(unode))); } } @@ -3245,6 +3271,9 @@ Object[] drain() assert !hasOverflow(); if (count == 0) { + // defensive: unreachable from BTree.update + if (sourceNode != null) + leaf.addAllocated(-shallowHeapOf(sourceNode)); clearSourceNode(); hasRightChild = false; return (Object[]) buffer[MAX_KEYS]; @@ -3256,7 +3285,7 @@ Object[] drain() && areIdentical(buffer, 0, sourceNode, 0, count) && areIdentical(buffer, MAX_KEYS, sourceNode, count, count + 1)) { - branch = sourceNode; + branch = sourceNode; // reused unchanged: neither allocated nor released } else { @@ -3273,6 +3302,9 @@ && areIdentical(buffer, MAX_KEYS, sourceNode, count, count + 1)) System.arraycopy(buffer, MAX_KEYS, branch, count, count + 1); } setDrainSizeMap(null, -1, branch, count); + // account the new branch (array + sizeMap) and release the source branch it replaces, + // before clearSourceNode() drops it + leaf.addAllocated(shallowHeapOf(branch) - (sourceNode == null ? 0 : shallowHeapOf(sourceNode))); } count = 0; @@ -3506,6 +3538,7 @@ void reset() branch.inUse = false; branch = branch.parent; } + leaf().clearSourceNode(); Invariants.require(branch == null || (branch.count == 0 && !branch.hasRightChild)); } @@ -3554,7 +3587,7 @@ public static class FastBuilder extends AbstractFastBuilder implements AutoCl FastBuilder() { - allocated = -1; + allocated = DISABLED; } // disable allocation tracking public void add(V value) @@ -3667,7 +3700,7 @@ Object[] update(Object[] update, Object[] insert, Comparator co this.insert.init(insert); this.updateF = updateF; this.comparator = comparator; - this.allocated = isSimple(updateF) ? -1 : 0; + this.allocated = isSimple(updateF) ? DISABLED : 0; int leafDepth = BTree.depth(update) - 1; LeafOrBranchBuilder builder = leaf(); Invariants.require(builder.isEmpty()); @@ -3678,12 +3711,17 @@ Object[] update(Object[] update, Object[] insert, Comparator co builder = branch; } + // record the root as the top builder's source, so that when the root node is rebuilt its old version is + // released from the running heap total (and an unchanged root can be reused), mirroring the per-node + // accounting performed for every child level + builder.setSourceNode(update); + Insert ik = this.insert.next(); ik = updateRecursive(ik, update, null, builder); assert ik == null; Object[] result = builder.completeBuild(); - if (allocated > 0) + if (allocated != DISABLED) updateF.onAllocatedOnHeap(allocated); return result; @@ -3864,7 +3902,7 @@ static abstract class AbstractSeekingTransformer extends AbstractFastBuild AbstractSeekingTransformer() { - allocated = -1; + allocated = DISABLED; ensureParent(); parent.inUse = false; } diff --git a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java index 0c23e69ec12e..5626cc3a768f 100644 --- a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java +++ b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java @@ -44,7 +44,7 @@ public interface UpdateFunction V merge(V replacing, K update); /** - * @param heapSize extra heap space allocated (over previous tree) + * @param heapSize heap space signed delta allocated (over previous tree), can be negative */ void onAllocatedOnHeap(long heapSize); diff --git a/test/unit/org/apache/cassandra/db/partitions/PartitionRowAccountingTest.java b/test/unit/org/apache/cassandra/db/partitions/PartitionRowAccountingTest.java new file mode 100644 index 000000000000..1af30de30fbc --- /dev/null +++ b/test/unit/org/apache/cassandra/db/partitions/PartitionRowAccountingTest.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.partitions; + +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.memtable.Memtable; +import org.apache.cassandra.utils.btree.BTree; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Row-level analogue of {@link SetCellAccountingTest}. Where that test grows/resets a single {@code set} + * column on one row, this test grows/resets the rows of a single partition: + *
    + *
  • adding a row is the analogue of adding a set element (both merge a new entry into a BTree -- here the + * partition's row tree, via {@link BTreePartitionUpdater#makeMergedPartition} → + * {@code BTree.update(current.tree, update.holder().tree, ...)});
  • + *
  • a whole-partition tombstone is the analogue of a {@code set = {...}} override: it shadows every previously + * written row, just as a collection assignment shadows every previously written element.
  • + *
+ * It runs a grow/reset op mix on one partition with strictly increasing timestamps and asserts the memtable's + * on/off-heap ownership never goes negative -- i.e. the update accounting never releases more than it allocated. + */ +public class PartitionRowAccountingTest extends CQLTester +{ + private static final Logger logger = LoggerFactory.getLogger(PartitionRowAccountingTest.class); + + @BeforeClass + public static void setUpClass() + { + ServerTestUtils.daemonInitialization(); + try + { + Field confField = DatabaseDescriptor.class.getDeclaredField("conf"); + confField.setAccessible(true); + Config conf = (Config) confField.get(null); + conf.memtable_allocation_type = Config.MemtableAllocationType.offheap_objects; + } + catch (ReflectiveOperationException e) + { + throw new RuntimeException(e); + } + CQLTester.prepareServer(); + } + + private ColumnFamilyStore createTestTable() + { + createTable("CREATE TABLE %s (" + + " pk text," + + " ck int," + + " last_contact timestamp," + + " namespace text," + + " properties text," + + " state text," + + " v text," + + " PRIMARY KEY (pk, ck))"); + return getCurrentColumnFamilyStore(); + } + + @Test + public void largePartitionGrowShrinkKeepOwnsNonNegative() + { + ColumnFamilyStore cfs = createTestTable(); + String tbl = KEYSPACE + '.' + currentTable(); + + final int rowCount = BTree.MAX_KEYS + 1; + final int ops = 2_000; + final AtomicLong ts = new AtomicLong(1_000_000L); + + for (int op = 0; op < ops; op++) + { + String cql; + int expectedLiveRows; + if (op % 2 == 0) + { + // grow: add the full set of rows (each row ~ a set element) + cql = growBatch(tbl, 0, rowCount, ts.incrementAndGet()); + expectedLiveRows = rowCount; + } + else + { + // reset: a whole-partition tombstone (~ "set = {...}") followed by a smaller set of rows. + // the tombstone is at an earlier timestamp than the re-inserted rows so the latter survive, + // while every previously written row (at an earlier timestamp) is shadowed by the tombstone. + long deleteTs = ts.incrementAndGet(); + long insertTs = ts.incrementAndGet(); + cql = resetBatch(tbl, 0, rowCount / 2, deleteTs, insertTs); + expectedLiveRows = rowCount / 2; + } + + QueryProcessor.executeInternal(cql); + + UntypedResultSet rs = QueryProcessor.executeInternal("SELECT ck FROM " + tbl + " WHERE pk = 'test'"); + int liveRows = 0; + if (rs != null) + for (UntypedResultSet.Row ignored : rs) + liveRows++; + + if (op % 100 == 0) + logger.info("== op=" + op + + ", liveRows= " + liveRows + + ", heapSize= " + ownsOnHeapNow(cfs) + + ", offheapSize= " + ownsOffHeapNow(cfs)); + + // the grow/reset analogue must actually oscillate the visible row count, proving the partition + // tombstone shadows the prior rows just as a set override shadows prior elements + assertThat(liveRows).as("visible rows after op=" + op).isEqualTo(expectedLiveRows); + assertOwnsNonNegative(cfs, "after op=" + op); + } + + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + assertOwnsNonNegative(cfs, "after flush"); + } + + /** + * Builds two logically identical partitions -- a deleted wide row -- via two merge paths and requires their + * on-heap ownership to be exactly equal: + *
    + *
  • retain path: insert a row's cells, then delete the row (the tombstone shadows the existing, owned cells);
  • + *
  • removeShadowed path: delete the row first, then insert cells the tombstone already shadows.
  • + *
+ * Both end with the same columns, row tombstone and empty cell tree, so correct accounting leaves them owning the + * same on-heap. Only the retain path can leak: if {@code BTreeRow.merge} fails to release the shrunk column tree or + * the row's liveness/deletion delta, the retain partition owns more -- an inflation a non-negative-ownership check + * cannot catch. Off-heap is not compared: under {@code offheap_objects} the retain path has written the cells into + * the off-heap slab, only reclaimed at flush, so it legitimately owns more off-heap. + */ + @Test + public void rowTombstoneOverExistingRowDoesNotInflateOwnership() + { + ColumnFamilyStore cfsRetain = createWideTable(); + String tblRetain = KEYSPACE + '.' + currentTable(); + ColumnFamilyStore cfsShadow = createWideTable(); + String tblShadow = KEYSPACE + '.' + currentTable(); + + final int rows = 50; + final long insertTs = 1000L; + final long deleteTs = 2000L; // newer than insertTs, so the tombstone shadows the cells + + for (int ck = 0; ck < rows; ck++) + { + // retain path: write the row, then delete it -> the tombstone shadows existing (owned) cells + QueryProcessor.executeInternal(wideInsert(tblRetain, ck, insertTs)); + QueryProcessor.executeInternal(rowDelete(tblRetain, ck, deleteTs)); + + // removeShadowed path: delete first, then write cells the tombstone already shadows + QueryProcessor.executeInternal(rowDelete(tblShadow, ck, deleteTs)); + QueryProcessor.executeInternal(wideInsert(tblShadow, ck, insertTs)); + } + + long onHeapRetain = ownsOnHeapNow(cfsRetain), onHeapShadow = ownsOnHeapNow(cfsShadow); + long offHeapRetain = ownsOffHeapNow(cfsRetain), offHeapShadow = ownsOffHeapNow(cfsShadow); + logger.info("retain-path onHeap=" + onHeapRetain + " offHeap=" + offHeapRetain + + "; removeShadowed-path onHeap=" + onHeapShadow + " offHeap=" + offHeapShadow); + + assertOwnsNonNegative(cfsRetain, "retain path"); + assertOwnsNonNegative(cfsShadow, "removeShadowed path"); + + assertThat(onHeapRetain) + .as("writing then deleting a row must leave the memtable owning exactly the same on-heap (%d bytes) as never " + + "effectively writing it (%d bytes) -- the shrunk column tree and the row's liveness/deletion delta must be " + + "accounted", onHeapRetain, onHeapShadow) + .isEqualTo(onHeapShadow); + } + + /** + * Same as {@link #rowTombstoneOverExistingRowDoesNotInflateOwnership} but the shadowed data lives in a + * complex (collection) column whose internal cell tree spans multiple BTree nodes (branch + leaves). + *

+ * The retain path ({@code BTreeRow.merge} → {@code reconciler::retain} → {@code ColumnData.removeShadowed} for the + * {@code set} column) drops every collection cell when the row tombstone supersedes them, but must also release the + * collection's internal tree node structure -- which was counted as memtable-owned when the row was first inserted + * ({@code ComplexColumnData.unsharedHeapSizeExcludingData} includes {@code sizeOnHeapOf(tree)}). If that internal + * structure is not released, the retain partition owns more on-heap than the logically identical removeShadowed + * partition. + */ + @Test + public void rowTombstoneOverExistingCollectionDoesNotInflateOwnership() + { + ColumnFamilyStore cfsRetain = createCollectionTable(); + String tblRetain = KEYSPACE + '.' + currentTable(); + ColumnFamilyStore cfsShadow = createCollectionTable(); + String tblShadow = KEYSPACE + '.' + currentTable(); + + final int rows = 50; + final long insertTs = 1000L; + final long deleteTs = 2000L; // newer than insertTs, so the tombstone shadows the collection cells + + for (int ck = 0; ck < rows; ck++) + { + // retain path: write the row (large collection), then delete it -> the tombstone shadows existing (owned) + // cells and the collection's multi-node internal tree + QueryProcessor.executeInternal(setInsert(tblRetain, ck, insertTs)); + QueryProcessor.executeInternal(rowDelete(tblRetain, ck, deleteTs)); + + // removeShadowed path: delete first, then write cells the tombstone already shadows + QueryProcessor.executeInternal(rowDelete(tblShadow, ck, deleteTs)); + QueryProcessor.executeInternal(setInsert(tblShadow, ck, insertTs)); + } + + long onHeapRetain = ownsOnHeapNow(cfsRetain), onHeapShadow = ownsOnHeapNow(cfsShadow); + long offHeapRetain = ownsOffHeapNow(cfsRetain), offHeapShadow = ownsOffHeapNow(cfsShadow); + logger.info("collection retain-path onHeap=" + onHeapRetain + " offHeap=" + offHeapRetain + + "; removeShadowed-path onHeap=" + onHeapShadow + " offHeap=" + offHeapShadow); + + assertOwnsNonNegative(cfsRetain, "retain path"); + assertOwnsNonNegative(cfsShadow, "removeShadowed path"); + + assertThat(onHeapRetain) + .as("writing then deleting a row with a multi-node collection must leave the memtable owning exactly the same " + + "on-heap (%d bytes) as never effectively writing it (%d bytes) -- the collection's freed internal tree " + + "structure must be accounted", onHeapRetain, onHeapShadow) + .isEqualTo(onHeapShadow); + } + + /** + * Isolates the complex column's own tombstone heap, which the broader + * {@link #rowTombstoneOverExistingCollectionDoesNotInflateOwnership} masks behind a large surviving cell tree. + *

+ * A whole-collection delete ({@code DELETE s ...}) writes a {@link org.apache.cassandra.db.rows.ComplexColumnData} + * carrying only a non-LIVE complex deletion and an empty cell tree, so its memtable-owned heap + * ({@code unsharedHeapSizeExcludingData}) reduces to {@code EMPTY_SIZE + complexDeletion.unsharedHeapSize()} -- the + * cell tree contributes nothing. When a newer row tombstone supersedes it, {@code ColumnData.removeShadowed} must + * release that complex deletion's heap ({@code DeletionTime.EMPTY_SIZE}) and not just the wrapper. If that term is + * dropped, the retain path strands exactly one {@code DeletionTime.EMPTY_SIZE} per row and owns more on-heap than + * the logically identical removeShadowed path -- a gap the large-tree test cannot attribute and a non-negative + * check cannot see. This is the drop-side half of the complex-deletion heap accounting; the swap-side (merge) half + * is exercised exhaustively by {@code AtomicBTreePartitionMemtableAccountingTest}. + */ + @Test + public void rowTombstoneOverCollectionDeletionReleasesComplexDeletionHeap() + { + ColumnFamilyStore cfsRetain = createCollectionTable(); + String tblRetain = KEYSPACE + '.' + currentTable(); + ColumnFamilyStore cfsShadow = createCollectionTable(); + String tblShadow = KEYSPACE + '.' + currentTable(); + + final int rows = 200; + final long collectionDeleteTs = 1000L; + final long rowDeleteTs = 2000L; // newer, so the row tombstone supersedes the collection tombstone + + for (int ck = 0; ck < rows; ck++) + { + // retain path: write a collection tombstone (an owned complex deletion over an empty cell tree), + // then delete the row -> the row tombstone supersedes and releases it via reconciler::retain + QueryProcessor.executeInternal(collectionDelete(tblRetain, ck, collectionDeleteTs)); + QueryProcessor.executeInternal(rowDelete(tblRetain, ck, rowDeleteTs)); + + // removeShadowed path: delete the row first, then write a collection tombstone it already shadows + QueryProcessor.executeInternal(rowDelete(tblShadow, ck, rowDeleteTs)); + QueryProcessor.executeInternal(collectionDelete(tblShadow, ck, collectionDeleteTs)); + } + + long onHeapRetain = ownsOnHeapNow(cfsRetain), onHeapShadow = ownsOnHeapNow(cfsShadow); + logger.info("collection-tombstone retain-path onHeap=" + onHeapRetain + + "; removeShadowed-path onHeap=" + onHeapShadow); + + assertOwnsNonNegative(cfsRetain, "retain path"); + assertOwnsNonNegative(cfsShadow, "removeShadowed path"); + + assertThat(onHeapRetain) + .as("a row tombstone superseding a collection tombstone must release the complex deletion's own heap " + + "(DeletionTime.EMPTY_SIZE per row): retain owns %d, removeShadowed owns %d", onHeapRetain, onHeapShadow) + .isEqualTo(onHeapShadow); + } + + private static String collectionDelete(String tbl, int ck, long timestamp) + { + return "DELETE s FROM " + tbl + " USING TIMESTAMP " + timestamp + " WHERE pk = 'test' AND ck = " + ck; + } + + private static long ownsOnHeapNow(ColumnFamilyStore cfs) + { + return Memtable.getMemoryUsage(cfs.getTracker().getView().getCurrentMemtable()).ownsOnHeap; + } + + private static long ownsOffHeapNow(ColumnFamilyStore cfs) + { + return Memtable.getMemoryUsage(cfs.getTracker().getView().getCurrentMemtable()).ownsOffHeap; + } + + /** + * A batch that inserts rows with clustering keys in [from,to), all at {@code timestamp}. + */ + private static String growBatch(String tbl, int from, int to, long timestamp) + { + StringBuilder sb = new StringBuilder("BEGIN UNLOGGED BATCH\n"); + for (int ck = from; ck < to; ck++) + appendInsert(sb, tbl, ck, timestamp); + return sb.append("APPLY BATCH;").toString(); + } + + /** + * A batch that first deletes the whole partition at {@code deleteTs} (a partition tombstone, the analogue of a + * {@code set = {...}} override), then inserts rows with clustering keys in [from,to) at {@code insertTs > deleteTs}. + */ + private static String resetBatch(String tbl, int from, int to, long deleteTs, long insertTs) + { + StringBuilder sb = new StringBuilder("BEGIN UNLOGGED BATCH\n"); + sb.append("DELETE FROM ").append(tbl).append(" USING TIMESTAMP ").append(deleteTs) + .append(" WHERE pk = 'test';\n"); + for (int ck = from; ck < to; ck++) + appendInsert(sb, tbl, ck, insertTs); + return sb.append("APPLY BATCH;").toString(); + } + + private static void appendInsert(StringBuilder sb, String tbl, int ck, long timestamp) + { + sb.append("INSERT INTO ").append(tbl) + .append(" (pk, ck, namespace, properties, state, v) VALUES ('test', ").append(ck) + .append(", '").append(elemName(ck)).append('\'') + .append(", '").append(elemName(ck)).append('\'') + .append(", '").append(elemName(ck)).append('\'') + .append(", '").append(elemName(ck)).append('\'') + .append(") USING TIMESTAMP ").append(timestamp).append(";\n"); + } + + private static String elemName(int i) + { + return 'e' + String.format("%05d", i); + } + + private static final int WIDE_COLS = BTree.MAX_KEYS + 1; // one more than fits in a leaf, so each row's column tree spans multiple nodes + + private ColumnFamilyStore createWideTable() + { + StringBuilder sb = new StringBuilder("CREATE TABLE %s (pk text, ck int"); + for (int i = 0; i < WIDE_COLS; i++) + sb.append(", ").append(colName(i)).append(" int"); + sb.append(", PRIMARY KEY (pk, ck))"); + createTable(sb.toString()); + return getCurrentColumnFamilyStore(); + } + + private static String wideInsert(String tbl, int ck, long timestamp) + { + StringBuilder names = new StringBuilder("pk, ck"); + StringBuilder vals = new StringBuilder("'test', ").append(ck); + for (int i = 0; i < WIDE_COLS; i++) + { + names.append(", ").append(colName(i)); + vals.append(", ").append(i); + } + return "INSERT INTO " + tbl + " (" + names + ") VALUES (" + vals + ") USING TIMESTAMP " + timestamp; + } + + private static String rowDelete(String tbl, int ck, long timestamp) + { + return "DELETE FROM " + tbl + " USING TIMESTAMP " + timestamp + " WHERE pk = 'test' AND ck = " + ck; + } + + // enough elements that each row's collection cell tree spans multiple nodes (branches + leaves), so the freed + // internal structure is non-trivial + private static final int SET_ELEMENTS = 4 * (BTree.MAX_KEYS + 1); + + private ColumnFamilyStore createCollectionTable() + { + createTable("CREATE TABLE %s (pk text, ck int, s set, PRIMARY KEY (pk, ck))"); + return getCurrentColumnFamilyStore(); + } + + private static String setInsert(String tbl, int ck, long timestamp) + { + StringBuilder set = new StringBuilder("{"); + for (int i = 0; i < SET_ELEMENTS; i++) + { + if (i > 0) + set.append(", "); + set.append('\'').append(elemName(i)).append('\''); + } + set.append('}'); + return "INSERT INTO " + tbl + " (pk, ck, s) VALUES ('test', " + ck + ", " + set + ") USING TIMESTAMP " + timestamp; + } + + private static String colName(int i) + { + return "c" + String.format("%03d", i); + } + + private static void assertOwnsNonNegative(ColumnFamilyStore cfs, String step) + { + for (Memtable mt : cfs.getTracker().getView().getAllMemtables()) + { + Memtable.MemoryUsage usage = Memtable.getMemoryUsage(mt); + assertThat(usage.ownsOnHeap) + .as("ON-heap owns went NEGATIVE [" + step + "]") + .isGreaterThanOrEqualTo(0L); + assertThat(usage.ownsOffHeap) + .as("OFF-heap owns went NEGATIVE [" + step + "]") + .isGreaterThanOrEqualTo(0L); + } + } +} diff --git a/test/unit/org/apache/cassandra/db/partitions/SetCellAccountingTest.java b/test/unit/org/apache/cassandra/db/partitions/SetCellAccountingTest.java new file mode 100644 index 000000000000..6c188d316d5c --- /dev/null +++ b/test/unit/org/apache/cassandra/db/partitions/SetCellAccountingTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.partitions; + +import java.lang.reflect.Field; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.memtable.Memtable; +import org.apache.cassandra.utils.btree.BTree; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Runs a grow/reset op mix on one partition's {@code set} column with strictly + * increasing timestamps, then asserts the memtable's on/off-heap ownership never goes negative. + */ +public class SetCellAccountingTest extends CQLTester +{ + private static final Logger logger = LoggerFactory.getLogger(SetCellAccountingTest.class); + + @BeforeClass + public static void setUpClass() + { + ServerTestUtils.daemonInitialization(); + try + { + Field confField = DatabaseDescriptor.class.getDeclaredField("conf"); + confField.setAccessible(true); + Config conf = (Config) confField.get(null); + conf.memtable_allocation_type = Config.MemtableAllocationType.offheap_objects; + } + catch (ReflectiveOperationException e) + { + throw new RuntimeException(e); + } + CQLTester.prepareServer(); + } + + private ColumnFamilyStore createTestTable() + { + createTable("CREATE TABLE %s (" + + " name text PRIMARY KEY," + + " last_contact timestamp," + + " namespace text," + + " partitioner text," + + " properties text," + + " state text," + + " seed_hosts set)"); + return getCurrentColumnFamilyStore(); + } + + @Test + public void largeSetGrowShrinkKeepOwnsNonNegative() + { + ColumnFamilyStore cfs = createTestTable(); + + final int setSize = BTree.MAX_KEYS + 1; + final int ops = 10_000; + final AtomicLong ts = new AtomicLong(1_000_000L); + + for (int op = 0; op < ops; op++) + { + long t = ts.incrementAndGet(); + String cql; + if (op % 2 == 0) + { + cql = "UPDATE %s USING TIMESTAMP " + t + " SET seed_hosts = seed_hosts + " + + rangeSet(0, setSize) + " WHERE name = 'test'"; + } + else + { + cql = "UPDATE %s USING TIMESTAMP " + t + " SET seed_hosts = " + + rangeSet(0, setSize / 2) + " WHERE name = 'test'"; + } + + + QueryProcessor.executeInternal(formatQuery(cql)); + UntypedResultSet rs = QueryProcessor.executeInternal( + formatQuery("SELECT seed_hosts FROM %s WHERE name = 'test'")); + Set seeds = (rs == null || rs.isEmpty() || !rs.one().has("seed_hosts")) + ? null + : rs.one().getSet("seed_hosts", UTF8Type.instance); + + if (op % 100 == 0) + logger.info("== op=" + op + + ", seedsSize= " + (seeds != null ? seeds.size() : "0") + + ", heapSize= " + ownsOnHeapNow(cfs) + + ", offheapSize= " + ownsOffHeapNow(cfs) + + ", seed_hosts=" + seeds); + assertOwnsNonNegative(cfs, "after op=" + op); + + } + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + assertOwnsNonNegative(cfs, "after flush"); + } + + private static long ownsOnHeapNow(ColumnFamilyStore cfs) + { + return Memtable.getMemoryUsage(cfs.getTracker().getView().getCurrentMemtable()).ownsOnHeap; + } + + private static long ownsOffHeapNow(ColumnFamilyStore cfs) + { + return Memtable.getMemoryUsage(cfs.getTracker().getView().getCurrentMemtable()).ownsOffHeap; + } + + + /** + * {@code {'e00000','e00001',...}} for indices [from,to). + */ + private static String rangeSet(int from, int to) + { + if (to <= from) return "{}"; + StringBuilder sb = new StringBuilder("{"); + for (int i = from; i < to; i++) + { + if (i > from) sb.append(','); + sb.append('\'').append(elemName(i)).append('\''); + } + return sb.append('}').toString(); + } + + private static String elemName(int i) + { + return 'e' + String.format("%05d", i); + } + + private static void assertOwnsNonNegative(ColumnFamilyStore cfs, String step) + { + for (Memtable mt : cfs.getTracker().getView().getAllMemtables()) + { + Memtable.MemoryUsage usage = Memtable.getMemoryUsage(mt); + assertThat(usage.ownsOnHeap) + .as("ON-heap owns went NEGATIVE [" + step + "]") + .isGreaterThanOrEqualTo(0L); + assertThat(usage.ownsOffHeap) + .as("OFF-heap owns went NEGATIVE [" + step + "]") + .isGreaterThanOrEqualTo(0L); + } + } +} diff --git a/test/unit/org/apache/cassandra/utils/btree/BTreeUpdateHeapAccountingTest.java b/test/unit/org/apache/cassandra/utils/btree/BTreeUpdateHeapAccountingTest.java new file mode 100644 index 000000000000..fd8679a94bde --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/btree/BTreeUpdateHeapAccountingTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.btree; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import java.util.TreeSet; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.utils.BulkIterator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +/** + * Asserts that {@link BTree#update}'s heap accounting -- the total reported through + * {@link UpdateFunction#onAllocatedOnHeap(long)} -- matches the actual on-heap structure of the resulting tree, as + * measured by {@link BTree#sizeOnHeapOf(Object[])}. {@code onAllocatedOnHeap} reports the extra heap allocated over + * the previous tree, so summed over a sequence of updates from empty it must equal {@code sizeOnHeapOf(result)}. + *

+ * The scenarios cover the {@code Updater} paths that matter for accounting (coverage verified with JaCoCo): many tiny + * disjoint inserts; a large contiguous run inserted in one update (forcing a node to overflow more than once before + * draining); re-inserting existing keys (exercising the merge path); and a height-4 tree. + */ +public class BTreeUpdateHeapAccountingTest +{ + private static final Logger logger = LoggerFactory.getLogger(BTreeUpdateHeapAccountingTest.class); + + private static final Comparator CMP = Integer::compare; + + private enum Scenario { SMALL_INSERTS, BLOCK_INSERTS, OVERLAPPING, DEEP } + + /** + * A non-simple (i.e. not {@link UpdateFunction.Simple}) update function, so that {@code BTree.update} actually + * performs heap accounting. It is an identity merge over the keys, so the only heap it reports is that of the + * BTree node arrays themselves -- exactly what {@code sizeOnHeapOf} measures. It also counts merge invocations + * so the test can assert the overlapping scenario really exercised the merge path. + */ + private static final class AccountingUpdateFunction implements UpdateFunction + { + long reported = 0; + long merges = 0; + + @Override + public Integer insert(Integer insert) + { + return insert; + } + + @Override + public Integer merge(Integer replacing, Integer update) + { + merges++; + return update; + } + + @Override + public void onAllocatedOnHeap(long heapSize) + { + reported += heapSize; + } + } + + @Test + public void reportedAllocationShouldMatchHeapSizeOfResult() + { + Trial[] trials = { + runTrial(Scenario.SMALL_INSERTS, 1L, 3000, 3), + runTrial(Scenario.SMALL_INSERTS, 42L, 2500, 100), + runTrial(Scenario.BLOCK_INSERTS, 7L, 12000, 250), + runTrial(Scenario.OVERLAPPING, 99L, 4000, 40), + runTrial(Scenario.DEEP, 12345L, 40000, 200), + }; + + List failures = new ArrayList<>(); + for (Trial r : trials) + { + logger.info(String.format("%-13s seed=%-6d keys=%-6d reported=%-10d sizeOnHeapOf=%-9d gap=%-10d [over=+%d/%du, under=-%d/%du] merges=%d height=%d", + r.scenario, r.seed, r.distinctKeys, r.reported, r.actualHeap, r.reported - r.actualHeap, + r.sumOver, r.updatesOver, r.sumUnder, r.updatesUnder, r.merges, r.height)); + if (r.reported != r.actualHeap) + failures.add(String.format("%s (seed=%d): reported=%d but sizeOnHeapOf(result)=%d (net gap=%d; over=+%d, under=-%d; merges=%d)", + r.scenario, r.seed, r.reported, r.actualHeap, r.reported - r.actualHeap, + r.sumOver, r.sumUnder, r.merges)); + } + + assertTrue("BTree.update's onAllocatedOnHeap total does not match BTree.sizeOnHeapOf(result):\n " + + String.join("\n ", failures), + failures.isEmpty()); + } + + private static Trial runTrial(Scenario scenario, long seed, int numKeys, int maxBatch) + { + Random random = new Random(seed); + + // distinct keys inserted in random order, in random-sized batches; depending on the scenario some batches + // are large contiguous runs (to force multi-overflow) and some re-insert existing keys (to force merge). + List order = new ArrayList<>(numKeys); + for (int i = 0; i < numKeys; i++) + order.add(i); + Collections.shuffle(order, random); + + AccountingUpdateFunction fn = new AccountingUpdateFunction(); + Object[] tree = BTree.empty(); + // reference set of the keys the tree should contain, used for the end-of-trial sanity check + TreeSet model = new TreeSet<>(); + + Trial trial = new Trial(); + int pos = 0; // number of distinct keys consumed from order[] so far, i.e. order[0..pos) are in the tree + boolean blockDone = false; + // run until every distinct key has been inserted; OVERLAPPING additionally keeps going until the merge path + // has been exercised at least 50 times (re-inserts only start producing merges once keys are in the tree) + while (pos < numKeys || (scenario == Scenario.OVERLAPPING && fn.merges < 50)) + { + TreeSet batchSet = new TreeSet<>(CMP); + + int block = 4000; // large enough to overflow not just a leaf but a whole branch >1x in a single update + if (scenario == Scenario.BLOCK_INSERTS && !blockDone && pos > maxBatch && pos < numKeys - block) + { + // a single update inserting a long contiguous run between two existing keys -- this overflows the + // same leaf, and its parent branch, more than once within one update. + for (int i = 0; i < block && pos < numKeys; i++) + batchSet.add(order.get(pos++)); + blockDone = true; + } + else + { + int batch = 1 + random.nextInt(maxBatch); + for (int i = 0; i < batch; i++) + { + // OVERLAPPING re-inserts an already-inserted key (one from order[0..pos)) to drive the merge + // path: roughly one entry in three while fresh keys remain, and every entry once they run out + boolean reInsert = scenario == Scenario.OVERLAPPING && pos > 0 + && (pos >= numKeys || random.nextInt(3) == 0); + if (reInsert) + batchSet.add(order.get(random.nextInt(pos))); + else if (pos < numKeys) + batchSet.add(order.get(pos++)); + } + if (batchSet.isEmpty() && pos < numKeys) + batchSet.add(order.get(pos++)); + } + if (batchSet.isEmpty()) + break; + + Integer[] keys = batchSet.toArray(new Integer[0]); // already sorted (TreeSet w/ CMP) + Object[] insert = BTree.build(BulkIterator.of(keys), keys.length, UpdateFunction.noOp()); + + // per-update contract: reported delta == sizeOnHeapOf(after) - sizeOnHeapOf(before) + long heapBefore = BTree.sizeOnHeapOf(tree); + long reportedBefore = fn.reported; + tree = BTree.update(tree, insert, CMP, fn); + long err = (fn.reported - reportedBefore) - (BTree.sizeOnHeapOf(tree) - heapBefore); + if (err > 0) { trial.updatesOver++; trial.sumOver += err; } + else if (err < 0) { trial.updatesUnder++; trial.sumUnder += -err; } + + model.addAll(batchSet); + } + + // sanity: the tree must contain exactly the model, in order + assertEquals(model.size(), BTree.size(tree)); + Integer prev = null; + int n = 0; + for (Integer key : BTree.iterable(tree)) + { + if (prev != null) + assertTrue("tree not sorted", key > prev); + assertTrue("unexpected key " + key, model.contains(key)); + prev = key; + n++; + } + assertEquals(model.size(), n); + + trial.scenario = scenario; + trial.seed = seed; + trial.distinctKeys = model.size(); + trial.reported = fn.reported; + trial.merges = fn.merges; + trial.actualHeap = BTree.sizeOnHeapOf(tree); + trial.height = BTree.depth(tree); + return trial; + } + + private static final class Trial + { + Scenario scenario; + long seed; + int distinctKeys; + long reported; + long merges; + long actualHeap; + int height; + int updatesOver; + long sumOver; + int updatesUnder; + long sumUnder; + } + + /** + * Verifies that re-inserting every key that already exists reports a net heap delta of exactly zero, + * and that {@code onAllocatedOnHeap} is actually invoked (not silently skipped by an {@code allocated > 0} + * guard). The call must happen so that callers accumulating signed deltas see the confirmation, and so + * that a negative allocated value cannot permanently disable accounting by crossing the -1 sentinel. + */ + @Test + public void netZeroDeltaOnAllocatedOnHeapIsInvoked() + { + int numKeys = BTree.MAX_KEYS * BTree.MAX_KEYS; + Integer[] keys = new Integer[numKeys]; + for (int i = 0; i < numKeys; i++) + keys[i] = i; + + Object[] tree = BTree.build(BulkIterator.of(keys), numKeys, UpdateFunction.noOp()); + Object[] insert = BTree.build(BulkIterator.of(keys), numKeys, UpdateFunction.noOp()); + + // Use a counting variant that records invocation count separately from the accumulated total + final long[] invocations = { 0 }; + final long[] total = { 0 }; + + UpdateFunction fn = new UpdateFunction<>() + { + @Override + public Integer insert(Integer i) + { + return i; + } + + @Override + public Integer merge(Integer existing, Integer update) + { + return update; + } + + @Override + public void onAllocatedOnHeap(long delta) + { + invocations[0]++; + total[0] += delta; + } + }; + + Object[] result = BTree.update(tree, insert, CMP, fn); + + assertSame("full re-insert of identical keys must reuse the existing tree root", tree, result); + + assertEquals("reported net delta must be zero for a full re-insert of identical keys", + 0L, total[0]); + + assertTrue("onAllocatedOnHeap must be invoked even for a net-zero update; " + + "skipping it conflates 'tracking disabled' (-1 sentinel) with 'net zero result' (0)", + invocations[0] >= 1); + } +}