diff --git a/api/v1alpha1/temporalworker_webhook.go b/api/v1alpha1/temporalworker_webhook.go index 6772fc8b..e026460b 100644 --- a/api/v1alpha1/temporalworker_webhook.go +++ b/api/v1alpha1/temporalworker_webhook.go @@ -7,7 +7,6 @@ package v1alpha1 import ( "context" "fmt" - "time" "github.com/temporalio/temporal-worker-controller/internal/defaults" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -19,10 +18,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) -const ( - maxTemporalWorkerDeploymentNameLen = 63 -) - func (r *TemporalWorkerDeployment) SetupWebhookWithManager(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr). For(r). @@ -85,71 +80,37 @@ func (r *TemporalWorkerDeployment) validateForUpdateOrCreate(ctx context.Context } func validateForUpdateOrCreate(old, new *TemporalWorkerDeployment) (admission.Warnings, error) { - var allErrs field.ErrorList - - if len(new.GetName()) > maxTemporalWorkerDeploymentNameLen { - allErrs = append(allErrs, - field.Invalid(field.NewPath("metadata.name"), new.GetName(), fmt.Sprintf("cannot be more than %d characters", maxTemporalWorkerDeploymentNameLen)), - ) - } - - allErrs = append(allErrs, validateRolloutStrategy(new.Spec.RolloutStrategy)...) - + allErrs := validateRolloutStrategy(new.Spec.RolloutStrategy) if len(allErrs) > 0 { return nil, newInvalidErr(new, allErrs) } - return nil, nil } +// validateRolloutStrategy checks constraints that the CRD schema cannot enforce: +// rampPercentage must be strictly increasing across steps, and gate.input and +// gate.inputFrom are mutually exclusive (gate.input is an unstructured JSON field +// invisible to CEL). All other rollout constraints are enforced by the CRD CEL rules. func validateRolloutStrategy(s RolloutStrategy) []*field.Error { var allErrs []*field.Error if s.Strategy == UpdateProgressive { - rolloutSteps := s.Steps - if len(rolloutSteps) == 0 { - allErrs = append(allErrs, - field.Invalid(field.NewPath("spec.rollout.steps"), rolloutSteps, "steps are required for Progressive rollout"), - ) - } var lastRamp int - for i, s := range rolloutSteps { - // Check duration >= 30s - if s.PauseDuration.Duration < 30*time.Second { + for i, step := range s.Steps { + if step.RampPercentage <= lastRamp { allErrs = append(allErrs, - field.Invalid(field.NewPath(fmt.Sprintf("spec.rollout.steps[%d].pauseDuration", i)), s.PauseDuration.Duration.String(), "pause duration must be at least 30s"), + field.Invalid(field.NewPath(fmt.Sprintf("spec.rollout.steps[%d].rampPercentage", i)), step.RampPercentage, "rampPercentage must increase between each step"), ) } - - // Check ramp value greater than last - if s.RampPercentage <= lastRamp { - allErrs = append(allErrs, - field.Invalid(field.NewPath(fmt.Sprintf("spec.rollout.steps[%d].rampPercentage", i)), s.RampPercentage, "rampPercentage must increase between each step"), - ) - } - lastRamp = s.RampPercentage + lastRamp = step.RampPercentage } } - // Validate gate input fields - if s.Gate != nil { - gate := s.Gate - if gate.Input != nil && gate.InputFrom != nil { - allErrs = append(allErrs, - field.Invalid(field.NewPath("spec.rollout.gate"), "input & inputFrom", - "only one of input or inputFrom may be set"), - ) - } - if gate.InputFrom != nil { - cm := gate.InputFrom.ConfigMapKeyRef - sec := gate.InputFrom.SecretKeyRef - if (cm == nil && sec == nil) || (cm != nil && sec != nil) { - allErrs = append(allErrs, - field.Invalid(field.NewPath("spec.rollout.gate.inputFrom"), gate.InputFrom, - "exactly one of configMapKeyRef or secretKeyRef must be set"), - ) - } - } + if s.Gate != nil && s.Gate.Input != nil && s.Gate.InputFrom != nil { + allErrs = append(allErrs, + field.Invalid(field.NewPath("spec.rollout.gate"), "input & inputFrom", + "only one of input or inputFrom may be set"), + ) } return allErrs diff --git a/api/v1alpha1/temporalworker_webhook_test.go b/api/v1alpha1/temporalworker_webhook_test.go index 15b48915..64c8df03 100644 --- a/api/v1alpha1/temporalworker_webhook_test.go +++ b/api/v1alpha1/temporalworker_webhook_test.go @@ -27,10 +27,6 @@ func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) { "valid temporal worker deployment": { obj: testhelpers.MakeTWDWithName("valid-worker", ""), }, - "temporal worker deployment with name too long": { - obj: testhelpers.MakeTWDWithName("this-is-a-very-long-temporal-worker-deployment-name-that-exceeds-the-maximum-allowed-length-of-sixty-three-characters", ""), - errorMsg: "cannot be more than 63 characters", - }, "invalid object type": { obj: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -39,14 +35,6 @@ func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) { }, errorMsg: "expected a TemporalWorkerDeployment", }, - "missing rollout steps": { - obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-missing-steps", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { - obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive - obj.Spec.RolloutStrategy.Steps = nil - return obj - }), - errorMsg: "spec.rollout.steps: Invalid value: null: steps are required for Progressive rollout", - }, "ramp value for step <= previous step": { obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-decreasing-ramps", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive @@ -62,18 +50,6 @@ func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) { }), errorMsg: "[spec.rollout.steps[2].rampPercentage: Invalid value: 9: rampPercentage must increase between each step, spec.rollout.steps[4].rampPercentage: Invalid value: 50: rampPercentage must increase between each step]", }, - "pause duration < 30s": { - obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-decreasing-ramps", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { - obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive - obj.Spec.RolloutStrategy.Steps = []temporaliov1alpha1.RolloutStep{ - {10, metav1.Duration{Duration: time.Minute}}, - {25, metav1.Duration{Duration: 10 * time.Second}}, - {50, metav1.Duration{Duration: time.Minute}}, - } - return obj - }), - errorMsg: `spec.rollout.steps[1].pauseDuration: Invalid value: "10s": pause duration must be at least 30s`, - }, } for name, tc := range tests { @@ -110,11 +86,6 @@ func TestTemporalWorkerDeployment_ValidateUpdate(t *testing.T) { oldObj: nil, newObj: testhelpers.MakeTWDWithName("valid-worker", ""), }, - "update with name too long": { - oldObj: nil, - newObj: testhelpers.MakeTWDWithName("this-is-a-very-long-temporal-worker-deployment-name-that-exceeds-the-maximum-allowed-length-of-sixty-three-characters", ""), - errorMsg: "cannot be more than 63 characters", - }, } for name, tc := range tests { diff --git a/api/v1alpha1/temporalworkerdeployment_cel_validation_test.go b/api/v1alpha1/temporalworkerdeployment_cel_validation_test.go new file mode 100644 index 00000000..8ae6a794 --- /dev/null +++ b/api/v1alpha1/temporalworkerdeployment_cel_validation_test.go @@ -0,0 +1,122 @@ +// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License. +// +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc. + +package v1alpha1 + +// Integration tests for CRD-level CEL validation rules on TemporalWorkerDeployment. +// +// These tests hit a real kube-apiserver (via envtest) so they verify that the +// x-kubernetes-validations blocks in the generated CRD manifest are syntactically +// valid and semantically correct. The webhook Go code is NOT involved here — we are +// testing what the API server enforces regardless of whether the webhook is enabled. + +import ( + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("TemporalWorkerDeployment CRD CEL validation", func() { + var ns string + + BeforeEach(func() { + ns = makeTestNamespace("twd-cel") + }) + + // baseTWD returns a minimal valid TWD in the given namespace. + baseTWD := func(name string) *TemporalWorkerDeployment { + return &TemporalWorkerDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: TemporalWorkerDeploymentSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "worker", Image: "worker:latest"}}, + }, + }, + RolloutStrategy: RolloutStrategy{Strategy: UpdateAllAtOnce}, + WorkerOptions: WorkerOptions{ + TemporalConnectionRef: TemporalConnectionReference{Name: "my-connection"}, + TemporalNamespace: "default", + }, + }, + } + } + + It("accepts a valid TWD", func() { + Expect(k8sClient.Create(ctx, baseTWD("valid-worker"))).To(Succeed()) + }) + + It("rejects name longer than 63 characters", func() { + twd := baseTWD(strings.Repeat("a", 64)) + err := k8sClient.Create(ctx, twd) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("name cannot be more than 63 characters")) + }) + + It("rejects Progressive strategy with no steps", func() { + twd := baseTWD("prog-no-steps") + twd.Spec.RolloutStrategy = RolloutStrategy{Strategy: UpdateProgressive} + err := k8sClient.Create(ctx, twd) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("steps are required for Progressive rollout")) + }) + + It("rejects more than 20 Progressive steps", func() { + steps := make([]RolloutStep, 21) + for i := range steps { + steps[i] = RolloutStep{ + RampPercentage: i + 1, + PauseDuration: metav1.Duration{Duration: time.Minute}, + } + } + twd := baseTWD("prog-too-many-steps") + twd.Spec.RolloutStrategy = RolloutStrategy{Strategy: UpdateProgressive, Steps: steps} + err := k8sClient.Create(ctx, twd) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("Too many")) + }) + + It("rejects a Progressive step with pauseDuration less than 30s", func() { + twd := baseTWD("short-pause") + twd.Spec.RolloutStrategy = RolloutStrategy{ + Strategy: UpdateProgressive, + Steps: []RolloutStep{ + {RampPercentage: 50, PauseDuration: metav1.Duration{Duration: 10 * time.Second}}, + }, + } + err := k8sClient.Create(ctx, twd) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("pause duration must be at least 30s")) + }) + + It("rejects gate.inputFrom with both configMapKeyRef and secretKeyRef set", func() { + twd := baseTWD("bad-gate-inputfrom") + twd.Spec.RolloutStrategy = RolloutStrategy{ + Strategy: UpdateAllAtOnce, + Gate: &GateWorkflowConfig{ + WorkflowType: "my-gate", + InputFrom: &GateInputSource{ + ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-cm"}, + Key: "key", + }, + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"}, + Key: "key", + }, + }, + }, + } + err := k8sClient.Create(ctx, twd) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("exactly one of configMapKeyRef or secretKeyRef must be set")) + }) +}) diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index 17805a42..d11dfa35 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -135,6 +135,12 @@ const ( // controller cannot query the current worker deployment state from Temporal. ReasonTemporalStateFetchFailed = "TemporalStateFetchFailed" + // ReasonInvalidSpec is set on ConditionReady=False and ConditionProgressing=False + // when the spec fails validation that the CRD schema cannot enforce (e.g. rampPercentage + // ordering, gate input/inputFrom exclusivity). Reconciliation resumes automatically + // when the user corrects the spec and applies it. + ReasonInvalidSpec = "InvalidSpec" + // Deprecated: Use ReasonRolloutComplete on ConditionReady instead. ReasonTemporalConnectionHealthy = "TemporalConnectionHealthy" ) @@ -379,6 +385,8 @@ type GateInputSource struct { } // RolloutStrategy defines strategy to apply during next rollout +// +kubebuilder:validation:XValidation:rule="self.strategy != 'Progressive' || (has(self.steps) && size(self.steps) > 0)",message="steps are required for Progressive rollout" +// +kubebuilder:validation:XValidation:rule="!has(self.gate) || !has(self.gate.inputFrom) || (has(self.gate.inputFrom.configMapKeyRef) != has(self.gate.inputFrom.secretKeyRef))",message="exactly one of configMapKeyRef or secretKeyRef must be set" type RolloutStrategy struct { // Specifies how to treat concurrent executions of a Job. // Valid values are: @@ -393,6 +401,7 @@ type RolloutStrategy struct { // Steps to execute progressive rollouts. Only required when strategy is "Progressive". // +optional + // +kubebuilder:validation:MaxItems=20 Steps []RolloutStep `json:"steps,omitempty" protobuf:"bytes,3,rep,name=steps"` } @@ -413,6 +422,7 @@ type SunsetStrategy struct { type AllAtOnceRolloutStrategy struct{} +// +kubebuilder:validation:XValidation:rule="duration(self.pauseDuration) >= duration('30s')",message="pause duration must be at least 30s" type RolloutStep struct { // RampPercentage indicates what percentage of new workflow executions should be // routed to the new worker deployment version while this step is active. @@ -436,6 +446,7 @@ type ManualRolloutStrategy struct{} //+kubebuilder:printcolumn:name="Target",type="string",JSONPath=".status.targetVersion.buildID",description="Target build ID" //+kubebuilder:printcolumn:name="Ramp %",type="number",JSONPath=".status.targetVersion.rampPercentage",description="Ramp percentage" //+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Age" +// +kubebuilder:validation:XValidation:rule="size(self.metadata.name) <= 63",message="name cannot be more than 63 characters" // TemporalWorkerDeployment is the Schema for the temporalworkerdeployments API type TemporalWorkerDeployment struct { diff --git a/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml b/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml index d4c5ce21..dc38f305 100644 --- a/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml +++ b/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml @@ -111,6 +111,10 @@ spec: - pauseDuration - rampPercentage type: object + x-kubernetes-validations: + - message: pause duration must be at least 30s + rule: duration(self.pauseDuration) >= duration('30s') + maxItems: 20 type: array strategy: enum: @@ -121,6 +125,14 @@ spec: required: - strategy type: object + x-kubernetes-validations: + - message: steps are required for Progressive rollout + rule: self.strategy != 'Progressive' || (has(self.steps) && size(self.steps) + > 0) + - message: exactly one of configMapKeyRef or secretKeyRef must be + set + rule: '!has(self.gate) || !has(self.gate.inputFrom) || (has(self.gate.inputFrom.configMapKeyRef) + != has(self.gate.inputFrom.secretKeyRef))' sunset: properties: deleteDelay: @@ -4180,6 +4192,9 @@ spec: - targetVersion type: object type: object + x-kubernetes-validations: + - message: name cannot be more than 63 characters + rule: size(self.metadata.name) <= 63 served: true storage: true subresources: diff --git a/internal/controller/reconciler_events_test.go b/internal/controller/reconciler_events_test.go index e342b7bc..ff84cede 100644 --- a/internal/controller/reconciler_events_test.go +++ b/internal/controller/reconciler_events_test.go @@ -10,6 +10,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/go-logr/logr" "github.com/stretchr/testify/assert" @@ -399,12 +400,17 @@ func TestReconcile_TWDNotFound_NoEvent(t *testing.T) { assert.Empty(t, drainEvents(recorder), "no events should be emitted when TWD is not found") } -func TestReconcile_ValidationFailure_NoEventEmitted(t *testing.T) { - // Progressive strategy with no steps is invalid; the reconciler requeues without emitting events. +// TestReconcile_InvalidSpec_EmitsEventAndSetsCondition verifies that spec validation +// errors not enforceable by the CRD schema (e.g. rampPercentage ordering) surface as +// a Warning event and a blocked condition rather than being silently requeued. +func TestReconcile_InvalidSpec_EmitsEventAndSetsCondition(t *testing.T) { twd := makeTWD("test-worker", "default", "my-connection") twd.Spec.RolloutStrategy = temporaliov1alpha1.RolloutStrategy{ Strategy: temporaliov1alpha1.UpdateProgressive, - Steps: nil, + Steps: []temporaliov1alpha1.RolloutStep{ + {RampPercentage: 50, PauseDuration: metav1.Duration{Duration: time.Minute}}, + {RampPercentage: 10, PauseDuration: metav1.Duration{Duration: time.Minute}}, // decreasing — invalid + }, } tc := makeNoCredsTemporalConnection("my-connection", "default", "localhost:7233") r, recorder := newTestReconciler([]client.Object{twd, tc}) @@ -414,10 +420,17 @@ func TestReconcile_ValidationFailure_NoEventEmitted(t *testing.T) { }) require.NoError(t, err) - assert.NotZero(t, result.RequeueAfter, "should requeue on validation failure") + assert.Zero(t, result.RequeueAfter, "should not requeue — spec update will re-trigger reconciliation") + events := drainEvents(recorder) - assertNoEventEmitted(t, events, temporaliov1alpha1.ReasonTemporalConnectionNotFound) - assertNoEventEmitted(t, events, temporaliov1alpha1.ReasonTemporalClientCreationFailed) + assertEventEmitted(t, events, temporaliov1alpha1.ReasonInvalidSpec) + + var updated temporaliov1alpha1.TemporalWorkerDeployment + require.NoError(t, r.Get(context.Background(), types.NamespacedName{Name: twd.Name, Namespace: twd.Namespace}, &updated)) + cond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionProgressing) + require.NotNil(t, cond, "Progressing condition should be set") + assert.Equal(t, metav1.ConditionFalse, cond.Status) + assert.Equal(t, temporaliov1alpha1.ReasonInvalidSpec, cond.Reason) } // TestReconcile_TemporalConnectionNotFound covers all three related assertions: event emission, diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 25f2f533..540963db 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -139,13 +139,15 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, err } - // TODO(carlydf): Handle warnings once we have some, handle ValidateUpdate once it is different from ValidateCreate + // Fallback validation for spec constraints the CRD schema cannot enforce (rampPercentage + // ordering, gate input/inputFrom exclusivity). When the optional TWD webhook is disabled + // these checks would otherwise go unreported; this surfaces them as a condition and event. if _, err := workerDeploy.ValidateCreate(ctx, &workerDeploy); err != nil { - l.Error(err, "invalid TemporalWorkerDeployment") - return ctrl.Result{ - Requeue: true, - RequeueAfter: 5 * time.Minute, // user needs time to fix this, if it changes, it will be re-queued immediately - }, nil + r.recordWarningAndSetBlocked(ctx, &workerDeploy, + temporaliov1alpha1.ReasonInvalidSpec, + fmt.Sprintf("Invalid TemporalWorkerDeployment spec: %v", err), + err.Error()) + return ctrl.Result{}, nil } // Note: TemporalConnectionRef.Name is validated by webhook due to +kubebuilder:validation:Required