diff --git a/env-utils/pom.xml b/env-utils/pom.xml new file mode 100644 index 00000000..af58f414 --- /dev/null +++ b/env-utils/pom.xml @@ -0,0 +1,45 @@ + + + + 4.0.0 + + + com.datasqrl.flinkrunner + flink-sql-runner-parent + 1.0-SNAPSHOT + + + env-utils + Environment Utilities + + + + + + org.slf4j + slf4j-api + provided + + + + com.fasterxml.jackson.core + jackson-databind + + + diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/EnvUtils.java b/env-utils/src/main/java/com/datasqrl/flinkrunner/utils/EnvUtils.java similarity index 72% rename from flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/EnvUtils.java rename to env-utils/src/main/java/com/datasqrl/flinkrunner/utils/EnvUtils.java index b2086c21..e0c3f089 100644 --- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/EnvUtils.java +++ b/env-utils/src/main/java/com/datasqrl/flinkrunner/utils/EnvUtils.java @@ -13,15 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.datasqrl.flinkrunner; +package com.datasqrl.flinkrunner.utils; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; /** Utility class for environment variable operations. */ -final class EnvUtils { - +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class EnvUtils { /** * Returns a map of environment variables with deployment-specific defaults. * @@ -35,15 +37,18 @@ final class EnvUtils { * * @return an immutable map containing all environment variables with defaults applied */ - static Map getEnvWithDefaults() { + public static Map getEnvWithDeploymentDefaults() { var env = new HashMap<>(System.getenv()); - env.putIfAbsent("DEPLOYMENT_ID", UUID.randomUUID().toString()); - env.putIfAbsent("DEPLOYMENT_TIMESTAMP", String.valueOf(System.currentTimeMillis())); + getDeploymentDefaults().forEach(env::putIfAbsent); return Map.copyOf(env); } - private EnvUtils() { - throw new UnsupportedOperationException(); + public static Map getDeploymentDefaults() { + return Map.of( + "DEPLOYMENT_ID", + UUID.randomUUID().toString(), + "DEPLOYMENT_TIMESTAMP", + String.valueOf(System.currentTimeMillis())); } } diff --git a/env-utils/src/main/java/com/datasqrl/flinkrunner/utils/EnvVarResolver.java b/env-utils/src/main/java/com/datasqrl/flinkrunner/utils/EnvVarResolver.java new file mode 100644 index 00000000..c62db983 --- /dev/null +++ b/env-utils/src/main/java/com/datasqrl/flinkrunner/utils/EnvVarResolver.java @@ -0,0 +1,246 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.utils; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import lombok.extern.slf4j.Slf4j; + +/** + * Environment variable resolving functionality. + * + *

Resolver instances replace {@code ${VAR}} placeholders with values from a configured + * environment map. Missing variables either fail resolution in strict mode or remain unresolved in + * non-strict mode. + */ +@Slf4j +public class EnvVarResolver { + + private static final Pattern ENVIRONMENT_VARIABLE_PATTERN = Pattern.compile("\\$\\{(.*?)\\}"); + + private final Map envVars; + private final ObjectMapper objectMapper; + private final boolean strict; + + private EnvVarResolver(Map envVars, boolean strict) { + this.envVars = envVars; + this.strict = strict; + objectMapper = initObjectMapper(); + } + + /** + * Creates a strict resolver backed by the current process environment. + * + * @return a resolver backed by {@link System#getenv()} + */ + public static EnvVarResolver of() { + return of(true); + } + + /** + * Creates a resolver backed by the current process environment. + * + * @param strict whether missing variables should fail resolution + * @return a resolver backed by {@link System#getenv()} + */ + public static EnvVarResolver of(boolean strict) { + return of(System.getenv(), strict); + } + + /** + * Creates a strict resolver backed by the supplied environment variables. + * + * @param envVars environment variables used for placeholder resolution + * @return a resolver backed by the supplied environment variables + */ + public static EnvVarResolver of(Map envVars) { + return of(envVars, true); + } + + /** + * Creates a resolver backed by the supplied environment variables. + * + * @param envVars environment variables used for placeholder resolution + * @param strict whether missing variables should fail resolution + * @return a resolver backed by the supplied environment variables + */ + public static EnvVarResolver of(Map envVars, boolean strict) { + return new EnvVarResolver(envVars, strict); + } + + /** + * Creates a strict resolver backed by the current process environment plus deployment defaults. + * + * @return a resolver with deployment defaults applied + */ + public static EnvVarResolver withDeploymentDefaults() { + return withDeploymentDefaults(true); + } + + /** + * Creates a resolver backed by the current process environment plus deployment defaults. + * + * @param strict whether missing variables should fail resolution + * @return a resolver with deployment defaults applied + */ + public static EnvVarResolver withDeploymentDefaults(boolean strict) { + return new EnvVarResolver(EnvUtils.getEnvWithDeploymentDefaults(), strict); + } + + /** + * Creates a strict resolver backed by the supplied environment variables plus deployment + * defaults. + * + * @param envVars environment variables used for placeholder resolution + * @return a resolver with deployment defaults applied + */ + public static EnvVarResolver withDeploymentDefaults(Map envVars) { + return withDeploymentDefaults(envVars, true); + } + + /** + * Creates a resolver backed by the supplied environment variables plus deployment defaults. + * + *

Deployment defaults are only added when the supplied map does not already contain those + * keys. + * + * @param envVars environment variables used for placeholder resolution + * @param strict whether missing variables should fail resolution + * @return a resolver with deployment defaults applied + */ + public static EnvVarResolver withDeploymentDefaults(Map envVars, boolean strict) { + var modifiedEnvVars = new HashMap<>(envVars); + EnvUtils.getDeploymentDefaults().forEach(modifiedEnvVars::putIfAbsent); + + return new EnvVarResolver(Map.copyOf(modifiedEnvVars), strict); + } + + /** + * Resolves environment variables referenced in a given source string. Searches for environment + * variable references based on {@link EnvVarResolver#ENVIRONMENT_VARIABLE_PATTERN}. If a blank + * source string is passed, it will be returned as is. + * + * @param src given source string that may contain environment variable references + * @return a new string with the resolved environment variables + * @throws IllegalStateException if strict mode is enabled and any referenced environment variable + * is not available + */ + public String resolve(String src) { + if (src == null || src.isBlank()) { + return src; + } + + var res = new StringBuilder(); + // First pass to replace environment variables + var matcher = ENVIRONMENT_VARIABLE_PATTERN.matcher(src); + var missingEnvVars = new HashSet(); + while (matcher.find()) { + var rawKey = matcher.group(1); + String key; + String defaultValue = null; + + // Support bash-style default values: ${VAR:-default} or ${VAR:=default} + int splitIdx = rawKey.indexOf(":-"); + if (splitIdx == -1) { + splitIdx = rawKey.indexOf(":="); + } + + if (splitIdx >= 0) { + key = rawKey.substring(0, splitIdx); + defaultValue = rawKey.substring(splitIdx + 2); + } else { + key = rawKey; + } + + if (envVars.containsKey(key)) { + var envValue = envVars.get(key); + matcher.appendReplacement(res, Matcher.quoteReplacement(envValue)); + } else if (defaultValue != null) { + matcher.appendReplacement(res, Matcher.quoteReplacement(defaultValue)); + } else { + missingEnvVars.add(key); + } + } + matcher.appendTail(res); + + if (strict && !missingEnvVars.isEmpty()) { + throw new IllegalStateException( + String.format( + "The following environment variables were referenced, but not found: %s", + missingEnvVars)); + } + + return res.toString(); + } + + /** + * Resolves environment variables referenced in a given JSON source string. Searches for + * environment variable references in any string leaf nodes based on {@link + * EnvVarResolver#ENVIRONMENT_VARIABLE_PATTERN}. + * + * @param jsonSrc given JSON source string that may contain environment variable references + * @return JSON string with the resolved environment variables + * @throws IOException if the JSON processing fails in any way + */ + public String resolveInJson(String jsonSrc) throws IOException { + var res = objectMapper.readValue(jsonSrc, Map.class); + + return objectMapper.writeValueAsString(res); + } + + /** + * Creates an {@link ObjectMapper} configured to resolve environment variables in string values. + * + * @return an object mapper with environment-variable resolution enabled for string + * deserialization + */ + public ObjectMapper initObjectMapper() { + return initObjectMapper(new ObjectMapper()); + } + + /** + * Configures an {@link ObjectMapper} to resolve environment variables in string values. + * + * @param mapper mapper to configure + * @return the supplied mapper with environment-variable resolution enabled for string + * deserialization + */ + public ObjectMapper initObjectMapper(ObjectMapper mapper) { + var module = new SimpleModule(); + module.addDeserializer(String.class, new JsonEnvVarDeserializer()); + mapper.registerModule(module); + + return mapper; + } + + private class JsonEnvVarDeserializer extends JsonDeserializer { + + @Override + public String deserialize(JsonParser parser, DeserializationContext ctx) throws IOException { + var value = parser.getText(); + return resolve(value); + } + } +} diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvUtilsTest.java b/env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvUtilsTest.java similarity index 76% rename from flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvUtilsTest.java rename to env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvUtilsTest.java index f55ad563..0712eeeb 100644 --- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvUtilsTest.java +++ b/env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvUtilsTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.datasqrl.flinkrunner; +package com.datasqrl.flinkrunner.utils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -25,7 +25,7 @@ class EnvUtilsTest { @Test void givenSystemEnv_whenGetEnvWithDefaults_thenReturnsEnvWithDefaults() { - var env = EnvUtils.getEnvWithDefaults(); + var env = EnvUtils.getEnvWithDeploymentDefaults(); assertThat(env).isNotNull(); assertThat(env).containsKey("DEPLOYMENT_ID"); @@ -34,7 +34,7 @@ void givenSystemEnv_whenGetEnvWithDefaults_thenReturnsEnvWithDefaults() { @Test void givenSystemEnv_whenGetEnvWithDefaults_thenDeploymentIdIsValidUuid() { - var env = EnvUtils.getEnvWithDefaults(); + var env = EnvUtils.getEnvWithDeploymentDefaults(); String deploymentId = env.get("DEPLOYMENT_ID"); assertThat(deploymentId) @@ -44,7 +44,7 @@ void givenSystemEnv_whenGetEnvWithDefaults_thenDeploymentIdIsValidUuid() { @Test void givenSystemEnv_whenGetEnvWithDefaults_thenDeploymentTimestampIsValidLong() { - var env = EnvUtils.getEnvWithDefaults(); + var env = EnvUtils.getEnvWithDeploymentDefaults(); String timestamp = env.get("DEPLOYMENT_TIMESTAMP"); assertThat(timestamp).isNotNull(); @@ -53,7 +53,7 @@ void givenSystemEnv_whenGetEnvWithDefaults_thenDeploymentTimestampIsValidLong() @Test void givenSystemEnv_whenGetEnvWithDefaults_thenReturnsImmutableMap() { - var env = EnvUtils.getEnvWithDefaults(); + var env = EnvUtils.getEnvWithDeploymentDefaults(); assertThatThrownBy(() -> env.put("NEW_KEY", "value")) .isInstanceOf(UnsupportedOperationException.class); @@ -61,7 +61,7 @@ void givenSystemEnv_whenGetEnvWithDefaults_thenReturnsImmutableMap() { @Test void givenSystemEnv_whenGetEnvWithDefaults_thenIncludesSystemEnvironmentVariables() { - var env = EnvUtils.getEnvWithDefaults(); + var env = EnvUtils.getEnvWithDeploymentDefaults(); var systemEnv = System.getenv(); // Verify that all system environment variables are present @@ -72,8 +72,8 @@ void givenSystemEnv_whenGetEnvWithDefaults_thenIncludesSystemEnvironmentVariable @Test void givenMultipleCalls_whenGetEnvWithDefaults_thenGeneratesDifferentDeploymentIds() { - var env1 = EnvUtils.getEnvWithDefaults(); - var env2 = EnvUtils.getEnvWithDefaults(); + var env1 = EnvUtils.getEnvWithDeploymentDefaults(); + var env2 = EnvUtils.getEnvWithDeploymentDefaults(); assertThat(env1.get("DEPLOYMENT_ID")).isNotEqualTo(env2.get("DEPLOYMENT_ID")); } @@ -81,23 +81,12 @@ void givenMultipleCalls_whenGetEnvWithDefaults_thenGeneratesDifferentDeploymentI @Test void givenMultipleCalls_whenGetEnvWithDefaults_thenGeneratesDifferentTimestamps() throws InterruptedException { - var env1 = EnvUtils.getEnvWithDefaults(); + var env1 = EnvUtils.getEnvWithDeploymentDefaults(); Thread.sleep(2); // Small delay to ensure different timestamps - var env2 = EnvUtils.getEnvWithDefaults(); + var env2 = EnvUtils.getEnvWithDeploymentDefaults(); long timestamp1 = Long.parseLong(env1.get("DEPLOYMENT_TIMESTAMP")); long timestamp2 = Long.parseLong(env2.get("DEPLOYMENT_TIMESTAMP")); assertThat(timestamp2).isGreaterThanOrEqualTo(timestamp1); } - - @Test - void givenConstructor_whenInvoked_thenThrowsUnsupportedOperationException() { - assertThatThrownBy( - () -> { - var constructor = EnvUtils.class.getDeclaredConstructor(); - constructor.setAccessible(true); - constructor.newInstance(); - }) - .hasCauseInstanceOf(UnsupportedOperationException.class); - } } diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvVarResolverTest.java b/env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvVarResolverTest.java similarity index 67% rename from flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvVarResolverTest.java rename to env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvVarResolverTest.java index c9df9b07..e155d724 100644 --- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvVarResolverTest.java +++ b/env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvVarResolverTest.java @@ -13,12 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.datasqrl.flinkrunner; +package com.datasqrl.flinkrunner.utils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; import java.util.Map; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; @@ -40,7 +43,7 @@ void givenEnvVariables_whenReplaceWithEnv_thenReplaceCorrectlyVars( Map.of( "USER", "John", "PATH", "/usr/bin"); - resolver = new EnvVarResolver(envVariables); + resolver = EnvVarResolver.of(envVariables); var result = resolver.resolve(command); assertThat(result).isEqualTo(expected); } @@ -54,7 +57,7 @@ void givenEnvVariables_whenReplaceWithEnv_thenReplaceCorrectlyVars( }) void givenMissingEnvVariables_whenResolveEnv_Vars_thenThrowException(String command) { Map envVariables = Map.of(); - resolver = new EnvVarResolver(envVariables); // Empty map to simulate missing variables + resolver = EnvVarResolver.of(envVariables); // Empty map to simulate missing variables assertThatThrownBy(() -> resolver.resolve(command)) .isInstanceOf(IllegalStateException.class) .hasMessageStartingWith( @@ -73,7 +76,7 @@ void givenSimilarEnvVariableNames_whenResolveEnv_Vars_thenPartialMatchDoesNotOcc Map.of( "USER", "John", "NAME", "exists"); - resolver = new EnvVarResolver(envVariables); + resolver = EnvVarResolver.of(envVariables); assertThatThrownBy(() -> resolver.resolve(command)) .isInstanceOf(IllegalStateException.class) .hasMessage( @@ -102,7 +105,7 @@ void givenDefaultEnvValues_whenResolve_thenFallbackOrUseEnvValue( Map.of( "USER", "John", "PATH", "/usr/bin"); - resolver = new EnvVarResolver(envVariables); + resolver = EnvVarResolver.of(envVariables); var result = resolver.resolve(command); assertThat(result).isEqualTo(expected); } @@ -117,7 +120,7 @@ void givenDefaultEnvValues_whenResolve_thenFallbackOrUseEnvValue( }) void givenMissingEnvWithoutDefault_whenResolve_thenThrowException(String command) { Map envVariables = Map.of("A", "something"); - resolver = new EnvVarResolver(envVariables); + resolver = EnvVarResolver.of(envVariables); assertThatThrownBy(() -> resolver.resolve(command)) .isInstanceOf(IllegalStateException.class) .hasMessageContaining("referenced, but not found"); @@ -133,8 +136,66 @@ void givenMissingEnvWithoutDefault_whenResolve_thenThrowException(String command }) void givenFallbackWithColons_whenResolve_thenParseCorrectly(String command, String expected) { Map envVariables = Map.of(); // No TOKEN set - resolver = new EnvVarResolver(envVariables); + resolver = EnvVarResolver.of(envVariables); var result = resolver.resolve(command); assertThat(result).isEqualTo(expected); } + + @Test + void givenNullOrBlankSource_whenResolve_thenReturnSource() { + resolver = EnvVarResolver.of(Map.of()); + + assertThat(resolver.resolve(null)).isNull(); + assertThat(resolver.resolve(" ")).isEqualTo(" "); + } + + @Test + void givenNonStrictResolver_whenMissingEnvVariables_thenLeavesPlaceholdersUnresolved() { + resolver = EnvVarResolver.of(Map.of("USER", "John"), false); + + var result = resolver.resolve("Hello ${USER}, ${MISSING}!"); + + assertThat(result).isEqualTo("Hello John, ${MISSING}!"); + } + + @Test + void givenDeploymentDefaults_whenResolve_thenUseDefaultsAndSuppliedValues() { + resolver = + EnvVarResolver.withDeploymentDefaults( + Map.of( + "DEPLOYMENT_ID", "deployment-1", + "USER", "John")); + + var result = resolver.resolve("${DEPLOYMENT_ID}|${DEPLOYMENT_TIMESTAMP}|${USER}"); + var parts = result.split("\\|"); + + assertThat(parts).hasSize(3); + assertThat(parts[0]).isEqualTo("deployment-1"); + assertThat(Long.parseLong(parts[1])).isGreaterThan(0); + assertThat(parts[2]).isEqualTo("John"); + } + + @Test + void givenNonStrictDeploymentDefaults_whenMissingNonDefaultEnvVariable_thenLeavesPlaceholder() { + resolver = + EnvVarResolver.withDeploymentDefaults(Map.of("DEPLOYMENT_ID", "deployment-1"), false); + + var result = resolver.resolve("${DEPLOYMENT_ID}|${MISSING}"); + + assertThat(result).isEqualTo("deployment-1|${MISSING}"); + } + + @Test + void givenJsonSource_whenResolveInJson_thenResolveStringLeafNodes() throws IOException { + resolver = EnvVarResolver.of(Map.of("USER", "John")); + + var result = + resolver.resolveInJson( + "{\"user\":\"${USER}\",\"nested\":{\"path\":\"${PATH:-/tmp}\"},\"count\":1}"); + var json = new ObjectMapper().readTree(result); + + assertThat(json.get("user").asText()).isEqualTo("John"); + assertThat(json.get("count").asInt()).isEqualTo(1); + assertThat(json.get("nested").get("path").asText()).isEqualTo("/tmp"); + } } diff --git a/flink-sql-runner/pom.xml b/flink-sql-runner/pom.xml index 94f73954..299ce186 100644 --- a/flink-sql-runner/pom.xml +++ b/flink-sql-runner/pom.xml @@ -118,6 +118,12 @@ + + com.datasqrl.flinkrunner + env-utils + ${project.version} + + com.datasqrl.flinkrunner diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/BaseRunner.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/BaseRunner.java index 20196e8d..eeb9ccbd 100644 --- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/BaseRunner.java +++ b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/BaseRunner.java @@ -15,6 +15,7 @@ */ package com.datasqrl.flinkrunner; +import com.datasqrl.flinkrunner.utils.EnvVarResolver; import java.io.File; import java.io.IOException; import java.util.function.Supplier; diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java index 09b225ec..3ff76ac1 100644 --- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java +++ b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java @@ -15,6 +15,7 @@ */ package com.datasqrl.flinkrunner; +import com.datasqrl.flinkrunner.utils.EnvVarResolver; import java.util.Arrays; import java.util.concurrent.Callable; import javax.annotation.Nullable; @@ -75,7 +76,7 @@ public CliRunner( @Nullable String planFile, @Nullable String configDir, @Nullable String udfPath) { - this(mode, new EnvVarResolver(), sqlFile, planFile, configDir, udfPath); + this(mode, EnvVarResolver.withDeploymentDefaults(), sqlFile, planFile, configDir, udfPath); } @VisibleForTesting diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/EnvVarResolver.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/EnvVarResolver.java deleted file mode 100644 index bf6a3d6f..00000000 --- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/EnvVarResolver.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright © 2026 DataSQRL (contact@datasqrl.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datasqrl.flinkrunner; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; - -/** Environment variable resolving functionality. */ -@Slf4j -public class EnvVarResolver { - - private static final Pattern ENVIRONMENT_VARIABLE_PATTERN = Pattern.compile("\\$\\{(.*?)\\}"); - - private final Map envVars; - private final ObjectMapper objectMapper; - - public EnvVarResolver() { - this(EnvUtils.getEnvWithDefaults()); - } - - public EnvVarResolver(Map envVars) { - this.envVars = envVars; - objectMapper = initObjectMapper(); - } - - /** - * Resolves environment variables referenced in a given source string. Searches for environment - * variable references based on {@link EnvVarResolver#ENVIRONMENT_VARIABLE_PATTERN}. If a blank - * source string is passed, it will be returned as is. - * - * @param src given source string that may contain environment variable references - * @return a new string with the resolved environment variables - * @throws IllegalStateException if any referenced environment variable are not available - */ - public String resolve(String src) { - if (StringUtils.isBlank(src)) { - return src; - } - - var res = new StringBuffer(); - // First pass to replace environment variables - var matcher = ENVIRONMENT_VARIABLE_PATTERN.matcher(src); - var missingEnvVars = new HashSet(); - while (matcher.find()) { - var rawKey = matcher.group(1); - String key; - String defaultValue = null; - - // Support bash-style default values: ${VAR:-default} or ${VAR:=default} - int splitIdx = rawKey.indexOf(":-"); - if (splitIdx == -1) { - splitIdx = rawKey.indexOf(":="); - } - - if (splitIdx >= 0) { - key = rawKey.substring(0, splitIdx); - defaultValue = rawKey.substring(splitIdx + 2); - } else { - key = rawKey; - } - - if (envVars.containsKey(key)) { - var envValue = envVars.get(key); - matcher.appendReplacement(res, Matcher.quoteReplacement(envValue)); - } else if (defaultValue != null) { - matcher.appendReplacement(res, Matcher.quoteReplacement(defaultValue)); - } else { - missingEnvVars.add(key); - } - } - matcher.appendTail(res); - - if (!missingEnvVars.isEmpty()) { - throw new IllegalStateException( - String.format( - "The following environment variables were referenced, but not found: %s", - missingEnvVars)); - } - - return res.toString(); - } - - /** - * Resolves environment variables referenced in a given JSON source string.Searches for - * environment variable references in any string leaf nodes based on {@link - * EnvVarResolver#ENVIRONMENT_VARIABLE_PATTERN}. - * - * @param jsonSrc given JSON source string that may contain environment variable references - * @return JSON string with the resolved environment variables - * @throws IOException if the JSON processing fails in any way - */ - public String resolveInJson(String jsonSrc) throws IOException { - var res = objectMapper.readValue(jsonSrc, Map.class); - - return objectMapper.writeValueAsString(res); - } - - private ObjectMapper initObjectMapper() { - var objectMapper = new ObjectMapper(); - - var module = new SimpleModule(); - module.addDeserializer(String.class, new JsonEnvVarDeserializer()); - objectMapper.registerModule(module); - - return objectMapper; - } - - private class JsonEnvVarDeserializer extends JsonDeserializer { - - @Override - public String deserialize(JsonParser parser, DeserializationContext ctx) throws IOException { - var value = parser.getText(); - return resolve(value); - } - } -} diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqrlRunner.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqrlRunner.java index 15b202e1..9b604a41 100644 --- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqrlRunner.java +++ b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqrlRunner.java @@ -15,6 +15,7 @@ */ package com.datasqrl.flinkrunner; +import com.datasqrl.flinkrunner.utils.EnvVarResolver; import javax.annotation.Nullable; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerTest.java b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerTest.java index 82c335ad..d059acbc 100644 --- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerTest.java +++ b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import com.datasqrl.flinkrunner.utils.EnvVarResolver; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; diff --git a/pom.xml b/pom.xml index f2dc2930..cb648333 100644 --- a/pom.xml +++ b/pom.xml @@ -59,6 +59,7 @@ + env-utils formats stdlib connectors @@ -106,6 +107,7 @@ 13.5 3.3.4 1.10.2 + 2.21.3 0.8.14 3.53.0 3.0.0 @@ -139,6 +141,14 @@ + + com.fasterxml.jackson + jackson-bom + ${jackson.version} + pom + import + + org.junit junit-bom