Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
Expand All @@ -39,14 +41,21 @@
import org.junit.runner.RunWith;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.junit.Assert.fail;

Expand Down Expand Up @@ -884,4 +893,96 @@ public void testPipePluginValidation() {
fail(e.getMessage());
}
}

@Test
public void testShowPipePluginAfterJarDeletedAndClusterRestart() throws Exception {
final String pluginName = "TEST_MISSING_JAR_PROCESSOR";
final String pluginClassName = "org.apache.iotdb.CountPointProcessor";
final Path pluginJarPath =
Paths.get(
System.getProperty("user.dir"),
"src",
"test",
"resources",
"pipe-count-point-processor-example.jar")
.toAbsolutePath();
System.out.println(pluginJarPath.toUri());

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipePlugin %s as '%s' USING URI '%s'",
pluginName, pluginClassName, pluginJarPath.toUri()));
}

senderEnv.shutdownAllDataNodes();
senderEnv.shutdownAllConfigNodes();

deletePluginJarUnderConfigNodes(pluginName);

senderEnv.startAllConfigNodes();
senderEnv.startAllDataNodes();
((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();

boolean pluginFound = false;
boolean exceptionMessageFound = false;
SQLException lastException = null;
for (int retry = 0; retry < 10; retry++) {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement();
final ResultSet resultSet = statement.executeQuery("show pipeplugins")) {
while (resultSet.next()) {
if (pluginName.equalsIgnoreCase(resultSet.getString("PluginName"))) {
pluginFound = true;
final String exceptionMessage = resultSet.getString("ExceptionMessage");
exceptionMessageFound = exceptionMessage != null && !exceptionMessage.trim().isEmpty();
break;
}
}
lastException = null;
break;
} catch (final SQLException e) {
lastException = e;
Thread.sleep(1000);
}
}
if (lastException != null) {
throw lastException;
}

Assert.assertTrue("Expected plugin in show pipe plugins result.", pluginFound);
Assert.assertTrue(
"Expected non-empty ExceptionMessage after deleting plugin jar and restarting cluster.",
exceptionMessageFound);

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(String.format("drop pipePlugin %s", pluginName));
}
}

private void deletePluginJarUnderConfigNodes(final String pluginName) throws IOException {
for (final ConfigNodeWrapper configNodeWrapper : senderEnv.getConfigNodeWrapperList()) {
final Path pluginJarDirPath =
Paths.get(
configNodeWrapper.getNodePath(), "ext", "pipe", "install", pluginName.toUpperCase());
if (!Files.exists(pluginJarDirPath)) {
continue;
}
try (final Stream<Path> children = Files.walk(pluginJarDirPath)) {
children
.filter(path -> !path.equals(pluginJarDirPath))
.sorted(Comparator.reverseOrder())
.forEach(
path -> {
try {
Files.deleteIfExists(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ public void testInformationSchema() throws SQLException {
"plugin_name,STRING,TAG,",
"plugin_type,STRING,ATTRIBUTE,",
"class_name,STRING,ATTRIBUTE,",
"plugin_jar,STRING,ATTRIBUTE,")));
"plugin_jar,STRING,ATTRIBUTE,",
"exception_message,STRING,ATTRIBUTE,")));
TestUtils.assertResultSetEqual(
statement.executeQuery("desc topics"),
"ColumnName,DataType,Category,",
Expand Down Expand Up @@ -708,9 +709,9 @@ public void testInformationSchema() throws SQLException {
TestUtils.assertResultSetEqual(
statement.executeQuery(
"select * from pipe_plugins where plugin_name = 'IOTDB-THRIFT-SINK'"),
"plugin_name,plugin_type,class_name,plugin_jar,",
"plugin_name,plugin_type,class_name,plugin_jar,exception_message,",
Collections.singleton(
"IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,"));
"IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,null,"));

TestUtils.assertResultSetEqual(
statement.executeQuery("select * from views"),
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public PipePluginTableResp(
public TGetPipePluginTableResp convertToThriftResponse() throws IOException {
final List<ByteBuffer> pipePluginInformationByteBuffers = new ArrayList<>();
for (PipePluginMeta pipePluginMeta : allPipePluginMeta) {
pipePluginInformationByteBuffers.add(pipePluginMeta.serialize());
pipePluginInformationByteBuffers.add(pipePluginMeta.serializeForShowPipePlugin());
}
return new TGetPipePluginTableResp(status, pipePluginInformationByteBuffers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
Expand Down Expand Up @@ -111,6 +112,15 @@ public boolean validateBeforeCreatingPipePlugin(
final String pluginName, final boolean isSetIfNotExistsCondition) {
// both build-in and user defined pipe plugin should be unique
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
final PipePluginMeta existedPipePluginMeta =
pipePluginMetaKeeper.getPipePluginMeta(pluginName);
final String loadingFailureMessage = existedPipePluginMeta.getPluginLoadingExceptionMessage();
if (loadingFailureMessage != null) {
throw new PipeException(
String.format(
"Failed to create PipePlugin [%s], this PipePlugin exists but failed to load: %s",
pluginName, loadingFailureMessage));
}
if (isSetIfNotExistsCondition) {
return true;
}
Expand Down Expand Up @@ -177,6 +187,7 @@ public void checkPipePluginExistence(
LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
checkPipePluginAvailabilityForPipeCreation(sourcePluginName, "source");

final PipeParameters processorParameters = new PipeParameters(processorAttributes);
final String processorPluginName =
Expand All @@ -190,6 +201,7 @@ public void checkPipePluginExistence(
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
checkPipePluginAvailabilityForPipeCreation(processorPluginName, "processor");

final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
final String sinkPluginName =
Expand All @@ -204,13 +216,14 @@ public void checkPipePluginExistence(
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
checkPipePluginAvailabilityForPipeCreation(sinkPluginName, "sink");
}

/////////////////////////////// Pipe Plugin Management ///////////////////////////////

public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan) {
final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
try {
final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
final String pluginName = pipePluginMeta.getPluginName();
final String className = pipePluginMeta.getClassName();
final String jarName = pipePluginMeta.getJarName();
Expand All @@ -220,6 +233,22 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
} else {
final String existed = pipePluginMetaKeeper.getPluginNameByJarName(jarName);
if (Objects.nonNull(existed)) {
final PipePluginMeta existedPipePluginMeta =
pipePluginMetaKeeper.getPipePluginMeta(existed);
final String existedLoadingFailureMessage =
existedPipePluginMeta.getPluginLoadingExceptionMessage();
if (existedLoadingFailureMessage != null) {
throw new PipeException(
String.format(
"Failed to create PipePlugin [%s], source PipePlugin [%s] failed to load: %s",
pluginName, existed, existedLoadingFailureMessage));
}
if (!pipePluginExecutableManager.hasPluginFileUnderInstallDir(existed, jarName)) {
throw new PipeException(
String.format(
"Failed to create PipePlugin [%s], source PipePlugin [%s] jar [%s] does not exist in install dir.",
pluginName, existed, jarName));
}
pipePluginExecutableManager.linkExistedPlugin(existed, pluginName, jarName);
computeFromPluginClass(pluginName, className);
} else {
Expand All @@ -237,7 +266,7 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
pipePluginMetaKeeper.addJarNameAndMd5(jarName, pipePluginMeta.getJarMD5());

return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final Exception e) {
} catch (final Throwable e) {
final String errorMessage =
String.format(
"Failed to execute createPipePlugin(%s) on config nodes, because of %s",
Expand All @@ -249,7 +278,7 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
}

private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePluginPlan)
throws Exception {
throws Throwable {
final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
final String pluginName = pipePluginMeta.getPluginName();
final String className = pipePluginMeta.getClassName();
Expand All @@ -258,15 +287,15 @@ private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePlu
pipePluginExecutableManager.savePluginToInstallDir(
ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), pluginName, jarName);
computeFromPluginClass(pluginName, className);
} catch (final Exception e) {
} catch (final Throwable e) {
// We need to rollback if the creation has failed
pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName, jarName);
throw e;
}
}

private void computeFromPluginClass(final String pluginName, final String className)
throws Exception {
throws Throwable {
final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
final PipePluginClassLoader pipePluginClassLoader =
classLoaderManager.createPipePluginClassLoader(pluginDirPath);
Expand All @@ -275,7 +304,7 @@ private void computeFromPluginClass(final String pluginName, final String classN
pipePluginMetaKeeper.addPipePluginVisibility(
pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
} catch (final Exception e) {
} catch (final Throwable e) {
try {
pipePluginClassLoader.close();
} catch (final Exception ignored) {
Expand Down Expand Up @@ -402,37 +431,84 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException {
if (pipePluginMeta.isBuiltin()) {
continue;
}
final String pluginName = pipePluginMeta.getPluginName();
try {
final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
final PipePluginClassLoader pipePluginClassLoader =
classLoaderManager.createPipePluginClassLoader(pluginDirPath);
try {
final Class<?> pluginClass =
Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader);
pipePluginMetaKeeper.addPipePluginVisibility(
pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
} catch (final Throwable e) {
try {
pipePluginClassLoader.close();
} catch (final Exception ignored) {
}
throw e;
}
} catch (final Throwable e) {
LOGGER.warn(
"Failed to load plugin class for plugin [{}] when loading snapshot [{}] ",
pluginName,
snapshotFile.getAbsolutePath(),
e);
}
createPipePluginOnStartup(pipePluginMeta, snapshotFile);
}
} finally {
releasePipePluginInfoLock();
}
}

private String getRootCauseMessage(final Throwable throwable) {
Throwable current = throwable;
while (current.getCause() != null && current.getCause() != current) {
current = current.getCause();
}
final String message = current.getMessage();
return current.getClass().getSimpleName() + (message == null ? "" : (": " + message));
}

private void checkPipePluginAvailabilityForPipeCreation(
final String pluginName, final String pluginType) {
final PipePluginMeta pipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
final String loadingFailureMessage = pipePluginMeta.getPluginLoadingExceptionMessage();
if (loadingFailureMessage != null) {
final String exceptionMessage =
String.format(
"Failed to create or alter pipe, the pipe %s plugin %s failed to load: %s",
pluginType, pluginName, loadingFailureMessage);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
}

private void createPipePluginOnStartup(
final PipePluginMeta pipePluginMeta, final File snapshotFile) {
final String pluginName = pipePluginMeta.getPluginName();
try {
final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
final PipePluginClassLoader pipePluginClassLoader =
classLoaderManager.createPipePluginClassLoader(pluginDirPath);
try {
final Class<?> pluginClass =
Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader);
pipePluginMetaKeeper.addPipePluginVisibility(
pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
pipePluginMetaKeeper.addPipePluginMeta(
pluginName,
new PipePluginMeta(
pipePluginMeta.getPluginName(),
pipePluginMeta.getClassName(),
pipePluginMeta.isBuiltin(),
pipePluginMeta.getJarName(),
pipePluginMeta.getJarMD5(),
null));
classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
} catch (final Throwable e) {
try {
pipePluginClassLoader.close();
} catch (final Exception ignored) {
}
throw e;
}
} catch (final Throwable e) {
pipePluginMetaKeeper.addPipePluginMeta(
pluginName,
new PipePluginMeta(
pipePluginMeta.getPluginName(),
pipePluginMeta.getClassName(),
pipePluginMeta.isBuiltin(),
pipePluginMeta.getJarName(),
pipePluginMeta.getJarMD5(),
getRootCauseMessage(e)));
pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH);
LOGGER.warn(
"Failed to load plugin class for plugin [{}] when loading snapshot [{}] ",
pluginName,
snapshotFile.getAbsolutePath(),
e);
}
}

/////////////////////////////// hashCode & equals ///////////////////////////////

@Override
Expand Down
Loading