Skip to content
67 changes: 14 additions & 53 deletions api/v1alpha1/temporalworker_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package v1alpha1
import (
"context"
"fmt"
"time"

"github.com/temporalio/temporal-worker-controller/internal/defaults"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -19,10 +18,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

const (
Comment thread
carlydf marked this conversation as resolved.
maxTemporalWorkerDeploymentNameLen = 63
)

func (r *TemporalWorkerDeployment) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
Expand Down Expand Up @@ -85,71 +80,37 @@ func (r *TemporalWorkerDeployment) validateForUpdateOrCreate(ctx context.Context
}

func validateForUpdateOrCreate(old, new *TemporalWorkerDeployment) (admission.Warnings, error) {
var allErrs field.ErrorList

if len(new.GetName()) > maxTemporalWorkerDeploymentNameLen {
allErrs = append(allErrs,
field.Invalid(field.NewPath("metadata.name"), new.GetName(), fmt.Sprintf("cannot be more than %d characters", maxTemporalWorkerDeploymentNameLen)),
)
}

allErrs = append(allErrs, validateRolloutStrategy(new.Spec.RolloutStrategy)...)

allErrs := validateRolloutStrategy(new.Spec.RolloutStrategy)
if len(allErrs) > 0 {
return nil, newInvalidErr(new, allErrs)
}

return nil, nil
}

// validateRolloutStrategy checks constraints that the CRD schema cannot enforce:
// rampPercentage must be strictly increasing across steps, and gate.input and
// gate.inputFrom are mutually exclusive (gate.input is an unstructured JSON field
// invisible to CEL). All other rollout constraints are enforced by the CRD CEL rules.
func validateRolloutStrategy(s RolloutStrategy) []*field.Error {
var allErrs []*field.Error

if s.Strategy == UpdateProgressive {
rolloutSteps := s.Steps
if len(rolloutSteps) == 0 {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec.rollout.steps"), rolloutSteps, "steps are required for Progressive rollout"),
)
}
var lastRamp int
for i, s := range rolloutSteps {
// Check duration >= 30s
if s.PauseDuration.Duration < 30*time.Second {
for i, step := range s.Steps {
if step.RampPercentage <= lastRamp {
allErrs = append(allErrs,
field.Invalid(field.NewPath(fmt.Sprintf("spec.rollout.steps[%d].pauseDuration", i)), s.PauseDuration.Duration.String(), "pause duration must be at least 30s"),
field.Invalid(field.NewPath(fmt.Sprintf("spec.rollout.steps[%d].rampPercentage", i)), step.RampPercentage, "rampPercentage must increase between each step"),
)
}

// Check ramp value greater than last
if s.RampPercentage <= lastRamp {
allErrs = append(allErrs,
field.Invalid(field.NewPath(fmt.Sprintf("spec.rollout.steps[%d].rampPercentage", i)), s.RampPercentage, "rampPercentage must increase between each step"),
)
}
lastRamp = s.RampPercentage
lastRamp = step.RampPercentage
}
}

// Validate gate input fields
if s.Gate != nil {
gate := s.Gate
if gate.Input != nil && gate.InputFrom != nil {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec.rollout.gate"), "input & inputFrom",
"only one of input or inputFrom may be set"),
)
}
if gate.InputFrom != nil {
cm := gate.InputFrom.ConfigMapKeyRef
sec := gate.InputFrom.SecretKeyRef
if (cm == nil && sec == nil) || (cm != nil && sec != nil) {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec.rollout.gate.inputFrom"), gate.InputFrom,
"exactly one of configMapKeyRef or secretKeyRef must be set"),
)
}
}
if s.Gate != nil && s.Gate.Input != nil && s.Gate.InputFrom != nil {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec.rollout.gate"), "input & inputFrom",
"only one of input or inputFrom may be set"),
)
}

return allErrs
Expand Down
29 changes: 0 additions & 29 deletions api/v1alpha1/temporalworker_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) {
"valid temporal worker deployment": {
obj: testhelpers.MakeTWDWithName("valid-worker", ""),
},
"temporal worker deployment with name too long": {
Comment thread
carlydf marked this conversation as resolved.
obj: testhelpers.MakeTWDWithName("this-is-a-very-long-temporal-worker-deployment-name-that-exceeds-the-maximum-allowed-length-of-sixty-three-characters", ""),
errorMsg: "cannot be more than 63 characters",
},
"invalid object type": {
obj: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -39,14 +35,6 @@ func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) {
},
errorMsg: "expected a TemporalWorkerDeployment",
},
"missing rollout steps": {
obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-missing-steps", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment {
obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive
obj.Spec.RolloutStrategy.Steps = nil
return obj
}),
errorMsg: "spec.rollout.steps: Invalid value: null: steps are required for Progressive rollout",
},
"ramp value for step <= previous step": {
obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-decreasing-ramps", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment {
obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive
Expand All @@ -62,18 +50,6 @@ func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) {
}),
errorMsg: "[spec.rollout.steps[2].rampPercentage: Invalid value: 9: rampPercentage must increase between each step, spec.rollout.steps[4].rampPercentage: Invalid value: 50: rampPercentage must increase between each step]",
},
"pause duration < 30s": {
obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-decreasing-ramps", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment {
obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive
obj.Spec.RolloutStrategy.Steps = []temporaliov1alpha1.RolloutStep{
{10, metav1.Duration{Duration: time.Minute}},
{25, metav1.Duration{Duration: 10 * time.Second}},
{50, metav1.Duration{Duration: time.Minute}},
}
return obj
}),
errorMsg: `spec.rollout.steps[1].pauseDuration: Invalid value: "10s": pause duration must be at least 30s`,
},
}

for name, tc := range tests {
Expand Down Expand Up @@ -110,11 +86,6 @@ func TestTemporalWorkerDeployment_ValidateUpdate(t *testing.T) {
oldObj: nil,
newObj: testhelpers.MakeTWDWithName("valid-worker", ""),
},
"update with name too long": {
oldObj: nil,
newObj: testhelpers.MakeTWDWithName("this-is-a-very-long-temporal-worker-deployment-name-that-exceeds-the-maximum-allowed-length-of-sixty-three-characters", ""),
errorMsg: "cannot be more than 63 characters",
},
}

for name, tc := range tests {
Expand Down
122 changes: 122 additions & 0 deletions api/v1alpha1/temporalworkerdeployment_cel_validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License.
//
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc.

package v1alpha1

// Integration tests for CRD-level CEL validation rules on TemporalWorkerDeployment.
//
// These tests hit a real kube-apiserver (via envtest) so they verify that the
// x-kubernetes-validations blocks in the generated CRD manifest are syntactically
// valid and semantically correct. The webhook Go code is NOT involved here — we are
// testing what the API server enforces regardless of whether the webhook is enabled.

import (
"strings"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("TemporalWorkerDeployment CRD CEL validation", func() {
var ns string

BeforeEach(func() {
ns = makeTestNamespace("twd-cel")
})

// baseTWD returns a minimal valid TWD in the given namespace.
baseTWD := func(name string) *TemporalWorkerDeployment {
return &TemporalWorkerDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: TemporalWorkerDeploymentSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "worker", Image: "worker:latest"}},
},
},
RolloutStrategy: RolloutStrategy{Strategy: UpdateAllAtOnce},
WorkerOptions: WorkerOptions{
TemporalConnectionRef: TemporalConnectionReference{Name: "my-connection"},
TemporalNamespace: "default",
},
},
}
}

It("accepts a valid TWD", func() {
Expect(k8sClient.Create(ctx, baseTWD("valid-worker"))).To(Succeed())
})

It("rejects name longer than 63 characters", func() {
twd := baseTWD(strings.Repeat("a", 64))
err := k8sClient.Create(ctx, twd)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("name cannot be more than 63 characters"))
})

It("rejects Progressive strategy with no steps", func() {
twd := baseTWD("prog-no-steps")
twd.Spec.RolloutStrategy = RolloutStrategy{Strategy: UpdateProgressive}
err := k8sClient.Create(ctx, twd)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("steps are required for Progressive rollout"))
})

It("rejects more than 20 Progressive steps", func() {
steps := make([]RolloutStep, 21)
for i := range steps {
steps[i] = RolloutStep{
RampPercentage: i + 1,
PauseDuration: metav1.Duration{Duration: time.Minute},
}
}
twd := baseTWD("prog-too-many-steps")
twd.Spec.RolloutStrategy = RolloutStrategy{Strategy: UpdateProgressive, Steps: steps}
err := k8sClient.Create(ctx, twd)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("Too many"))
})

It("rejects a Progressive step with pauseDuration less than 30s", func() {
twd := baseTWD("short-pause")
twd.Spec.RolloutStrategy = RolloutStrategy{
Strategy: UpdateProgressive,
Steps: []RolloutStep{
{RampPercentage: 50, PauseDuration: metav1.Duration{Duration: 10 * time.Second}},
},
}
err := k8sClient.Create(ctx, twd)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("pause duration must be at least 30s"))
})

It("rejects gate.inputFrom with both configMapKeyRef and secretKeyRef set", func() {
twd := baseTWD("bad-gate-inputfrom")
twd.Spec.RolloutStrategy = RolloutStrategy{
Strategy: UpdateAllAtOnce,
Gate: &GateWorkflowConfig{
WorkflowType: "my-gate",
InputFrom: &GateInputSource{
ConfigMapKeyRef: &corev1.ConfigMapKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: "my-cm"},
Key: "key",
},
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"},
Key: "key",
},
},
},
}
err := k8sClient.Create(ctx, twd)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("exactly one of configMapKeyRef or secretKeyRef must be set"))
})
})
11 changes: 11 additions & 0 deletions api/v1alpha1/worker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ const (
// controller cannot query the current worker deployment state from Temporal.
ReasonTemporalStateFetchFailed = "TemporalStateFetchFailed"

// ReasonInvalidSpec is set on ConditionReady=False and ConditionProgressing=False
// when the spec fails validation that the CRD schema cannot enforce (e.g. rampPercentage
// ordering, gate input/inputFrom exclusivity). Reconciliation resumes automatically
// when the user corrects the spec and applies it.
ReasonInvalidSpec = "InvalidSpec"

// Deprecated: Use ReasonRolloutComplete on ConditionReady instead.
ReasonTemporalConnectionHealthy = "TemporalConnectionHealthy"
)
Expand Down Expand Up @@ -379,6 +385,8 @@ type GateInputSource struct {
}

// RolloutStrategy defines strategy to apply during next rollout
// +kubebuilder:validation:XValidation:rule="self.strategy != 'Progressive' || (has(self.steps) && size(self.steps) > 0)",message="steps are required for Progressive rollout"
// +kubebuilder:validation:XValidation:rule="!has(self.gate) || !has(self.gate.inputFrom) || (has(self.gate.inputFrom.configMapKeyRef) != has(self.gate.inputFrom.secretKeyRef))",message="exactly one of configMapKeyRef or secretKeyRef must be set"
type RolloutStrategy struct {
// Specifies how to treat concurrent executions of a Job.
// Valid values are:
Expand All @@ -393,6 +401,7 @@ type RolloutStrategy struct {

// Steps to execute progressive rollouts. Only required when strategy is "Progressive".
// +optional
// +kubebuilder:validation:MaxItems=20
Steps []RolloutStep `json:"steps,omitempty" protobuf:"bytes,3,rep,name=steps"`
}

Expand All @@ -413,6 +422,7 @@ type SunsetStrategy struct {

type AllAtOnceRolloutStrategy struct{}

// +kubebuilder:validation:XValidation:rule="duration(self.pauseDuration) >= duration('30s')",message="pause duration must be at least 30s"
type RolloutStep struct {
// RampPercentage indicates what percentage of new workflow executions should be
// routed to the new worker deployment version while this step is active.
Expand All @@ -436,6 +446,7 @@ type ManualRolloutStrategy struct{}
//+kubebuilder:printcolumn:name="Target",type="string",JSONPath=".status.targetVersion.buildID",description="Target build ID"
//+kubebuilder:printcolumn:name="Ramp %",type="number",JSONPath=".status.targetVersion.rampPercentage",description="Ramp percentage"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Age"
// +kubebuilder:validation:XValidation:rule="size(self.metadata.name) <= 63",message="name cannot be more than 63 characters"

// TemporalWorkerDeployment is the Schema for the temporalworkerdeployments API
type TemporalWorkerDeployment struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ spec:
- pauseDuration
- rampPercentage
type: object
x-kubernetes-validations:
- message: pause duration must be at least 30s
rule: duration(self.pauseDuration) >= duration('30s')
maxItems: 20
type: array
strategy:
enum:
Expand All @@ -121,6 +125,14 @@ spec:
required:
- strategy
type: object
x-kubernetes-validations:
- message: steps are required for Progressive rollout
rule: self.strategy != 'Progressive' || (has(self.steps) && size(self.steps)
> 0)
- message: exactly one of configMapKeyRef or secretKeyRef must be
set
rule: '!has(self.gate) || !has(self.gate.inputFrom) || (has(self.gate.inputFrom.configMapKeyRef)
!= has(self.gate.inputFrom.secretKeyRef))'
sunset:
properties:
deleteDelay:
Expand Down Expand Up @@ -4180,6 +4192,9 @@ spec:
- targetVersion
type: object
type: object
x-kubernetes-validations:
- message: name cannot be more than 63 characters
rule: size(self.metadata.name) <= 63
served: true
storage: true
subresources:
Expand Down
Loading
Loading