From 64d054cac10e218b22e73256809dcd84bdaff8e5 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 25 Jun 2026 01:04:12 +0100 Subject: [PATCH] Expose immediately-executed tasks in the queries virtual table SEPExecutor.maybeExecuteImmediately() runs a task synchronously on the calling worker thread, nested within the task the worker is already running. Such immediate tasks were invisible in system_views.queries, which only exposed each worker's primary running task. This is common on the coordinator path, where a local read or mutation is executed immediately within the enclosing QUERY task. Each SEPWorker now also tracks an immediate current task, set around maybeExecuteImmediately(), and exposes it as an additional DebuggableTaskRunner, so the queries table reports both the enclosing task and the immediate one as separate rows. patch by Dmitry Konstantinov; reviewed by TBD for CASSANDRA-21471 --- .../cassandra/concurrent/CassandraThread.java | 16 +++++++- .../concurrent/ImmediateTaskHolder.java | 36 ++++++++++++++++ .../cassandra/concurrent/SEPExecutor.java | 14 +++++++ .../cassandra/concurrent/SEPWorker.java | 41 +++++++++++++++++-- .../concurrent/SharedExecutorPool.java | 3 +- .../distributed/test/QueriesTableTest.java | 12 +++--- 6 files changed, 111 insertions(+), 11 deletions(-) create mode 100644 src/java/org/apache/cassandra/concurrent/ImmediateTaskHolder.java diff --git a/src/java/org/apache/cassandra/concurrent/CassandraThread.java b/src/java/org/apache/cassandra/concurrent/CassandraThread.java index 5afec8815b5..c4047ad2d02 100644 --- a/src/java/org/apache/cassandra/concurrent/CassandraThread.java +++ b/src/java/org/apache/cassandra/concurrent/CassandraThread.java @@ -27,19 +27,33 @@ public class CassandraThread extends FastThreadLocalThread private ThreadLocalMetrics threadLocalMetrics; private ExecutorLocals executorLocals; - public CassandraThread(ThreadGroup group, Runnable target, String name) + private final ImmediateTaskHolder immediateTaskHolder; + + public CassandraThread(ThreadGroup group, Runnable target, String name, ImmediateTaskHolder immediateTaskHolder) { super(group, target, name); + this.immediateTaskHolder = immediateTaskHolder; + } + public CassandraThread(ThreadGroup group, Runnable target, String name) + { + this(group, target, name, null); } public CassandraThread() { super(); + this.immediateTaskHolder = null; } public CassandraThread(Runnable target) { super(target); + this.immediateTaskHolder = null; + } + + public ImmediateTaskHolder getImmediateTaskHolder() + { + return immediateTaskHolder == null ? ImmediateTaskHolder.NO_OP : immediateTaskHolder; } public ThreadLocalMetrics getThreadLocalMetrics() diff --git a/src/java/org/apache/cassandra/concurrent/ImmediateTaskHolder.java b/src/java/org/apache/cassandra/concurrent/ImmediateTaskHolder.java new file mode 100644 index 00000000000..9999d8d1c90 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/ImmediateTaskHolder.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +public interface ImmediateTaskHolder +{ + void setImmediateTask(Runnable currentTask); + + class NoOp implements ImmediateTaskHolder + { + + @Override + public void setImmediateTask(Runnable currentTask) + { + // nothing to do + } + } + + NoOp NO_OP = new NoOp(); +} diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java index ccce797be74..6a072325f88 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java @@ -211,6 +211,8 @@ public void maybeExecuteImmediately(Runnable task) } else { + ImmediateTaskHolder taskHolder = getNestedCurrentTaskHolder(); + taskHolder.setImmediateTask(task); try { task.run(); @@ -222,10 +224,22 @@ public void maybeExecuteImmediately(Runnable task) // in this case in particular we are not processing the rest of the queue anyway, and so // the work permit may go wasted if we don't immediately attempt to spawn another worker maybeSchedule(); + // nesting with depth > 1 is not supported + taskHolder.setImmediateTask(null); } } } + private static ImmediateTaskHolder getNestedCurrentTaskHolder() + { + Thread currentThread = Thread.currentThread(); + if (currentThread instanceof CassandraThread) + { + return ((CassandraThread) currentThread).getImmediateTaskHolder(); + } + return ImmediateTaskHolder.NO_OP; + } + @Override public void execute(Runnable run) { diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java index 8eb34303e27..b352c889caa 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java +++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java @@ -51,6 +51,32 @@ final class SEPWorker extends AtomicReference implements Runnabl private final AtomicReference currentTask = new AtomicReference<>(); + private class ImmediateDebuggableTaskRunner implements DebuggableTask.DebuggableTaskRunner, ImmediateTaskHolder + { + private final AtomicReference immediateCurrentTask = new AtomicReference<>(); + + @Override + public DebuggableTask running() + { + return getDebuggableTask(immediateCurrentTask.get()); + } + + @Override + public String id() + { + // derive from the current thread name so the nested row tracks renames and correlates with the worker's main row + return thread.getName() + "(immediate)"; + } + + @Override + public void setImmediateTask(Runnable currentTask) + { + immediateCurrentTask.lazySet(currentTask); + } + } + + private final ImmediateDebuggableTaskRunner immediateDebuggableTaskRunner; + private String lastUsedExecutorName; SEPWorker(ThreadGroup threadGroup, Long workerId, Work initialState, SharedExecutorPool pool) @@ -58,18 +84,27 @@ final class SEPWorker extends AtomicReference implements Runnabl this.pool = pool; this.workerId = workerId; this.workerIdThreadSuffix = '-' + workerId.toString(); - thread = new CassandraThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId); + String threadName = threadGroup.getName() + "-Worker-" + workerId; + this.immediateDebuggableTaskRunner = new ImmediateDebuggableTaskRunner(); + thread = new CassandraThread(threadGroup, this, threadName, immediateDebuggableTaskRunner); thread.setDaemon(true); set(initialState); thread.start(); } + public DebuggableTask.DebuggableTaskRunner immediateRunner() + { + return immediateDebuggableTaskRunner; + } + @Override public DebuggableTask running() { - // can change after null check so go off local reference - Runnable task = currentTask.get(); + return getDebuggableTask(currentTask.get()); + } + private static DebuggableTask getDebuggableTask(Runnable task) + { // Local read and mutation Runnables are themselves debuggable if (task instanceof DebuggableTask) return (DebuggableTask) task; diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java index 3ad848d325f..4b2075158d3 100644 --- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java +++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java @@ -123,7 +123,8 @@ void workerEnded(SEPWorker worker) public Stream workers() { - return allWorkers.stream(); + return Stream.concat(allWorkers.stream(), + allWorkers.stream().map(SEPWorker::immediateRunner)); } void maybeStartSpinningWorker() diff --git a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java index 934987076a7..7b3a1eb2bf3 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java @@ -121,13 +121,13 @@ private static void assertReadsAndWritesVisible() String task = row.get("task").toString(); boolean localReaderThread = threadId.contains("Read") || threadId.contains("SharedPool-Worker"); - readVisible |= localReaderThread && task.contains("SELECT"); + readVisible |= localReaderThread && task.contains("SELECT") && !task.contains("QUERY"); boolean coordReaderThread = threadId.contains("Native-Transport-Requests") || threadId.contains("SharedPool-Worker"); - coordinatorReadVisible |= coordReaderThread && task.contains("SELECT"); + coordinatorReadVisible |= coordReaderThread && task.contains("SELECT") && task.contains("QUERY"); boolean localWriterThread = threadId.contains("Mutation") || threadId.contains("SharedPool-Worker"); - writeVisible |= localWriterThread && task.contains("Mutation"); + writeVisible |= localWriterThread && task.contains("Mutation") && !task.contains("QUERY"); boolean coordWriterThread = threadId.contains("Native-Transport-Requests") || threadId.contains("SharedPool-Worker"); - coordinatorWriteVisible |= coordWriterThread && task.contains("INSERT"); + coordinatorWriteVisible |= coordWriterThread && task.contains("INSERT") && task.contains("QUERY"); } assertTrue(readVisible); @@ -169,9 +169,9 @@ private static void assertCasVisible() String task = row.get("task").toString(); boolean localReaderThread = threadId.contains("Read") || threadId.contains("SharedPool-Worker"); - readVisible |= localReaderThread && task.contains("SELECT"); + readVisible |= localReaderThread && task.contains("SELECT") && !task.contains("QUERY"); boolean coordUpdateThread = threadId.contains("Native-Transport-Requests") || threadId.contains("SharedPool-Worker"); - coordinatorUpdateVisible |= coordUpdateThread && task.contains("UPDATE"); + coordinatorUpdateVisible |= coordUpdateThread && task.contains("UPDATE") && task.contains("QUERY"); } assertTrue(readVisible);