Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public long unsharedHeapSize()
if(this == NONE)
return 0;

return EMPTY_SIZE + BTree.sizeOfStructureOnHeap(columns);
return EMPTY_SIZE + BTree.sizeOnHeapOf(columns);
}

@Override
Expand Down
16 changes: 13 additions & 3 deletions src/java/org/apache/cassandra/db/rows/BTreeRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ public long unsharedHeapSize()
+ clustering.unsharedHeapSize()
+ primaryKeyLivenessInfo.unsharedHeapSize()
+ deletion.unsharedHeapSize()
+ BTree.sizeOfStructureOnHeap(btree);
+ BTree.sizeOnHeapOf(btree);

return accumulate((cd, v) -> v + cd.unsharedHeapSize(), heapSize);
}
Expand All @@ -581,7 +581,7 @@ public long unsharedHeapSizeExcludingData()
+ clustering.unsharedHeapSizeExcludingData()
+ primaryKeyLivenessInfo.unsharedHeapSize()
+ deletion.unsharedHeapSize()
+ BTree.sizeOfStructureOnHeap(btree);
+ BTree.sizeOnHeapOf(btree);

return accumulate((cd, v) -> v + cd.unsharedHeapSizeExcludingData(), heapSize);
}
Expand Down Expand Up @@ -660,10 +660,20 @@ else if (rowDeletion.isShadowedBy(livenessInfo))
{
// The update's deletion shadows part of the existing row. Those cells ARE owned by
// the memtable, so record their removal via retain().
existingBtree = BTree.transformAndFilter(existingBtree, reconciler::retain);
Object[] retained = BTree.transformAndFilter(existingBtree, reconciler::retain);
if (existingBtree != retained)
{
reconcileF.onAllocatedOnHeap(BTree.sizeOnHeapOf(retained) - BTree.sizeOnHeapOf(existingBtree));
existingBtree = retained;
}
}
}
Object[] tree = BTree.update(existingBtree, updateBtree, ColumnData.comparator, reconciler);
// BTree.update and the reconciler only account the column data (cells and column-tree nodes); the row's
// own LivenessInfo/Deletion are not. When they change (e.g. a row tombstone supersedes a live row) the
// new objects become memtable-owned and the old ones are released, so account that delta here.
reconcileF.onAllocatedOnHeap((livenessInfo.unsharedHeapSize() + rowDeletion.unsharedHeapSize())
- (existing.primaryKeyLivenessInfo().unsharedHeapSize() + existing.deletion().unsharedHeapSize()));
return new BTreeRow(existing.clustering, livenessInfo, rowDeletion, tree, minDeletionTime(tree, livenessInfo, deletion));
}
}
Expand Down
25 changes: 23 additions & 2 deletions src/java/org/apache/cassandra/db/rows/ColumnData.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public ColumnData merge(ColumnData existing, ColumnData update)
}
cells = BTree.update(existingTree, updateTree, existingComplex.column.cellComparator(), (UpdateFunction) reconciler);
}
onAllocatedOnHeap(maxComplexDeletion.unsharedHeapSize() - existingDeletion.unsharedHeapSize());
return new ComplexColumnData(existingComplex.column, cells, maxComplexDeletion);
}
}
Expand Down Expand Up @@ -217,8 +218,28 @@ private ColumnData removeShadowed(ColumnData existing, PostReconciliationFunctio
ComplexColumnData existingComplex = (ComplexColumnData) existing;
if (activeDeletion.supersedes(existingComplex.complexDeletion()))
{
Object[] cells = BTree.transformAndFilter(existingComplex.tree(), (ColumnData cd) -> removeShadowed(cd, recordDeletion));
return BTree.isEmpty(cells) ? null : new ComplexColumnData(existingComplex.column, cells, DeletionTime.LIVE);
Object[] existingTree = existingComplex.tree();
Object[] cells = BTree.transformAndFilter(existingTree, (ColumnData cd) -> removeShadowed(cd, recordDeletion));
ComplexColumnData result = BTree.isEmpty(cells) ? null
: new ComplexColumnData(existingComplex.column, cells, DeletionTime.LIVE);
// The shadowed inner cells are released through recordDeletion.delete above, but that does not cover
// the complex column's own structure: its cell tree (which can span multiple BTree nodes), its
// complex deletion, and, when the column is dropped entirely, its wrapper. All were counted as
// owned when the column was first written (ComplexColumnData.unsharedHeapSizeExcludingData), so release that
// delta here. The rewritten column carries DeletionTime.LIVE, so the dropped complex
// deletion's heap is released too, matching the swap Reconciler.merge accounts on its path.
// On the update side (recordDeletion == noOp) this is a no-op, so skip it entirely.
if (recordDeletion != ColumnData.noOp)
{
long structureBefore = ComplexColumnData.EMPTY_SIZE
+ existingComplex.complexDeletion().unsharedHeapSize()
+ BTree.sizeOnHeapOf(existingTree);
long structureAfter = result == null ? 0 : ComplexColumnData.EMPTY_SIZE
+ DeletionTime.LIVE.unsharedHeapSize()
+ BTree.sizeOnHeapOf(cells);
recordDeletion.onAllocatedOnHeap(structureAfter - structureBefore);
}
return result;
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell<?>>
{
static final Cell<?>[] NO_CELLS = new Cell<?>[0];

private static final long EMPTY_SIZE = ObjectSizes.measure(new ComplexColumnData(ColumnMetadata.regularColumn("",
"",
"",
SetType.getInstance(ByteType.instance, true),
ColumnMetadata.NO_UNIQUE_ID),
static final long EMPTY_SIZE = ObjectSizes.measure(new ComplexColumnData(ColumnMetadata.regularColumn("",
"",
"",
SetType.getInstance(ByteType.instance, true),
ColumnMetadata.NO_UNIQUE_ID),
NO_CELLS,
DeletionTime.build(0, 0)));

Expand Down Expand Up @@ -151,7 +151,7 @@ public long unsharedHeapSize()
@Override
public long unsharedHeapSizeExcludingData()
{
long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells);
long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells) + complexDeletion.unsharedHeapSize();
// TODO: this can be turned into a simple multiplication, at least while we have only one Cell implementation
for (Cell<?> cell : this)
heapSize += cell.unsharedHeapSizeExcludingData();
Expand Down
100 changes: 69 additions & 31 deletions src/java/org/apache/cassandra/utils/btree/BTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class BTree
public static final int MIN_KEYS = BRANCH_FACTOR / 2 - 1;
public static final int MAX_KEYS = BRANCH_FACTOR - 1;
public static final long STOP_SENTINEL_VALUE = Long.MAX_VALUE;
private static final long MAX_KEYS_REF_ARRAY_SIZE = ObjectSizes.sizeOfReferenceArray(MAX_KEYS);

// An empty BTree Leaf - which is the same as an empty BTree
private static final Object[] EMPTY_LEAF = new Object[1];
Expand Down Expand Up @@ -979,19 +980,6 @@ public static int size(Object[] tree)
return sizeMap(tree)[(length / 2) - 1];
}

public static long sizeOfStructureOnHeap(Object[] tree)
{
if (tree == EMPTY_LEAF)
return 0;

long size = ObjectSizes.sizeOfArray(tree);
if (isLeaf(tree))
return size;
for (int i = getChildStart(tree); i < getChildEnd(tree); i++)
size += sizeOfStructureOnHeap((Object[]) tree[i]);
return size;
}

/**
* Checks is the node is a leaf.
*
Expand Down Expand Up @@ -2331,6 +2319,23 @@ private static long sizeOnHeapOfLeaf(Object[] tree)
return ObjectSizes.sizeOfArray(tree);
}

/**
* The heap occupied by a single node (its backing array, plus the sizeMap for a branch), <em>not</em> including
* its children. This is the per-node contribution to {@link #sizeOnHeapOf(Object[])}; the builders use it to
* keep their running {@code allocated} total net: every newly retained node adds its shallow heap and every
* source node it replaces subtracts its shallow heap, so reused subtrees cancel.
*/
private static long shallowHeapOf(Object[] node)
{
if (isEmpty(node))
return 0;

long size = ObjectSizes.sizeOfArray(node);
if (!isLeaf(node))
size += ObjectSizes.sizeOfArray(sizeMap(node));
return size;
}

// Arbitrary boundaries
private static Object POSITIVE_INFINITY = new Object();
private static Object NEGATIVE_INFINITY = new Object();
Expand Down Expand Up @@ -2646,6 +2651,7 @@ static boolean areIdentical(int[] a, int aOffset, int[] b, int bOffset, int coun
*/
private static abstract class LeafBuilder extends LeafOrBranchBuilder
{
static final long DISABLED = Long.MIN_VALUE;
long allocated;

LeafBuilder()
Expand All @@ -2654,6 +2660,13 @@ private static abstract class LeafBuilder extends LeafOrBranchBuilder
buffer = new Object[MAX_KEYS];
}

/** Adjust the running heap total, when accounting is enabled */
final void addAllocated(long bytes)
{
if (allocated != DISABLED)
allocated += bytes;
}

/**
* Add {@code nextKey} to the buffer, overflowing if necessary
*/
Expand Down Expand Up @@ -2830,8 +2843,11 @@ Object[] redistributeAndDrain(Object[] pred, int predSize, Object predNextKey)
int newPredecessorCount = predSize - steal;
Object[] newPredecessor = new Object[newPredecessorCount | 1];
System.arraycopy(pred, 0, newPredecessor, 0, newPredecessorCount);
if (allocated >= 0)
allocated += ObjectSizes.sizeOfReferenceArray(newPredecessorCount | 1);
// we replace the single source leaf with two new leaves (newPredecessor + newLeaf); account both and
// release the source we consumed, so the running total stays a net delta
addAllocated(ObjectSizes.sizeOfArray(newPredecessor)
+ ObjectSizes.sizeOfArray(newLeaf)
- (sourceNode == null ? 0 : sizeOnHeapOfLeaf(sourceNode)));
ensureParent().addChildAndNextKey(newPredecessor, newPredecessorCount, pred[newPredecessorCount]);
return newLeaf;
}
Expand Down Expand Up @@ -2883,8 +2899,7 @@ void propagateOverflow()
{
// propagate the leaf we have saved in savedBuffer
// precondition: savedLeafCount == MAX_KEYS
if (allocated >= 0)
allocated += ObjectSizes.sizeOfReferenceArray(MAX_KEYS);
addAllocated(MAX_KEYS_REF_ARRAY_SIZE);
ensureParent().addChildAndNextKey(savedBuffer, MAX_KEYS, savedNextKey);
savedBuffer = null;
savedNextKey = null;
Expand Down Expand Up @@ -2921,9 +2936,7 @@ else if (!hasOverflow() && sourceNode != null && count == sizeOfLeaf(sourceNode)
propagateOverflow();

sizeOfLeaf = count;
leaf = drain();
if (allocated >= 0 && sizeOfLeaf > 0)
allocated += ObjectSizes.sizeOfReferenceArray(sizeOfLeaf | 1) - (sourceNode == null ? 0 : sizeOnHeapOfLeaf(sourceNode));
leaf = drain(); // drain() accounts the new leaf and releases the source it replaces
}

count = 0;
Expand All @@ -2941,19 +2954,27 @@ Object[] drain()
// the number of children here may be smaller than MIN_KEYS if this is the root node
assert !hasOverflow();
if (count == 0)
{
// defensive: unreachable from BTree.update
if (sourceNode != null)
addAllocated(-sizeOnHeapOfLeaf(sourceNode));
clearSourceNode();
return empty();
}

Object[] newLeaf;
if (sourceNode != null
&& count == sizeOfLeaf(sourceNode)
&& areIdentical(buffer, 0, sourceNode, 0, count))
{
newLeaf = sourceNode;
newLeaf = sourceNode; // reused unchanged: neither allocated nor released
}
else
{
newLeaf = new Object[count | 1];
System.arraycopy(buffer, 0, newLeaf, 0, count);
// account the new leaf and release the source it replaces, before clearSourceNode() drops it
addAllocated(ObjectSizes.sizeOfArray(newLeaf) - (sourceNode == null ? 0 : sizeOnHeapOfLeaf(sourceNode)));
}
count = 0;
clearSourceNode();
Expand Down Expand Up @@ -3054,10 +3075,10 @@ final void addChildAndNextKey(Object[] newChild, int newChildSize, Object nextKe
*/
final void propagateOverflow()
{
// propagate the leaf we have saved in leaf().savedBuffer
if (leaf.allocated >= 0)
leaf.allocated += ObjectSizes.sizeOfReferenceArray(2 * (1 + MAX_KEYS));
// propagate the branch we have saved in savedBuffer; it is a brand new node, so account it in full
// (array + sizeMap) with nothing to release
int size = setOverflowSizeMap(savedBuffer, MAX_KEYS);
leaf.addAllocated(shallowHeapOf(savedBuffer));
ensureParent().addChildAndNextKey(savedBuffer, size, savedNextKey);
savedBuffer = null;
savedNextKey = null;
Expand Down Expand Up @@ -3114,8 +3135,11 @@ Object[] redistributeOverflowAndDrain()
System.arraycopy(savedBuffer, 0, savedBranch, 0, savedBranchCount);
System.arraycopy(savedBuffer, MAX_KEYS, savedBranch, savedBranchCount, savedBranchCount + 1);
int savedBranchSize = setOverflowSizeMap(savedBranch, savedBranchCount);
if (leaf.allocated >= 0)
leaf.allocated += ObjectSizes.sizeOfReferenceArray(2 * (1 + savedBranchCount));
// we replace the single source branch with two new branches (savedBranch + newBranch); account both
// (array + sizeMap) and release the source we consumed
leaf.addAllocated(shallowHeapOf(savedBranch)
+ shallowHeapOf(newBranch)
- (sourceNode == null ? 0 : shallowHeapOf(sourceNode)));
ensureParent().addChildAndNextKey(savedBranch, savedBranchSize, savedBuffer[savedBranchCount]);
savedNextKey = null;

Expand Down Expand Up @@ -3224,6 +3248,8 @@ && areIdentical(buffer, MAX_KEYS, unode, usz, usz + 1))
System.arraycopy(buffer, 0, branch, 0, count);
System.arraycopy(buffer, MAX_KEYS, branch, count, count + 1);
sizeOfBranch = setDrainSizeMap(unode, usz, branch, count);
// account the new branch (array + sizeMap) and release the source branch it replaces
leaf.addAllocated(shallowHeapOf(branch) - (unode == null ? 0 : shallowHeapOf(unode)));
}
}

Expand All @@ -3245,6 +3271,9 @@ Object[] drain()
assert !hasOverflow();
if (count == 0)
{
// defensive: unreachable from BTree.update
if (sourceNode != null)
leaf.addAllocated(-shallowHeapOf(sourceNode));
clearSourceNode();
hasRightChild = false;
return (Object[]) buffer[MAX_KEYS];
Expand All @@ -3256,7 +3285,7 @@ Object[] drain()
&& areIdentical(buffer, 0, sourceNode, 0, count)
&& areIdentical(buffer, MAX_KEYS, sourceNode, count, count + 1))
{
branch = sourceNode;
branch = sourceNode; // reused unchanged: neither allocated nor released
}
else
{
Expand All @@ -3273,6 +3302,9 @@ && areIdentical(buffer, MAX_KEYS, sourceNode, count, count + 1))
System.arraycopy(buffer, MAX_KEYS, branch, count, count + 1);
}
setDrainSizeMap(null, -1, branch, count);
// account the new branch (array + sizeMap) and release the source branch it replaces,
// before clearSourceNode() drops it
leaf.addAllocated(shallowHeapOf(branch) - (sourceNode == null ? 0 : shallowHeapOf(sourceNode)));
}

count = 0;
Expand Down Expand Up @@ -3506,6 +3538,7 @@ void reset()
branch.inUse = false;
branch = branch.parent;
}
leaf().clearSourceNode();
Invariants.require(branch == null || (branch.count == 0 && !branch.hasRightChild));
}

Expand Down Expand Up @@ -3554,7 +3587,7 @@ public static class FastBuilder<V> extends AbstractFastBuilder implements AutoCl

FastBuilder()
{
allocated = -1;
allocated = DISABLED;
} // disable allocation tracking

public void add(V value)
Expand Down Expand Up @@ -3667,7 +3700,7 @@ Object[] update(Object[] update, Object[] insert, Comparator<? super Compare> co
this.insert.init(insert);
this.updateF = updateF;
this.comparator = comparator;
this.allocated = isSimple(updateF) ? -1 : 0;
this.allocated = isSimple(updateF) ? DISABLED : 0;
int leafDepth = BTree.depth(update) - 1;
LeafOrBranchBuilder builder = leaf();
Invariants.require(builder.isEmpty());
Expand All @@ -3678,12 +3711,17 @@ Object[] update(Object[] update, Object[] insert, Comparator<? super Compare> co
builder = branch;
}

// record the root as the top builder's source, so that when the root node is rebuilt its old version is
// released from the running heap total (and an unchanged root can be reused), mirroring the per-node
// accounting performed for every child level
builder.setSourceNode(update);

Insert ik = this.insert.next();
ik = updateRecursive(ik, update, null, builder);
assert ik == null;
Object[] result = builder.completeBuild();

if (allocated > 0)
if (allocated != DISABLED)
updateF.onAllocatedOnHeap(allocated);

return result;
Expand Down Expand Up @@ -3864,7 +3902,7 @@ static abstract class AbstractSeekingTransformer<I, O> extends AbstractFastBuild

AbstractSeekingTransformer()
{
allocated = -1;
allocated = DISABLED;
ensureParent();
parent.inUse = false;
}
Expand Down
Loading