Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.ServerCommandLine;
import org.apache.iotdb.commons.client.ClientManagerMetrics;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadModule;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
Expand Down Expand Up @@ -55,7 +53,6 @@
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeMetrics;
import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
Expand All @@ -82,9 +79,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean {
Expand Down Expand Up @@ -116,11 +110,6 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean {

private int exitStatusCode = 0;

private Future<Void> dataPartitionTableCheckFuture;

private ExecutorService dataPartitionTableCheckExecutor =
IoTDBThreadPoolFactory.newSingleThreadExecutor("DATA_PARTITION_TABLE_CHECK");

public ConfigNode() {
super("ConfigNode");
// We do not init anything here, so that we can re-initialize the instance in IT.
Expand Down Expand Up @@ -158,15 +147,6 @@ protected void start() throws IoTDBException {
}
active();
LOGGER.info("IoTDB started");
if (dataPartitionTableCheckFuture != null) {
try {
dataPartitionTableCheckFuture.get();
} catch (ExecutionException | InterruptedException e) {
LOGGER.error("Data partition table check task execute failed", e);
} finally {
dataPartitionTableCheckExecutor.shutdownNow();
}
}
}

@Override
Expand Down Expand Up @@ -195,10 +175,7 @@ public void active() {
int configNodeId = CONF.getConfigNodeId();
configManager.initConsensusManager();
upgrade();
TConfigNodeLocation leaderNodeLocation = waitForLeaderElected();
if (leaderNodeLocation == null) {
leaderNodeLocation = configManager.getConsensusManager().getNotNullLeaderLocation();
}
waitForLeaderElected();
setUpMetricService();
// Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
// that the external service is not provided until ConfigNode is fully available
Expand Down Expand Up @@ -226,46 +203,6 @@ public void active() {
}
loadSecretKey();
loadHardwareCode();

/* After the ConfigNode leader election, a leader switch may occur, which could cause the procedure not to be created. This can happen if the original leader has not yet executed the procedure creation, while the other followers have already finished starting up. Therefore, having the original leader (before the leader switch) initiate the process ensures that only one procedure will be created. */
if (leaderNodeLocation.getConfigNodeId() == configNodeId) {
if (!configManager
.getProcedureManager()
.isExistUnfinishedProcedure(DataPartitionTableIntegrityCheckProcedure.class)) {
dataPartitionTableCheckFuture =
dataPartitionTableCheckExecutor.submit(
() -> {
LOGGER.info(
"[DataPartitionIntegrity] Prepare to start dataPartitionTableIntegrityCheck after all datanodes started up");
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());

while (true) {
List<Integer> dnList =
configManager
.getLoadManager()
.filterDataNodeThroughStatus(NodeStatus.Running);
if (dnList != null && !dnList.isEmpty()) {
LOGGER.info("Starting dataPartitionTableIntegrityCheck...");
TSStatus status =
configManager
.getProcedureManager()
.dataPartitionTableIntegrityCheck();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(
"Data partition table integrity check failed! Current status code is {}, status message is {}",
status.getCode(),
status.getMessage());
}
break;
} else {
LOGGER.info("No running datanodes found, waiting...");
Thread.sleep(5000);
}
}
return null;
});
}
}
return;
} else {
saveSecretKey();
Expand Down
Loading