Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/java/org/apache/cassandra/concurrent/CassandraThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,33 @@ public class CassandraThread extends FastThreadLocalThread
private ThreadLocalMetrics threadLocalMetrics;
private ExecutorLocals executorLocals;

public CassandraThread(ThreadGroup group, Runnable target, String name)
private final ImmediateTaskHolder immediateTaskHolder;

public CassandraThread(ThreadGroup group, Runnable target, String name, ImmediateTaskHolder immediateTaskHolder)
{
super(group, target, name);
this.immediateTaskHolder = immediateTaskHolder;
}
public CassandraThread(ThreadGroup group, Runnable target, String name)
{
this(group, target, name, null);
}

public CassandraThread()
{
super();
this.immediateTaskHolder = null;
}

public CassandraThread(Runnable target)
{
super(target);
this.immediateTaskHolder = null;
}

public ImmediateTaskHolder getImmediateTaskHolder()
{
return immediateTaskHolder == null ? ImmediateTaskHolder.NO_OP : immediateTaskHolder;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we just set NO_OP directly in the constructors above (and make passing null illegal) so we don't have to do this check?

}

public ThreadLocalMetrics getThreadLocalMetrics()
Expand Down
36 changes: 36 additions & 0 deletions src/java/org/apache/cassandra/concurrent/ImmediateTaskHolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.concurrent;

public interface ImmediateTaskHolder
{
void setImmediateTask(Runnable currentTask);

class NoOp implements ImmediateTaskHolder
{

@Override
public void setImmediateTask(Runnable currentTask)
{
// nothing to do
}
}

NoOp NO_OP = new NoOp();
}
14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/concurrent/SEPExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ public void maybeExecuteImmediately(Runnable task)
}
else
{
ImmediateTaskHolder taskHolder = getNestedCurrentTaskHolder();
taskHolder.setImmediateTask(task);
try
{
task.run();
Expand All @@ -222,10 +224,22 @@ public void maybeExecuteImmediately(Runnable task)
// in this case in particular we are not processing the rest of the queue anyway, and so
// the work permit may go wasted if we don't immediately attempt to spawn another worker
maybeSchedule();
// nesting with depth > 1 is not supported

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a cheap way to assert this is the case?

taskHolder.setImmediateTask(null);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this to the start of the finally block to make sure that we can't leak stale state into the virtual table if some kind of exception is thrown from maybeSchedule()?

}
}
}

private static ImmediateTaskHolder getNestedCurrentTaskHolder()
{
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CassandraThread)
{
return ((CassandraThread) currentThread).getImmediateTaskHolder();
}
return ImmediateTaskHolder.NO_OP;
}

@Override
public void execute(Runnable run)
{
Expand Down
41 changes: 38 additions & 3 deletions src/java/org/apache/cassandra/concurrent/SEPWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,60 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl

private final AtomicReference<Runnable> currentTask = new AtomicReference<>();

private class ImmediateDebuggableTaskRunner implements DebuggableTask.DebuggableTaskRunner, ImmediateTaskHolder
{
private final AtomicReference<Runnable> immediateCurrentTask = new AtomicReference<>();

@Override
public DebuggableTask running()
{
return getDebuggableTask(immediateCurrentTask.get());
}

@Override
public String id()
{
// derive from the current thread name so the nested row tracks renames and correlates with the worker's main row
return thread.getName() + "(immediate)";
}

@Override
public void setImmediateTask(Runnable currentTask)
{
immediateCurrentTask.lazySet(currentTask);
}
}

private final ImmediateDebuggableTaskRunner immediateDebuggableTaskRunner;

private String lastUsedExecutorName;

SEPWorker(ThreadGroup threadGroup, Long workerId, Work initialState, SharedExecutorPool pool)
{
this.pool = pool;
this.workerId = workerId;
this.workerIdThreadSuffix = '-' + workerId.toString();
thread = new CassandraThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId);
String threadName = threadGroup.getName() + "-Worker-" + workerId;
this.immediateDebuggableTaskRunner = new ImmediateDebuggableTaskRunner();
thread = new CassandraThread(threadGroup, this, threadName, immediateDebuggableTaskRunner);
thread.setDaemon(true);
set(initialState);
thread.start();
}

public DebuggableTask.DebuggableTaskRunner immediateRunner()
{
return immediateDebuggableTaskRunner;
}

@Override
public DebuggableTask running()
{
// can change after null check so go off local reference
Runnable task = currentTask.get();
return getDebuggableTask(currentTask.get());
}

private static DebuggableTask getDebuggableTask(Runnable task)
{
// Local read and mutation Runnables are themselves debuggable
if (task instanceof DebuggableTask)
return (DebuggableTask) task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ void workerEnded(SEPWorker worker)

public Stream<? extends DebuggableTaskRunner> workers()
{
return allWorkers.stream();
return Stream.concat(allWorkers.stream(),
allWorkers.stream().map(SEPWorker::immediateRunner));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From CC shallow review:

Finding 7: Stream.concat(allWorkers.stream(), allWorkers.stream().map(...)) iterates the concurrent set twice — snapshot inconsistency on worker churn

  • Location: src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java:125-127
  • Confidence: Medium
  • Flagged by: Concurrency, Resources
  • What's wrong: allWorkers is Collections.newSetFromMap(new ConcurrentHashMap<>()). The two allWorkers.stream() calls produce two independent iterations; between them, workers can be added/removed by schedule() / workerEnded(). The two streams can yield different worker sets — main row
    without immediate row for a freshly-removed worker, immediate-only row for one added between the two traversals. Also doubles the iteration cost on each SELECT * FROM system_views.queries even at idle.
  • Suggested fix:
    return allWorkers.stream().flatMap(w -> Stream.of((DebuggableTaskRunner) w, w.immediateRunner()));

}

void maybeStartSpinningWorker()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ private static void assertReadsAndWritesVisible()
String task = row.get("task").toString();

boolean localReaderThread = threadId.contains("Read") || threadId.contains("SharedPool-Worker");
readVisible |= localReaderThread && task.contains("SELECT");
readVisible |= localReaderThread && task.contains("SELECT") && !task.contains("QUERY");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From CC shallow review:

The tests disambiguate the new immediate-row from the enclosing-QUERY-row by String.contains("QUERY") on the free-form task description, not by the structural marker the patch actually introduces (the (immediate) suffix on thread_id). If the local-read description ever
changes to mention "QUERY", or a user creates a keyspace/table literally named QUERY, or someone re-uses the same test against a query reading from system_views.queries, the assertion silently flips. The unambiguous signal is thread_id ending with (immediate).

boolean coordReaderThread = threadId.contains("Native-Transport-Requests") || threadId.contains("SharedPool-Worker");
coordinatorReadVisible |= coordReaderThread && task.contains("SELECT");
coordinatorReadVisible |= coordReaderThread && task.contains("SELECT") && task.contains("QUERY");
boolean localWriterThread = threadId.contains("Mutation") || threadId.contains("SharedPool-Worker");
writeVisible |= localWriterThread && task.contains("Mutation");
writeVisible |= localWriterThread && task.contains("Mutation") && !task.contains("QUERY");
boolean coordWriterThread = threadId.contains("Native-Transport-Requests") || threadId.contains("SharedPool-Worker");
coordinatorWriteVisible |= coordWriterThread && task.contains("INSERT");
coordinatorWriteVisible |= coordWriterThread && task.contains("INSERT") && task.contains("QUERY");
}

assertTrue(readVisible);
Expand Down Expand Up @@ -169,9 +169,9 @@ private static void assertCasVisible()
String task = row.get("task").toString();

boolean localReaderThread = threadId.contains("Read") || threadId.contains("SharedPool-Worker");
readVisible |= localReaderThread && task.contains("SELECT");
readVisible |= localReaderThread && task.contains("SELECT") && !task.contains("QUERY");
boolean coordUpdateThread = threadId.contains("Native-Transport-Requests") || threadId.contains("SharedPool-Worker");
coordinatorUpdateVisible |= coordUpdateThread && task.contains("UPDATE");
coordinatorUpdateVisible |= coordUpdateThread && task.contains("UPDATE") && task.contains("QUERY");
}

assertTrue(readVisible);
Expand Down