diff --git a/src/java/org/apache/cassandra/concurrent/CassandraThread.java b/src/java/org/apache/cassandra/concurrent/CassandraThread.java index 5afec8815b5..c4047ad2d02 100644 --- a/src/java/org/apache/cassandra/concurrent/CassandraThread.java +++ b/src/java/org/apache/cassandra/concurrent/CassandraThread.java @@ -27,19 +27,33 @@ public class CassandraThread extends FastThreadLocalThread private ThreadLocalMetrics threadLocalMetrics; private ExecutorLocals executorLocals; - public CassandraThread(ThreadGroup group, Runnable target, String name) + private final ImmediateTaskHolder immediateTaskHolder; + + public CassandraThread(ThreadGroup group, Runnable target, String name, ImmediateTaskHolder immediateTaskHolder) { super(group, target, name); + this.immediateTaskHolder = immediateTaskHolder; + } + public CassandraThread(ThreadGroup group, Runnable target, String name) + { + this(group, target, name, null); } public CassandraThread() { super(); + this.immediateTaskHolder = null; } public CassandraThread(Runnable target) { super(target); + this.immediateTaskHolder = null; + } + + public ImmediateTaskHolder getImmediateTaskHolder() + { + return immediateTaskHolder == null ? ImmediateTaskHolder.NO_OP : immediateTaskHolder; } public ThreadLocalMetrics getThreadLocalMetrics() diff --git a/src/java/org/apache/cassandra/concurrent/ImmediateTaskHolder.java b/src/java/org/apache/cassandra/concurrent/ImmediateTaskHolder.java new file mode 100644 index 00000000000..9999d8d1c90 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/ImmediateTaskHolder.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +public interface ImmediateTaskHolder +{ + void setImmediateTask(Runnable currentTask); + + class NoOp implements ImmediateTaskHolder + { + + @Override + public void setImmediateTask(Runnable currentTask) + { + // nothing to do + } + } + + NoOp NO_OP = new NoOp(); +} diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java index ccce797be74..6a072325f88 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java @@ -211,6 +211,8 @@ public void maybeExecuteImmediately(Runnable task) } else { + ImmediateTaskHolder taskHolder = getNestedCurrentTaskHolder(); + taskHolder.setImmediateTask(task); try { task.run(); @@ -222,10 +224,22 @@ public void maybeExecuteImmediately(Runnable task) // in this case in particular we are not processing the rest of the queue anyway, and so // the work permit may go wasted if we don't immediately attempt to spawn another worker maybeSchedule(); + // nesting with depth > 1 is not supported + taskHolder.setImmediateTask(null); } } } + private static ImmediateTaskHolder getNestedCurrentTaskHolder() + { + Thread currentThread = Thread.currentThread(); + if (currentThread instanceof CassandraThread) + { + return ((CassandraThread) currentThread).getImmediateTaskHolder(); + } + return ImmediateTaskHolder.NO_OP; + } + @Override public void execute(Runnable run) { diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java index 8eb34303e27..b352c889caa 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java +++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java @@ -51,6 +51,32 @@ final class SEPWorker extends AtomicReference implements Runnabl private final AtomicReference currentTask = new AtomicReference<>(); + private class ImmediateDebuggableTaskRunner implements DebuggableTask.DebuggableTaskRunner, ImmediateTaskHolder + { + private final AtomicReference immediateCurrentTask = new AtomicReference<>(); + + @Override + public DebuggableTask running() + { + return getDebuggableTask(immediateCurrentTask.get()); + } + + @Override + public String id() + { + // derive from the current thread name so the nested row tracks renames and correlates with the worker's main row + return thread.getName() + "(immediate)"; + } + + @Override + public void setImmediateTask(Runnable currentTask) + { + immediateCurrentTask.lazySet(currentTask); + } + } + + private final ImmediateDebuggableTaskRunner immediateDebuggableTaskRunner; + private String lastUsedExecutorName; SEPWorker(ThreadGroup threadGroup, Long workerId, Work initialState, SharedExecutorPool pool) @@ -58,18 +84,27 @@ final class SEPWorker extends AtomicReference implements Runnabl this.pool = pool; this.workerId = workerId; this.workerIdThreadSuffix = '-' + workerId.toString(); - thread = new CassandraThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId); + String threadName = threadGroup.getName() + "-Worker-" + workerId; + this.immediateDebuggableTaskRunner = new ImmediateDebuggableTaskRunner(); + thread = new CassandraThread(threadGroup, this, threadName, immediateDebuggableTaskRunner); thread.setDaemon(true); set(initialState); thread.start(); } + public DebuggableTask.DebuggableTaskRunner immediateRunner() + { + return immediateDebuggableTaskRunner; + } + @Override public DebuggableTask running() { - // can change after null check so go off local reference - Runnable task = currentTask.get(); + return getDebuggableTask(currentTask.get()); + } + private static DebuggableTask getDebuggableTask(Runnable task) + { // Local read and mutation Runnables are themselves debuggable if (task instanceof DebuggableTask) return (DebuggableTask) task; diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java index 3ad848d325f..4b2075158d3 100644 --- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java +++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java @@ -123,7 +123,8 @@ void workerEnded(SEPWorker worker) public Stream workers() { - return allWorkers.stream(); + return Stream.concat(allWorkers.stream(), + allWorkers.stream().map(SEPWorker::immediateRunner)); } void maybeStartSpinningWorker() diff --git a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java index 934987076a7..7b3a1eb2bf3 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java @@ -121,13 +121,13 @@ private static void assertReadsAndWritesVisible() String task = row.get("task").toString(); boolean localReaderThread = threadId.contains("Read") || threadId.contains("SharedPool-Worker"); - readVisible |= localReaderThread && task.contains("SELECT"); + readVisible |= localReaderThread && task.contains("SELECT") && !task.contains("QUERY"); boolean coordReaderThread = threadId.contains("Native-Transport-Requests") || threadId.contains("SharedPool-Worker"); - coordinatorReadVisible |= coordReaderThread && task.contains("SELECT"); + coordinatorReadVisible |= coordReaderThread && task.contains("SELECT") && task.contains("QUERY"); boolean localWriterThread = threadId.contains("Mutation") || threadId.contains("SharedPool-Worker"); - writeVisible |= localWriterThread && task.contains("Mutation"); + writeVisible |= localWriterThread && task.contains("Mutation") && !task.contains("QUERY"); boolean coordWriterThread = threadId.contains("Native-Transport-Requests") || threadId.contains("SharedPool-Worker"); - coordinatorWriteVisible |= coordWriterThread && task.contains("INSERT"); + coordinatorWriteVisible |= coordWriterThread && task.contains("INSERT") && task.contains("QUERY"); } assertTrue(readVisible); @@ -169,9 +169,9 @@ private static void assertCasVisible() String task = row.get("task").toString(); boolean localReaderThread = threadId.contains("Read") || threadId.contains("SharedPool-Worker"); - readVisible |= localReaderThread && task.contains("SELECT"); + readVisible |= localReaderThread && task.contains("SELECT") && !task.contains("QUERY"); boolean coordUpdateThread = threadId.contains("Native-Transport-Requests") || threadId.contains("SharedPool-Worker"); - coordinatorUpdateVisible |= coordUpdateThread && task.contains("UPDATE"); + coordinatorUpdateVisible |= coordUpdateThread && task.contains("UPDATE") && task.contains("QUERY"); } assertTrue(readVisible);