From a8680d7c71ea0c81b79b0f8980f1d5ec91b01101 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Thu, 21 May 2026 10:54:09 -0700 Subject: [PATCH] feat(k8s): add configVersion annotation to pipeline deployments Add an MD5 hash of the JSON-serized pipeline_config as a pod template annotation so deployments roll when the config changes. Co-authored-by: Cursor --- .../sentry_streams_k8s/pipeline_step.py | 15 +++++++++++++++ sentry_streams_k8s/tests/test_pipeline_step.py | 4 ++++ 2 files changed, 19 insertions(+) diff --git a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py index be29e8a2..53390961 100644 --- a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py +++ b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py @@ -1,5 +1,6 @@ from __future__ import annotations +import hashlib import json import re from importlib.resources import files @@ -82,6 +83,11 @@ def get_multiprocess_config(pipeline_config: dict[str, Any]) -> tuple[int | None return process_count, segments_with_parallelism +def compute_config_version(pipeline_config: dict[str, Any]) -> str: + """MD5 hash of pipeline_config serialized as JSON (matches ConfigMap encoding).""" + return hashlib.md5(json.dumps(pipeline_config).encode()).hexdigest() + + def build_container( container_template: dict[str, Any], pipeline_name: str, @@ -246,6 +252,7 @@ def _build_merged_pipeline_deployment( step_labels: dict[str, Any], container: dict[str, Any], volumes: list[dict[str, Any]], + config_version: str, ) -> dict[str, Any]: """ Assembles a k8s deployment by layering these structures on top of the base deployment @@ -268,6 +275,9 @@ def _build_merged_pipeline_deployment( "template": { "metadata": { "labels": step_labels, + "annotations": { + "configVersion": config_version, + }, }, "spec": { "containers": [container], @@ -495,6 +505,8 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: } ) + config_version = compute_config_version(pipeline_config) + add_canary = ctx.get("with_canary", False) and replicas > 1 main_deployment_name = make_k8s_name( f"{service_name}-pipeline-{pipeline_name}-{segment_id}" @@ -513,6 +525,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: step_labels={**labels, "env": "primary"}, container=container, volumes=volumes, + config_version=config_version, ) canary_deployment = _build_merged_pipeline_deployment( base_deployment=base_deployment, @@ -523,6 +536,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: step_labels={**labels, "env": "canary"}, container=container, volumes=volumes, + config_version=config_version, ) else: deployment = _build_merged_pipeline_deployment( @@ -534,6 +548,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: step_labels={**labels, "env": "primary"}, container=container, volumes=volumes, + config_version=config_version, ) configmap = { diff --git a/sentry_streams_k8s/tests/test_pipeline_step.py b/sentry_streams_k8s/tests/test_pipeline_step.py index a884610d..2e4b519f 100644 --- a/sentry_streams_k8s/tests/test_pipeline_step.py +++ b/sentry_streams_k8s/tests/test_pipeline_step.py @@ -8,6 +8,7 @@ from sentry_streams_k8s.pipeline_step import ( PipelineStep, build_container, + compute_config_version, load_base_template, make_k8s_name, parse_context, @@ -447,6 +448,9 @@ def test_run_generates_complete_manifests() -> None: deployment["spec"]["template"]["metadata"]["annotations"]["sidecar.istio.io/inject"] == "false" ) + assert deployment["spec"]["template"]["metadata"]["annotations"]["configVersion"] == ( + compute_config_version(context["pipeline_config"]) + ) selector = deployment["spec"]["selector"] assert selector["matchLabels"]["pipeline-app"] == "sbc-profiles"