diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml new file mode 100644 index 00000000..2b8dc574 --- /dev/null +++ b/.github/workflows/integration-test.yml @@ -0,0 +1,76 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Integration Tests + +on: + push: + branches: ["main", "master"] + pull_request: + branches: ["main", "master"] + workflow_dispatch: + +env: + CARGO_TERM_COLOR: always + +jobs: + integration-test: + name: Integration Tests + runs-on: ubuntu-latest + timeout-minutes: 30 + + steps: + - name: Checkout Source + uses: actions/checkout@v4 + + - name: Verify Docker + run: docker version + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + cache: "pip" + + - name: Setup Rust + uses: dtolnay/rust-toolchain@stable + + - name: Install System Dependencies + run: | + sudo apt-get update + sudo apt-get install -y --no-install-recommends \ + cmake \ + libssl-dev \ + libcurl4-openssl-dev \ + pkg-config \ + libsasl2-dev \ + protobuf-compiler + + - name: Cache Cargo + uses: Swatinem/rust-cache@v2 + + - name: Build Release Binary + run: make env && make dist + + - name: Pre-pull Kafka Image + run: docker pull apache/kafka:3.7.0 + + - name: Run Integration Tests + run: make integration-test + + - name: Upload Test Logs + if: failure() + uses: actions/upload-artifact@v4 + with: + name: integration-test-logs + path: tests/integration/target/**/logs/ + retention-days: 30 diff --git a/.gitignore b/.gitignore index 99a53648..5dffeb0f 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,8 @@ python/**/target/ **/**.egg-info .cache/ /.cursor/worktrees.json + +# Integration test output +tests/integration/target/ +tests/integration/.venv/ +tests/integration/install diff --git a/Makefile b/Makefile index 4daf185b..c8e1da4d 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ C_0 := \033[0m log = @printf "$(C_B)[-]$(C_0) %-15s %s\n" "$(1)" "$(2)" success = @printf "$(C_G)[✔]$(C_0) %s\n" "$(1)" -.PHONY: all help build build-lite dist dist-lite clean test env env-clean go-sdk-env go-sdk-build go-sdk-clean docker docker-run docker-push .check-env .build-wasm +.PHONY: all help build build-lite dist dist-lite clean test env env-clean go-sdk-env go-sdk-build go-sdk-clean docker docker-run docker-push .check-env .build-wasm integration-test all: build @@ -63,6 +63,8 @@ help: @echo " docker-run Run container (port 8080, mount logs)" @echo " docker-push Push image to registry" @echo "" + @echo " integration-test Run integration tests (delegates to tests/integration)" + @echo "" @echo " Version: $(VERSION) | Arch: $(ARCH) | OS: $(OS)" build: .check-env .build-wasm @@ -145,6 +147,7 @@ clean: @cargo clean @rm -rf $(DIST_ROOT) data logs @./scripts/clean.sh 2>/dev/null || true + @$(MAKE) -C tests/integration clean 2>/dev/null || true $(call success,Done) .check-env: @@ -167,3 +170,6 @@ docker-push: $(call log,DOCKER,Pushing $(IMAGE_NAME)) @docker push $(IMAGE_NAME) $(call success,Push Complete) + +integration-test: + @$(MAKE) -C tests/integration test PYTEST_ARGS="$(PYTEST_ARGS)" diff --git a/src/runtime/streaming/operators/grouping/incremental_aggregate.rs b/src/runtime/streaming/operators/grouping/incremental_aggregate.rs index 625cdee5..43e0e657 100644 --- a/src/runtime/streaming/operators/grouping/incremental_aggregate.rs +++ b/src/runtime/streaming/operators/grouping/incremental_aggregate.rs @@ -465,7 +465,7 @@ impl IncrementalAggregatingFunc { state }); - for (idx, v) in agg.state_cols.iter().zip(state.into_iter()) { + for (idx, v) in agg.state_cols.iter().zip(state) { states[*idx].push(v); } } @@ -543,7 +543,7 @@ impl IncrementalAggregatingFunc { } } - let mut cols = self.key_converter.convert_rows(rows.into_iter())?; + let mut cols = self.key_converter.convert_rows(rows)?; cols.push(Arc::new(accumulator_builder.finish())); cols.push(Arc::new(args_row_builder.finish())); cols.push(Arc::new(count_builder.finish())); @@ -612,7 +612,7 @@ impl IncrementalAggregatingFunc { mem::take(&mut self.updated_keys).into_iter().unzip(); let mut deleted_keys = vec![]; - for (k, retract) in updated_keys.iter().zip(updated_values.into_iter()) { + for (k, retract) in updated_keys.iter().zip(updated_values) { let append = self.evaluate(&k.0)?; if let Some(v) = retract { diff --git a/src/runtime/wasm/processor/wasm/wasm_task.rs b/src/runtime/wasm/processor/wasm/wasm_task.rs index 2837c9a0..c61f385f 100644 --- a/src/runtime/wasm/processor/wasm/wasm_task.rs +++ b/src/runtime/wasm/processor/wasm/wasm_task.rs @@ -392,10 +392,27 @@ impl WasmTask { ) -> ControlAction { match signal { TaskControlSignal::Start { completion_flag } => { - for input in inputs.iter_mut() { - let _ = input.start(); + for (idx, input) in inputs.iter_mut().enumerate() { + if let Err(e) = input.start() { + let msg = format!("Failed to start input {}: {}", idx, e); + log::error!("{}", msg); + *failure_cause.lock().unwrap() = Some(msg.clone()); + *shared_state.lock().unwrap() = + ComponentState::Error { error: msg.clone() }; + *execution_state.lock().unwrap() = ExecutionState::Failed; + completion_flag.mark_error(msg); + return ControlAction::Pause; + } + } + if let Err(e) = processor.start_outputs() { + let msg = format!("Failed to start outputs: {}", e); + log::error!("{}", msg); + *failure_cause.lock().unwrap() = Some(msg.clone()); + *shared_state.lock().unwrap() = ComponentState::Error { error: msg.clone() }; + *execution_state.lock().unwrap() = ExecutionState::Failed; + completion_flag.mark_error(msg); + return ControlAction::Pause; } - let _ = processor.start_outputs(); *state = TaskState::Running; *shared_state.lock().unwrap() = ComponentState::Running; completion_flag.mark_completed(); diff --git a/src/server/handler.rs b/src/server/handler.rs index 82ccb803..0319e352 100644 --- a/src/server/handler.rs +++ b/src/server/handler.rs @@ -91,6 +91,25 @@ impl FunctionStreamServiceImpl { } } + fn classify_error(message: &str) -> StatusCode { + let lower = message.to_lowercase(); + if lower.contains("not found") || lower.contains("not exist") { + StatusCode::NotFound + } else if lower.contains("uniqueness violation") + || lower.contains("already exists") + || lower.contains("duplicate") + { + StatusCode::Conflict + } else if lower.contains("invalid") + || lower.contains("unsupported") + || lower.contains("missing") + { + StatusCode::BadRequest + } else { + StatusCode::InternalServerError + } + } + async fn execute_statement( &self, stmt: &dyn Statement, @@ -101,7 +120,8 @@ impl FunctionStreamServiceImpl { if result.success { Self::build_success_response(success_status, result.message, result.data) } else { - Self::build_error_response(StatusCode::InternalServerError, result.message) + let status = Self::classify_error(&result.message); + Self::build_error_response(status, result.message) } } } @@ -139,8 +159,9 @@ impl FunctionStreamService for FunctionStreamServiceImpl { if !result.success { error!("SQL execution aborted: {}", result.message); + let status = Self::classify_error(&result.message); return Ok(TonicResponse::new(Self::build_error_response( - StatusCode::InternalServerError, + status, result.message, ))); } @@ -235,8 +256,9 @@ impl FunctionStreamService for FunctionStreamServiceImpl { if !result.success { error!("show_functions execution failed: {}", result.message); + let status = Self::classify_error(&result.message); return Ok(TonicResponse::new(ShowFunctionsResponse { - status_code: StatusCode::InternalServerError as i32, + status_code: status as i32, message: result.message, functions: vec![], })); diff --git a/src/sql/logical_planner/streaming_planner.rs b/src/sql/logical_planner/streaming_planner.rs index e501695d..4619fb3f 100644 --- a/src/sql/logical_planner/streaming_planner.rs +++ b/src/sql/logical_planner/streaming_planner.rs @@ -341,7 +341,7 @@ impl PlanToGraphVisitor<'_> { let node_index = self.graph.add_node(execution_unit); self.add_index_to_traversal(node_index); - for (source, edge) in input_nodes.into_iter().zip(routing_edges.into_iter()) { + for (source, edge) in input_nodes.into_iter().zip(routing_edges) { self.graph.add_edge(source, node_index, edge); } diff --git a/src/sql/types/data_type.rs b/src/sql/types/data_type.rs index 070324d5..387a4190 100644 --- a/src/sql/types/data_type.rs +++ b/src/sql/types/data_type.rs @@ -98,14 +98,11 @@ fn convert_simple_data_type( } }, SQLDataType::Date => Ok(DataType::Date32), - SQLDataType::Time(None, tz_info) => { + SQLDataType::Time(None, tz_info) if matches!(tz_info, TimezoneInfo::None) - || matches!(tz_info, TimezoneInfo::WithoutTimeZone) - { - Ok(DataType::Time64(TimeUnit::Nanosecond)) - } else { - return plan_err!("Unsupported SQL type {sql_type:?}"); - } + || matches!(tz_info, TimezoneInfo::WithoutTimeZone) => + { + Ok(DataType::Time64(TimeUnit::Nanosecond)) } SQLDataType::Numeric(exact_number_info) | SQLDataType::Decimal(exact_number_info) => { let (precision, scale) = match *exact_number_info { diff --git a/tests/integration/Makefile b/tests/integration/Makefile new file mode 100644 index 00000000..f16d640b --- /dev/null +++ b/tests/integration/Makefile @@ -0,0 +1,64 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------- +# Integration Test Makefile +# ----------------------------------------------------------------------- +# Usage: +# make test — Setup env + run pytest (PYTEST_ARGS="-k xxx") +# make clean — Remove .venv and test output +# +# Prerequisites: +# The FunctionStream binary must already be built (make build / make build-lite +# from the project root). +# ----------------------------------------------------------------------- + +PROJECT_ROOT := $(shell git -C $(CURDIR) rev-parse --show-toplevel) +PYTHON_ROOT := $(PROJECT_ROOT)/python +VENV := $(CURDIR)/.venv +PIP := $(VENV)/bin/pip +PY := $(VENV)/bin/python + +C_G := \033[0;32m +C_B := \033[0;34m +C_0 := \033[0m + +log = @printf "$(C_B)[-]$(C_0) %-12s %s\n" "$(1)" "$(2)" +success = @printf "$(C_G)[✔]$(C_0) %s\n" "$(1)" + +.PHONY: test clean help + +help: + @echo "Integration Test Targets:" + @echo "" + @echo " test Setup Python env + run pytest (PYTEST_ARGS=...)" + @echo " clean Remove .venv and target/tests output" + +install: requirements.txt $(PYTHON_ROOT)/functionstream-api/pyproject.toml $(PYTHON_ROOT)/functionstream-client/pyproject.toml + $(call log,ENV,Setting up Python virtual environment) + @test -d $(VENV) || python3 -m venv $(VENV) + @$(PIP) install --quiet --upgrade pip + @$(PIP) install --quiet -r requirements.txt + @touch $@ + $(call success,Python environment ready) + +test: install + $(call log,TEST,Running integration tests) + @$(PY) -m pytest -v $(PYTEST_ARGS) + $(call success,All integration tests passed) + +clean: + $(call log,CLEAN,Removing test artifacts) + @rm -rf $(VENV) + @rm -rf $(CURDIR)/target + @rm -rf $(CURDIR)/install + $(call success,Clean complete) diff --git a/tests/integration/README.md b/tests/integration/README.md new file mode 100644 index 00000000..23096720 --- /dev/null +++ b/tests/integration/README.md @@ -0,0 +1,88 @@ +# Integration Tests + +## Prerequisites + +| Dependency | Version | Purpose | +|------------|----------|-------------------------------------------------| +| Python | >= 3.9 | Test framework runtime | +| Rust | stable | Build the FunctionStream binary | +| Docker | >= 20.10 | Run a Kafka broker for streaming integration tests | + +> **Docker is required.** The test framework automatically pulls and manages +> an `apache/kafka:3.7.0` container in KRaft mode to provide a real Kafka +> broker for tests that involve Kafka input/output. Tests will fail if the +> Docker daemon is not running. + +## Quick Start + +```bash +# From the project root +make build # Build the release binary (with --features python) +make integration-test + +# Or run directly from this directory +cd tests/integration +make test +``` + +## Directory Layout + +``` +tests/integration/ +├── Makefile # test / clean targets +├── requirements.txt # Python dependencies (pytest, grpcio, docker, ...) +├── pytest.ini # Pytest configuration +├── framework/ # Reusable test infrastructure +│ ├── instance.py # FunctionStreamInstance facade +│ ├── workspace.py # Per-test directory management +│ ├── config.py # Server config generation +│ ├── process.py # OS process lifecycle (start/stop/kill) +│ ├── utils.py # Port allocation, readiness probes +│ └── kafka_manager.py # Docker-managed Kafka broker (KRaft mode) +├── test/ # Test suites +│ ├── wasm/ # WASM function tests +│ │ └── python_sdk/ # Python SDK integration tests +│ └── streaming/ # Streaming engine tests (future) +└── target/ # Test output (git-ignored) + ├── .shared_cache/ # Shared WASM compilation cache across tests + └── ////logs/ +``` + +## Test Output + +Each test gets an isolated server instance with its own log directory: + +``` +target/wasm/python_sdk/TestFunctionLifecycle/test_full_lifecycle_transitions/20260416_221655/ + logs/ + app.log # FunctionStream application log + stdout.log # Server stdout + stderr.log # Server stderr +``` + +Only `logs/` is retained after tests complete; `conf/` and `data/` are +automatically cleaned up. + +## Python Dependencies + +All Python packages are listed in `requirements.txt` and installed +automatically by `make test`. Key dependencies: + +- `pytest` — test runner +- `grpcio` / `protobuf` — gRPC client communication +- `docker` — Docker SDK for managing the Kafka container +- `confluent-kafka` — Kafka admin client for topic management +- `functionstream-api` / `functionstream-client` — local editable installs + +## Running Specific Tests + +```bash +# Single test +make test PYTEST_ARGS="-k test_full_lifecycle_transitions" + +# Single file +make test PYTEST_ARGS="test/wasm/python_sdk/test_lifecycle.py" + +# Verbose with live log +make test PYTEST_ARGS="-v --log-cli-level=DEBUG" +``` diff --git a/tests/integration/framework/__init__.py b/tests/integration/framework/__init__.py new file mode 100644 index 00000000..b735753a --- /dev/null +++ b/tests/integration/framework/__init__.py @@ -0,0 +1,22 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .instance import FunctionStreamInstance + +__all__ = ["FunctionStreamInstance", "KafkaDockerManager"] + + +def __getattr__(name: str): + if name == "KafkaDockerManager": + from .kafka_manager import KafkaDockerManager + return KafkaDockerManager + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/tests/integration/framework/config.py b/tests/integration/framework/config.py new file mode 100644 index 00000000..e086a4b0 --- /dev/null +++ b/tests/integration/framework/config.py @@ -0,0 +1,217 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +InstanceConfig: builds and writes the config.yaml consumed by +the FunctionStream binary via FUNCTION_STREAM_CONF. +""" + +import logging +import os +import tempfile +from pathlib import Path +from typing import Any, Dict, List + +import yaml + +from .workspace import InstanceWorkspace + +logger = logging.getLogger(__name__) + + +class ConfigurationError(Exception): + """Base exception for all configuration-related errors.""" + pass + + +class WasmRuntimeNotFoundError(ConfigurationError): + """Raised when the required Python WASM runtime cannot be located.""" + pass + + +class PathManager: + """ + Utility class for resolving and managing cross-platform paths. + All outputs are standardized to POSIX format for safe YAML serialization. + """ + + _INTEGRATION_DIR = Path(__file__).resolve().parents[1] + _PROJECT_ROOT = _INTEGRATION_DIR.parents[1] + + @classmethod + def get_target_dir(cls) -> Path: + """Return the integration-test workspace root (tests/integration/target).""" + return cls._INTEGRATION_DIR / "target" + + @classmethod + def get_binary_path(cls) -> Path: + """Locate the compiled function-stream binary under the project target dir.""" + import platform + import struct + + system = platform.system() + machine = platform.machine().lower() + arch_map = {"arm64": "aarch64", "amd64": "x86_64"} + arch = arch_map.get(machine, machine) + + if system == "Linux": + triple = f"{arch}-unknown-linux-gnu" + elif system == "Darwin": + triple = f"{arch}-apple-darwin" + elif system == "Windows": + triple = f"{arch}-pc-windows-msvc" + else: + triple = f"{arch}-unknown-linux-gnu" + + binary_name = "function-stream.exe" if system == "Windows" else "function-stream" + candidate = cls._PROJECT_ROOT / "target" / triple / "release" / binary_name + + if candidate.is_file(): + return candidate + + fallback = cls._PROJECT_ROOT / "target" / "release" / binary_name + if fallback.is_file(): + return fallback + + raise FileNotFoundError( + f"Cannot find function-stream binary. Checked:\n" + f" - {candidate}\n" + f" - {fallback}\n" + f"Build first with: make build (or make build-lite)" + ) + + @classmethod + def get_shared_cache_dir(cls) -> Path: + return cls._INTEGRATION_DIR / "target" / ".shared_cache" + + @classmethod + def build_posix_path(cls, base: Path, *segments: str) -> str: + """ + Safely joins a base Path with string segments and returns a POSIX-compliant string. + Eliminates manual string concatenation and Windows backslash issues. + """ + target_path = base + for segment in segments: + target_path = target_path / segment + return target_path.as_posix() + + @classmethod + def find_python_wasm_posix(cls) -> str: + """Locates the Python WASM runtime and returns its POSIX path.""" + candidates: List[Path] = [ + cls._PROJECT_ROOT / "python" / "functionstream-runtime" / "target" / "functionstream-python-runtime.wasm", + cls._PROJECT_ROOT / "dist" / "function-stream" / "data" / "cache" / "python-runner" / "functionstream-python-runtime.wasm", + ] + + for candidate in candidates: + if candidate.is_file(): + logger.debug("Found Python WASM runtime at: %s", candidate) + return candidate.resolve().as_posix() + + raise WasmRuntimeNotFoundError( + "Could not find python-runtime.wasm. Checked locations:\n" + + "\n".join(f" - {c}" for c in candidates) + ) + + +class InstanceConfig: + """ + Generates and persists config.yaml for one FunctionStream instance. + """ + + def __init__(self, host: str, port: int, workspace: InstanceWorkspace): + self._workspace = workspace + + wasm_path = PathManager.find_python_wasm_posix() + cache_dir = PathManager.build_posix_path(PathManager.get_shared_cache_dir(), "python-runner") + log_file = PathManager.build_posix_path(workspace.log_dir, "app.log") + task_db_path = PathManager.build_posix_path(workspace.data_dir, "task") + catalog_db_path = PathManager.build_posix_path(workspace.data_dir, "stream_catalog") + + self._config: Dict[str, Any] = { + "service": { + "service_id": f"it-{port}", + "service_name": "function-stream", + "version": "1.0.0", + "host": host, + "port": port, + "debug": False, + }, + "logging": { + "level": "info", + "format": "json", + "file_path": log_file, + "max_file_size": 50, + "max_files": 3, + }, + "python": { + "wasm_path": wasm_path, + "cache_dir": cache_dir, + "enable_cache": True, + }, + "state_storage": { + "storage_type": "memory", + }, + "task_storage": { + "storage_type": "rocksdb", + "db_path": task_db_path, + }, + "stream_catalog": { + "persist": True, + "db_path": catalog_db_path, + }, + } + + @property + def raw(self) -> Dict[str, Any]: + return self._config.copy() + + def override(self, overrides: Dict[str, Any]) -> None: + """Apply overrides using dot-separated keys.""" + for dotted_key, value in overrides.items(): + keys = dotted_key.split(".") + target = self._config + + for k in keys[:-1]: + target = target.setdefault(k, {}) + if not isinstance(target, dict): + raise ConfigurationError(f"Cannot override key '{dotted_key}': '{k}' is not a dictionary.") + + target[keys[-1]] = value + + def write_to_workspace(self) -> Path: + """Serialize config to the workspace config.yaml safely.""" + target_file = Path(self._workspace.config_file) + target_file.parent.mkdir(parents=True, exist_ok=True) + + fd, temp_path = tempfile.mkstemp( + dir=target_file.parent, + prefix=".config-", + suffix=".yaml", + text=True + ) + + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + yaml.safe_dump( + self._config, + f, + default_flow_style=False, + sort_keys=False + ) + os.replace(temp_path, target_file) + except Exception as e: + Path(temp_path).unlink(missing_ok=True) + logger.error("Failed to write configuration file: %s", e) + raise ConfigurationError(f"Configuration write failed: {e}") from e + + return target_file \ No newline at end of file diff --git a/tests/integration/framework/instance.py b/tests/integration/framework/instance.py new file mode 100644 index 00000000..e30415db --- /dev/null +++ b/tests/integration/framework/instance.py @@ -0,0 +1,222 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +FunctionStreamInstance: the Facade that composes workspace, config, +and process into a single easy-to-use API for test cases. +""" + +import logging +from pathlib import Path +from types import TracebackType +from typing import TYPE_CHECKING, Any, Optional, Type + +from .config import InstanceConfig, PathManager +from .process import FunctionStreamProcess +from .utils import find_free_port, wait_for_port +from .workspace import InstanceWorkspace + +if TYPE_CHECKING: + from fs_client import FsClient + from fs_client._proto.function_stream_pb2 import SqlResponse + + +class FunctionStreamInstanceError(Exception): + """Base exception for FunctionStream instance errors.""" + pass + + +class ServerStartupError(FunctionStreamInstanceError): + """Raised when the server fails to start or bind to the port within the timeout.""" + pass + + +class MissingDependencyError(FunctionStreamInstanceError): + """Raised when an optional client dependency (e.g., grpc) is missing.""" + pass + + +class FunctionStreamInstance: + """ + Facade for a single FunctionStream server used in integration tests. + + Designed to be used as a context manager to guarantee resource cleanup. + + Usage: + with FunctionStreamInstance("my_test").configure(**{"service.debug": True}) as inst: + client = inst.get_client() + inst.execute_sql("SELECT 1;") + """ + + def __init__( + self, + test_name: str = "unnamed", + host: str = "127.0.0.1", + binary_path: Optional[Path] = None, + ): + self.test_name = test_name + self.host = host + self.port = find_free_port() + + self._logger = logging.getLogger(f"{__name__}.{self.test_name}-{self.port}") + + actual_binary = binary_path or PathManager.get_binary_path() + target_dir = PathManager.get_target_dir() + + self.workspace = InstanceWorkspace(target_dir, test_name, self.port) + self.config = InstanceConfig(self.host, self.port, self.workspace) + self.process = FunctionStreamProcess(actual_binary, self.workspace) + + # ------------------------------------------------------------------ + # Context Manager Protocol (Replaces __del__) + # ------------------------------------------------------------------ + + def __enter__(self) -> "FunctionStreamInstance": + """Start the instance when entering the 'with' block.""" + self.start() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + """Ensure absolute cleanup when exiting the 'with' block.""" + self._logger.debug("Tearing down instance due to context exit.") + self.kill() + + # ------------------------------------------------------------------ + # Properties + # ------------------------------------------------------------------ + + @property + def is_running(self) -> bool: + return self.process.is_running + + @property + def pid(self) -> Optional[int]: + return self.process.pid + + # ------------------------------------------------------------------ + # Configuration (before start) + # ------------------------------------------------------------------ + + def configure(self, **overrides: Any) -> "FunctionStreamInstance": + """ + Apply config overrides. Chainable. + Example: inst.configure(service__debug=True).start() + """ + self.config.override(overrides) + return self + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + def start(self, timeout: float = 30.0) -> "FunctionStreamInstance": + """Prepare workspace, write config, launch binary, wait for ready.""" + if self.is_running: + self._logger.debug("Instance is already running. Skipping start.") + return self + + self.workspace.setup() + self.config.write_to_workspace() + + self._logger.info( + "Starting FunctionStream [port=%d, dir=%s]", + self.port, + self.workspace.root_dir, + ) + self.process.start() + + if not wait_for_port(self.host, self.port, timeout): + stderr_tail = self._read_tail(self.workspace.stderr_file) + self.process.kill() + raise ServerStartupError( + f"Server did not become ready within {timeout}s on port {self.port}.\n" + f"Process stderr tail:\n{stderr_tail}" + ) + + self._logger.info("FunctionStream ready [pid=%s]", self.pid) + return self + + def stop(self, timeout: float = 10.0) -> None: + """Graceful SIGTERM shutdown, then remove everything except logs.""" + if self.is_running: + self._logger.debug("Stopping instance gracefully...") + self.process.stop(timeout=timeout) + self.workspace.cleanup() + + def kill(self) -> None: + """Immediate SIGKILL, then remove everything except logs.""" + if self.is_running: + self._logger.debug("Killing instance immediately...") + self.process.kill() + self.workspace.cleanup() + + def restart(self, timeout: float = 10.0) -> "FunctionStreamInstance": + """Stop then start (same port, same workspace).""" + self._logger.info("Restarting instance...") + self.stop(timeout=timeout) + return self.start() + + # ------------------------------------------------------------------ + # Client access + # ------------------------------------------------------------------ + + def get_client(self) -> "FsClient": + """Return a connected FsClient. Caller should close it when done.""" + try: + from fs_client import FsClient + except ImportError as e: + raise MissingDependencyError( + "functionstream-client is not installed. " + "Run: pip install -e python/functionstream-client" + ) from e + return FsClient(host=self.host, port=self.port) + + def execute_sql(self, sql: str, timeout: float = 30.0) -> "SqlResponse": + """Convenience helper: send a SQL statement via gRPC ExecuteSql.""" + try: + import grpc + from fs_client._proto import function_stream_pb2, function_stream_pb2_grpc + except ImportError as e: + raise MissingDependencyError( + "gRPC dependencies or proto definitions are not available." + ) from e + + # Use context manager for channel to prevent socket leaks + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = function_stream_pb2_grpc.FunctionStreamServiceStub(channel) + request = function_stream_pb2.SqlRequest(sql=sql) + self._logger.debug("Executing SQL: %s", sql) + return stub.ExecuteSql(request, timeout=timeout) + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + @staticmethod + def _read_tail(path: Path, chars: int = 2000) -> str: + """Reads the end of a file safely, useful for pulling crash logs.""" + if not path.is_file(): + return "" + try: + text = path.read_text(errors="replace") + return text[-chars:] if len(text) > chars else text + except Exception as e: + return f"" + + def __repr__(self) -> str: + status = "running" if self.is_running else "stopped" + return f"" \ No newline at end of file diff --git a/tests/integration/framework/kafka_manager.py b/tests/integration/framework/kafka_manager.py new file mode 100644 index 00000000..e495f638 --- /dev/null +++ b/tests/integration/framework/kafka_manager.py @@ -0,0 +1,304 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Docker-managed Kafka broker for integration tests. + +Provides automated image pull, idempotent container start, health check, +topic lifecycle management, and data cleanup via KRaft-mode single-node Kafka. +""" + +import logging +import time +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional, TypeVar + +import docker +from docker.errors import APIError, DockerException, NotFound +from confluent_kafka import Consumer, KafkaError, KafkaException +from confluent_kafka import TopicPartition as _new_topic_partition +from confluent_kafka.admin import AdminClient, NewTopic + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + + +class KafkaDockerManagerError(Exception): + """Base exception for KafkaDockerManager errors.""" + pass + + +class KafkaReadinessTimeoutError(KafkaDockerManagerError): + """Raised when Kafka fails to become ready within the timeout.""" + pass + + +@dataclass(frozen=True) +class KafkaConfig: + """Configuration for the Kafka Docker Container.""" + image: str = "apache/kafka:3.7.0" + container_name: str = "fs-integration-kafka-broker" + bootstrap_servers: str = "127.0.0.1:9092" + internal_port: int = 9092 + controller_port: int = 9093 + cluster_id: str = "fs-integration-test-cluster-01" + readiness_timeout_sec: int = 60 + + @property + def environment_vars(self) -> Dict[str, str]: + """Generate KRaft environment variables.""" + return { + "KAFKA_NODE_ID": "1", + "KAFKA_PROCESS_ROLES": "broker,controller", + "KAFKA_CONTROLLER_LISTENER_NAMES": "CONTROLLER", + "KAFKA_LISTENERS": ( + f"PLAINTEXT://0.0.0.0:{self.internal_port}," + f"CONTROLLER://0.0.0.0:{self.controller_port}" + ), + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": ( + "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" + ), + "KAFKA_ADVERTISED_LISTENERS": f"PLAINTEXT://{self.bootstrap_servers}", + "KAFKA_CONTROLLER_QUORUM_VOTERS": f"1@localhost:{self.controller_port}", + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", + "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1", + "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": "1", + "CLUSTER_ID": self.cluster_id, + } + + +class KafkaDockerManager: + """ + Manages a single-node Kafka broker inside a Docker container (KRaft mode). + + Designed to be stateless with respect to topics. Highly recommended to use + as a context manager to ensure proper cleanup. + + Usage:: + + with KafkaDockerManager() as mgr: + mgr.create_topics_if_not_exist(["input-topic", "output-topic"]) + # Run tests... + mgr.clear_all_topics() + """ + + def __init__( + self, + config: Optional[KafkaConfig] = None, + docker_client: Optional[docker.DockerClient] = None, + ) -> None: + self.config = config or KafkaConfig() + # Dependency Injection: Allow passing an existing client, or create a lazy one. + self._docker_client = docker_client + + @property + def docker_client(self) -> docker.DockerClient: + """Lazy initialization of the Docker client.""" + if self._docker_client is None: + try: + self._docker_client = docker.from_env() + except DockerException as e: + raise KafkaDockerManagerError(f"Failed to connect to Docker daemon: {e}") from e + return self._docker_client + + # ------------------------------------------------------------------ + # Context Manager Protocol + # ------------------------------------------------------------------ + + def __enter__(self) -> "KafkaDockerManager": + self.setup_kafka() + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.teardown_kafka() + + # ------------------------------------------------------------------ + # Full setup / teardown + # ------------------------------------------------------------------ + + def setup_kafka(self) -> None: + """Pull image -> start container -> wait for readiness.""" + logger.info("Setting up Kafka broker (KRaft)...") + self._ensure_image() + self._ensure_container() + self._wait_for_readiness() + logger.info("Kafka setup complete. Broker is ready.") + + def teardown_kafka(self) -> None: + """Stop and gracefully remove the Kafka container.""" + try: + container = self.docker_client.containers.get(self.config.container_name) + logger.info("Stopping Kafka container '%s'...", self.config.container_name) + container.stop(timeout=5) # Give Kafka a few seconds for graceful shutdown + except NotFound: + logger.debug("Container '%s' not found during teardown.", self.config.container_name) + except APIError as exc: + logger.warning("Docker API error while stopping Kafka: %s", exc) + except Exception as exc: + logger.error("Unexpected error during teardown: %s", exc) + + # ------------------------------------------------------------------ + # Docker Operations + # ------------------------------------------------------------------ + + def _ensure_image(self) -> None: + try: + self.docker_client.images.get(self.config.image) + logger.debug("Image '%s' already present locally.", self.config.image) + except NotFound: + logger.info("Pulling Kafka image '%s' (this may take a while)...", self.config.image) + self.docker_client.images.pull(self.config.image) + logger.info("Image pulled successfully.") + + def _ensure_container(self) -> None: + try: + container = self.docker_client.containers.get(self.config.container_name) + if container.status != "running": + logger.info("Starting existing container '%s'...", self.config.container_name) + container.start() + else: + logger.debug("Container '%s' is already running.", self.config.container_name) + except NotFound: + logger.info("Creating and starting new Kafka container '%s'...", self.config.container_name) + self.docker_client.containers.run( + image=self.config.image, + name=self.config.container_name, + ports={f"{self.config.internal_port}/tcp": self.config.internal_port}, + environment=self.config.environment_vars, + detach=True, + remove=True, # Auto-remove on stop + ) + + # ------------------------------------------------------------------ + # Readiness Probes + # ------------------------------------------------------------------ + + def _retry_until_ready( + self, + action: Callable[[], bool], + timeout_msg: str, + interval: float = 1.0 + ) -> None: + """Generic polling mechanism with timeout.""" + deadline = time.time() + self.config.readiness_timeout_sec + while time.time() < deadline: + if action(): + return + time.sleep(interval) + raise KafkaReadinessTimeoutError(f"{timeout_msg} after {self.config.readiness_timeout_sec}s.") + + def _wait_for_readiness(self) -> None: + """Wait for AdminClient to list topics and Coordinator to be ready.""" + logger.info("Waiting for Kafka API to become responsive at %s...", self.config.bootstrap_servers) + + def _is_api_ready() -> bool: + try: + admin = AdminClient({"bootstrap.servers": self.config.bootstrap_servers}) + admin.list_topics(timeout=2) + return True + except KafkaException: + return False + + self._retry_until_ready(_is_api_ready, "Kafka API did not become responsive") + + logger.info("Broker API is up. Verifying group coordinator...") + self._wait_for_group_coordinator() + + def _wait_for_group_coordinator(self) -> None: + """Ensures __consumer_offsets is initialized and coordinator is ready.""" + def _is_coordinator_ready() -> bool: + consumer = None + try: + consumer = Consumer({ + "bootstrap.servers": self.config.bootstrap_servers, + "group.id": "__readiness_probe__", + "session.timeout.ms": "6000", + }) + # Attempt to read committed offsets to trigger coordinator interaction + consumer.committed([_new_topic_partition("__consumer_offsets", 0)], timeout=2) + return True + except KafkaException: + return False + except Exception as e: + logger.debug("Unexpected error during coordinator probe: %s", e) + return False + finally: + if consumer is not None: + consumer.close() + + self._retry_until_ready(_is_coordinator_ready, "Group coordinator did not become ready") + + # ------------------------------------------------------------------ + # Topic management + # ------------------------------------------------------------------ + + def create_topic( + self, + topic_name: str, + num_partitions: int = 1, + replication_factor: int = 1, + ) -> None: + """Create a Kafka topic idempotently.""" + admin = AdminClient({"bootstrap.servers": self.config.bootstrap_servers}) + new_topic = NewTopic( + topic_name, + num_partitions=num_partitions, + replication_factor=replication_factor, + ) + + futures = admin.create_topics([new_topic], operation_timeout=5) + for topic, future in futures.items(): + try: + future.result() + logger.info("Created topic '%s'.", topic) + except KafkaException as exc: + kafka_error = exc.args[0] + if kafka_error.code() == KafkaError.TOPIC_ALREADY_EXISTS: + logger.debug("Topic '%s' already exists; skipping.", topic) + else: + logger.error("Failed to create topic '%s': %s", topic, kafka_error) + raise + except Exception as exc: + logger.error("Unexpected error creating topic '%s': %s", topic, exc) + raise + + def create_topics_if_not_exist( + self, topic_names: List[str], num_partitions: int = 1 + ) -> None: + """Batch-create topics idempotently.""" + for topic in topic_names: + self.create_topic(topic, num_partitions=num_partitions) + + def clear_all_topics(self) -> None: + """Delete every non-internal topic (fast data reset between tests).""" + admin = AdminClient({"bootstrap.servers": self.config.bootstrap_servers}) + try: + metadata = admin.list_topics(timeout=5) + to_delete = [ + t for t in metadata.topics if not t.startswith("__") + ] + if not to_delete: + logger.debug("No user topics to clean up.") + return + + logger.info("Deleting topics: %s", to_delete) + futures = admin.delete_topics(to_delete, operation_timeout=5) + + for topic, fut in futures.items(): + try: + fut.result() + logger.debug("Deleted topic '%s'.", topic) + except KafkaException as exc: + logger.warning("Failed to delete topic '%s': %s", topic, exc.args[0]) + except Exception as exc: + logger.warning("Topic cleanup process encountered an error: %s", exc) \ No newline at end of file diff --git a/tests/integration/framework/process.py b/tests/integration/framework/process.py new file mode 100644 index 00000000..1c81ba02 --- /dev/null +++ b/tests/integration/framework/process.py @@ -0,0 +1,185 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +FunctionStreamProcess: owns the OS-level process lifecycle +(spawn, SIGTERM, SIGKILL) for a single FunctionStream binary. +""" + +import logging +import os +import signal +import subprocess +import sys +from pathlib import Path +from typing import IO, Any, Dict, Optional + +from .workspace import InstanceWorkspace + +logger = logging.getLogger(__name__) + + +class ProcessManagerError(Exception): + """Base exception for process management errors.""" + pass + + +class BinaryNotFoundError(ProcessManagerError): + """Raised when the target binary executable does not exist.""" + pass + + +class ProcessAlreadyRunningError(ProcessManagerError): + """Raised when attempting to start a process that is already running.""" + pass + + +class FunctionStreamProcess: + """ + Manages the lifecycle of a single FunctionStream OS process safely. + Fully cross-platform compatible (Windows, macOS, Linux). + """ + + def __init__(self, binary_path: Path, workspace: InstanceWorkspace): + self._binary = binary_path.resolve() + self._workspace = workspace + self._process: Optional[subprocess.Popen[Any]] = None + self._stdout_fh: Optional[IO[Any]] = None + self._stderr_fh: Optional[IO[Any]] = None + + @property + def pid(self) -> Optional[int]: + return self._process.pid if self._process else None + + @property + def is_running(self) -> bool: + """Check if the process is currently running without blocking.""" + if self._process is None: + return False + # poll() returns None if process is still running, else returns exit code + return self._process.poll() is None + + def start(self) -> None: + """Launch the binary, redirecting stdout/stderr to log files safely.""" + if self.is_running: + raise ProcessAlreadyRunningError( + f"Process is already running with PID {self.pid}." + ) + + if not self._binary.is_file(): + raise BinaryNotFoundError( + f"Binary not found: {self._binary}. " + "Run 'make integration-build' first." + ) + + env: Dict[str, str] = os.environ.copy() + env["FUNCTION_STREAM_CONF"] = str(self._workspace.config_file) + env["FUNCTION_STREAM_HOME"] = str(self._workspace.root_dir) + + popen_kwargs: Dict[str, Any] = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + else: + popen_kwargs["preexec_fn"] = os.setsid + try: + self._stdout_fh = open(self._workspace.stdout_file, "w", encoding="utf-8") + self._stderr_fh = open(self._workspace.stderr_file, "w", encoding="utf-8") + + self._process = subprocess.Popen( + [str(self._binary)], + env=env, + cwd=str(self._workspace.root_dir), + stdout=self._stdout_fh, + stderr=self._stderr_fh, + **popen_kwargs, + ) + logger.info("Process started successfully [pid=%d]", self._process.pid) + + except Exception as e: + self._close_handles() + logger.error("Failed to start process: %s", e) + raise ProcessManagerError(f"Process initialization failed: {e}") from e + + def stop(self, timeout: float = 10.0) -> None: + """ + Graceful shutdown via SIGTERM (POSIX) or CTRL_BREAK (Windows). + Falls back to immediate Kill on timeout. + """ + if not self.is_running or self._process is None: + self._close_handles() + return + + logger.debug("Attempting graceful shutdown [pid=%d]...", self._process.pid) + try: + self._send_termination_signal() + self._process.wait(timeout=timeout) + logger.info("Process stopped gracefully [pid=%d].", self._process.pid) + except (ProcessLookupError, PermissionError): + # Process already died or OS denied access + pass + except subprocess.TimeoutExpired: + logger.warning("Graceful shutdown timed out (%.1fs). Escalating to Kill [pid=%d].", + timeout, self._process.pid) + self.kill() + return + finally: + self._close_handles() + + def kill(self) -> None: + """Immediately terminate the process group across all platforms.""" + if not self.is_running or self._process is None: + self._close_handles() + return + + logger.debug("Executing force kill [pid=%d]...", self._process.pid) + try: + self._send_kill_signal() + self._process.wait(timeout=5.0) + logger.info("Process forcefully killed [pid=%d].", self._process.pid) + except (ProcessLookupError, PermissionError): + pass + except subprocess.TimeoutExpired: + logger.error("Process group kill timed out. Falling back to base kill.") + self._process.kill() + self._process.wait(timeout=2.0) + finally: + self._close_handles() + + # ------------------------------------------------------------------ + # OS-Specific Signal Implementations + # ------------------------------------------------------------------ + + def _send_termination_signal(self) -> None: + """Cross-platform implementation for graceful termination.""" + if sys.platform == "win32": + self._process.send_signal(signal.CTRL_BREAK_EVENT) + else: + os.killpg(os.getpgid(self._process.pid), signal.SIGTERM) + + def _send_kill_signal(self) -> None: + if sys.platform == "win32": + self._process.kill() + else: + os.killpg(os.getpgid(self._process.pid), signal.SIGKILL) + + def _close_handles(self) -> None: + """Safely close open file handles and clear state.""" + for fh in (self._stdout_fh, self._stderr_fh): + if fh is not None and not fh.closed: + try: + fh.close() + except Exception as e: + logger.debug("Failed to close file handle: %s", e) + + self._stdout_fh = None + self._stderr_fh = None + self._process = None \ No newline at end of file diff --git a/tests/integration/framework/utils.py b/tests/integration/framework/utils.py new file mode 100644 index 00000000..2ec125ed --- /dev/null +++ b/tests/integration/framework/utils.py @@ -0,0 +1,112 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Stateless utility functions: resilient port allocation and health checks. +""" + +import logging +import socket +import time +from typing import Optional + +logger = logging.getLogger(__name__) + + +class NetworkUtilityError(Exception): + """Base exception for network utility functions.""" + pass + + +class PortAllocationError(NetworkUtilityError): + """Raised when a free port cannot be allocated.""" + pass + + +def find_free_port(host: str = "127.0.0.1", port_range: Optional[tuple[int, int]] = None) -> int: + """ + Finds a reliable, available TCP port. + + By default, relies on the OS kernel to allocate an ephemeral port (binding to port 0). + This guarantees no immediate collision and is extremely fast. + + Args: + host: The interface IP to bind against. + port_range: Optional tuple of (start, end). If provided, falls back to scanning + this specific range instead of OS assignment. + + Raises: + PortAllocationError: If binding fails or no ports are available in the range. + """ + if port_range is not None: + start, end = port_range + import random + candidates = list(range(start, end + 1)) + random.shuffle(candidates) + + for port in candidates: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + s.bind((host, port)) + return port + except OSError: + continue + + raise PortAllocationError(f"Exhausted all ports in requested range [{start}, {end}].") + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + s.bind((host, 0)) + allocated_port = s.getsockname()[1] + return allocated_port + except OSError as e: + raise PortAllocationError(f"OS failed to allocate an ephemeral port on {host}: {e}") from e + + +def wait_for_port( + host: str, + port: int, + timeout: float = 30.0, + poll_interval: float = 0.5 +) -> bool: + """ + Blocks until the given TCP port is accepting connections. + + Uses `socket.create_connection` for robust DNS resolution and + transparent IPv4/IPv6 dual-stack support. + + Args: + host: Hostname or IP address. + port: Target TCP port. + timeout: Max seconds to wait. + poll_interval: Seconds to wait between retries. + + Returns: + True if connection succeeded within timeout, False otherwise. + """ + logger.debug("Waiting up to %.1fs for %s:%d to become responsive...", timeout, host, port) + deadline = time.monotonic() + timeout + + while time.monotonic() < deadline: + try: + with socket.create_connection((host, port), timeout=poll_interval): + logger.debug("Successfully connected to %s:%d", host, port) + return True + except (OSError, ConnectionRefusedError): + pass + + time.sleep(poll_interval) + + logger.error("Timeout reached: %s:%d did not accept connections after %.1fs.", host, port, timeout) + return False \ No newline at end of file diff --git a/tests/integration/framework/workspace.py b/tests/integration/framework/workspace.py new file mode 100644 index 00000000..515a6918 --- /dev/null +++ b/tests/integration/framework/workspace.py @@ -0,0 +1,109 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +InstanceWorkspace: manages the directory tree for a single +FunctionStream test instance. + +Layout (during test run): + tests/integration/target//--/ + conf/config.yaml + data/ + logs/stdout.log, stderr.log, app.log + +After cleanup only ``logs/`` is retained. +""" + +import logging +import shutil +import time +import uuid +from datetime import datetime +from pathlib import Path + +logger = logging.getLogger(__name__) + + +class WorkspaceError(Exception): + """Base exception for workspace-related errors.""" + pass + + +class InstanceWorkspace: + """ + Owns the on-disk directory environment for one FunctionStream instance. + Designed for safe parallel execution and cross-platform robustness. + """ + + def __init__(self, target_dir: Path, test_name: str, port: int): + self.target_dir = target_dir.resolve() + self.test_name = test_name + self.port = port + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + unique_id = uuid.uuid4().hex[:6] + self.instance_id = f"{timestamp}-{port}-{unique_id}" + + self.root_dir = self.target_dir / self.test_name / self.instance_id + + self.conf_dir = self.root_dir / "conf" + self.data_dir = self.root_dir / "data" + self.log_dir = self.root_dir / "logs" + + self.config_file = self.conf_dir / "config.yaml" + self.stdout_file = self.log_dir / "stdout.log" + self.stderr_file = self.log_dir / "stderr.log" + + def setup(self) -> None: + """Create the full directory tree safely.""" + logger.debug("Setting up workspace at %s", self.root_dir) + try: + for d in (self.conf_dir, self.data_dir, self.log_dir): + d.mkdir(parents=True, exist_ok=True) + except OSError as e: + logger.error("Failed to create workspace directories: %s", e) + raise WorkspaceError(f"Workspace setup failed: {e}") from e + + def cleanup(self) -> None: + """ + Remove everything except logs/ so only diagnostic output remains. + Uses a robust deletion strategy to handle temporary OS file locks. + """ + logger.debug("Cleaning up workspace (retaining logs) at %s", self.root_dir) + for d in (self.conf_dir, self.data_dir): + if d.exists(): + self._safe_rmtree(d) + + def _safe_rmtree(self, path: Path, retries: int = 3, delay: float = 0.5) -> None: + """ + Robust directory removal with backoff retries. + Crucial for Windows where RocksDB/LevelDB files might hold residual locks + for a few milliseconds after the parent process dies. + """ + for attempt in range(1, retries + 1): + try: + shutil.rmtree(path, ignore_errors=False) + return + except OSError as e: + if attempt < retries: + logger.debug( + "File lock detected during cleanup of %s (attempt %d/%d). Retrying in %ss...", + path, attempt, retries, delay + ) + time.sleep(delay) + else: + logger.warning( + "Failed to completely remove %s after %d attempts. " + "Leaving residual files. Error: %s", + path, retries, e + ) + shutil.rmtree(path, ignore_errors=True) \ No newline at end of file diff --git a/tests/integration/pytest.ini b/tests/integration/pytest.ini new file mode 100644 index 00000000..6e745e7f --- /dev/null +++ b/tests/integration/pytest.ini @@ -0,0 +1,20 @@ +; Licensed under the Apache License, Version 2.0 (the "License"); +; you may not use this file except in compliance with the License. +; You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, software +; distributed under the License is distributed on an "AS IS" BASIS, +; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +; See the License for the specific language governing permissions and +; limitations under the License. + +[pytest] +testpaths = test +python_files = test_*.py +python_classes = Test* +python_functions = test_* +log_cli = true +log_cli_level = INFO +log_cli_format = %(asctime)s [%(levelname)s] %(name)s: %(message)s diff --git a/tests/integration/requirements.txt b/tests/integration/requirements.txt new file mode 100644 index 00000000..d1e597c3 --- /dev/null +++ b/tests/integration/requirements.txt @@ -0,0 +1,25 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Third-party dependencies +pytest>=7.0 +pyyaml>=6.0 +grpcio>=1.60.0 +protobuf>=4.25.0 + +# Docker + Kafka management for integration tests +docker>=7.0 +confluent-kafka>=2.3.0 + +# FunctionStream Python packages (local editable installs) +-e ../../python/functionstream-api +-e ../../python/functionstream-client diff --git a/tests/integration/test/__init__.py b/tests/integration/test/__init__.py new file mode 100644 index 00000000..4d9a9249 --- /dev/null +++ b/tests/integration/test/__init__.py @@ -0,0 +1,11 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/integration/test/streaming/__init__.py b/tests/integration/test/streaming/__init__.py new file mode 100644 index 00000000..4d9a9249 --- /dev/null +++ b/tests/integration/test/streaming/__init__.py @@ -0,0 +1,11 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/integration/test/wasm/__init__.py b/tests/integration/test/wasm/__init__.py new file mode 100644 index 00000000..4d9a9249 --- /dev/null +++ b/tests/integration/test/wasm/__init__.py @@ -0,0 +1,11 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/integration/test/wasm/python_sdk/__init__.py b/tests/integration/test/wasm/python_sdk/__init__.py new file mode 100644 index 00000000..4d9a9249 --- /dev/null +++ b/tests/integration/test/wasm/python_sdk/__init__.py @@ -0,0 +1,11 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/integration/test/wasm/python_sdk/conftest.py b/tests/integration/test/wasm/python_sdk/conftest.py new file mode 100644 index 00000000..aa5d60c6 --- /dev/null +++ b/tests/integration/test/wasm/python_sdk/conftest.py @@ -0,0 +1,102 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re +import sys +from pathlib import Path +from typing import Generator, List + +_CURRENT_DIR = Path(__file__).resolve().parent +_INTEGRATION_ROOT = Path(__file__).resolve().parents[3] + +if str(_INTEGRATION_ROOT) not in sys.path: + sys.path.insert(0, str(_INTEGRATION_ROOT)) +if str(_CURRENT_DIR) not in sys.path: + sys.path.insert(0, str(_CURRENT_DIR)) + +import pytest +from framework import FunctionStreamInstance, KafkaDockerManager +from fs_client.client import FsClient + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="session") +def kafka() -> Generator[KafkaDockerManager, None, None]: + """ + Session-scoped Kafka broker manager. + Leverages Context Manager for guaranteed teardown. + """ + with KafkaDockerManager() as mgr: + yield mgr + try: + mgr.clear_all_topics() + except Exception as e: + logger.warning("Failed to clear topics during Kafka teardown: %s", e) + + +@pytest.fixture(scope="session") +def kafka_topics(kafka: KafkaDockerManager) -> str: + """ + Pre-creates standard topics and returns the bootstrap server address. + """ + kafka.create_topics_if_not_exist(["in", "out", "events", "counts"]) + return kafka.config.bootstrap_servers + + +def _sanitize_node_id(nodeid: str) -> str: + """Converts a pytest nodeid into a safe directory name.""" + clean_name = re.sub(r"[^\w\-]+", "-", nodeid) + return clean_name.strip("-") + + +@pytest.fixture +def fs_server(request: pytest.FixtureRequest) -> Generator[FunctionStreamInstance, None, None]: + """ + Function-scoped FunctionStream instance. + Uses Context Manager to ensure SIGKILL and workspace cleanup. + """ + test_name = _sanitize_node_id(request.node.nodeid) + with FunctionStreamInstance(test_name=test_name) as instance: + yield instance + + +@pytest.fixture +def fs_client(fs_server: FunctionStreamInstance) -> Generator[FsClient, None, None]: + """ + Function-scoped FsClient connected to the isolated fs_server. + """ + with fs_server.get_client() as client: + yield client + + +@pytest.fixture +def function_registry(fs_client: FsClient) -> Generator[List[str], None, None]: + """ + RAII-style registry for FunctionStream tasks. + Ensures absolute teardown of functions to prevent state leakage. + """ + registered_names: List[str] = [] + + yield registered_names + + for name in registered_names: + try: + fs_client.stop_function(name) + except Exception as e: + logger.debug("Failed to stop function '%s' during cleanup: %s", name, e) + + try: + fs_client.drop_function(name) + except Exception as e: + logger.error("Failed to drop function '%s' during cleanup: %s", name, e) \ No newline at end of file diff --git a/tests/integration/test/wasm/python_sdk/processors/__init__.py b/tests/integration/test/wasm/python_sdk/processors/__init__.py new file mode 100644 index 00000000..8d66f800 --- /dev/null +++ b/tests/integration/test/wasm/python_sdk/processors/__init__.py @@ -0,0 +1,17 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test processors package. +Each module contains a specific implementation of FSProcessorDriver +to test different engine capabilities. +""" diff --git a/tests/integration/test/wasm/python_sdk/processors/counter_processor.py b/tests/integration/test/wasm/python_sdk/processors/counter_processor.py new file mode 100644 index 00000000..cd0a5f06 --- /dev/null +++ b/tests/integration/test/wasm/python_sdk/processors/counter_processor.py @@ -0,0 +1,73 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +A stateful processor that uses KVStore to count occurrences of incoming strings. +Validates state persistence and checkpointing mechanisms. +""" + +import json +from typing import Dict + +from fs_api import FSProcessorDriver, Context + + +class CounterProcessor(FSProcessorDriver): + """ + Stateful word-counter backed by KVStore. + Each incoming UTF-8 string increments its counter in the store and emits + a JSON payload ``{"word": ..., "count": ..., "total": ...}`` downstream. + """ + + def __init__(self) -> None: + self._counter_map: Dict[str, int] = {} + self._total_processed: int = 0 + self._store_name: str = "integration-counter-store" + + def init(self, ctx: Context, config: dict) -> None: + self._counter_map = {} + self._total_processed = 0 + + def process(self, ctx: Context, source_id: int, data: bytes) -> None: + input_str = data.decode("utf-8", errors="replace").strip() + if not input_str: + return + + self._total_processed += 1 + store = ctx.getOrCreateKVStore(self._store_name) + + store_key_bytes = input_str.encode("utf-8") + stored_val = store.get_state(store_key_bytes) + current_count = int(stored_val.decode("utf-8")) if stored_val else 0 + new_count = current_count + 1 + + self._counter_map[input_str] = new_count + store.put_state(store_key_bytes, str(new_count).encode("utf-8")) + + payload = {"word": input_str, "count": new_count, "total": self._total_processed} + ctx.emit(json.dumps(payload).encode("utf-8"), 0) + + def process_watermark(self, ctx: Context, source_id: int, watermark: int) -> None: + ctx.emit_watermark(watermark, 0) + + def take_checkpoint(self, ctx: Context, checkpoint_id: int) -> bytes: + return json.dumps(self._counter_map).encode("utf-8") + + def check_heartbeat(self, ctx: Context) -> bool: + return True + + def close(self, ctx: Context) -> None: + self._counter_map.clear() + self._total_processed = 0 + + def custom(self, payload: bytes) -> bytes: + return b'{"status": "ok"}' diff --git a/tests/integration/test/wasm/python_sdk/test_data_flow.py b/tests/integration/test/wasm/python_sdk/test_data_flow.py new file mode 100644 index 00000000..9e9532a2 --- /dev/null +++ b/tests/integration/test/wasm/python_sdk/test_data_flow.py @@ -0,0 +1,242 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import time +import uuid +from dataclasses import dataclass +from typing import Any, Dict, List + +from confluent_kafka import Consumer, KafkaError, Producer +from fs_client.client import FsClient +from fs_client.config import KafkaInput, KafkaOutput, WasmTaskBuilder +from processors.counter_processor import CounterProcessor + +logger = logging.getLogger(__name__) + +CONSUME_TIMEOUT_S = 60.0 +POLL_INTERVAL_S = 0.5 +CONSUMER_WARMUP_S = 3.0 + + +@dataclass(frozen=True) +class FlowContext: + fn_name: str + in_topic: str + out_topic: str + + +def _unique_id(prefix: str) -> str: + return f"{prefix}-{uuid.uuid4().hex[:8]}" + + +def _build_earliest_input(bootstrap: str, topic: str, group: str) -> KafkaInput: + ki = KafkaInput(bootstrap, topic, group, 0) + ki.data["auto.offset.reset"] = "earliest" + return ki + + +def produce_messages(bootstrap: str, topic: str, messages: List[str], timeout: float = 10.0) -> None: + producer = Producer({"bootstrap.servers": bootstrap}) + try: + for msg in messages: + producer.produce(topic, value=msg.encode("utf-8")) + finally: + remaining = producer.flush(timeout=timeout) + if remaining > 0: + raise RuntimeError(f"Producer failed to flush {remaining} messages within {timeout}s") + + +def consume_messages( + bootstrap: str, + topic: str, + expected_count: int, + timeout: float = CONSUME_TIMEOUT_S, +) -> List[Dict[str, Any]]: + consumer = Consumer({ + "bootstrap.servers": bootstrap, + "group.id": _unique_id("test-consumer"), + "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + }) + consumer.subscribe([topic]) + collected: List[Dict[str, Any]] = [] + deadline = time.time() + timeout + + try: + while len(collected) < expected_count and time.time() < deadline: + msg = consumer.poll(timeout=POLL_INTERVAL_S) + if msg is None: + continue + if msg.error(): + if msg.error().code() != KafkaError._PARTITION_EOF: + logger.error("Kafka consumer error: %s", msg.error()) + continue + + payload = msg.value().decode("utf-8") + collected.append(json.loads(payload)) + finally: + consumer.close() + + if len(collected) < expected_count: + raise TimeoutError(f"Expected {expected_count} messages, received {len(collected)}") + + return collected + + +def deploy_function( + fs_client: FsClient, + fn_name: str, + bootstrap: str, + in_topic: str, + out_topic: str, +) -> None: + config = ( + WasmTaskBuilder() + .set_name(fn_name) + .add_init_config("class_name", "CounterProcessor") + .add_input_group([_build_earliest_input(bootstrap, in_topic, fn_name)]) + .add_output(KafkaOutput(bootstrap, out_topic, 0)) + .build() + ) + + success = fs_client.create_python_function_from_config(config, CounterProcessor) + if not success: + raise RuntimeError(f"Failed to deploy function: {fn_name}") + + time.sleep(CONSUMER_WARMUP_S) + + +class TestDataFlow: + + def _setup_flow(self, function_registry: List[str], kafka: Any, prefix: str) -> FlowContext: + fn_name = _unique_id(prefix) + ctx = FlowContext( + fn_name=fn_name, + in_topic=f"{fn_name}-in", + out_topic=f"{fn_name}-out" + ) + function_registry.append(ctx.fn_name) + kafka.create_topics_if_not_exist([ctx.in_topic, ctx.out_topic]) + return ctx + + def test_single_word_counting( + self, + fs_client: FsClient, + function_registry: List[str], + kafka: Any, + kafka_topics: str, + ): + ctx = self._setup_flow(function_registry, kafka, "flow-single") + word = "hello" + n = 10 + + produce_messages(kafka_topics, ctx.in_topic, [word] * n) + deploy_function(fs_client, ctx.fn_name, kafka_topics, ctx.in_topic, ctx.out_topic) + records = consume_messages(kafka_topics, ctx.out_topic, n) + + for i, rec in enumerate(records, start=1): + assert rec["word"] == word + assert rec["count"] == i + assert rec["total"] == i + + def test_multiple_distinct_words( + self, + fs_client: FsClient, + function_registry: List[str], + kafka: Any, + kafka_topics: str, + ): + ctx = self._setup_flow(function_registry, kafka, "flow-multi") + messages = ["apple", "banana", "apple", "cherry", "banana", "apple"] + + produce_messages(kafka_topics, ctx.in_topic, messages) + deploy_function(fs_client, ctx.fn_name, kafka_topics, ctx.in_topic, ctx.out_topic) + records = consume_messages(kafka_topics, ctx.out_topic, len(messages)) + + per_word_counts: Dict[str, int] = {} + for rec in records: + word = rec["word"] + per_word_counts[word] = per_word_counts.get(word, 0) + 1 + assert rec["count"] == per_word_counts[word] + + assert per_word_counts == {"apple": 3, "banana": 2, "cherry": 1} + + def test_large_batch_throughput( + self, + fs_client: FsClient, + function_registry: List[str], + kafka: Any, + kafka_topics: str, + ): + ctx = self._setup_flow(function_registry, kafka, "flow-batch") + batch_size = 500 + messages = [f"item-{i % 50}" for i in range(batch_size)] + + produce_messages(kafka_topics, ctx.in_topic, messages) + deploy_function(fs_client, ctx.fn_name, kafka_topics, ctx.in_topic, ctx.out_topic) + records = consume_messages(kafka_topics, ctx.out_topic, batch_size) + + assert len(records) == batch_size + totals = [r["total"] for r in records] + assert totals == list(range(1, batch_size + 1)) + + per_word: Dict[str, int] = {} + for rec in records: + word = rec["word"] + per_word[word] = per_word.get(word, 0) + 1 + assert rec["count"] == per_word[word] + + def test_empty_messages_are_skipped( + self, + fs_client: FsClient, + function_registry: List[str], + kafka: Any, + kafka_topics: str, + ): + ctx = self._setup_flow(function_registry, kafka, "flow-empty") + messages = ["foo", "", "bar", " ", "foo", ""] + + produce_messages(kafka_topics, ctx.in_topic, messages) + deploy_function(fs_client, ctx.fn_name, kafka_topics, ctx.in_topic, ctx.out_topic) + + expected_outputs = 3 + records = consume_messages(kafka_topics, ctx.out_topic, expected_outputs) + + words = [r["word"] for r in records] + assert words == ["foo", "bar", "foo"] + assert records[0]["count"] == 1 + assert records[1]["count"] == 1 + assert records[2]["count"] == 2 + + def test_output_json_schema( + self, + fs_client: FsClient, + function_registry: List[str], + kafka: Any, + kafka_topics: str, + ): + ctx = self._setup_flow(function_registry, kafka, "flow-schema") + test_words = ["alpha", "beta", "gamma"] + + produce_messages(kafka_topics, ctx.in_topic, test_words) + deploy_function(fs_client, ctx.fn_name, kafka_topics, ctx.in_topic, ctx.out_topic) + records = consume_messages(kafka_topics, ctx.out_topic, len(test_words)) + + for rec in records: + assert set(rec.keys()) == {"word", "count", "total"} + assert isinstance(rec["word"], str) + assert isinstance(rec["count"], int) + assert isinstance(rec["total"], int) + assert rec["count"] >= 1 + assert rec["total"] >= 1 \ No newline at end of file diff --git a/tests/integration/test/wasm/python_sdk/test_lifecycle.py b/tests/integration/test/wasm/python_sdk/test_lifecycle.py new file mode 100644 index 00000000..34c8dfbe --- /dev/null +++ b/tests/integration/test/wasm/python_sdk/test_lifecycle.py @@ -0,0 +1,169 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid +from typing import Any, List, Optional + +from fs_client.client import FsClient +from fs_client.config import KafkaInput, KafkaOutput, WasmTaskBuilder +from processors.counter_processor import CounterProcessor + +EXPECTED_STOPPED_STATES = frozenset(["STOPPED", "PAUSED", "INITIALIZED"]) +EXPECTED_RUNNING_STATES = frozenset(["RUNNING"]) + + +def _generate_unique_name(prefix: str) -> str: + return f"{prefix}-{uuid.uuid4().hex[:8]}" + + +def _create_counter_task_builder(fn_name: str, bootstrap: str) -> WasmTaskBuilder: + return ( + WasmTaskBuilder() + .set_name(fn_name) + .add_init_config("class_name", "CounterProcessor") + .add_input_group([KafkaInput(bootstrap, "in", "grp", 0)]) + .add_output(KafkaOutput(bootstrap, "out", 0)) + ) + + +def _get_function_info(fs_client: FsClient, fn_name: str) -> Optional[Any]: + listing = fs_client.show_functions() + for fn in listing.functions: + if fn.name == fn_name: + return fn + return None + + +class TestFunctionLifecycle: + + def test_full_lifecycle_transitions( + self, fs_client: FsClient, function_registry: List[str], kafka_topics: str + ) -> None: + fn_name = _generate_unique_name("lifecycle") + function_registry.append(fn_name) + + config = _create_counter_task_builder(fn_name, kafka_topics).add_init_config("test_mode", "true").build() + + assert fs_client.create_python_function_from_config(config, CounterProcessor) is True + + fn_info = _get_function_info(fs_client, fn_name) + assert fn_info is not None + assert bool(fn_info.status) + + assert fs_client.stop_function(fn_name) is True + + assert fs_client.start_function(fn_name) is True + fn_info = _get_function_info(fs_client, fn_name) + assert fn_info is not None + assert fn_info.status.upper() in EXPECTED_RUNNING_STATES + + assert fs_client.stop_function(fn_name) is True + fn_info = _get_function_info(fs_client, fn_name) + assert fn_info is not None + assert fn_info.status.upper() in EXPECTED_STOPPED_STATES + + assert fs_client.drop_function(fn_name) is True + assert _get_function_info(fs_client, fn_name) is None + + if fn_name in function_registry: + function_registry.remove(fn_name) + + def test_show_functions_returns_created_function( + self, fs_client: FsClient, function_registry: List[str], kafka_topics: str + ) -> None: + fn_name = _generate_unique_name("show") + function_registry.append(fn_name) + + config = _create_counter_task_builder(fn_name, kafka_topics).build() + fs_client.create_python_function_from_config(config, CounterProcessor) + + assert _get_function_info(fs_client, fn_name) is not None + + def test_show_functions_result_fields( + self, fs_client: FsClient, function_registry: List[str], kafka_topics: str + ) -> None: + fn_name = _generate_unique_name("fields") + function_registry.append(fn_name) + + config = _create_counter_task_builder(fn_name, kafka_topics).build() + fs_client.create_python_function_from_config(config, CounterProcessor) + + fn_info = _get_function_info(fs_client, fn_name) + assert fn_info is not None + assert fn_info.name == fn_name + assert bool(fn_info.task_type) + assert bool(fn_info.status) + + def test_multiple_functions_coexist( + self, fs_client: FsClient, function_registry: List[str], kafka_topics: str + ) -> None: + names = [_generate_unique_name("multi") for _ in range(3)] + function_registry.extend(names) + + for name in names: + config = _create_counter_task_builder(name, kafka_topics).build() + fs_client.create_python_function_from_config(config, CounterProcessor) + + listing = fs_client.show_functions() + listed_names = {f.name for f in listing.functions} + + for name in names: + assert name in listed_names + + def test_rapid_create_drop_cycle( + self, fs_client: FsClient, function_registry: List[str], kafka_topics: str + ) -> None: + for i in range(5): + fn_name = _generate_unique_name(f"rapid-{i}") + config = _create_counter_task_builder(fn_name, kafka_topics).build() + + fs_client.create_python_function_from_config(config, CounterProcessor) + fs_client.stop_function(fn_name) + assert fs_client.drop_function(fn_name) is True + + listing = fs_client.show_functions() + remaining = [f.name for f in listing.functions if f.name.startswith("rapid-")] + assert not remaining + + def test_restart_preserves_identity( + self, fs_client: FsClient, function_registry: List[str], kafka_topics: str + ) -> None: + fn_name = _generate_unique_name("restart") + function_registry.append(fn_name) + + config = _create_counter_task_builder(fn_name, kafka_topics).build() + fs_client.create_python_function_from_config(config, CounterProcessor) + + fs_client.stop_function(fn_name) + fs_client.start_function(fn_name) + + fn_info = _get_function_info(fs_client, fn_name) + assert fn_info is not None + assert fn_info.name == fn_name + assert fn_info.status.upper() in EXPECTED_RUNNING_STATES + + def test_stop_then_drop( + self, fs_client: FsClient, function_registry: List[str], kafka_topics: str + ) -> None: + fn_name = _generate_unique_name("stop-drop") + function_registry.append(fn_name) + + config = _create_counter_task_builder(fn_name, kafka_topics).build() + fs_client.create_python_function_from_config(config, CounterProcessor) + + assert fs_client.stop_function(fn_name) is True + assert fs_client.drop_function(fn_name) is True + + assert _get_function_info(fs_client, fn_name) is None + + if fn_name in function_registry: + function_registry.remove(fn_name) \ No newline at end of file