Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public enum ExplainType {

private boolean userQuery = false;

/**
* When true (e.g. SHOW QUERIES), operator and exchange memory may use fallback when pool is
* insufficient. Set from analysis via {@link #setNeedSetHighestPriority(boolean)}.
*/
private boolean needSetHighestPriority = false;

private boolean debug = false;

private Map<NodeRef<Table>, Query> cteQueries = new HashMap<>();
Expand Down Expand Up @@ -507,6 +513,14 @@ public void setUserQuery(boolean userQuery) {
this.userQuery = userQuery;
}

public boolean needSetHighestPriority() {
return needSetHighestPriority;
}

public void setNeedSetHighestPriority(boolean needSetHighestPriority) {
this.needSetHighestPriority = needSetHighestPriority;
}

public boolean isDebug() {
return debug;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,11 @@ private synchronized ISinkChannel createLocalSinkChannel(
}
queue =
new SharedTsBlockQueue(
localFragmentInstanceId, localPlanNodeId, localMemoryManager, executorService);
localFragmentInstanceId,
localPlanNodeId,
localMemoryManager,
executorService,
instanceContext.isHighestPriority());
}

return new LocalSinkChannel(
Expand All @@ -680,7 +684,8 @@ public ISinkChannel createLocalSinkChannelForPipeline(
driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
planNodeId,
localMemoryManager,
executorService);
executorService,
driverContext.getFragmentInstanceContext().isHighestPriority());
queue.allowAddingTsBlock();
return new LocalSinkChannel(
queue,
Expand Down Expand Up @@ -718,6 +723,7 @@ private ISinkChannel createSinkChannel(
tsBlockSerdeFactory.get(),
new ISinkChannelListenerImpl(
localFragmentInstanceId, instanceContext, instanceContext::failed, cnt),
instanceContext.isHighestPriority(),
mppDataExchangeServiceClientManager);
}

Expand Down Expand Up @@ -809,6 +815,24 @@ public synchronized ISourceHandle createLocalSourceHandleForFragment(
TFragmentInstanceId remoteFragmentInstanceId,
int index,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
return createLocalSourceHandleForFragment(
localFragmentInstanceId,
localPlanNodeId,
remotePlanNodeId,
remoteFragmentInstanceId,
index,
onFailureCallback,
false);
}

public synchronized ISourceHandle createLocalSourceHandleForFragment(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
String remotePlanNodeId,
TFragmentInstanceId remoteFragmentInstanceId,
int index,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
boolean isHighestPriority) {
if (sourceHandles.containsKey(localFragmentInstanceId)
&& sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
throw new IllegalStateException(
Expand Down Expand Up @@ -840,7 +864,11 @@ public synchronized ISourceHandle createLocalSourceHandleForFragment(
}
queue =
new SharedTsBlockQueue(
remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager, executorService);
remoteFragmentInstanceId,
remotePlanNodeId,
localMemoryManager,
executorService,
isHighestPriority);
}
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
Expand All @@ -862,6 +890,24 @@ public ISourceHandle createSourceHandle(
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
return createSourceHandle(
localFragmentInstanceId,
localPlanNodeId,
indexOfUpstreamSinkHandle,
remoteEndpoint,
remoteFragmentInstanceId,
onFailureCallback,
false);
}

public ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
int indexOfUpstreamSinkHandle,
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
boolean isHighestPriority) {
Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(localFragmentInstanceId);
if (sourceHandleMap != null && sourceHandleMap.containsKey(localPlanNodeId)) {
throw new IllegalStateException(
Expand Down Expand Up @@ -891,6 +937,7 @@ public ISourceHandle createSourceHandle(
executorService,
tsBlockSerdeFactory.get(),
new SourceHandleListenerImpl(onFailureCallback),
isHighestPriority,
mppDataExchangeServiceClientManager);
sourceHandles
.computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel;
import org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;

Expand Down Expand Up @@ -63,7 +64,7 @@ public class SharedTsBlockQueue {

private long bufferRetainedSizeInBytes = 0L;

private final Queue<TsBlock> queue = new LinkedList<>();
private final Queue<Pair<TsBlock, Long>> queue = new LinkedList<>();

private SettableFuture<Void> blocked = SettableFuture.create();

Expand All @@ -83,6 +84,7 @@ public class SharedTsBlockQueue {

private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance();
private final boolean isHighestPriority;

private volatile Throwable abortedCause = null;

Expand All @@ -94,6 +96,15 @@ public SharedTsBlockQueue(
String planNodeId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService) {
this(fragmentInstanceId, planNodeId, localMemoryManager, executorService, false);
}

public SharedTsBlockQueue(
TFragmentInstanceId fragmentInstanceId,
String planNodeId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService,
boolean isHighestPriority) {
this.localFragmentInstanceId =
Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null");
this.fullFragmentInstanceId =
Expand All @@ -102,6 +113,7 @@ public SharedTsBlockQueue(
this.localMemoryManager =
Validate.notNull(localMemoryManager, "local memory manager cannot be null");
this.executorService = Validate.notNull(executorService, "ExecutorService can not be null.");
this.isHighestPriority = isHighestPriority;
}

public boolean hasNoMoreTsBlocks() {
Expand Down Expand Up @@ -196,15 +208,18 @@ public TsBlock remove() {
}
throw new IllegalStateException("queue has been destroyed");
}
TsBlock tsBlock = queue.remove();
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
Pair<TsBlock, Long> tsBlockWithReservedBytes = queue.remove();
long reservedBytes = tsBlockWithReservedBytes.right;
if (reservedBytes > 0) {
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
reservedBytes);
bufferRetainedSizeInBytes -= reservedBytes;
}
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the event
// to
// corresponding LocalSinkChannel.
Expand All @@ -214,7 +229,7 @@ public TsBlock remove() {
if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
blocked = SettableFuture.create();
}
return tsBlock;
return tsBlockWithReservedBytes.left;
}

/**
Expand All @@ -240,20 +255,22 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId);
alreadyRegistered = true;
}
Pair<ListenableFuture<Void>, Boolean> pair =
MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
.reserve(
.reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getSizeInBytes(),
maxBytesCanReserve);
blockedOnMemory = pair.left;
bufferRetainedSizeInBytes += tsBlock.getSizeInBytes();
maxBytesCanReserve,
isHighestPriority);
blockedOnMemory = reserveResult.getFuture();
long reservedBytes = reserveResult.getReservedBytes();
bufferRetainedSizeInBytes += reservedBytes;

// reserve memory failed, we should wait until there is enough memory
if (!Boolean.TRUE.equals(pair.right)) {
if (!reserveResult.isReserveSuccess()) {
SettableFuture<Void> channelBlocked = SettableFuture.create();
blockedOnMemory.addListener(
() -> {
Expand All @@ -268,7 +285,7 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
channelBlocked.set(null);
return;
}
queue.add(tsBlock);
queue.add(new Pair<>(tsBlock, reservedBytes));
if (!blocked.isDone()) {
blocked.set(null);
}
Expand All @@ -285,7 +302,7 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
executorService);
return channelBlocked;
} else { // reserve memory succeeded, add the TsBlock directly
queue.add(tsBlock);
queue.add(new Pair<>(tsBlock, reservedBytes));
if (!blocked.isDone()) {
blocked.set(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.utils.SetThreadName;
Expand Down Expand Up @@ -119,6 +120,8 @@ public class SinkChannel implements ISinkChannel {
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance();

private final boolean isHighestPriority;

private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET =
DataExchangeCostMetricSet.getInstance();
private static final DataExchangeCountMetricSet DATA_EXCHANGE_COUNT_METRIC_SET =
Expand All @@ -141,6 +144,34 @@ public SinkChannel(
SinkListener sinkListener,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
this(
remoteEndpoint,
remoteFragmentInstanceId,
remotePlanNodeId,
localPlanNodeId,
localFragmentInstanceId,
localMemoryManager,
executorService,
serde,
sinkListener,
false,
mppDataExchangeServiceClientManager);
}

@SuppressWarnings("squid:S107")
public SinkChannel(
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
String remotePlanNodeId,
String localPlanNodeId,
TFragmentInstanceId localFragmentInstanceId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService,
TsBlockSerde serde,
SinkListener sinkListener,
boolean isHighestPriority,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndPoint can not be null.");
this.remoteFragmentInstanceId =
Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId can not be null.");
Expand All @@ -155,6 +186,7 @@ public SinkChannel(
this.executorService = Validate.notNull(executorService, "executorService can not be null.");
this.serde = Validate.notNull(serde, "serde can not be null.");
this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not be null.");
this.isHighestPriority = isHighestPriority;
this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager;
this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
this.threadName =
Expand Down Expand Up @@ -204,21 +236,22 @@ public synchronized void send(TsBlock tsBlock) {
long sizeInBytes = tsBlock.getSizeInBytes();
int startSequenceId;
startSequenceId = nextSequenceId;
blocked =
MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
.reserve(
.reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
sizeInBytes,
maxBytesCanReserve)
.left;
bufferRetainedSizeInBytes += sizeInBytes;
maxBytesCanReserve,
isHighestPriority);
blocked = reserveResult.getFuture();
bufferRetainedSizeInBytes += reserveResult.getReservedBytes();

sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
nextSequenceId += 1;
currentTsBlockSize = sizeInBytes;
currentTsBlockSize = reserveResult.getReservedBytes();

submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(sizeInBytes));
} finally {
Expand Down Expand Up @@ -434,19 +467,21 @@ public synchronized void open() {
return;
}
// SinkChannel is opened when ShuffleSinkHandle choose it as the next channel
this.blocked =
MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
.reserve(
.reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
maxBytesCanReserve) // actually we only know maxBytesCanReserve after
// the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because
// at first this SinkChannel has not reserved memory.
.left;
this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
maxBytesCanReserve,
isHighestPriority); // actually we only know maxBytesCanReserve after
// the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because
// at first this SinkChannel has not reserved memory.
this.blocked = reserveResult.getFuture();
this.bufferRetainedSizeInBytes = reserveResult.getReservedBytes();
this.currentTsBlockSize = reserveResult.getReservedBytes();
}

@Override
Expand Down
Loading
Loading