From dfde8f05355e68ab56f4c923a84a290fab3031ec Mon Sep 17 00:00:00 2001 From: Nitin Misra Date: Mon, 8 Jun 2026 15:06:21 +0530 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(controller):=20add=20stateful?= =?UTF-8?q?=20replica=20health=20classifier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports the stateful replica health classifier from the parallel feat/replica-health-detection effort onto main's data model. Main was already equal or ahead on parsing, metric cleanup, negative-behind handling and multi-database aggregation, so only the genuinely net-new classifier is brought over; main's richer per-replica metrics are kept and the new gauges are additive. What's new: - Per-replica state machine (replica_classifier.go) distinguishing a freshly-registered replica (Transient, 30s grace) from a broken data channel (DataChannelDown), plus Behind / BehindTooLong / Invalid / UnknownStatus. Clears the behind timer when the channel is down so the behind_seconds gauge does not grow without bound. - Configurable ReplicationSpec.BehindAlertThreshold (default 5m) for duration-based "behind too long" detection. Declared as a pointer so an unset value is omitted and picks up the CRD default rather than serializing as "0s" (a value-typed metav1.Duration is never omitted by omitempty, which would trip the >0 CEL validation on typed-client creates). - New gauges memgraph_replica_data_channel_up and memgraph_replica_behind_seconds, recorded inside CheckReplicationHealth and swept on replica unregister / cluster deletion. - Specific, actionable events (ReplicaDataChannelDown / ReplicaInvalid / ReplicaBehindTooLong) replacing the generic ReplicaUnhealthy. ReplicationManager now holds per-replica timers (in-memory, reset on restart) and an injectable clock for deterministic tests. CheckReplicationHealth keeps its existing signature so RecordReplicaSet continues to populate main's snapshot metrics. Note: a replica within the 30s grace window now counts toward HealthyReplicas (Transient) instead of being immediately Invalid — the intended anti-flapping behavior. Co-Authored-By: Claude Opus 4.8 (1M context) --- api/v1alpha1/memgraphcluster_types.go | 10 + api/v1alpha1/zz_generated.deepcopy.go | 7 +- .../memgraph.base14.io_memgraphclusters.yaml | 13 + internal/controller/events.go | 4 +- internal/controller/events_test.go | 4 +- .../controller/memgraphcluster_controller.go | 6 +- internal/controller/metrics.go | 46 +++ internal/controller/metrics_test.go | 46 +++ internal/controller/replica_classifier.go | 123 ++++++++ .../controller/replica_classifier_test.go | 270 ++++++++++++++++++ internal/controller/replication.go | 144 +++++++++- 11 files changed, 661 insertions(+), 12 deletions(-) create mode 100644 internal/controller/replica_classifier.go create mode 100644 internal/controller/replica_classifier_test.go diff --git a/api/v1alpha1/memgraphcluster_types.go b/api/v1alpha1/memgraphcluster_types.go index 6219479..7367cd3 100644 --- a/api/v1alpha1/memgraphcluster_types.go +++ b/api/v1alpha1/memgraphcluster_types.go @@ -140,6 +140,16 @@ type ReplicationSpec struct { // +kubebuilder:default="ASYNC" // +optional Mode ReplicationMode `json:"mode,omitempty"` + + // BehindAlertThreshold is the duration a replica may stay behind the main + // before being reported as unhealthy. Defaults to 5m. Must be greater than 0. + // Pointer so that an unset field is omitted and picks up the default, rather + // than serializing as "0s" (metav1.Duration is a struct, so omitempty alone + // would not omit its zero value). + // +kubebuilder:default="5m" + // +kubebuilder:validation:XValidation:rule=`!self.startsWith("-") && self != "0s"`,message="behindAlertThreshold must be a positive Go duration (e.g. '5m', '30s'); zero and negative values are not allowed" + // +optional + BehindAlertThreshold *metav1.Duration `json:"behindAlertThreshold,omitempty"` } // HighAvailabilitySpec defines automatic promotion/failover settings diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index cfac49f..b7889fd 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -111,7 +111,7 @@ func (in *MemgraphClusterSpec) DeepCopyInto(out *MemgraphClusterSpec) { in.Resources.DeepCopyInto(&out.Resources) in.Storage.DeepCopyInto(&out.Storage) out.Config = in.Config - out.Replication = in.Replication + in.Replication.DeepCopyInto(&out.Replication) if in.HighAvailability != nil { in, out := &in.HighAvailability, &out.HighAvailability *out = new(HighAvailabilitySpec) @@ -227,6 +227,11 @@ func (in *ReplicationHealth) DeepCopy() *ReplicationHealth { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReplicationSpec) DeepCopyInto(out *ReplicationSpec) { *out = *in + if in.BehindAlertThreshold != nil { + in, out := &in.BehindAlertThreshold, &out.BehindAlertThreshold + *out = new(metav1.Duration) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReplicationSpec. diff --git a/config/crd/bases/memgraph.base14.io_memgraphclusters.yaml b/config/crd/bases/memgraph.base14.io_memgraphclusters.yaml index fa94111..4cd74d1 100644 --- a/config/crd/bases/memgraph.base14.io_memgraphclusters.yaml +++ b/config/crd/bases/memgraph.base14.io_memgraphclusters.yaml @@ -1034,6 +1034,19 @@ spec: replication: description: Replication defines the replication settings properties: + behindAlertThreshold: + default: 5m + description: |- + BehindAlertThreshold is the duration a replica may stay behind the main + before being reported as unhealthy. Defaults to 5m. Must be greater than 0. + Pointer so that an unset field is omitted and picks up the default, rather + than serializing as "0s" (metav1.Duration is a struct, so omitempty alone + would not omit its zero value). + type: string + x-kubernetes-validations: + - message: behindAlertThreshold must be a positive Go duration + (e.g. '5m', '30s'); zero and negative values are not allowed + rule: '!self.startsWith("-") && self != "0s"' mode: default: ASYNC description: Mode is the replication mode (ASYNC, SYNC, STRICT_SYNC) diff --git a/internal/controller/events.go b/internal/controller/events.go index 5ce0951..1f9864b 100644 --- a/internal/controller/events.go +++ b/internal/controller/events.go @@ -20,7 +20,9 @@ const ( EventReasonReplicaUnregistered = "ReplicaUnregistered" EventReasonReplicationHealthy = "ReplicationHealthy" EventReasonReplicationError = "ReplicationError" - EventReasonReplicaUnhealthy = "ReplicaUnhealthy" + EventReasonReplicaDataChannelDown = "ReplicaDataChannelDown" + EventReasonReplicaInvalid = "ReplicaInvalid" + EventReasonReplicaBehindTooLong = "ReplicaBehindTooLong" EventReasonReplicationLagHigh = "ReplicationLagHigh" // Failover events diff --git a/internal/controller/events_test.go b/internal/controller/events_test.go index f2aee53..d075aca 100644 --- a/internal/controller/events_test.go +++ b/internal/controller/events_test.go @@ -20,7 +20,9 @@ func TestEventReasonConstants(t *testing.T) { {"EventReasonReplicaUnregistered", EventReasonReplicaUnregistered}, {"EventReasonReplicationHealthy", EventReasonReplicationHealthy}, {"EventReasonReplicationError", EventReasonReplicationError}, - {"EventReasonReplicaUnhealthy", EventReasonReplicaUnhealthy}, + {"EventReasonReplicaDataChannelDown", EventReasonReplicaDataChannelDown}, + {"EventReasonReplicaInvalid", EventReasonReplicaInvalid}, + {"EventReasonReplicaBehindTooLong", EventReasonReplicaBehindTooLong}, {"EventReasonReplicationLagHigh", EventReasonReplicationLagHigh}, {"EventReasonMainInstanceFailed", EventReasonMainInstanceFailed}, {"EventReasonFailoverStarted", EventReasonFailoverStarted}, diff --git a/internal/controller/memgraphcluster_controller.go b/internal/controller/memgraphcluster_controller.go index a6397d1..5858da3 100644 --- a/internal/controller/memgraphcluster_controller.go +++ b/internal/controller/memgraphcluster_controller.go @@ -77,7 +77,10 @@ func (r *MemgraphClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ cluster := &memgraphv1alpha1.MemgraphCluster{} if err := r.Get(ctx, req.NamespacedName, cluster); err != nil { if apierrors.IsNotFound(err) { - // Clean up metrics for deleted cluster + // Clean up metrics and per-replica state for deleted cluster + if r.replicationManager != nil { + r.replicationManager.DeleteClusterState(req.Name, req.Namespace) + } r.metrics.DeleteClusterMetrics(req.Name, req.Namespace) return ctrl.Result{}, nil } @@ -301,6 +304,7 @@ func (r *MemgraphClusterReconciler) ensureReplicationManager() error { } r.replicationManager = NewReplicationManager(mgClient, r.Recorder) + r.replicationManager.SetMetricsRecorder(r.metrics) return nil } diff --git a/internal/controller/metrics.go b/internal/controller/metrics.go index f026446..8542039 100644 --- a/internal/controller/metrics.go +++ b/internal/controller/metrics.go @@ -59,6 +59,26 @@ var ( []string{"cluster", "namespace"}, ) + // Stateful per-replica health metrics (owned by the replica classifier in + // ReplicationManager.CheckReplicationHealth). These complement the snapshot + // gauges below with time-aware signals: whether the data channel is up and + // how long the replica has continuously been behind. + replicaDataChannelUpGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "memgraph_replica_data_channel_up", + Help: "1 if the replica's data channel is established (data_info populated and status not invalid/unknown), else 0", + }, + []string{"cluster", "namespace", "replica"}, + ) + + replicaBehindSecondsGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "memgraph_replica_behind_seconds", + Help: "Number of seconds the replica has continuously been behind the main; 0 when caught up", + }, + []string{"cluster", "namespace", "replica"}, + ) + // Per-replica replication metrics (from SHOW REPLICAS data_info). // These distinguish "registered" from "actually streaming data": a replica // can be registered and heartbeating while replicating nothing (empty @@ -290,6 +310,8 @@ func init() { clusterRegisteredReplicasGauge, replicationLagGauge, replicationHealthyGauge, + replicaDataChannelUpGauge, + replicaBehindSecondsGauge, replicaHealthyGauge, replicaStatusGauge, replicaBehindGauge, @@ -520,6 +542,8 @@ func (m *MetricsRecorder) DeleteClusterMetrics(cluster, namespace string) { // Sweep all per-replica and per-instance series for the cluster partial := prometheus.Labels{"cluster": cluster, "namespace": namespace} + replicaDataChannelUpGauge.DeletePartialMatch(partial) + replicaBehindSecondsGauge.DeletePartialMatch(partial) replicaHealthyGauge.DeletePartialMatch(partial) replicaStatusGauge.DeletePartialMatch(partial) replicaBehindGauge.DeletePartialMatch(partial) @@ -529,3 +553,25 @@ func (m *MetricsRecorder) DeleteClusterMetrics(cluster, namespace string) { replicationEdgeDriftGauge.DeletePartialMatch(partial) instanceHealthGauge.DeletePartialMatch(partial) } + +// RecordReplicaDataChannel sets the per-replica data-channel-up gauge. +func (m *MetricsRecorder) RecordReplicaDataChannel(cluster, namespace, replica string, up bool) { + v := 0.0 + if up { + v = 1.0 + } + replicaDataChannelUpGauge.WithLabelValues(cluster, namespace, replica).Set(v) +} + +// RecordReplicaBehindSeconds sets how long the replica has been behind, in seconds. +// Pass 0 when the replica is caught up. +func (m *MetricsRecorder) RecordReplicaBehindSeconds(cluster, namespace, replica string, seconds float64) { + replicaBehindSecondsGauge.WithLabelValues(cluster, namespace, replica).Set(seconds) +} + +// DeleteReplicaMetrics removes the stateful per-replica gauges for a single +// replica (called when a replica is unregistered or its cluster is deleted). +func (m *MetricsRecorder) DeleteReplicaMetrics(cluster, namespace, replica string) { + replicaDataChannelUpGauge.DeleteLabelValues(cluster, namespace, replica) + replicaBehindSecondsGauge.DeleteLabelValues(cluster, namespace, replica) +} diff --git a/internal/controller/metrics_test.go b/internal/controller/metrics_test.go index eb5bb4a..df7ca43 100644 --- a/internal/controller/metrics_test.go +++ b/internal/controller/metrics_test.go @@ -161,6 +161,52 @@ func TestMetricsRecorder_DeleteClusterMetricsSweepsReplicaSeries(t *testing.T) { } } +func TestMetricsRecorder_RecordReplicaDataChannelAndBehind(t *testing.T) { + m := NewMetricsRecorder() + cluster, namespace, replica := "chan-cluster", testMetricsNamespace, "replica_0" + + m.RecordReplicaDataChannel(cluster, namespace, replica, true) + if got := testutil.ToFloat64(replicaDataChannelUpGauge.WithLabelValues(cluster, namespace, replica)); got != 1 { + t.Errorf("data_channel_up = %v, want 1", got) + } + + m.RecordReplicaDataChannel(cluster, namespace, replica, false) + if got := testutil.ToFloat64(replicaDataChannelUpGauge.WithLabelValues(cluster, namespace, replica)); got != 0 { + t.Errorf("data_channel_up = %v, want 0", got) + } + + m.RecordReplicaBehindSeconds(cluster, namespace, replica, 42.5) + if got := testutil.ToFloat64(replicaBehindSecondsGauge.WithLabelValues(cluster, namespace, replica)); got != 42.5 { + t.Errorf("behind_seconds = %v, want 42.5", got) + } + + // DeleteReplicaMetrics drops both series for the replica. + m.DeleteReplicaMetrics(cluster, namespace, replica) + if got := testutil.CollectAndCount(replicaDataChannelUpGauge); got != 0 { + t.Errorf("data_channel_up has %d series after delete, want 0", got) + } + if got := testutil.CollectAndCount(replicaBehindSecondsGauge); got != 0 { + t.Errorf("behind_seconds has %d series after delete, want 0", got) + } +} + +func TestMetricsRecorder_DeleteClusterMetricsSweepsStatefulReplicaSeries(t *testing.T) { + m := NewMetricsRecorder() + cluster, namespace := "stateful-sweep-cluster", testMetricsNamespace + + m.RecordReplicaDataChannel(cluster, namespace, "replica_0", true) + m.RecordReplicaBehindSeconds(cluster, namespace, "replica_0", 10) + + m.DeleteClusterMetrics(cluster, namespace) + + if got := testutil.CollectAndCount(replicaDataChannelUpGauge); got != 0 { + t.Errorf("data_channel_up has %d series after DeleteClusterMetrics, want 0", got) + } + if got := testutil.CollectAndCount(replicaBehindSecondsGauge); got != 0 { + t.Errorf("behind_seconds has %d series after DeleteClusterMetrics, want 0", got) + } +} + func TestMetricsRecorder_RecordInstanceHealth(t *testing.T) { m := NewMetricsRecorder() diff --git a/internal/controller/replica_classifier.go b/internal/controller/replica_classifier.go new file mode 100644 index 0000000..efd4f06 --- /dev/null +++ b/internal/controller/replica_classifier.go @@ -0,0 +1,123 @@ +// Copyright 2025 Base14. See LICENSE file for details. + +package controller + +import ( + "strings" + "time" + + "github.com/base14/memgraph-operator/internal/memgraph" +) + +// replicaClassification is the outcome of evaluating a single replica's state +// against the operator's health policy. +type replicaClassification int + +const ( + classificationHealthy replicaClassification = iota // ready/recovery/replicating, caught up + classificationBehind // behind > 0 but within behindAlertThreshold + classificationTransient // data_info empty for < channelDownGracePeriod + classificationDataChannelDown // data_info empty for >= channelDownGracePeriod + classificationInvalid // any DB status == "invalid" + classificationUnknownStatus // any DB status not in known set + classificationBehindTooLong // behind > 0 longer than behindAlertThreshold +) + +// channelDownGracePeriod is how long an empty data_info must persist before +// being flagged as DataChannelDown. Memgraph normally populates data_info +// within a few seconds of registration. +const channelDownGracePeriod = 30 * time.Second + +// isHealthy reports whether a classification counts toward HealthyReplicas. +// Transient and Behind are "in-progress" warnings, not failures. +func (c replicaClassification) isHealthy() bool { + switch c { + case classificationHealthy, classificationBehind, classificationTransient: + return true + default: + return false + } +} + +// replicaState holds per-replica timers that span health checks. +// The zero value means "no timer started". +type replicaState struct { + behindSince time.Time // when did the replica first have behind > 0 without recovery + channelDownSince time.Time // when did the replica first have empty data_info +} + +// classifyReplica evaluates one replica against per-replica state. +// It mutates state in-place (start/clear timers) and returns the classification. +// +// Policy (from spec 2026-05-25-replica-health-detection-design): +// - empty DataInfo +// - first seen / within grace -> Transient +// - persisted past grace -> DataChannelDown +// - any DB status == "invalid" -> Invalid (terminal, returned immediately) +// - any DB status unknown -> UnknownStatus (defensive) +// - any DB Behind > 0 +// - first seen / within threshold -> Behind +// - persisted past threshold -> BehindTooLong +// - else -> Healthy +// +// Negative Behind values are ignored entirely (they indicate replica-side +// divergence, which is a separate concern parked for a follow-up). +func classifyReplica(replica memgraph.ReplicaInfo, state *replicaState, now time.Time, behindThreshold time.Duration) replicaClassification { + if len(replica.DataInfo) == 0 { + if state.channelDownSince.IsZero() { + state.channelDownSince = now + } + // Channel is down → lag is undefined; clear the behind timer so the + // memgraph_replica_behind_seconds gauge does not keep growing. + state.behindSince = time.Time{} + if now.Sub(state.channelDownSince) > channelDownGracePeriod { + return classificationDataChannelDown + } + return classificationTransient + } + state.channelDownSince = time.Time{} + + // First pass: check for terminal "invalid" status without mutating state. + for _, db := range replica.DataInfo { + if dbStatus(db) == memgraph.ReplicaStatusInvalid { + return classificationInvalid + } + } + + worst := classificationHealthy + anyBehind := false + + for _, db := range replica.DataInfo { + switch dbStatus(db) { + case memgraph.ReplicaStatusReady, memgraph.ReplicaStatusRecovery, memgraph.ReplicaStatusReplicating: + // known good + default: + worst = classificationUnknownStatus + } + + if db.Behind > 0 { + anyBehind = true + if state.behindSince.IsZero() { + state.behindSince = now + } + if now.Sub(state.behindSince) > behindThreshold { + return classificationBehindTooLong + } + if worst == classificationHealthy { + worst = classificationBehind + } + } + } + + if !anyBehind { + state.behindSince = time.Time{} + } + + return worst +} + +// dbStatus normalizes a per-database status the same way summarizeDataInfo does +// in the memgraph client, so classification is robust to casing/quoting. +func dbStatus(db memgraph.ReplicaDBInfo) string { + return strings.ToLower(strings.Trim(strings.TrimSpace(db.Status), "\"")) +} diff --git a/internal/controller/replica_classifier_test.go b/internal/controller/replica_classifier_test.go new file mode 100644 index 0000000..e134bdd --- /dev/null +++ b/internal/controller/replica_classifier_test.go @@ -0,0 +1,270 @@ +// Copyright 2025 Base14. See LICENSE file for details. + +package controller + +import ( + "testing" + "time" + + "github.com/base14/memgraph-operator/internal/memgraph" +) + +func TestClassifyReplica(t *testing.T) { + baseTime := time.Date(2026, 5, 26, 12, 0, 0, 0, time.UTC) + behindThreshold := 5 * time.Minute + + tests := []struct { + name string + replica memgraph.ReplicaInfo + state *replicaState + now time.Time + want replicaClassification + wantState replicaState // expected state AFTER classification + }{ + { + name: "empty data_info first seen -> transient (channelDownSince set to now)", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{}, + }, + state: &replicaState{}, + now: baseTime, + want: classificationTransient, + wantState: replicaState{channelDownSince: baseTime}, + }, + { + name: "empty data_info still within 30s -> transient", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{}, + }, + state: &replicaState{channelDownSince: baseTime}, + now: baseTime.Add(20 * time.Second), + want: classificationTransient, + wantState: replicaState{channelDownSince: baseTime}, + }, + { + name: "empty data_info past 30s -> DataChannelDown", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{}, + }, + state: &replicaState{channelDownSince: baseTime}, + now: baseTime.Add(31 * time.Second), + want: classificationDataChannelDown, + wantState: replicaState{channelDownSince: baseTime}, + }, + { + name: "data_info populated clears channelDownSince", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: "ready", Behind: 0, Timestamp: 1}, + }, + }, + state: &replicaState{channelDownSince: baseTime}, + now: baseTime.Add(10 * time.Second), + want: classificationHealthy, + wantState: replicaState{}, + }, + { + name: "ready healthy", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: "ready", Behind: 0, Timestamp: 1}, + }, + }, + state: &replicaState{}, + now: baseTime, + want: classificationHealthy, + wantState: replicaState{}, + }, + { + name: "replicating healthy", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: "replicating", Behind: 0, Timestamp: 1}, + }, + }, + state: &replicaState{}, + now: baseTime, + want: classificationHealthy, + wantState: replicaState{}, + }, + { + name: "recovery healthy", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: "recovery", Behind: 0, Timestamp: 1}, + }, + }, + state: &replicaState{}, + now: baseTime, + want: classificationHealthy, + wantState: replicaState{}, + }, + { + name: "invalid -> Invalid (terminal)", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: "invalid", Behind: 0, Timestamp: 1}, + }, + }, + state: &replicaState{}, + now: baseTime, + want: classificationInvalid, + wantState: replicaState{}, + }, + { + name: "unknown status -> UnknownStatus", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: "weird", Behind: 0, Timestamp: 1}, + }, + }, + state: &replicaState{}, + now: baseTime, + want: classificationUnknownStatus, + wantState: replicaState{}, + }, + { + name: "uppercase/quoted status normalized (READY -> healthy)", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: `"READY"`, Behind: 0, Timestamp: 1}, + }, + }, + state: &replicaState{}, + now: baseTime, + want: classificationHealthy, + wantState: replicaState{}, + }, + { + name: "behind > 0 first seen -> Behind (sets behindSince)", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: "replicating", Behind: 10, Timestamp: 1}, + }, + }, + state: &replicaState{}, + now: baseTime, + want: classificationBehind, + wantState: replicaState{behindSince: baseTime}, + }, + { + name: "behind > 0 within threshold -> Behind", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: "replicating", Behind: 10, Timestamp: 1}, + }, + }, + state: &replicaState{behindSince: baseTime}, + now: baseTime.Add(4 * time.Minute), + want: classificationBehind, + wantState: replicaState{behindSince: baseTime}, + }, + { + name: "behind > 0 past threshold -> BehindTooLong", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: "replicating", Behind: 10, Timestamp: 1}, + }, + }, + state: &replicaState{behindSince: baseTime}, + now: baseTime.Add(5*time.Minute + time.Second), + want: classificationBehindTooLong, + wantState: replicaState{behindSince: baseTime}, + }, + { + name: "caught up clears behindSince", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: "ready", Behind: 0, Timestamp: 1}, + }, + }, + state: &replicaState{behindSince: baseTime}, + now: baseTime.Add(time.Minute), + want: classificationHealthy, + wantState: replicaState{}, + }, + { + name: "channel-down clears behindSince", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{}, + }, + state: &replicaState{behindSince: baseTime}, + now: baseTime.Add(10 * time.Second), + want: classificationTransient, + wantState: replicaState{channelDownSince: baseTime.Add(10 * time.Second)}, + }, + { + name: "negative behind ignored (not Behind)", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "memgraph": {Status: "recovery", Behind: -8, Timestamp: 1}, + }, + }, + state: &replicaState{}, + now: baseTime, + want: classificationHealthy, + wantState: replicaState{}, + }, + { + name: "invalid wins over behind in multi-db", + replica: memgraph.ReplicaInfo{ + Name: "r0", + DataInfo: map[string]memgraph.ReplicaDBInfo{ + "db_a": {Status: "replicating", Behind: 999, Timestamp: 1}, + "db_b": {Status: "invalid", Behind: 0, Timestamp: 1}, + }, + }, + state: &replicaState{}, + now: baseTime, + want: classificationInvalid, + wantState: replicaState{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := classifyReplica(tt.replica, tt.state, tt.now, behindThreshold) + if got != tt.want { + t.Errorf("classifyReplica() = %v, want %v", got, tt.want) + } + if !tt.state.behindSince.Equal(tt.wantState.behindSince) { + t.Errorf("state.behindSince = %v, want %v", tt.state.behindSince, tt.wantState.behindSince) + } + if !tt.state.channelDownSince.Equal(tt.wantState.channelDownSince) { + t.Errorf("state.channelDownSince = %v, want %v", tt.state.channelDownSince, tt.wantState.channelDownSince) + } + }) + } +} + +func TestClassificationIsHealthy(t *testing.T) { + healthy := []replicaClassification{classificationHealthy, classificationBehind, classificationTransient} + unhealthy := []replicaClassification{classificationDataChannelDown, classificationInvalid, classificationUnknownStatus, classificationBehindTooLong} + + for _, c := range healthy { + if !c.isHealthy() { + t.Errorf("%v should be healthy", c) + } + } + for _, c := range unhealthy { + if c.isHealthy() { + t.Errorf("%v should be unhealthy", c) + } + } +} diff --git a/internal/controller/replication.go b/internal/controller/replication.go index 7bbf071..b4433ae 100644 --- a/internal/controller/replication.go +++ b/internal/controller/replication.go @@ -7,6 +7,8 @@ import ( "fmt" "sort" "strings" + "sync" + "time" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -20,6 +22,16 @@ import ( type ReplicationManager struct { client *memgraph.Client recorder record.EventRecorder + metrics *MetricsRecorder + + // states tracks per-replica timers for classification, keyed by + // "namespace/name" -> replica name -> state. In-memory only; timers + // reset on operator restart. + statesMu sync.Mutex + states map[string]map[string]*replicaState + + // now is the clock used for classification. Overridable in tests. + now func() time.Time } // NewReplicationManager creates a new ReplicationManager @@ -27,14 +39,71 @@ func NewReplicationManager(client *memgraph.Client, recorder record.EventRecorde return &ReplicationManager{ client: client, recorder: recorder, + states: make(map[string]map[string]*replicaState), + now: time.Now, } } +// SetMetricsRecorder wires the metrics recorder. Safe to call once at startup. +func (rm *ReplicationManager) SetMetricsRecorder(m *MetricsRecorder) { + rm.metrics = m +} + // Client returns the underlying memgraph client func (rm *ReplicationManager) Client() *memgraph.Client { return rm.client } +// stateFor returns (creating if necessary) the per-replica state entry. +// Caller must hold statesMu. +func (rm *ReplicationManager) stateFor(clusterKey, replicaName string) *replicaState { + byReplica, ok := rm.states[clusterKey] + if !ok { + byReplica = make(map[string]*replicaState) + rm.states[clusterKey] = byReplica + } + st, ok := byReplica[replicaName] + if !ok { + st = &replicaState{} + byReplica[replicaName] = st + } + return st +} + +// pruneStates drops state entries for replicas no longer in the observed set. +// Caller must hold statesMu. +func (rm *ReplicationManager) pruneStates(clusterKey string, observed map[string]struct{}) { + byReplica, ok := rm.states[clusterKey] + if !ok { + return + } + for name := range byReplica { + if _, kept := observed[name]; !kept { + delete(byReplica, name) + } + } +} + +// DeleteClusterState removes all per-replica state for a cluster and drops the +// matching per-replica metric series. Called during cluster deletion. +func (rm *ReplicationManager) DeleteClusterState(clusterName, namespace string) { + clusterKey := namespace + "/" + clusterName + + rm.statesMu.Lock() + defer rm.statesMu.Unlock() + + byReplica, ok := rm.states[clusterKey] + if !ok { + return + } + if rm.metrics != nil { + for replicaName := range byReplica { + rm.metrics.DeleteReplicaMetrics(clusterName, namespace, replicaName) + } + } + delete(rm.states, clusterKey) +} + // ConfigureReplication sets up replication for the cluster // It configures the write instance as MAIN and all other instances as REPLICAs func (rm *ReplicationManager) ConfigureReplication(ctx context.Context, cluster *memgraphv1alpha1.MemgraphCluster, pods []corev1.Pod, writeInstance string, log *zap.Logger) error { @@ -197,6 +266,9 @@ func (rm *ReplicationManager) cleanupStaleReplicas(ctx context.Context, cluster } else { rm.recorder.Event(cluster, corev1.EventTypeNormal, EventReasonReplicaUnregistered, fmt.Sprintf("Stale replica %s unregistered from main", replica.Name)) + if rm.metrics != nil { + rm.metrics.DeleteReplicaMetrics(cluster.Name, cluster.Namespace, replica.Name) + } } } } @@ -218,25 +290,81 @@ func (rm *ReplicationManager) CheckReplicationHealth(ctx context.Context, cluste return nil, nil, fmt.Errorf("failed to show replicas: %w", err) } + behindThreshold := 5 * time.Minute // safety net if CRD defaulting did not apply + if t := cluster.Spec.Replication.BehindAlertThreshold; t != nil && t.Duration > 0 { + behindThreshold = t.Duration + } + + now := rm.now() + clusterKey := cluster.Namespace + "/" + cluster.Name + health := &memgraphv1alpha1.ReplicationHealth{ TotalReplicas: int32(len(replicas)), HealthyReplicas: 0, } + observed := make(map[string]struct{}, len(replicas)) + + rm.statesMu.Lock() + defer rm.statesMu.Unlock() + for _, replica := range replicas { - if replica.IsHealthy() { + observed[replica.Name] = struct{}{} + st := rm.stateFor(clusterKey, replica.Name) + + class := classifyReplica(replica, st, now, behindThreshold) + if class.isHealthy() { health.HealthyReplicas++ - } else { - log.Warn("unhealthy replica detected", + } + + // Stateful per-replica metrics (additive to the snapshot gauges + // recorded by RecordReplicaSet from the returned replicas). + if rm.metrics != nil { + channelUp := class != classificationDataChannelDown && + class != classificationTransient && + class != classificationInvalid && + class != classificationUnknownStatus + rm.metrics.RecordReplicaDataChannel(cluster.Name, cluster.Namespace, replica.Name, channelUp) + + behindSeconds := 0.0 + if !st.behindSince.IsZero() { + behindSeconds = now.Sub(st.behindSince).Seconds() + } + rm.metrics.RecordReplicaBehindSeconds(cluster.Name, cluster.Namespace, replica.Name, behindSeconds) + } + + // Emit specific, actionable events for unhealthy classifications only. + switch class { + case classificationDataChannelDown: + log.Warn("replica data channel down", zap.String("replica", replica.Name), - zap.String("status", replica.Status), - zap.Bool("dataInfoPresent", replica.DataInfoPresent()), - zap.Int64("behind", replica.Behind)) - rm.recorder.Event(cluster, corev1.EventTypeWarning, EventReasonReplicaUnhealthy, - fmt.Sprintf("Replica %s is unhealthy: %s", replica.Name, replica.Status)) + zap.Duration("emptyFor", now.Sub(st.channelDownSince))) + rm.recorder.Event(cluster, corev1.EventTypeWarning, EventReasonReplicaDataChannelDown, + fmt.Sprintf("Replica %s registered but data_info is empty for %s — main has not opened a data channel. Check connectivity on :10000, replica role, and version match. Re-registering with a fresh PVC usually resolves this.", + replica.Name, now.Sub(st.channelDownSince).Round(time.Second))) + case classificationInvalid: + log.Warn("replica in invalid state", + zap.String("replica", replica.Name), + zap.Any("dataInfo", replica.DataInfo)) + rm.recorder.Event(cluster, corev1.EventTypeWarning, EventReasonReplicaInvalid, + fmt.Sprintf("Replica %s reports invalid status. Manual intervention required (drop + re-register).", replica.Name)) + case classificationBehindTooLong: + log.Warn("replica behind for too long", + zap.String("replica", replica.Name), + zap.Duration("behindFor", now.Sub(st.behindSince)), + zap.Duration("threshold", behindThreshold)) + rm.recorder.Event(cluster, corev1.EventTypeWarning, EventReasonReplicaBehindTooLong, + fmt.Sprintf("Replica %s has been behind for %s (threshold %s).", + replica.Name, now.Sub(st.behindSince).Round(time.Second), behindThreshold)) + case classificationUnknownStatus: + log.Warn("replica reports unknown status", + zap.String("replica", replica.Name), + zap.Any("dataInfo", replica.DataInfo)) } } + rm.pruneStates(clusterKey, observed) + // Emit event if all replicas are healthy if health.HealthyReplicas == health.TotalReplicas && health.TotalReplicas > 0 { rm.recorder.Event(cluster, corev1.EventTypeNormal, EventReasonReplicationHealthy,