Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Integration Tests

on:
push:
branches: ["main", "master"]
pull_request:
branches: ["main", "master"]
workflow_dispatch:

env:
CARGO_TERM_COLOR: always

jobs:
integration-test:
name: Integration Tests
runs-on: ubuntu-latest
timeout-minutes: 30

steps:
- name: Checkout Source
uses: actions/checkout@v4

- name: Verify Docker
run: docker version

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: "pip"

- name: Setup Rust
uses: dtolnay/rust-toolchain@stable

- name: Install System Dependencies
run: |
sudo apt-get update
sudo apt-get install -y --no-install-recommends \
cmake \
libssl-dev \
libcurl4-openssl-dev \
pkg-config \
libsasl2-dev \
protobuf-compiler

- name: Cache Cargo
uses: Swatinem/rust-cache@v2

- name: Build Release Binary
run: make env && make dist

- name: Pre-pull Kafka Image
run: docker pull apache/kafka:3.7.0

- name: Run Integration Tests
run: make integration-test

- name: Upload Test Logs
if: failure()
uses: actions/upload-artifact@v4
with:
name: integration-test-logs
path: tests/integration/target/**/logs/
retention-days: 30
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ python/**/target/
**/**.egg-info
.cache/
/.cursor/worktrees.json

# Integration test output
tests/integration/target/
tests/integration/.venv/
tests/integration/install
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ C_0 := \033[0m
log = @printf "$(C_B)[-]$(C_0) %-15s %s\n" "$(1)" "$(2)"
success = @printf "$(C_G)[✔]$(C_0) %s\n" "$(1)"

.PHONY: all help build build-lite dist dist-lite clean test env env-clean go-sdk-env go-sdk-build go-sdk-clean docker docker-run docker-push .check-env .build-wasm
.PHONY: all help build build-lite dist dist-lite clean test env env-clean go-sdk-env go-sdk-build go-sdk-clean docker docker-run docker-push .check-env .build-wasm integration-test

all: build

Expand All @@ -63,6 +63,8 @@ help:
@echo " docker-run Run container (port 8080, mount logs)"
@echo " docker-push Push image to registry"
@echo ""
@echo " integration-test Run integration tests (delegates to tests/integration)"
@echo ""
@echo " Version: $(VERSION) | Arch: $(ARCH) | OS: $(OS)"

build: .check-env .build-wasm
Expand Down Expand Up @@ -145,6 +147,7 @@ clean:
@cargo clean
@rm -rf $(DIST_ROOT) data logs
@./scripts/clean.sh 2>/dev/null || true
@$(MAKE) -C tests/integration clean 2>/dev/null || true
$(call success,Done)

.check-env:
Expand All @@ -167,3 +170,6 @@ docker-push:
$(call log,DOCKER,Pushing $(IMAGE_NAME))
@docker push $(IMAGE_NAME)
$(call success,Push Complete)

integration-test:
@$(MAKE) -C tests/integration test PYTEST_ARGS="$(PYTEST_ARGS)"
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ impl IncrementalAggregatingFunc {
state
});

for (idx, v) in agg.state_cols.iter().zip(state.into_iter()) {
for (idx, v) in agg.state_cols.iter().zip(state) {
states[*idx].push(v);
}
}
Expand Down Expand Up @@ -543,7 +543,7 @@ impl IncrementalAggregatingFunc {
}
}

let mut cols = self.key_converter.convert_rows(rows.into_iter())?;
let mut cols = self.key_converter.convert_rows(rows)?;
cols.push(Arc::new(accumulator_builder.finish()));
cols.push(Arc::new(args_row_builder.finish()));
cols.push(Arc::new(count_builder.finish()));
Expand Down Expand Up @@ -612,7 +612,7 @@ impl IncrementalAggregatingFunc {
mem::take(&mut self.updated_keys).into_iter().unzip();
let mut deleted_keys = vec![];

for (k, retract) in updated_keys.iter().zip(updated_values.into_iter()) {
for (k, retract) in updated_keys.iter().zip(updated_values) {
let append = self.evaluate(&k.0)?;

if let Some(v) = retract {
Expand Down
23 changes: 20 additions & 3 deletions src/runtime/wasm/processor/wasm/wasm_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,27 @@ impl WasmTask {
) -> ControlAction {
match signal {
TaskControlSignal::Start { completion_flag } => {
for input in inputs.iter_mut() {
let _ = input.start();
for (idx, input) in inputs.iter_mut().enumerate() {
if let Err(e) = input.start() {
let msg = format!("Failed to start input {}: {}", idx, e);
log::error!("{}", msg);
*failure_cause.lock().unwrap() = Some(msg.clone());
*shared_state.lock().unwrap() =
ComponentState::Error { error: msg.clone() };
*execution_state.lock().unwrap() = ExecutionState::Failed;
completion_flag.mark_error(msg);
return ControlAction::Pause;
}
}
if let Err(e) = processor.start_outputs() {
let msg = format!("Failed to start outputs: {}", e);
log::error!("{}", msg);
*failure_cause.lock().unwrap() = Some(msg.clone());
*shared_state.lock().unwrap() = ComponentState::Error { error: msg.clone() };
*execution_state.lock().unwrap() = ExecutionState::Failed;
completion_flag.mark_error(msg);
return ControlAction::Pause;
}
let _ = processor.start_outputs();
*state = TaskState::Running;
*shared_state.lock().unwrap() = ComponentState::Running;
completion_flag.mark_completed();
Expand Down
28 changes: 25 additions & 3 deletions src/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,25 @@ impl FunctionStreamServiceImpl {
}
}

fn classify_error(message: &str) -> StatusCode {
let lower = message.to_lowercase();
if lower.contains("not found") || lower.contains("not exist") {
StatusCode::NotFound
} else if lower.contains("uniqueness violation")
|| lower.contains("already exists")
|| lower.contains("duplicate")
{
StatusCode::Conflict
} else if lower.contains("invalid")
|| lower.contains("unsupported")
|| lower.contains("missing")
{
StatusCode::BadRequest
} else {
StatusCode::InternalServerError
}
}

async fn execute_statement(
&self,
stmt: &dyn Statement,
Expand All @@ -101,7 +120,8 @@ impl FunctionStreamServiceImpl {
if result.success {
Self::build_success_response(success_status, result.message, result.data)
} else {
Self::build_error_response(StatusCode::InternalServerError, result.message)
let status = Self::classify_error(&result.message);
Self::build_error_response(status, result.message)
}
}
}
Expand Down Expand Up @@ -139,8 +159,9 @@ impl FunctionStreamService for FunctionStreamServiceImpl {

if !result.success {
error!("SQL execution aborted: {}", result.message);
let status = Self::classify_error(&result.message);
return Ok(TonicResponse::new(Self::build_error_response(
StatusCode::InternalServerError,
status,
result.message,
)));
}
Expand Down Expand Up @@ -235,8 +256,9 @@ impl FunctionStreamService for FunctionStreamServiceImpl {

if !result.success {
error!("show_functions execution failed: {}", result.message);
let status = Self::classify_error(&result.message);
return Ok(TonicResponse::new(ShowFunctionsResponse {
status_code: StatusCode::InternalServerError as i32,
status_code: status as i32,
message: result.message,
functions: vec![],
}));
Expand Down
2 changes: 1 addition & 1 deletion src/sql/logical_planner/streaming_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl PlanToGraphVisitor<'_> {
let node_index = self.graph.add_node(execution_unit);
self.add_index_to_traversal(node_index);

for (source, edge) in input_nodes.into_iter().zip(routing_edges.into_iter()) {
for (source, edge) in input_nodes.into_iter().zip(routing_edges) {
self.graph.add_edge(source, node_index, edge);
}

Expand Down
11 changes: 4 additions & 7 deletions src/sql/types/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,11 @@ fn convert_simple_data_type(
}
},
SQLDataType::Date => Ok(DataType::Date32),
SQLDataType::Time(None, tz_info) => {
SQLDataType::Time(None, tz_info)
if matches!(tz_info, TimezoneInfo::None)
|| matches!(tz_info, TimezoneInfo::WithoutTimeZone)
{
Ok(DataType::Time64(TimeUnit::Nanosecond))
} else {
return plan_err!("Unsupported SQL type {sql_type:?}");
}
|| matches!(tz_info, TimezoneInfo::WithoutTimeZone) =>
{
Ok(DataType::Time64(TimeUnit::Nanosecond))
}
SQLDataType::Numeric(exact_number_info) | SQLDataType::Decimal(exact_number_info) => {
let (precision, scale) = match *exact_number_info {
Expand Down
64 changes: 64 additions & 0 deletions tests/integration/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# -----------------------------------------------------------------------
# Integration Test Makefile
# -----------------------------------------------------------------------
# Usage:
# make test — Setup env + run pytest (PYTEST_ARGS="-k xxx")
# make clean — Remove .venv and test output
#
# Prerequisites:
# The FunctionStream binary must already be built (make build / make build-lite
# from the project root).
# -----------------------------------------------------------------------

PROJECT_ROOT := $(shell git -C $(CURDIR) rev-parse --show-toplevel)
PYTHON_ROOT := $(PROJECT_ROOT)/python
VENV := $(CURDIR)/.venv
PIP := $(VENV)/bin/pip
PY := $(VENV)/bin/python

C_G := \033[0;32m
C_B := \033[0;34m
C_0 := \033[0m

log = @printf "$(C_B)[-]$(C_0) %-12s %s\n" "$(1)" "$(2)"
success = @printf "$(C_G)[✔]$(C_0) %s\n" "$(1)"

.PHONY: test clean help

help:
@echo "Integration Test Targets:"
@echo ""
@echo " test Setup Python env + run pytest (PYTEST_ARGS=...)"
@echo " clean Remove .venv and target/tests output"

install: requirements.txt $(PYTHON_ROOT)/functionstream-api/pyproject.toml $(PYTHON_ROOT)/functionstream-client/pyproject.toml
$(call log,ENV,Setting up Python virtual environment)
@test -d $(VENV) || python3 -m venv $(VENV)
@$(PIP) install --quiet --upgrade pip
@$(PIP) install --quiet -r requirements.txt
@touch $@
$(call success,Python environment ready)

test: install
$(call log,TEST,Running integration tests)
@$(PY) -m pytest -v $(PYTEST_ARGS)
$(call success,All integration tests passed)

clean:
$(call log,CLEAN,Removing test artifacts)
@rm -rf $(VENV)
@rm -rf $(CURDIR)/target
@rm -rf $(CURDIR)/install
$(call success,Clean complete)
Loading
Loading