diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java index feb96c02bdb..ad910283ea4 100644 --- a/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java @@ -41,6 +41,7 @@ import org.apache.cassandra.concurrent.Shutdownable; import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocalThread; import static com.google.common.collect.ImmutableList.of; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; @@ -64,10 +65,8 @@ public class ThreadLocalMetrics static final AtomicInteger idGenerator = new AtomicInteger(); - private static final Object freeMetricIdSetGuard = new Object(); - @VisibleForTesting - static final BitSet freeMetricIdSet = new BitSet(); + static final FreeMetricIdSetTracker freeMetricIdSetTracker = new FreeMetricIdSetTracker(); private static final List allThreadLocalMetrics = new CopyOnWriteArrayList<>(); @@ -101,7 +100,13 @@ public static ThreadLocalMetrics create() { ThreadLocalMetrics result = new ThreadLocalMetrics(); allThreadLocalMetrics.add(result); - destroyWhenUnreachable(Thread.currentThread(), result::release); + + Thread thread = Thread.currentThread(); + // use phantom references ony if needed + // CassandraThread is FastThreadLocalThread too + if (!(thread instanceof FastThreadLocalThread)) + destroyWhenUnreachable(thread, result::release); + return result; } @@ -317,13 +322,7 @@ private static int calculateNewCapacity(int metricId) static int allocateMetricId() { - int metricId; - synchronized (freeMetricIdSetGuard) - { - metricId = freeMetricIdSet.nextSetBit(0); - if (metricId >= 0) - freeMetricIdSet.clear(metricId); - } + int metricId = freeMetricIdSetTracker.getFreeMetricId(); if (metricId < 0) metricId = idGenerator.getAndIncrement(); @@ -374,26 +373,108 @@ static void recycleMetricId(int metricId) lock.unlock(); } - // there's no an obvious happens-before relation between currentCounterValues[metricId] = 0 write we just did - // and an initial read of the entry by a thread which updates the reused metric - // as a workaround we introduce a delay in recyling to provide the write visibility in practice - // even if it is not formally guaranteed by the JMM - ScheduledExecutors.scheduledTasks.schedule(() -> { + freeMetricIdSetTracker.markAsFree(metricId); + } + + @VisibleForTesting + static class FreeMetricIdSetTracker + { + + private final Object freeMetricIdSetGuard = new Object(); + + private final BitSet freeMetricIdSet = new BitSet(); + + private final BitSet tickDelayedToFreeMetricIdSet = new BitSet(); + private final BitSet tockDelayedToFreeMetricIdSet = new BitSet(); + + private BitSet delayedToFreeMetricIdSet = tickDelayedToFreeMetricIdSet; + private volatile boolean anythingToFree; + + FreeMetricIdSetTracker() + { + this(true); + } + + @VisibleForTesting + FreeMetricIdSetTracker(boolean scheduleTask) + { + if (scheduleTask) + ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(this::triggerRecycling, 5, 5, TimeUnit.SECONDS); + } + + @VisibleForTesting + void triggerRecycling() + { + if (anythingToFree) + { + synchronized (freeMetricIdSetGuard) + { + BitSet toProcess = otherSet(delayedToFreeMetricIdSet); + freeMetricIdSet.or(toProcess); + toProcess.clear(); + anythingToFree = !delayedToFreeMetricIdSet.isEmpty(); + delayedToFreeMetricIdSet = toProcess; + } + } + } + + private BitSet otherSet(BitSet set) + { + return set == tickDelayedToFreeMetricIdSet ? tockDelayedToFreeMetricIdSet : tickDelayedToFreeMetricIdSet; + } + + public int getFreeMetricId() + { + synchronized (freeMetricIdSetGuard) + { + int metricId = freeMetricIdSet.nextSetBit(0); + if (metricId >= 0) + freeMetricIdSet.clear(metricId); + return metricId; + } + } + + public void markAsFree(int metricId) + { + // there's no an obvious happens-before relation between currentCounterValues[metricId] = 0 write we just did + // and an initial read of the entry by a thread which updates the reused metric + // as a workaround we introduce a delay in recyling to provide the write visibility in practice + // even if it is not formally guaranteed by the JMM + synchronized (freeMetricIdSetGuard) + { + anythingToFree = true; + delayedToFreeMetricIdSet.set(metricId); + } + } + + public int getFreeMetricSetCardinality() + { synchronized (freeMetricIdSetGuard) { - freeMetricIdSet.set(metricId); + return freeMetricIdSet.cardinality(); } - }, 5, TimeUnit.SECONDS); + } + + @Override + public String toString() + { + synchronized (freeMetricIdSetGuard) + { + return "FreeMetricIdSetTracker{" + + "freeMetricIdSet=" + freeMetricIdSet + + ", tickDelayedToFreeMetricIdSet=" + tickDelayedToFreeMetricIdSet + + ", tockDelayedToFreeMetricIdSet=" + tockDelayedToFreeMetricIdSet + + ", delayedToFreeMetricIdSet=" + (delayedToFreeMetricIdSet == tickDelayedToFreeMetricIdSet ? "tick" : "tock") + + ", anythingToFree=" + anythingToFree + + '}'; + } + } } @VisibleForTesting static int getAllocatedMetricsCount() { - int freeCount; - synchronized (freeMetricIdSetGuard) - { - freeCount = freeMetricIdSet.cardinality(); - } + int freeCount = freeMetricIdSetTracker.getFreeMetricSetCardinality(); return idGenerator.get() - freeCount; } diff --git a/test/unit/org/apache/cassandra/metrics/FreeMetricIdSetTrackerTest.java b/test/unit/org/apache/cassandra/metrics/FreeMetricIdSetTrackerTest.java new file mode 100644 index 00000000000..c315a80f7f8 --- /dev/null +++ b/test/unit/org/apache/cassandra/metrics/FreeMetricIdSetTrackerTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import java.util.HashSet; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +import org.junit.Test; + +import org.apache.cassandra.metrics.ThreadLocalMetrics.FreeMetricIdSetTracker; + +import static org.junit.Assert.assertEquals; + +public class FreeMetricIdSetTrackerTest +{ + // free and release ids in interleaved portions, and verify that a later portion does not + // become reusable before its own two-cycle delay has elapsed - i.e. only the ids that are + // actually due are released, never the freshly freed ones. + @Test + public void testInterleavedFreeAndReleaseReleasesOnlyExpectedIds() + { + FreeMetricIdSetTracker tracker = new FreeMetricIdSetTracker(false); + + // portion A: free, then one release cycle (A is now in the delayed buffer, not yet reusable) + Set portionA = ImmutableSet.of(1, 2, 3); + portionA.forEach(tracker::markAsFree); + tracker.triggerRecycling(); + assertEquals(0, tracker.getFreeMetricSetCardinality()); + + // portion B: free a fresh batch, then another release cycle. + // this cycle is A's second cycle (A becomes reusable) but only B's first (B must stay held). + Set portionB = ImmutableSet.of(10, 11); + portionB.forEach(tracker::markAsFree); + tracker.triggerRecycling(); + + // only portion A is released here - portion B must not leak out yet + assertEquals(portionA.size(), tracker.getFreeMetricSetCardinality()); + assertEquals(portionA, drainFreeIds(tracker)); + + // a further release cycle finally makes portion B reusable, and nothing else + tracker.triggerRecycling(); + assertEquals(portionB.size(), tracker.getFreeMetricSetCardinality()); + assertEquals(portionB, drainFreeIds(tracker)); + + assertEquals(0, tracker.getFreeMetricSetCardinality()); + assertEquals(-1, tracker.getFreeMetricId()); + } + + @Test + public void testTriggerRecyclingIsNoOpWhenNothingFreed() + { + FreeMetricIdSetTracker tracker = new FreeMetricIdSetTracker(false); + + // triggering with an empty tracker must not produce phantom free ids + tracker.triggerRecycling(); + tracker.triggerRecycling(); + + assertEquals(0, tracker.getFreeMetricSetCardinality()); + assertEquals(-1, tracker.getFreeMetricId()); + } + + /** + * Drains every id currently available for reuse out of the tracker. + */ + private static Set drainFreeIds(FreeMetricIdSetTracker tracker) + { + Set ids = new HashSet<>(); + int id; + while ((id = tracker.getFreeMetricId()) >= 0) + ids.add(id); + return ids; + } +} diff --git a/test/unit/org/apache/cassandra/metrics/ThreadLocalCounterTest.java b/test/unit/org/apache/cassandra/metrics/ThreadLocalCounterTest.java index 2597504522c..03a0cb0640f 100644 --- a/test/unit/org/apache/cassandra/metrics/ThreadLocalCounterTest.java +++ b/test/unit/org/apache/cassandra/metrics/ThreadLocalCounterTest.java @@ -90,7 +90,7 @@ public void testLifecycleAndMultipleInstancesCreation() throws InterruptedExcept LOGGER.info("id generator state: {}, free IDs: {}", ThreadLocalMetrics.idGenerator.get(), - ThreadLocalMetrics.freeMetricIdSet); + ThreadLocalMetrics.freeMetricIdSetTracker); LOGGER.info("iteration completed: {} / {}", iteration + 1, ITERATIONS_COUNT); } }