diff --git a/server/internal/database/orchestrator.go b/server/internal/database/orchestrator.go index 180fd135..bbae52f9 100644 --- a/server/internal/database/orchestrator.go +++ b/server/internal/database/orchestrator.go @@ -187,4 +187,8 @@ type Orchestrator interface { // spec before it is persisted. old is nil when the instance is being created // for the first time. ReconcileInstanceSpec(old, new *InstanceSpec) error + // ReconcileServiceInstanceSpec is called during service instance spec + // reconciliation to resolve and pin the container image. old is nil when + // the service instance is being created for the first time. + ReconcileServiceInstanceSpec(old, new *ServiceInstanceSpec) error } diff --git a/server/internal/database/reconcile_versions.go b/server/internal/database/reconcile_versions.go index a07cf425..487a0b00 100644 --- a/server/internal/database/reconcile_versions.go +++ b/server/internal/database/reconcile_versions.go @@ -64,6 +64,12 @@ func (s *Service) ReconcileAllDatabaseVersions(ctx context.Context) error { return fmt.Errorf("failed to get instance spec for instance '%s': %w", instance.InstanceID, err) } else if err == nil { instanceSpec.Spec.PgEdgeVersion = instance.PgEdgeVersion + // Clear the pinned image so the next reconcile re-derives the + // correct image from the manifest for the new version. + if instanceSpec.Spec.OrchestratorOpts != nil && + instanceSpec.Spec.OrchestratorOpts.Swarm != nil { + instanceSpec.Spec.OrchestratorOpts.Swarm.ResolvedImage = "" + } ops = append(ops, s.store.InstanceSpec.Update(instanceSpec)) } diff --git a/server/internal/database/service.go b/server/internal/database/service.go index b2287d4c..82a87c7a 100644 --- a/server/internal/database/service.go +++ b/server/internal/database/service.go @@ -766,6 +766,10 @@ func (s *Service) ReconcileServiceInstanceSpec(ctx context.Context, spec *Servic return nil, fmt.Errorf("failed to get current spec for service instance '%s': %w", spec.ServiceInstanceID, err) } + if err := s.orchestrator.ReconcileServiceInstanceSpec(previous, spec); err != nil { + return nil, fmt.Errorf("failed to reconcile service instance spec: %w", err) + } + var allocated []int rollback := func(cause error) error { rollbackCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 5a4adb95..a3fe9aa3 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -204,15 +204,29 @@ func (o *Orchestrator) resolveInstanceImages(spec *database.InstanceSpec) (*Imag } } -// ReconcileInstanceSpec resolves the container image for the new spec and -// clears a stale ResolvedImage if PgEdgeVersion changed since the last -// reconciliation. +// ReconcileInstanceSpec resolves the container image for the new spec. +// When the version is unchanged it carries the stored ResolvedImage forward so +// resolveInstanceImages takes the fast path (case 2) and never consults the +// manifest. When the version changed it clears the stale pin so +// resolveInstanceImages re-derives the correct image. func (o *Orchestrator) ReconcileInstanceSpec(old, new *database.InstanceSpec) error { - // If the Postgres version changed, the previously resolved image no longer - // matches — clear it so resolveInstanceImages fetches the correct one from - // the manifest. if old != nil && old.PgEdgeVersion != nil && new.PgEdgeVersion != nil { - if !old.PgEdgeVersion.Equals(new.PgEdgeVersion) { + if old.PgEdgeVersion.Equals(new.PgEdgeVersion) { + // Version unchanged — carry forward the pinned image so we don't + // re-derive it from the manifest on every reconcile. + if old.OrchestratorOpts != nil && old.OrchestratorOpts.Swarm != nil && + old.OrchestratorOpts.Swarm.ResolvedImage != "" { + if new.OrchestratorOpts == nil { + new.OrchestratorOpts = &database.OrchestratorOpts{} + } + if new.OrchestratorOpts.Swarm == nil { + new.OrchestratorOpts.Swarm = &database.SwarmOpts{} + } + new.OrchestratorOpts.Swarm.ResolvedImage = old.OrchestratorOpts.Swarm.ResolvedImage + } + } else { + // Version changed — clear the stale pin so resolveInstanceImages + // fetches the correct image for the new version from the manifest. if new.OrchestratorOpts != nil && new.OrchestratorOpts.Swarm != nil { new.OrchestratorOpts.Swarm.ResolvedImage = "" } @@ -222,6 +236,68 @@ func (o *Orchestrator) ReconcileInstanceSpec(old, new *database.InstanceSpec) er return err } +// resolveServiceImage returns a ServiceImage for the service instance using +// the same precedence as resolveInstanceImages: +// 1. ServiceSpec.OrchestratorOpts.Swarm.Image (user override) — manifest skipped +// 2. ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage (CP-managed, already stored) +// 3. Manifest lookup — result written to ResolvedImage (lazy backfill / first creation) +func (o *Orchestrator) resolveServiceImage(spec *database.ServiceInstanceSpec) (*ServiceImage, error) { + var swarmOpts *database.SwarmOpts + if spec.ServiceSpec != nil && spec.ServiceSpec.OrchestratorOpts != nil { + swarmOpts = spec.ServiceSpec.OrchestratorOpts.Swarm + } + + switch { + case swarmOpts != nil && swarmOpts.Image != "": + return &ServiceImage{Tag: swarmOpts.Image}, nil + case swarmOpts != nil && swarmOpts.ResolvedImage != "": + return &ServiceImage{Tag: swarmOpts.ResolvedImage}, nil + default: + manifested, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version) + if err != nil { + return nil, fmt.Errorf("failed to get service image: %w", err) + } + if spec.ServiceSpec.OrchestratorOpts == nil { + spec.ServiceSpec.OrchestratorOpts = &database.OrchestratorOpts{} + } + if spec.ServiceSpec.OrchestratorOpts.Swarm == nil { + spec.ServiceSpec.OrchestratorOpts.Swarm = &database.SwarmOpts{} + } + spec.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage = manifested.Tag + return manifested, nil + } +} + +// ReconcileServiceInstanceSpec resolves and pins the container image for the +// service instance. When the service version is unchanged it carries the stored +// ResolvedImage forward so resolveServiceImage takes the fast path and never +// consults the manifest. When the version changed it clears the stale pin. +// old is nil when the service instance is being created for the first time. +func (o *Orchestrator) ReconcileServiceInstanceSpec(old, new *database.ServiceInstanceSpec) error { + if old != nil && old.ServiceSpec != nil && new.ServiceSpec != nil && + old.ServiceSpec.Version == new.ServiceSpec.Version { + // Version unchanged — carry forward the pinned image. + if old.ServiceSpec.OrchestratorOpts != nil && + old.ServiceSpec.OrchestratorOpts.Swarm != nil && + old.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage != "" { + if new.ServiceSpec.OrchestratorOpts == nil { + new.ServiceSpec.OrchestratorOpts = &database.OrchestratorOpts{} + } + if new.ServiceSpec.OrchestratorOpts.Swarm == nil { + new.ServiceSpec.OrchestratorOpts.Swarm = &database.SwarmOpts{} + } + new.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage = old.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage + } + } else if new.ServiceSpec != nil && new.ServiceSpec.OrchestratorOpts != nil && + new.ServiceSpec.OrchestratorOpts.Swarm != nil { + // Version changed — clear the stale pin so resolveServiceImage + // fetches the correct image for the new version from the manifest. + new.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage = "" + } + _, err := o.resolveServiceImage(new) + return err +} + func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResource, []resource.Resource, []resource.Resource, error) { images, err := o.resolveInstanceImages(spec) if err != nil { @@ -476,8 +552,7 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn } func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { - // Get service image based on service type and version - serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version) + serviceImage, err := o.resolveServiceImage(spec) if err != nil { return nil, fmt.Errorf("failed to get service image: %w", err) } @@ -685,8 +760,7 @@ func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInsta // generateRAGInstanceResources returns the resources needed for one RAG service // instance. func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { - // Get service image. - serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version) + serviceImage, err := o.resolveServiceImage(spec) if err != nil { return nil, fmt.Errorf("failed to get service image: %w", err) } diff --git a/server/internal/orchestrator/swarm/reconcile_instance_spec_test.go b/server/internal/orchestrator/swarm/reconcile_instance_spec_test.go new file mode 100644 index 00000000..845f0183 --- /dev/null +++ b/server/internal/orchestrator/swarm/reconcile_instance_spec_test.go @@ -0,0 +1,106 @@ +package swarm + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/ds" +) + +func TestReconcileInstanceSpec(t *testing.T) { + o := &Orchestrator{ + versions: NewVersions(config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "registry.example.com/pgedge", + }, + }), + } + + knownVersion := ds.MustParsePgEdgeVersion("17.9", "5") + newKnownVersion := ds.MustParsePgEdgeVersion("17.10", "5") + + manifestImage, err := o.versions.GetImages(knownVersion) + require.NoError(t, err) + pinnedImage := manifestImage.PgEdgeImage + + t.Run("first creation: old nil, ResolvedImage written from manifest", func(t *testing.T) { + spec := &database.InstanceSpec{PgEdgeVersion: knownVersion} + require.NoError(t, o.ReconcileInstanceSpec(nil, spec)) + require.NotNil(t, spec.OrchestratorOpts) + require.NotNil(t, spec.OrchestratorOpts.Swarm) + assert.Equal(t, pinnedImage, spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("same version: old.ResolvedImage carried forward, manifest not re-consulted", func(t *testing.T) { + old := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:pinned-tag"}, + }, + } + newSpec := &database.InstanceSpec{PgEdgeVersion: knownVersion} + + require.NoError(t, o.ReconcileInstanceSpec(old, newSpec)) + require.NotNil(t, newSpec.OrchestratorOpts) + require.NotNil(t, newSpec.OrchestratorOpts.Swarm) + // Must use the stored pin, not re-derive from manifest. + assert.Equal(t, "registry.example.com/pgedge:pinned-tag", newSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("same version, old has no ResolvedImage: manifest lookup runs", func(t *testing.T) { + old := &database.InstanceSpec{PgEdgeVersion: knownVersion} + newSpec := &database.InstanceSpec{PgEdgeVersion: knownVersion} + + require.NoError(t, o.ReconcileInstanceSpec(old, newSpec)) + require.NotNil(t, newSpec.OrchestratorOpts) + assert.Equal(t, pinnedImage, newSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("version changed: old ResolvedImage cleared, manifest re-consulted", func(t *testing.T) { + newVersionImage, err := o.versions.GetImages(newKnownVersion) + require.NoError(t, err) + + old := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:old-tag"}, + }, + } + newSpec := &database.InstanceSpec{ + PgEdgeVersion: newKnownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:old-tag"}, + }, + } + + require.NoError(t, o.ReconcileInstanceSpec(old, newSpec)) + // Must use the new version's manifest image, not the stale pin. + assert.Equal(t, newVersionImage.PgEdgeImage, newSpec.OrchestratorOpts.Swarm.ResolvedImage) + assert.NotEqual(t, "registry.example.com/pgedge:old-tag", newSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("user Image override: preserved, ResolvedImage from old still copied but case 1 wins", func(t *testing.T) { + old := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:pinned-tag"}, + }, + } + newSpec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{Image: "my-custom/image:dev"}, + }, + } + + require.NoError(t, o.ReconcileInstanceSpec(old, newSpec)) + // Image override must win; resolveInstanceImages case 1 returns it directly. + assert.Equal(t, "my-custom/image:dev", newSpec.OrchestratorOpts.Swarm.Image) + // ResolvedImage is copied but resolveInstanceImages does not overwrite it in case 1. + assert.Equal(t, "registry.example.com/pgedge:pinned-tag", newSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) +} diff --git a/server/internal/orchestrator/swarm/resolve_service_image_test.go b/server/internal/orchestrator/swarm/resolve_service_image_test.go new file mode 100644 index 00000000..c3008c40 --- /dev/null +++ b/server/internal/orchestrator/swarm/resolve_service_image_test.go @@ -0,0 +1,153 @@ +package swarm + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/database" +) + +func newTestServiceOrchestrator() *Orchestrator { + return &Orchestrator{ + serviceVersions: NewServiceVersions(config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "registry.example.com/pgedge", + }, + }), + } +} + +func serviceSpecWith(serviceType, version string, swarm *database.SwarmOpts) *database.ServiceInstanceSpec { + svc := &database.ServiceSpec{ + ServiceType: serviceType, + Version: version, + } + if swarm != nil { + svc.OrchestratorOpts = &database.OrchestratorOpts{Swarm: swarm} + } + return &database.ServiceInstanceSpec{ServiceSpec: svc} +} + +func TestResolveServiceImage(t *testing.T) { + o := newTestServiceOrchestrator() + + manifestImage, err := o.serviceVersions.GetServiceImage("mcp", "latest") + require.NoError(t, err) + pinnedTag := manifestImage.Tag + + t.Run("Image override used directly, manifest not consulted", func(t *testing.T) { + spec := serviceSpecWith("mcp", "latest", &database.SwarmOpts{Image: "my-registry/mcp:dev"}) + + img, err := o.resolveServiceImage(spec) + require.NoError(t, err) + assert.Equal(t, "my-registry/mcp:dev", img.Tag) + // ResolvedImage must not be written when Image is set + assert.Empty(t, spec.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("Image override works for unknown version (bypasses manifest)", func(t *testing.T) { + spec := serviceSpecWith("mcp", "unknown-version", &database.SwarmOpts{Image: "my-registry/mcp:dev"}) + + img, err := o.resolveServiceImage(spec) + require.NoError(t, err) + assert.Equal(t, "my-registry/mcp:dev", img.Tag) + }) + + t.Run("Image takes precedence over ResolvedImage", func(t *testing.T) { + spec := serviceSpecWith("mcp", "latest", &database.SwarmOpts{ + Image: "custom-override:latest", + ResolvedImage: "previously-resolved:tag", + }) + + img, err := o.resolveServiceImage(spec) + require.NoError(t, err) + assert.Equal(t, "custom-override:latest", img.Tag) + // ResolvedImage must not be touched when Image wins + assert.Equal(t, "previously-resolved:tag", spec.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("ResolvedImage used when Image is empty", func(t *testing.T) { + spec := serviceSpecWith("mcp", "latest", &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:pinned"}) + + img, err := o.resolveServiceImage(spec) + require.NoError(t, err) + assert.Equal(t, "registry.example.com/pgedge:pinned", img.Tag) + // ResolvedImage must not be modified + assert.Equal(t, "registry.example.com/pgedge:pinned", spec.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("lazy backfill: resolves from manifest and writes ResolvedImage", func(t *testing.T) { + spec := serviceSpecWith("mcp", "latest", nil) + + img, err := o.resolveServiceImage(spec) + require.NoError(t, err) + assert.Equal(t, pinnedTag, img.Tag) + require.NotNil(t, spec.ServiceSpec.OrchestratorOpts) + require.NotNil(t, spec.ServiceSpec.OrchestratorOpts.Swarm) + assert.Equal(t, pinnedTag, spec.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("lazy backfill: unknown service type returns error", func(t *testing.T) { + spec := serviceSpecWith("unknown-service", "latest", nil) + + _, err := o.resolveServiceImage(spec) + assert.Error(t, err) + }) + + t.Run("lazy backfill: unknown version returns error", func(t *testing.T) { + spec := serviceSpecWith("mcp", "99.99.99", nil) + + _, err := o.resolveServiceImage(spec) + assert.Error(t, err) + }) +} + +func TestReconcileServiceInstanceSpec(t *testing.T) { + o := newTestServiceOrchestrator() + + manifestImage, err := o.serviceVersions.GetServiceImage("mcp", "latest") + require.NoError(t, err) + pinnedTag := manifestImage.Tag + + t.Run("first creation: old nil, ResolvedImage written from manifest", func(t *testing.T) { + spec := serviceSpecWith("mcp", "latest", nil) + require.NoError(t, o.ReconcileServiceInstanceSpec(nil, spec)) + require.NotNil(t, spec.ServiceSpec.OrchestratorOpts) + require.NotNil(t, spec.ServiceSpec.OrchestratorOpts.Swarm) + assert.Equal(t, pinnedTag, spec.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("same version: old.ResolvedImage carried forward, manifest not re-consulted", func(t *testing.T) { + old := serviceSpecWith("mcp", "latest", &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:pinned-mcp"}) + newSpec := serviceSpecWith("mcp", "latest", nil) + + require.NoError(t, o.ReconcileServiceInstanceSpec(old, newSpec)) + require.NotNil(t, newSpec.ServiceSpec.OrchestratorOpts) + require.NotNil(t, newSpec.ServiceSpec.OrchestratorOpts.Swarm) + assert.Equal(t, "registry.example.com/pgedge:pinned-mcp", newSpec.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("same version, old has no ResolvedImage: manifest lookup runs", func(t *testing.T) { + old := serviceSpecWith("mcp", "latest", nil) + newSpec := serviceSpecWith("mcp", "latest", nil) + + require.NoError(t, o.ReconcileServiceInstanceSpec(old, newSpec)) + require.NotNil(t, newSpec.ServiceSpec.OrchestratorOpts) + assert.Equal(t, pinnedTag, newSpec.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("version changed: stale ResolvedImage cleared, manifest re-consulted", func(t *testing.T) { + postgrestImage, err := o.serviceVersions.GetServiceImage("postgrest", "14.5") + require.NoError(t, err) + + old := serviceSpecWith("postgrest", "latest", &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:postgrest-old"}) + newSpec := serviceSpecWith("postgrest", "14.5", &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:postgrest-old"}) + + require.NoError(t, o.ReconcileServiceInstanceSpec(old, newSpec)) + assert.Equal(t, postgrestImage.Tag, newSpec.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage) + assert.NotEqual(t, "registry.example.com/pgedge:postgrest-old", newSpec.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage) + }) +} diff --git a/server/internal/orchestrator/systemd/orchestrator.go b/server/internal/orchestrator/systemd/orchestrator.go index 3141af6e..836efa41 100644 --- a/server/internal/orchestrator/systemd/orchestrator.go +++ b/server/internal/orchestrator/systemd/orchestrator.go @@ -151,6 +151,10 @@ func (o *Orchestrator) ReconcileInstanceSpec(_, _ *database.InstanceSpec) error return nil } +func (o *Orchestrator) ReconcileServiceInstanceSpec(_, _ *database.ServiceInstanceSpec) error { + return nil +} + func (o *Orchestrator) GenerateInstanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResources, error) { paths, err := o.InstancePaths(spec.PgEdgeVersion.PostgresVersion, spec.InstanceID) if err != nil {