getDeploymentDefaults() {
+ return Map.of(
+ "DEPLOYMENT_ID",
+ UUID.randomUUID().toString(),
+ "DEPLOYMENT_TIMESTAMP",
+ String.valueOf(System.currentTimeMillis()));
}
}
diff --git a/env-utils/src/main/java/com/datasqrl/flinkrunner/utils/EnvVarResolver.java b/env-utils/src/main/java/com/datasqrl/flinkrunner/utils/EnvVarResolver.java
new file mode 100644
index 00000000..c62db983
--- /dev/null
+++ b/env-utils/src/main/java/com/datasqrl/flinkrunner/utils/EnvVarResolver.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright © 2026 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.flinkrunner.utils;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Environment variable resolving functionality.
+ *
+ * Resolver instances replace {@code ${VAR}} placeholders with values from a configured
+ * environment map. Missing variables either fail resolution in strict mode or remain unresolved in
+ * non-strict mode.
+ */
+@Slf4j
+public class EnvVarResolver {
+
+ private static final Pattern ENVIRONMENT_VARIABLE_PATTERN = Pattern.compile("\\$\\{(.*?)\\}");
+
+ private final Map envVars;
+ private final ObjectMapper objectMapper;
+ private final boolean strict;
+
+ private EnvVarResolver(Map envVars, boolean strict) {
+ this.envVars = envVars;
+ this.strict = strict;
+ objectMapper = initObjectMapper();
+ }
+
+ /**
+ * Creates a strict resolver backed by the current process environment.
+ *
+ * @return a resolver backed by {@link System#getenv()}
+ */
+ public static EnvVarResolver of() {
+ return of(true);
+ }
+
+ /**
+ * Creates a resolver backed by the current process environment.
+ *
+ * @param strict whether missing variables should fail resolution
+ * @return a resolver backed by {@link System#getenv()}
+ */
+ public static EnvVarResolver of(boolean strict) {
+ return of(System.getenv(), strict);
+ }
+
+ /**
+ * Creates a strict resolver backed by the supplied environment variables.
+ *
+ * @param envVars environment variables used for placeholder resolution
+ * @return a resolver backed by the supplied environment variables
+ */
+ public static EnvVarResolver of(Map envVars) {
+ return of(envVars, true);
+ }
+
+ /**
+ * Creates a resolver backed by the supplied environment variables.
+ *
+ * @param envVars environment variables used for placeholder resolution
+ * @param strict whether missing variables should fail resolution
+ * @return a resolver backed by the supplied environment variables
+ */
+ public static EnvVarResolver of(Map envVars, boolean strict) {
+ return new EnvVarResolver(envVars, strict);
+ }
+
+ /**
+ * Creates a strict resolver backed by the current process environment plus deployment defaults.
+ *
+ * @return a resolver with deployment defaults applied
+ */
+ public static EnvVarResolver withDeploymentDefaults() {
+ return withDeploymentDefaults(true);
+ }
+
+ /**
+ * Creates a resolver backed by the current process environment plus deployment defaults.
+ *
+ * @param strict whether missing variables should fail resolution
+ * @return a resolver with deployment defaults applied
+ */
+ public static EnvVarResolver withDeploymentDefaults(boolean strict) {
+ return new EnvVarResolver(EnvUtils.getEnvWithDeploymentDefaults(), strict);
+ }
+
+ /**
+ * Creates a strict resolver backed by the supplied environment variables plus deployment
+ * defaults.
+ *
+ * @param envVars environment variables used for placeholder resolution
+ * @return a resolver with deployment defaults applied
+ */
+ public static EnvVarResolver withDeploymentDefaults(Map envVars) {
+ return withDeploymentDefaults(envVars, true);
+ }
+
+ /**
+ * Creates a resolver backed by the supplied environment variables plus deployment defaults.
+ *
+ * Deployment defaults are only added when the supplied map does not already contain those
+ * keys.
+ *
+ * @param envVars environment variables used for placeholder resolution
+ * @param strict whether missing variables should fail resolution
+ * @return a resolver with deployment defaults applied
+ */
+ public static EnvVarResolver withDeploymentDefaults(Map envVars, boolean strict) {
+ var modifiedEnvVars = new HashMap<>(envVars);
+ EnvUtils.getDeploymentDefaults().forEach(modifiedEnvVars::putIfAbsent);
+
+ return new EnvVarResolver(Map.copyOf(modifiedEnvVars), strict);
+ }
+
+ /**
+ * Resolves environment variables referenced in a given source string. Searches for environment
+ * variable references based on {@link EnvVarResolver#ENVIRONMENT_VARIABLE_PATTERN}. If a blank
+ * source string is passed, it will be returned as is.
+ *
+ * @param src given source string that may contain environment variable references
+ * @return a new string with the resolved environment variables
+ * @throws IllegalStateException if strict mode is enabled and any referenced environment variable
+ * is not available
+ */
+ public String resolve(String src) {
+ if (src == null || src.isBlank()) {
+ return src;
+ }
+
+ var res = new StringBuilder();
+ // First pass to replace environment variables
+ var matcher = ENVIRONMENT_VARIABLE_PATTERN.matcher(src);
+ var missingEnvVars = new HashSet();
+ while (matcher.find()) {
+ var rawKey = matcher.group(1);
+ String key;
+ String defaultValue = null;
+
+ // Support bash-style default values: ${VAR:-default} or ${VAR:=default}
+ int splitIdx = rawKey.indexOf(":-");
+ if (splitIdx == -1) {
+ splitIdx = rawKey.indexOf(":=");
+ }
+
+ if (splitIdx >= 0) {
+ key = rawKey.substring(0, splitIdx);
+ defaultValue = rawKey.substring(splitIdx + 2);
+ } else {
+ key = rawKey;
+ }
+
+ if (envVars.containsKey(key)) {
+ var envValue = envVars.get(key);
+ matcher.appendReplacement(res, Matcher.quoteReplacement(envValue));
+ } else if (defaultValue != null) {
+ matcher.appendReplacement(res, Matcher.quoteReplacement(defaultValue));
+ } else {
+ missingEnvVars.add(key);
+ }
+ }
+ matcher.appendTail(res);
+
+ if (strict && !missingEnvVars.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "The following environment variables were referenced, but not found: %s",
+ missingEnvVars));
+ }
+
+ return res.toString();
+ }
+
+ /**
+ * Resolves environment variables referenced in a given JSON source string. Searches for
+ * environment variable references in any string leaf nodes based on {@link
+ * EnvVarResolver#ENVIRONMENT_VARIABLE_PATTERN}.
+ *
+ * @param jsonSrc given JSON source string that may contain environment variable references
+ * @return JSON string with the resolved environment variables
+ * @throws IOException if the JSON processing fails in any way
+ */
+ public String resolveInJson(String jsonSrc) throws IOException {
+ var res = objectMapper.readValue(jsonSrc, Map.class);
+
+ return objectMapper.writeValueAsString(res);
+ }
+
+ /**
+ * Creates an {@link ObjectMapper} configured to resolve environment variables in string values.
+ *
+ * @return an object mapper with environment-variable resolution enabled for string
+ * deserialization
+ */
+ public ObjectMapper initObjectMapper() {
+ return initObjectMapper(new ObjectMapper());
+ }
+
+ /**
+ * Configures an {@link ObjectMapper} to resolve environment variables in string values.
+ *
+ * @param mapper mapper to configure
+ * @return the supplied mapper with environment-variable resolution enabled for string
+ * deserialization
+ */
+ public ObjectMapper initObjectMapper(ObjectMapper mapper) {
+ var module = new SimpleModule();
+ module.addDeserializer(String.class, new JsonEnvVarDeserializer());
+ mapper.registerModule(module);
+
+ return mapper;
+ }
+
+ private class JsonEnvVarDeserializer extends JsonDeserializer {
+
+ @Override
+ public String deserialize(JsonParser parser, DeserializationContext ctx) throws IOException {
+ var value = parser.getText();
+ return resolve(value);
+ }
+ }
+}
diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvUtilsTest.java b/env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvUtilsTest.java
similarity index 76%
rename from flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvUtilsTest.java
rename to env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvUtilsTest.java
index f55ad563..0712eeeb 100644
--- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvUtilsTest.java
+++ b/env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvUtilsTest.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.datasqrl.flinkrunner;
+package com.datasqrl.flinkrunner.utils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -25,7 +25,7 @@ class EnvUtilsTest {
@Test
void givenSystemEnv_whenGetEnvWithDefaults_thenReturnsEnvWithDefaults() {
- var env = EnvUtils.getEnvWithDefaults();
+ var env = EnvUtils.getEnvWithDeploymentDefaults();
assertThat(env).isNotNull();
assertThat(env).containsKey("DEPLOYMENT_ID");
@@ -34,7 +34,7 @@ void givenSystemEnv_whenGetEnvWithDefaults_thenReturnsEnvWithDefaults() {
@Test
void givenSystemEnv_whenGetEnvWithDefaults_thenDeploymentIdIsValidUuid() {
- var env = EnvUtils.getEnvWithDefaults();
+ var env = EnvUtils.getEnvWithDeploymentDefaults();
String deploymentId = env.get("DEPLOYMENT_ID");
assertThat(deploymentId)
@@ -44,7 +44,7 @@ void givenSystemEnv_whenGetEnvWithDefaults_thenDeploymentIdIsValidUuid() {
@Test
void givenSystemEnv_whenGetEnvWithDefaults_thenDeploymentTimestampIsValidLong() {
- var env = EnvUtils.getEnvWithDefaults();
+ var env = EnvUtils.getEnvWithDeploymentDefaults();
String timestamp = env.get("DEPLOYMENT_TIMESTAMP");
assertThat(timestamp).isNotNull();
@@ -53,7 +53,7 @@ void givenSystemEnv_whenGetEnvWithDefaults_thenDeploymentTimestampIsValidLong()
@Test
void givenSystemEnv_whenGetEnvWithDefaults_thenReturnsImmutableMap() {
- var env = EnvUtils.getEnvWithDefaults();
+ var env = EnvUtils.getEnvWithDeploymentDefaults();
assertThatThrownBy(() -> env.put("NEW_KEY", "value"))
.isInstanceOf(UnsupportedOperationException.class);
@@ -61,7 +61,7 @@ void givenSystemEnv_whenGetEnvWithDefaults_thenReturnsImmutableMap() {
@Test
void givenSystemEnv_whenGetEnvWithDefaults_thenIncludesSystemEnvironmentVariables() {
- var env = EnvUtils.getEnvWithDefaults();
+ var env = EnvUtils.getEnvWithDeploymentDefaults();
var systemEnv = System.getenv();
// Verify that all system environment variables are present
@@ -72,8 +72,8 @@ void givenSystemEnv_whenGetEnvWithDefaults_thenIncludesSystemEnvironmentVariable
@Test
void givenMultipleCalls_whenGetEnvWithDefaults_thenGeneratesDifferentDeploymentIds() {
- var env1 = EnvUtils.getEnvWithDefaults();
- var env2 = EnvUtils.getEnvWithDefaults();
+ var env1 = EnvUtils.getEnvWithDeploymentDefaults();
+ var env2 = EnvUtils.getEnvWithDeploymentDefaults();
assertThat(env1.get("DEPLOYMENT_ID")).isNotEqualTo(env2.get("DEPLOYMENT_ID"));
}
@@ -81,23 +81,12 @@ void givenMultipleCalls_whenGetEnvWithDefaults_thenGeneratesDifferentDeploymentI
@Test
void givenMultipleCalls_whenGetEnvWithDefaults_thenGeneratesDifferentTimestamps()
throws InterruptedException {
- var env1 = EnvUtils.getEnvWithDefaults();
+ var env1 = EnvUtils.getEnvWithDeploymentDefaults();
Thread.sleep(2); // Small delay to ensure different timestamps
- var env2 = EnvUtils.getEnvWithDefaults();
+ var env2 = EnvUtils.getEnvWithDeploymentDefaults();
long timestamp1 = Long.parseLong(env1.get("DEPLOYMENT_TIMESTAMP"));
long timestamp2 = Long.parseLong(env2.get("DEPLOYMENT_TIMESTAMP"));
assertThat(timestamp2).isGreaterThanOrEqualTo(timestamp1);
}
-
- @Test
- void givenConstructor_whenInvoked_thenThrowsUnsupportedOperationException() {
- assertThatThrownBy(
- () -> {
- var constructor = EnvUtils.class.getDeclaredConstructor();
- constructor.setAccessible(true);
- constructor.newInstance();
- })
- .hasCauseInstanceOf(UnsupportedOperationException.class);
- }
}
diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvVarResolverTest.java b/env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvVarResolverTest.java
similarity index 67%
rename from flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvVarResolverTest.java
rename to env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvVarResolverTest.java
index c9df9b07..e155d724 100644
--- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/EnvVarResolverTest.java
+++ b/env-utils/src/test/java/com/datasqrl/flinkrunner/utils/EnvVarResolverTest.java
@@ -13,12 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.datasqrl.flinkrunner;
+package com.datasqrl.flinkrunner.utils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
import java.util.Map;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
@@ -40,7 +43,7 @@ void givenEnvVariables_whenReplaceWithEnv_thenReplaceCorrectlyVars(
Map.of(
"USER", "John",
"PATH", "/usr/bin");
- resolver = new EnvVarResolver(envVariables);
+ resolver = EnvVarResolver.of(envVariables);
var result = resolver.resolve(command);
assertThat(result).isEqualTo(expected);
}
@@ -54,7 +57,7 @@ void givenEnvVariables_whenReplaceWithEnv_thenReplaceCorrectlyVars(
})
void givenMissingEnvVariables_whenResolveEnv_Vars_thenThrowException(String command) {
Map envVariables = Map.of();
- resolver = new EnvVarResolver(envVariables); // Empty map to simulate missing variables
+ resolver = EnvVarResolver.of(envVariables); // Empty map to simulate missing variables
assertThatThrownBy(() -> resolver.resolve(command))
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith(
@@ -73,7 +76,7 @@ void givenSimilarEnvVariableNames_whenResolveEnv_Vars_thenPartialMatchDoesNotOcc
Map.of(
"USER", "John",
"NAME", "exists");
- resolver = new EnvVarResolver(envVariables);
+ resolver = EnvVarResolver.of(envVariables);
assertThatThrownBy(() -> resolver.resolve(command))
.isInstanceOf(IllegalStateException.class)
.hasMessage(
@@ -102,7 +105,7 @@ void givenDefaultEnvValues_whenResolve_thenFallbackOrUseEnvValue(
Map.of(
"USER", "John",
"PATH", "/usr/bin");
- resolver = new EnvVarResolver(envVariables);
+ resolver = EnvVarResolver.of(envVariables);
var result = resolver.resolve(command);
assertThat(result).isEqualTo(expected);
}
@@ -117,7 +120,7 @@ void givenDefaultEnvValues_whenResolve_thenFallbackOrUseEnvValue(
})
void givenMissingEnvWithoutDefault_whenResolve_thenThrowException(String command) {
Map envVariables = Map.of("A", "something");
- resolver = new EnvVarResolver(envVariables);
+ resolver = EnvVarResolver.of(envVariables);
assertThatThrownBy(() -> resolver.resolve(command))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("referenced, but not found");
@@ -133,8 +136,66 @@ void givenMissingEnvWithoutDefault_whenResolve_thenThrowException(String command
})
void givenFallbackWithColons_whenResolve_thenParseCorrectly(String command, String expected) {
Map envVariables = Map.of(); // No TOKEN set
- resolver = new EnvVarResolver(envVariables);
+ resolver = EnvVarResolver.of(envVariables);
var result = resolver.resolve(command);
assertThat(result).isEqualTo(expected);
}
+
+ @Test
+ void givenNullOrBlankSource_whenResolve_thenReturnSource() {
+ resolver = EnvVarResolver.of(Map.of());
+
+ assertThat(resolver.resolve(null)).isNull();
+ assertThat(resolver.resolve(" ")).isEqualTo(" ");
+ }
+
+ @Test
+ void givenNonStrictResolver_whenMissingEnvVariables_thenLeavesPlaceholdersUnresolved() {
+ resolver = EnvVarResolver.of(Map.of("USER", "John"), false);
+
+ var result = resolver.resolve("Hello ${USER}, ${MISSING}!");
+
+ assertThat(result).isEqualTo("Hello John, ${MISSING}!");
+ }
+
+ @Test
+ void givenDeploymentDefaults_whenResolve_thenUseDefaultsAndSuppliedValues() {
+ resolver =
+ EnvVarResolver.withDeploymentDefaults(
+ Map.of(
+ "DEPLOYMENT_ID", "deployment-1",
+ "USER", "John"));
+
+ var result = resolver.resolve("${DEPLOYMENT_ID}|${DEPLOYMENT_TIMESTAMP}|${USER}");
+ var parts = result.split("\\|");
+
+ assertThat(parts).hasSize(3);
+ assertThat(parts[0]).isEqualTo("deployment-1");
+ assertThat(Long.parseLong(parts[1])).isGreaterThan(0);
+ assertThat(parts[2]).isEqualTo("John");
+ }
+
+ @Test
+ void givenNonStrictDeploymentDefaults_whenMissingNonDefaultEnvVariable_thenLeavesPlaceholder() {
+ resolver =
+ EnvVarResolver.withDeploymentDefaults(Map.of("DEPLOYMENT_ID", "deployment-1"), false);
+
+ var result = resolver.resolve("${DEPLOYMENT_ID}|${MISSING}");
+
+ assertThat(result).isEqualTo("deployment-1|${MISSING}");
+ }
+
+ @Test
+ void givenJsonSource_whenResolveInJson_thenResolveStringLeafNodes() throws IOException {
+ resolver = EnvVarResolver.of(Map.of("USER", "John"));
+
+ var result =
+ resolver.resolveInJson(
+ "{\"user\":\"${USER}\",\"nested\":{\"path\":\"${PATH:-/tmp}\"},\"count\":1}");
+ var json = new ObjectMapper().readTree(result);
+
+ assertThat(json.get("user").asText()).isEqualTo("John");
+ assertThat(json.get("count").asInt()).isEqualTo(1);
+ assertThat(json.get("nested").get("path").asText()).isEqualTo("/tmp");
+ }
}
diff --git a/flink-sql-runner/pom.xml b/flink-sql-runner/pom.xml
index 94f73954..299ce186 100644
--- a/flink-sql-runner/pom.xml
+++ b/flink-sql-runner/pom.xml
@@ -118,6 +118,12 @@
+
+ com.datasqrl.flinkrunner
+ env-utils
+ ${project.version}
+
+
com.datasqrl.flinkrunner
diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/BaseRunner.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/BaseRunner.java
index 20196e8d..eeb9ccbd 100644
--- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/BaseRunner.java
+++ b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/BaseRunner.java
@@ -15,6 +15,7 @@
*/
package com.datasqrl.flinkrunner;
+import com.datasqrl.flinkrunner.utils.EnvVarResolver;
import java.io.File;
import java.io.IOException;
import java.util.function.Supplier;
diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java
index 09b225ec..3ff76ac1 100644
--- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java
+++ b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java
@@ -15,6 +15,7 @@
*/
package com.datasqrl.flinkrunner;
+import com.datasqrl.flinkrunner.utils.EnvVarResolver;
import java.util.Arrays;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
@@ -75,7 +76,7 @@ public CliRunner(
@Nullable String planFile,
@Nullable String configDir,
@Nullable String udfPath) {
- this(mode, new EnvVarResolver(), sqlFile, planFile, configDir, udfPath);
+ this(mode, EnvVarResolver.withDeploymentDefaults(), sqlFile, planFile, configDir, udfPath);
}
@VisibleForTesting
diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/EnvVarResolver.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/EnvVarResolver.java
deleted file mode 100644
index bf6a3d6f..00000000
--- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/EnvVarResolver.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright © 2026 DataSQRL (contact@datasqrl.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datasqrl.flinkrunner;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
-
-/** Environment variable resolving functionality. */
-@Slf4j
-public class EnvVarResolver {
-
- private static final Pattern ENVIRONMENT_VARIABLE_PATTERN = Pattern.compile("\\$\\{(.*?)\\}");
-
- private final Map envVars;
- private final ObjectMapper objectMapper;
-
- public EnvVarResolver() {
- this(EnvUtils.getEnvWithDefaults());
- }
-
- public EnvVarResolver(Map envVars) {
- this.envVars = envVars;
- objectMapper = initObjectMapper();
- }
-
- /**
- * Resolves environment variables referenced in a given source string. Searches for environment
- * variable references based on {@link EnvVarResolver#ENVIRONMENT_VARIABLE_PATTERN}. If a blank
- * source string is passed, it will be returned as is.
- *
- * @param src given source string that may contain environment variable references
- * @return a new string with the resolved environment variables
- * @throws IllegalStateException if any referenced environment variable are not available
- */
- public String resolve(String src) {
- if (StringUtils.isBlank(src)) {
- return src;
- }
-
- var res = new StringBuffer();
- // First pass to replace environment variables
- var matcher = ENVIRONMENT_VARIABLE_PATTERN.matcher(src);
- var missingEnvVars = new HashSet();
- while (matcher.find()) {
- var rawKey = matcher.group(1);
- String key;
- String defaultValue = null;
-
- // Support bash-style default values: ${VAR:-default} or ${VAR:=default}
- int splitIdx = rawKey.indexOf(":-");
- if (splitIdx == -1) {
- splitIdx = rawKey.indexOf(":=");
- }
-
- if (splitIdx >= 0) {
- key = rawKey.substring(0, splitIdx);
- defaultValue = rawKey.substring(splitIdx + 2);
- } else {
- key = rawKey;
- }
-
- if (envVars.containsKey(key)) {
- var envValue = envVars.get(key);
- matcher.appendReplacement(res, Matcher.quoteReplacement(envValue));
- } else if (defaultValue != null) {
- matcher.appendReplacement(res, Matcher.quoteReplacement(defaultValue));
- } else {
- missingEnvVars.add(key);
- }
- }
- matcher.appendTail(res);
-
- if (!missingEnvVars.isEmpty()) {
- throw new IllegalStateException(
- String.format(
- "The following environment variables were referenced, but not found: %s",
- missingEnvVars));
- }
-
- return res.toString();
- }
-
- /**
- * Resolves environment variables referenced in a given JSON source string.Searches for
- * environment variable references in any string leaf nodes based on {@link
- * EnvVarResolver#ENVIRONMENT_VARIABLE_PATTERN}.
- *
- * @param jsonSrc given JSON source string that may contain environment variable references
- * @return JSON string with the resolved environment variables
- * @throws IOException if the JSON processing fails in any way
- */
- public String resolveInJson(String jsonSrc) throws IOException {
- var res = objectMapper.readValue(jsonSrc, Map.class);
-
- return objectMapper.writeValueAsString(res);
- }
-
- private ObjectMapper initObjectMapper() {
- var objectMapper = new ObjectMapper();
-
- var module = new SimpleModule();
- module.addDeserializer(String.class, new JsonEnvVarDeserializer());
- objectMapper.registerModule(module);
-
- return objectMapper;
- }
-
- private class JsonEnvVarDeserializer extends JsonDeserializer {
-
- @Override
- public String deserialize(JsonParser parser, DeserializationContext ctx) throws IOException {
- var value = parser.getText();
- return resolve(value);
- }
- }
-}
diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqrlRunner.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqrlRunner.java
index 15b202e1..9b604a41 100644
--- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqrlRunner.java
+++ b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqrlRunner.java
@@ -15,6 +15,7 @@
*/
package com.datasqrl.flinkrunner;
+import com.datasqrl.flinkrunner.utils.EnvVarResolver;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerTest.java b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerTest.java
index 82c335ad..d059acbc 100644
--- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerTest.java
+++ b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerTest.java
@@ -22,6 +22,7 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import com.datasqrl.flinkrunner.utils.EnvVarResolver;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
diff --git a/pom.xml b/pom.xml
index f2dc2930..cb648333 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,6 +59,7 @@
+ env-utils
formats
stdlib
connectors
@@ -106,6 +107,7 @@
13.5
3.3.4
1.10.2
+ 2.21.3
0.8.14
3.53.0
3.0.0
@@ -139,6 +141,14 @@
+
+ com.fasterxml.jackson
+ jackson-bom
+ ${jackson.version}
+ pom
+ import
+
+
org.junit
junit-bom