diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index a507b98008bbd..db2e6b7d44b10 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -129,6 +129,12 @@ public enum ExplainType { private boolean userQuery = false; + /** + * When true (e.g. SHOW QUERIES), operator and exchange memory may use fallback when pool is + * insufficient. Set from analysis via {@link #setNeedSetHighestPriority(boolean)}. + */ + private boolean needSetHighestPriority = false; + private boolean debug = false; private Map, Query> cteQueries = new HashMap<>(); @@ -507,6 +513,14 @@ public void setUserQuery(boolean userQuery) { this.userQuery = userQuery; } + public boolean needSetHighestPriority() { + return needSetHighestPriority; + } + + public void setNeedSetHighestPriority(boolean needSetHighestPriority) { + this.needSetHighestPriority = needSetHighestPriority; + } + public boolean isDebug() { return debug; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java index d2caf330d6608..f72bb12f7f0b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java @@ -656,7 +656,11 @@ private synchronized ISinkChannel createLocalSinkChannel( } queue = new SharedTsBlockQueue( - localFragmentInstanceId, localPlanNodeId, localMemoryManager, executorService); + localFragmentInstanceId, + localPlanNodeId, + localMemoryManager, + executorService, + instanceContext.isHighestPriority()); } return new LocalSinkChannel( @@ -680,7 +684,8 @@ public ISinkChannel createLocalSinkChannelForPipeline( driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(), planNodeId, localMemoryManager, - executorService); + executorService, + driverContext.getFragmentInstanceContext().isHighestPriority()); queue.allowAddingTsBlock(); return new LocalSinkChannel( queue, @@ -718,6 +723,7 @@ private ISinkChannel createSinkChannel( tsBlockSerdeFactory.get(), new ISinkChannelListenerImpl( localFragmentInstanceId, instanceContext, instanceContext::failed, cnt), + instanceContext.isHighestPriority(), mppDataExchangeServiceClientManager); } @@ -809,6 +815,24 @@ public synchronized ISourceHandle createLocalSourceHandleForFragment( TFragmentInstanceId remoteFragmentInstanceId, int index, IMPPDataExchangeManagerCallback onFailureCallback) { + return createLocalSourceHandleForFragment( + localFragmentInstanceId, + localPlanNodeId, + remotePlanNodeId, + remoteFragmentInstanceId, + index, + onFailureCallback, + false); + } + + public synchronized ISourceHandle createLocalSourceHandleForFragment( + TFragmentInstanceId localFragmentInstanceId, + String localPlanNodeId, + String remotePlanNodeId, + TFragmentInstanceId remoteFragmentInstanceId, + int index, + IMPPDataExchangeManagerCallback onFailureCallback, + boolean isHighestPriority) { if (sourceHandles.containsKey(localFragmentInstanceId) && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) { throw new IllegalStateException( @@ -840,7 +864,11 @@ public synchronized ISourceHandle createLocalSourceHandleForFragment( } queue = new SharedTsBlockQueue( - remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager, executorService); + remoteFragmentInstanceId, + remotePlanNodeId, + localMemoryManager, + executorService, + isHighestPriority); } LocalSourceHandle localSourceHandle = new LocalSourceHandle( @@ -862,6 +890,24 @@ public ISourceHandle createSourceHandle( TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId, IMPPDataExchangeManagerCallback onFailureCallback) { + return createSourceHandle( + localFragmentInstanceId, + localPlanNodeId, + indexOfUpstreamSinkHandle, + remoteEndpoint, + remoteFragmentInstanceId, + onFailureCallback, + false); + } + + public ISourceHandle createSourceHandle( + TFragmentInstanceId localFragmentInstanceId, + String localPlanNodeId, + int indexOfUpstreamSinkHandle, + TEndPoint remoteEndpoint, + TFragmentInstanceId remoteFragmentInstanceId, + IMPPDataExchangeManagerCallback onFailureCallback, + boolean isHighestPriority) { Map sourceHandleMap = sourceHandles.get(localFragmentInstanceId); if (sourceHandleMap != null && sourceHandleMap.containsKey(localPlanNodeId)) { throw new IllegalStateException( @@ -891,6 +937,7 @@ public ISourceHandle createSourceHandle( executorService, tsBlockSerdeFactory.get(), new SourceHandleListenerImpl(onFailureCallback), + isHighestPriority, mppDataExchangeServiceClientManager); sourceHandles .computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java index e49efabb9869c..ad68956a98e11 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel; import org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle; import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; @@ -63,7 +64,7 @@ public class SharedTsBlockQueue { private long bufferRetainedSizeInBytes = 0L; - private final Queue queue = new LinkedList<>(); + private final Queue> queue = new LinkedList<>(); private SettableFuture blocked = SettableFuture.create(); @@ -83,6 +84,7 @@ public class SharedTsBlockQueue { private long maxBytesCanReserve = IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance(); + private final boolean isHighestPriority; private volatile Throwable abortedCause = null; @@ -94,6 +96,15 @@ public SharedTsBlockQueue( String planNodeId, LocalMemoryManager localMemoryManager, ExecutorService executorService) { + this(fragmentInstanceId, planNodeId, localMemoryManager, executorService, false); + } + + public SharedTsBlockQueue( + TFragmentInstanceId fragmentInstanceId, + String planNodeId, + LocalMemoryManager localMemoryManager, + ExecutorService executorService, + boolean isHighestPriority) { this.localFragmentInstanceId = Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null"); this.fullFragmentInstanceId = @@ -102,6 +113,7 @@ public SharedTsBlockQueue( this.localMemoryManager = Validate.notNull(localMemoryManager, "local memory manager cannot be null"); this.executorService = Validate.notNull(executorService, "ExecutorService can not be null."); + this.isHighestPriority = isHighestPriority; } public boolean hasNoMoreTsBlocks() { @@ -196,15 +208,18 @@ public TsBlock remove() { } throw new IllegalStateException("queue has been destroyed"); } - TsBlock tsBlock = queue.remove(); - localMemoryManager - .getQueryPool() - .free( - localFragmentInstanceId.getQueryId(), - fullFragmentInstanceId, - localPlanNodeId, - tsBlock.getSizeInBytes()); - bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes(); + Pair tsBlockWithReservedBytes = queue.remove(); + long reservedBytes = tsBlockWithReservedBytes.right; + if (reservedBytes > 0) { + localMemoryManager + .getQueryPool() + .free( + localFragmentInstanceId.getQueryId(), + fullFragmentInstanceId, + localPlanNodeId, + reservedBytes); + bufferRetainedSizeInBytes -= reservedBytes; + } // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event // to // corresponding LocalSinkChannel. @@ -214,7 +229,7 @@ public TsBlock remove() { if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) { blocked = SettableFuture.create(); } - return tsBlock; + return tsBlockWithReservedBytes.left; } /** @@ -240,20 +255,22 @@ public ListenableFuture add(TsBlock tsBlock) { localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId); alreadyRegistered = true; } - Pair, Boolean> pair = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, tsBlock.getSizeInBytes(), - maxBytesCanReserve); - blockedOnMemory = pair.left; - bufferRetainedSizeInBytes += tsBlock.getSizeInBytes(); + maxBytesCanReserve, + isHighestPriority); + blockedOnMemory = reserveResult.getFuture(); + long reservedBytes = reserveResult.getReservedBytes(); + bufferRetainedSizeInBytes += reservedBytes; // reserve memory failed, we should wait until there is enough memory - if (!Boolean.TRUE.equals(pair.right)) { + if (!reserveResult.isReserveSuccess()) { SettableFuture channelBlocked = SettableFuture.create(); blockedOnMemory.addListener( () -> { @@ -268,7 +285,7 @@ public ListenableFuture add(TsBlock tsBlock) { channelBlocked.set(null); return; } - queue.add(tsBlock); + queue.add(new Pair<>(tsBlock, reservedBytes)); if (!blocked.isDone()) { blocked.set(null); } @@ -285,7 +302,7 @@ public ListenableFuture add(TsBlock tsBlock) { executorService); return channelBlocked; } else { // reserve memory succeeded, add the TsBlock directly - queue.add(tsBlock); + queue.add(new Pair<>(tsBlock, reservedBytes)); if (!blocked.isDone()) { blocked.set(null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java index 9cb624e4d961f..aa323bd4bddd6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener; import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet; import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet; import org.apache.iotdb.db.utils.SetThreadName; @@ -119,6 +120,8 @@ public class SinkChannel implements ISinkChannel { private long maxBytesCanReserve = IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance(); + private final boolean isHighestPriority; + private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET = DataExchangeCostMetricSet.getInstance(); private static final DataExchangeCountMetricSet DATA_EXCHANGE_COUNT_METRIC_SET = @@ -141,6 +144,34 @@ public SinkChannel( SinkListener sinkListener, IClientManager mppDataExchangeServiceClientManager) { + this( + remoteEndpoint, + remoteFragmentInstanceId, + remotePlanNodeId, + localPlanNodeId, + localFragmentInstanceId, + localMemoryManager, + executorService, + serde, + sinkListener, + false, + mppDataExchangeServiceClientManager); + } + + @SuppressWarnings("squid:S107") + public SinkChannel( + TEndPoint remoteEndpoint, + TFragmentInstanceId remoteFragmentInstanceId, + String remotePlanNodeId, + String localPlanNodeId, + TFragmentInstanceId localFragmentInstanceId, + LocalMemoryManager localMemoryManager, + ExecutorService executorService, + TsBlockSerde serde, + SinkListener sinkListener, + boolean isHighestPriority, + IClientManager + mppDataExchangeServiceClientManager) { this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndPoint can not be null."); this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId can not be null."); @@ -155,6 +186,7 @@ public SinkChannel( this.executorService = Validate.notNull(executorService, "executorService can not be null."); this.serde = Validate.notNull(serde, "serde can not be null."); this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not be null."); + this.isHighestPriority = isHighestPriority; this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager; this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS; this.threadName = @@ -204,21 +236,22 @@ public synchronized void send(TsBlock tsBlock) { long sizeInBytes = tsBlock.getSizeInBytes(); int startSequenceId; startSequenceId = nextSequenceId; - blocked = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, sizeInBytes, - maxBytesCanReserve) - .left; - bufferRetainedSizeInBytes += sizeInBytes; + maxBytesCanReserve, + isHighestPriority); + blocked = reserveResult.getFuture(); + bufferRetainedSizeInBytes += reserveResult.getReservedBytes(); sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize)); nextSequenceId += 1; - currentTsBlockSize = sizeInBytes; + currentTsBlockSize = reserveResult.getReservedBytes(); submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(sizeInBytes)); } finally { @@ -434,19 +467,21 @@ public synchronized void open() { return; } // SinkChannel is opened when ShuffleSinkHandle choose it as the next channel - this.blocked = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, - maxBytesCanReserve) // actually we only know maxBytesCanReserve after - // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because - // at first this SinkChannel has not reserved memory. - .left; - this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + maxBytesCanReserve, + isHighestPriority); // actually we only know maxBytesCanReserve after + // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because + // at first this SinkChannel has not reserved memory. + this.blocked = reserveResult.getFuture(); + this.bufferRetainedSizeInBytes = reserveResult.getReservedBytes(); + this.currentTsBlockSize = reserveResult.getReservedBytes(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java index 1c6406a2ed9ee..ec29131af2227 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener; import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet; import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet; import org.apache.iotdb.db.utils.SetThreadName; @@ -116,6 +117,8 @@ public class SourceHandle implements ISourceHandle { */ private boolean canGetTsBlockFromRemote = false; + private final boolean isHighestPriority; + private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET = DataExchangeCostMetricSet.getInstance(); private static final DataExchangeCountMetricSet DATA_EXCHANGE_COUNT_METRIC_SET = @@ -138,6 +141,34 @@ public SourceHandle( SourceHandleListener sourceHandleListener, IClientManager mppDataExchangeServiceClientManager) { + this( + remoteEndpoint, + remoteFragmentInstanceId, + localFragmentInstanceId, + localPlanNodeId, + indexOfUpstreamSinkHandle, + localMemoryManager, + executorService, + serde, + sourceHandleListener, + false, + mppDataExchangeServiceClientManager); + } + + @SuppressWarnings("squid:S107") + public SourceHandle( + TEndPoint remoteEndpoint, + TFragmentInstanceId remoteFragmentInstanceId, + TFragmentInstanceId localFragmentInstanceId, + String localPlanNodeId, + int indexOfUpstreamSinkHandle, + LocalMemoryManager localMemoryManager, + ExecutorService executorService, + TsBlockSerde serde, + SourceHandleListener sourceHandleListener, + boolean isHighestPriority, + IClientManager + mppDataExchangeServiceClientManager) { this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndpoint can not be null."); this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId can not be null."); @@ -153,6 +184,7 @@ public SourceHandle( this.serde = Validate.notNull(serde, "serde can not be null."); this.sourceHandleListener = Validate.notNull(sourceHandleListener, "sourceHandleListener can not be null."); + this.isHighestPriority = isHighestPriority; this.bufferRetainedSizeInBytes = 0L; this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager; this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS; @@ -193,19 +225,24 @@ public synchronized ByteBuffer getSerializedTsBlock() { if (tsBlock == null) { return null; } - long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId); + Long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId); + if (retainedSize == null) { + throw new IllegalStateException("Reserved data block size is null."); + } if (LOGGER.isDebugEnabled()) { LOGGER.debug("[GetTsBlockFromBuffer] sequenceId:{}, size:{}", currSequenceId, retainedSize); } currSequenceId += 1; - bufferRetainedSizeInBytes -= retainedSize; - localMemoryManager - .getQueryPool() - .free( - localFragmentInstanceId.getQueryId(), - fullFragmentInstanceId, - localPlanNodeId, - retainedSize); + if (retainedSize > 0) { + bufferRetainedSizeInBytes -= retainedSize; + localMemoryManager + .getQueryPool() + .free( + localFragmentInstanceId.getQueryId(), + fullFragmentInstanceId, + localPlanNodeId, + retainedSize); + } if (sequenceIdToTsBlock.isEmpty() && !isFinished()) { if (LOGGER.isDebugEnabled()) { @@ -242,18 +279,24 @@ private synchronized void trySubmitGetDataBlocksTask() { if (bytesToReserve == null) { throw new IllegalStateException("Data block size is null."); } - pair = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, bytesToReserve, - maxBytesCanReserve); - bufferRetainedSizeInBytes += bytesToReserve; + maxBytesCanReserve, + isHighestPriority); + pair = new Pair<>(reserveResult.getFuture(), reserveResult.isReserveSuccess()); + // actually reserve size is not equals raw size, update the actually reserve size to the map + if (reserveResult.getReservedBytes() != bytesToReserve) { + sequenceIdToDataBlockSize.put(endSequenceId, reserveResult.getReservedBytes()); + } + bufferRetainedSizeInBytes += reserveResult.getReservedBytes(); endSequenceId += 1; - reservedBytes += bytesToReserve; + reservedBytes += reserveResult.getReservedBytes(); if (!Boolean.TRUE.equals(pair.right)) { blockedSize = bytesToReserve; break; @@ -631,14 +674,16 @@ private void fail(Throwable t) { if (aborted || closed) { return; } - bufferRetainedSizeInBytes -= reservedBytes; - localMemoryManager - .getQueryPool() - .free( - localFragmentInstanceId.getQueryId(), - fullFragmentInstanceId, - localPlanNodeId, - reservedBytes); + if (reservedBytes > 0) { + bufferRetainedSizeInBytes -= reservedBytes; + localMemoryManager + .getQueryPool() + .free( + localFragmentInstanceId.getQueryId(), + fullFragmentInstanceId, + localPlanNodeId, + reservedBytes); + } sourceHandleListener.onFailure(SourceHandle.this, t); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index df9ac2d2f936f..4cfd8dc63ecd7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -161,6 +161,7 @@ public class FragmentInstanceContext extends QueryContext { private long unclosedUnseqFileNum = 0; private long closedSeqFileNum = 0; private long closedUnseqFileNum = 0; + private boolean highestPriority = false; public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, @@ -1190,6 +1191,18 @@ public boolean ignoreNotExistsDevice() { return ignoreNotExistsDevice; } + /** + * Same flag as {@link + * org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis#needSetHighestPriority()}. + */ + public boolean isHighestPriority() { + return highestPriority; + } + + public void setHighestPriority(boolean highestPriority) { + this.highestPriority = highestPriority; + } + public boolean isSingleSourcePath() { return singleSourcePath; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 1898cbfe53ccb..9dfb2ffa68472 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -163,6 +163,7 @@ public FragmentInstanceInfo execDataQueryFragmentInstance( dataNodeQueryContextMap, instance.isDebug(), instance.isVerbose())); + context.setHighestPriority(instance.isHighestPriority()); try { List driverFactories = @@ -277,6 +278,7 @@ public FragmentInstanceInfo execSchemaQueryFragmentInstance( instance.getSessionInfo(), instance.isDebug(), instance.isVerbose())); + context.setHighestPriority(instance.isHighestPriority()); try { List driverFactories = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java index 3e00c845dab28..64bc5022588a5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java @@ -26,10 +26,8 @@ import org.apache.iotdb.db.exception.runtime.MemoryLeakException; import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.tsfile.external.commons.lang3.Validate; -import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +41,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; +import static com.google.common.util.concurrent.Futures.immediateVoidFuture; + /** A thread-safe memory pool. */ public class MemoryPool { @@ -113,6 +113,31 @@ public boolean set(@Nullable V value) { } } + public static class MemoryReservationResult { + private final ListenableFuture future; + private final boolean reserveSuccess; + private final long reservedBytes; + + public MemoryReservationResult( + ListenableFuture future, boolean reserveSuccess, long reservedBytes) { + this.future = future; + this.reserveSuccess = reserveSuccess; + this.reservedBytes = reservedBytes; + } + + public ListenableFuture getFuture() { + return future; + } + + public boolean isReserveSuccess() { + return reserveSuccess; + } + + public long getReservedBytes() { + return reservedBytes; + } + } + private final String id; private final IMemoryBlock memoryBlock; private final long maxBytesPerFragmentInstance; @@ -224,18 +249,20 @@ public void deRegisterFragmentInstanceFromQueryMemoryMap( } /** - * Reserve memory with bytesToReserve. + * Reserve memory with bytesToReserve respect priority. * - * @return if reserve succeed, pair.right will be true, otherwise false + * @return if reserve succeed, reservedBytes may be zero or equals with bytesToReserve; if reserve + * failed, reservedBytes must be equals with bytesToReserve * @throws IllegalArgumentException throw exception if current query requests more memory than can * be allocated. */ - public Pair, Boolean> reserve( + public MemoryReservationResult reserveWithPriority( String queryId, String fragmentInstanceId, String planNodeId, long bytesToReserve, - long maxBytesCanReserve) { + long maxBytesCanReserve, + boolean isHighestPriority) { Validate.notNull(queryId, "queryId can not be null."); Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be null."); Validate.notNull(planNodeId, "planNodeId can not be null."); @@ -256,19 +283,21 @@ public Pair, Boolean> reserve( bytesToReserve, maxBytesCanReserve)); } - ListenableFuture result; if (tryReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve)) { - result = Futures.immediateFuture(null); - return new Pair<>(result, Boolean.TRUE); + return new MemoryReservationResult(immediateVoidFuture(), true, bytesToReserve); } else { + rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve); + if (isHighestPriority) { + // SHOW QUERIES: treat as success with zero bytes reserved from pool when insufficient. + return new MemoryReservationResult(immediateVoidFuture(), true, 0L); + } LOGGER.debug( "Blocked reserve request: {} bytes memory for planNodeId{}", bytesToReserve, planNodeId); - rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve); - result = + ListenableFuture result = MemoryReservationFuture.create( queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve); memoryReservationFutures.add((MemoryReservationFuture) result); - return new Pair<>(result, Boolean.FALSE); + return new MemoryReservationResult(result, false, bytesToReserve); } } @@ -299,7 +328,8 @@ public boolean tryReserveForTest( /** * Cancel the specified memory reservation. If the reservation has finished, do nothing. * - * @param future The future returned from {@link #reserve(String, String, String, long, long)} + * @param future The future returned from {@link #reserveWithPriority(String, String, String, + * long, long, boolean)} * @return If the future has not complete, return the number of bytes being reserved. Otherwise, * return 0. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index ec1bdeed0c629..0d17b50a119ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -511,7 +511,7 @@ public boolean isQuery() { public boolean needSetHighestPriority() { // if is this Statement is ShowQueryStatement, set its instances to the highest priority, so // that the sub-tasks of the ShowQueries instances could be executed first. - return StatementType.SHOW_QUERIES.equals(statement.getType()); + return statement != null && StatementType.SHOW_QUERIES.equals(statement.getType()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 97070e37357b8..785a273238a7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -129,6 +129,7 @@ public QueryExecution(IPlanner planner, MPPQueryContext context, ExecutorService this.context = context; this.planner = planner; this.analysis = analyze(context); + context.setNeedSetHighestPriority(analysis.needSetHighestPriority()); this.stateMachine = new QueryStateMachine(context.getQueryId(), executor); // We add the abort logic inside the QueryExecution. @@ -610,7 +611,8 @@ private void initResultHandle() { context.getResultNodeContext().getUpStreamPlanNodeId().getId(), context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(), 0, // Upstream of result ExchangeNode will only have one child. - stateMachine::transitionToFailed) + stateMachine::transitionToFailed, + context.needSetHighestPriority()) : MPPDataExchangeService.getInstance() .getMPPDataExchangeManager() .createSourceHandle( @@ -619,7 +621,8 @@ private void initResultHandle() { 0, upstreamEndPoint, context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(), - stateMachine::transitionToFailed); + stateMachine::transitionToFailed, + context.needSetHighestPriority()); } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 422454c11b679..0e32dfd097e46 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -2694,14 +2694,16 @@ public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext conte node.getUpstreamPlanNodeId().getId(), remoteInstanceId.toThrift(), node.getIndexOfUpstreamSinkHandle(), - context.getInstanceContext()::failed) + context.getInstanceContext()::failed, + context.getInstanceContext().isHighestPriority()) : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle( localInstanceId.toThrift(), node.getPlanNodeId().getId(), node.getIndexOfUpstreamSinkHandle(), upstreamEndPoint, remoteInstanceId.toThrift(), - context.getInstanceContext()::failed); + context.getInstanceContext()::failed, + context.getInstanceContext().isHighestPriority()); if (!isSameNode) { context.addExchangeSumNum(1); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index c31a2061f49ec..f1bb43abed440 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -454,14 +454,16 @@ public Operator visitTableExchange(ExchangeNode node, LocalExecutionPlanContext node.getUpstreamPlanNodeId().getId(), remoteInstanceId.toThrift(), node.getIndexOfUpstreamSinkHandle(), - context.getInstanceContext()::failed) + context.getInstanceContext()::failed, + context.getInstanceContext().isHighestPriority()) : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle( localInstanceId.toThrift(), node.getPlanNodeId().getId(), node.getIndexOfUpstreamSinkHandle(), upstreamEndPoint, remoteInstanceId.toThrift(), - context.getInstanceContext()::failed); + context.getInstanceContext()::failed, + context.getInstanceContext().isHighestPriority()); if (!isSameNode) { context.addExchangeSumNum(1); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java index d53e29e16924f..ae99aac6fbaf2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java @@ -267,6 +267,7 @@ public static FragmentInstance deserializeFrom(ByteBuffer buffer) { fragmentInstance.hostDataNode = hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null; fragmentInstance.isExplainAnalyze = ReadWriteIOUtils.readBool(buffer); + fragmentInstance.setHighestPriority(ReadWriteIOUtils.readBool(buffer)); return fragmentInstance; } @@ -293,6 +294,7 @@ public ByteBuffer serializeToByteBuffer() { ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode, outputStream); } ReadWriteIOUtils.write(isExplainAnalyze, outputStream); + ReadWriteIOUtils.write(isHighestPriority, outputStream); return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); } catch (IOException e) { LOGGER.error("Unexpected error occurs when serializing this FragmentInstance.", e); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java index 3ec95a0205aa3..944094e0fbf23 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java @@ -94,13 +94,14 @@ public void testSend() { Assert.assertFalse(localSinkChannel.isFinished()); Assert.assertEquals(11 * mockTsBlockSize, localSinkChannel.getBufferRetainedSizeInBytes()); Mockito.verify(spyMemoryPool, Mockito.times(11)) - .reserve( + .reserveWithPriority( queryId, FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( remoteFragmentInstanceId), remotePlanNodeId, mockTsBlockSize, - Long.MAX_VALUE); + Long.MAX_VALUE, + false); // Receive TsBlocks. int numOfReceivedTsblocks = 0; @@ -187,13 +188,14 @@ public void testAbort() { Assert.assertFalse(localSinkChannel.isFinished()); Assert.assertEquals(11 * mockTsBlockSize, localSinkChannel.getBufferRetainedSizeInBytes()); Mockito.verify(spyMemoryPool, Mockito.times(11)) - .reserve( + .reserveWithPriority( queryId, FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( remoteFragmentInstanceId), remotePlanNodeId, mockTsBlockSize, - Long.MAX_VALUE); + Long.MAX_VALUE, + false); // Abort. localSinkChannel.abort(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java index 00c653499b0c4..a92eaa5d0e283 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java @@ -22,13 +22,13 @@ import org.apache.iotdb.commons.memory.MemoryManager; import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.apache.tsfile.external.commons.lang3.Validate; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.utils.Pair; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -63,15 +63,16 @@ public void testAsyncListenerAfterAbortDoesNotAddTsBlock() { MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class); Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool); - // reserve() returns (manualFuture, false) — simulating memory blocked + // reserveWithPriority() returns blocked future and reserve failure. Mockito.when( - mockMemoryPool.reserve( + mockMemoryPool.reserveWithPriority( Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), - Mockito.anyLong())) - .thenReturn(new Pair<>(manualFuture, Boolean.FALSE)); + Mockito.anyLong(), + Mockito.anyBoolean())) + .thenReturn(new MemoryReservationResult(manualFuture, false, 1024L)); // tryCancel returns 0 — simulating future already completed (can't cancel) Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java index 2b5d7f176be76..75d204acaded4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java @@ -106,13 +106,14 @@ public void testAbort() { Assert.assertFalse(localSinkChannel.isFinished()); Assert.assertEquals(11 * mockTsBlockSize, localSinkChannel.getBufferRetainedSizeInBytes()); Mockito.verify(spyMemoryPool, Mockito.times(11)) - .reserve( + .reserveWithPriority( queryId, FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( remoteFragmentInstanceId), remotePlanNodeId, mockTsBlockSize, - Long.MAX_VALUE); + Long.MAX_VALUE, + false); // Abort. shuffleSinkHandle.abort(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java index 85d1abefb0ccc..66d50675ddff1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java @@ -259,13 +259,14 @@ public void testBlockedOneTimeReceive() { .collect(Collectors.toList())); try { Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6)) - .reserve( + .reserveWithPriority( queryId, FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( localFragmentInstanceId), localPlanNodeId, MOCK_TSBLOCK_SIZE, - maxBytesCanReserve); + maxBytesCanReserve, + false); Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1)) .getDataBlock( Mockito.argThat( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java index 327d4a34c39e7..b09498ad949dd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java @@ -20,11 +20,11 @@ package org.apache.iotdb.db.queryengine.execution.exchange; import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import com.google.common.util.concurrent.SettableFuture; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.column.TsBlockSerde; -import org.apache.tsfile.utils.Pair; import org.mockito.Mockito; import org.mockito.stubbing.Answer; @@ -68,21 +68,25 @@ public static MemoryPool createMockBlockedMemoryPool( settableFuture.get().set(null); AtomicReference reservedBytes = new AtomicReference<>(0L); Mockito.when( - mockMemoryPool.reserve( + mockMemoryPool.reserveWithPriority( Mockito.eq(queryId), Mockito.eq(fragmentInstanceId), Mockito.eq(planNodeId), Mockito.anyLong(), - Mockito.anyLong())) + Mockito.anyLong(), + Mockito.anyBoolean())) .thenAnswer( invocation -> { long bytesToReserve = invocation.getArgument(3); if (reservedBytes.get() + bytesToReserve <= capacityInBytes) { - reservedBytes.updateAndGet(v -> v + (long) invocation.getArgument(3)); - return new Pair<>(settableFuture.get(), true); + reservedBytes.updateAndGet(v -> v + bytesToReserve); + return new MemoryReservationResult(settableFuture.get(), true, bytesToReserve); } else { + if (invocation.getArgument(5)) { + return new MemoryReservationResult(settableFuture.get(), true, 0L); + } settableFuture.set(SettableFuture.create()); - return new Pair<>(settableFuture.get(), false); + return new MemoryReservationResult(settableFuture.get(), false, bytesToReserve); } }); Mockito.doAnswer( @@ -124,13 +128,17 @@ public static MemoryPool createMockBlockedMemoryPool( public static MemoryPool createMockNonBlockedMemoryPool() { MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class); Mockito.when( - mockMemoryPool.reserve( + mockMemoryPool.reserveWithPriority( Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), - Mockito.anyLong())) - .thenReturn(new Pair<>(immediateFuture(null), true)); + Mockito.anyLong(), + Mockito.anyBoolean())) + .thenAnswer( + invocation -> + new MemoryReservationResult( + immediateFuture(null), true, invocation.getArgument(3))); Mockito.when( mockMemoryPool.tryReserve( Mockito.anyString(), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java index ebcf2abf4e3b0..8614d39f20a54 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.execution.memory; import org.apache.iotdb.commons.memory.MemoryManager; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import com.google.common.util.concurrent.ListenableFuture; import org.junit.Assert; @@ -98,7 +99,9 @@ public void testOverTryReserve() { public void testReserve() { ListenableFuture future = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE).left; + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, false) + .getFuture(); Assert.assertTrue(future.isDone()); Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(256L, pool.getReservedBytes()); @@ -108,7 +111,8 @@ public void testReserve() { public void tesReserveZero() { try { - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 0L, Long.MAX_VALUE); + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 0L, Long.MAX_VALUE, false); Assert.fail("Expect IllegalArgumentException"); } catch (IllegalArgumentException ignore) { } @@ -118,7 +122,8 @@ public void tesReserveZero() { public void testReserveNegative() { try { - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, -1L, Long.MAX_VALUE); + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, -1L, Long.MAX_VALUE, false); Assert.fail("Expect IllegalArgumentException"); } catch (IllegalArgumentException ignore) { } @@ -128,7 +133,9 @@ public void testReserveNegative() { public void testReserveAll() { ListenableFuture future = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE).left; + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false) + .getFuture(); Assert.assertTrue(future.isDone()); Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(512L, pool.getReservedBytes()); @@ -138,11 +145,15 @@ public void testReserveAll() { public void testOverReserve() { ListenableFuture future = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE).left; + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, false) + .getFuture(); Assert.assertTrue(future.isDone()); Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(256L, pool.getReservedBytes()); - future = pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L).left; + future = + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L, false) + .getFuture(); Assert.assertFalse(future.isDone()); Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(256L, pool.getReservedBytes()); @@ -152,11 +163,13 @@ public void testOverReserve() { public void testReserveAndFree() { Assert.assertTrue( - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE) - .left + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false) + .getFuture() .isDone()); ListenableFuture future = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L).left; + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L, false) + .getFuture(); Assert.assertFalse(future.isDone()); Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(512L, pool.getReservedBytes()); @@ -170,18 +183,22 @@ public void testReserveAndFree() { public void testMultiReserveAndFree() { Assert.assertTrue( - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE) - .left + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, false) + .getFuture() .isDone()); Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(256L, pool.getReservedBytes()); ListenableFuture future1 = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L).left; + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L, false) + .getFuture(); ListenableFuture future2 = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L).left; + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L, false) + .getFuture(); ListenableFuture future3 = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L).left; + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L, false) + .getFuture(); Assert.assertFalse(future1.isDone()); Assert.assertFalse(future2.isDone()); Assert.assertFalse(future3.isDone()); @@ -288,7 +305,8 @@ public void testTryCancelBlockedReservation() { pool.tryReserveForTest(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE)); ListenableFuture f = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 512L).left; + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 512L, false) + .getFuture(); Assert.assertFalse(f.isDone()); // Cancel the reservation. Assert.assertEquals(256L, pool.tryCancel(f)); @@ -300,11 +318,76 @@ public void testTryCancelBlockedReservation() { public void testTryCancelCompletedReservation() { ListenableFuture f = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE).left; + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, false) + .getFuture(); Assert.assertTrue(f.isDone()); // Cancel the reservation. Assert.assertEquals(0L, pool.tryCancel(f)); Assert.assertTrue(f.isDone()); Assert.assertFalse(f.isCancelled()); } + + /** + * Normal query: requested bytes exceed what the pool can still provide — reserve fails (blocked + * future, not immediate success). + */ + @Test + public void testReserveWithPriorityNormalQueryExceedsAvailable() { + MemoryReservationResult r1 = + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false); + Assert.assertTrue(r1.isReserveSuccess()); + Assert.assertEquals(512L, r1.getReservedBytes()); + Assert.assertTrue(r1.getFuture().isDone()); + + MemoryReservationResult r2 = + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false); + Assert.assertTrue(r2.isReserveSuccess()); + Assert.assertEquals(512L, r2.getReservedBytes()); + Assert.assertEquals(1024L, pool.getReservedBytes()); + + MemoryReservationResult r3 = + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, false); + Assert.assertFalse(r3.isReserveSuccess()); + Assert.assertEquals(256L, r3.getReservedBytes()); + Assert.assertFalse(r3.getFuture().isDone()); + Assert.assertEquals(1024L, pool.getReservedBytes()); + } + + /** SHOW QUERIES path: exceeds pool capacity — treated as success with zero bytes from pool. */ + @Test + public void testReserveWithPriorityShowQueriesExceedsAvailable() { + Assert.assertTrue( + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false) + .isReserveSuccess()); + Assert.assertTrue( + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false) + .isReserveSuccess()); + Assert.assertEquals(1024L, pool.getReservedBytes()); + + MemoryReservationResult r = + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, true); + Assert.assertTrue(r.isReserveSuccess()); + Assert.assertEquals(0L, r.getReservedBytes()); + Assert.assertTrue(r.getFuture().isDone()); + Assert.assertEquals(1024L, pool.getReservedBytes()); + } + + /** SHOW QUERIES path: pool has room — same as normal successful reserve. */ + @Test + public void testReserveWithPriorityShowQueriesWithinAvailable() { + MemoryReservationResult r = + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, true); + Assert.assertTrue(r.isReserveSuccess()); + Assert.assertEquals(256L, r.getReservedBytes()); + Assert.assertTrue(r.getFuture().isDone()); + Assert.assertEquals(256L, pool.getReservedBytes()); + } }