Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/internal/database/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,8 @@ type Orchestrator interface {
// spec before it is persisted. old is nil when the instance is being created
// for the first time.
ReconcileInstanceSpec(old, new *InstanceSpec) error
// ReconcileServiceInstanceSpec is called during service instance spec
// reconciliation to resolve and pin the container image. old is nil when
// the service instance is being created for the first time.
ReconcileServiceInstanceSpec(old, new *ServiceInstanceSpec) error
}
6 changes: 6 additions & 0 deletions server/internal/database/reconcile_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func (s *Service) ReconcileAllDatabaseVersions(ctx context.Context) error {
return fmt.Errorf("failed to get instance spec for instance '%s': %w", instance.InstanceID, err)
} else if err == nil {
instanceSpec.Spec.PgEdgeVersion = instance.PgEdgeVersion
// Clear the pinned image so the next reconcile re-derives the
// correct image from the manifest for the new version.
if instanceSpec.Spec.OrchestratorOpts != nil &&
instanceSpec.Spec.OrchestratorOpts.Swarm != nil {
instanceSpec.Spec.OrchestratorOpts.Swarm.ResolvedImage = ""
}
ops = append(ops, s.store.InstanceSpec.Update(instanceSpec))
}

Expand Down
4 changes: 4 additions & 0 deletions server/internal/database/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,10 @@ func (s *Service) ReconcileServiceInstanceSpec(ctx context.Context, spec *Servic
return nil, fmt.Errorf("failed to get current spec for service instance '%s': %w", spec.ServiceInstanceID, err)
}

if err := s.orchestrator.ReconcileServiceInstanceSpec(previous, spec); err != nil {
return nil, fmt.Errorf("failed to reconcile service instance spec: %w", err)
}

var allocated []int
rollback := func(cause error) error {
rollbackCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
Expand Down
96 changes: 85 additions & 11 deletions server/internal/orchestrator/swarm/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,29 @@ func (o *Orchestrator) resolveInstanceImages(spec *database.InstanceSpec) (*Imag
}
}

// ReconcileInstanceSpec resolves the container image for the new spec and
// clears a stale ResolvedImage if PgEdgeVersion changed since the last
// reconciliation.
// ReconcileInstanceSpec resolves the container image for the new spec.
// When the version is unchanged it carries the stored ResolvedImage forward so
// resolveInstanceImages takes the fast path (case 2) and never consults the
// manifest. When the version changed it clears the stale pin so
// resolveInstanceImages re-derives the correct image.
func (o *Orchestrator) ReconcileInstanceSpec(old, new *database.InstanceSpec) error {
// If the Postgres version changed, the previously resolved image no longer
// matches — clear it so resolveInstanceImages fetches the correct one from
// the manifest.
if old != nil && old.PgEdgeVersion != nil && new.PgEdgeVersion != nil {
if !old.PgEdgeVersion.Equals(new.PgEdgeVersion) {
if old.PgEdgeVersion.Equals(new.PgEdgeVersion) {
// Version unchanged — carry forward the pinned image so we don't
// re-derive it from the manifest on every reconcile.
if old.OrchestratorOpts != nil && old.OrchestratorOpts.Swarm != nil &&
old.OrchestratorOpts.Swarm.ResolvedImage != "" {
if new.OrchestratorOpts == nil {
new.OrchestratorOpts = &database.OrchestratorOpts{}
}
if new.OrchestratorOpts.Swarm == nil {
new.OrchestratorOpts.Swarm = &database.SwarmOpts{}
}
new.OrchestratorOpts.Swarm.ResolvedImage = old.OrchestratorOpts.Swarm.ResolvedImage
}
} else {
// Version changed — clear the stale pin so resolveInstanceImages
// fetches the correct image for the new version from the manifest.
if new.OrchestratorOpts != nil && new.OrchestratorOpts.Swarm != nil {
new.OrchestratorOpts.Swarm.ResolvedImage = ""
}
Expand All @@ -222,6 +236,68 @@ func (o *Orchestrator) ReconcileInstanceSpec(old, new *database.InstanceSpec) er
return err
}

// resolveServiceImage returns a ServiceImage for the service instance using
// the same precedence as resolveInstanceImages:
// 1. ServiceSpec.OrchestratorOpts.Swarm.Image (user override) — manifest skipped
// 2. ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage (CP-managed, already stored)
// 3. Manifest lookup — result written to ResolvedImage (lazy backfill / first creation)
func (o *Orchestrator) resolveServiceImage(spec *database.ServiceInstanceSpec) (*ServiceImage, error) {
var swarmOpts *database.SwarmOpts
if spec.ServiceSpec != nil && spec.ServiceSpec.OrchestratorOpts != nil {
swarmOpts = spec.ServiceSpec.OrchestratorOpts.Swarm
}

switch {
case swarmOpts != nil && swarmOpts.Image != "":
return &ServiceImage{Tag: swarmOpts.Image}, nil
case swarmOpts != nil && swarmOpts.ResolvedImage != "":
return &ServiceImage{Tag: swarmOpts.ResolvedImage}, nil
default:
manifested, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version)
if err != nil {
return nil, fmt.Errorf("failed to get service image: %w", err)
}
if spec.ServiceSpec.OrchestratorOpts == nil {
spec.ServiceSpec.OrchestratorOpts = &database.OrchestratorOpts{}
}
if spec.ServiceSpec.OrchestratorOpts.Swarm == nil {
spec.ServiceSpec.OrchestratorOpts.Swarm = &database.SwarmOpts{}
}
spec.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage = manifested.Tag
return manifested, nil
}
}

// ReconcileServiceInstanceSpec resolves and pins the container image for the
// service instance. When the service version is unchanged it carries the stored
// ResolvedImage forward so resolveServiceImage takes the fast path and never
// consults the manifest. When the version changed it clears the stale pin.
// old is nil when the service instance is being created for the first time.
func (o *Orchestrator) ReconcileServiceInstanceSpec(old, new *database.ServiceInstanceSpec) error {
if old != nil && old.ServiceSpec != nil && new.ServiceSpec != nil &&
old.ServiceSpec.Version == new.ServiceSpec.Version {
// Version unchanged — carry forward the pinned image.
if old.ServiceSpec.OrchestratorOpts != nil &&
old.ServiceSpec.OrchestratorOpts.Swarm != nil &&
old.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage != "" {
if new.ServiceSpec.OrchestratorOpts == nil {
new.ServiceSpec.OrchestratorOpts = &database.OrchestratorOpts{}
}
if new.ServiceSpec.OrchestratorOpts.Swarm == nil {
new.ServiceSpec.OrchestratorOpts.Swarm = &database.SwarmOpts{}
}
new.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage = old.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage
}
} else if new.ServiceSpec != nil && new.ServiceSpec.OrchestratorOpts != nil &&
new.ServiceSpec.OrchestratorOpts.Swarm != nil {
// Version changed — clear the stale pin so resolveServiceImage
// fetches the correct image for the new version from the manifest.
new.ServiceSpec.OrchestratorOpts.Swarm.ResolvedImage = ""
}
_, err := o.resolveServiceImage(new)
return err
}

func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResource, []resource.Resource, []resource.Resource, error) {
images, err := o.resolveInstanceImages(spec)
if err != nil {
Expand Down Expand Up @@ -476,8 +552,7 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn
}

func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
// Get service image based on service type and version
serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version)
serviceImage, err := o.resolveServiceImage(spec)
if err != nil {
return nil, fmt.Errorf("failed to get service image: %w", err)
}
Expand Down Expand Up @@ -685,8 +760,7 @@ func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInsta
// generateRAGInstanceResources returns the resources needed for one RAG service
// instance.
func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
// Get service image.
serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version)
serviceImage, err := o.resolveServiceImage(spec)
if err != nil {
return nil, fmt.Errorf("failed to get service image: %w", err)
}
Expand Down
106 changes: 106 additions & 0 deletions server/internal/orchestrator/swarm/reconcile_instance_spec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package swarm

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/pgEdge/control-plane/server/internal/config"
"github.com/pgEdge/control-plane/server/internal/database"
"github.com/pgEdge/control-plane/server/internal/ds"
)

func TestReconcileInstanceSpec(t *testing.T) {
o := &Orchestrator{
versions: NewVersions(config.Config{
DockerSwarm: config.DockerSwarm{
ImageRepositoryHost: "registry.example.com/pgedge",
},
}),
}

knownVersion := ds.MustParsePgEdgeVersion("17.9", "5")
newKnownVersion := ds.MustParsePgEdgeVersion("17.10", "5")

manifestImage, err := o.versions.GetImages(knownVersion)
require.NoError(t, err)
pinnedImage := manifestImage.PgEdgeImage

t.Run("first creation: old nil, ResolvedImage written from manifest", func(t *testing.T) {
spec := &database.InstanceSpec{PgEdgeVersion: knownVersion}
require.NoError(t, o.ReconcileInstanceSpec(nil, spec))
require.NotNil(t, spec.OrchestratorOpts)
require.NotNil(t, spec.OrchestratorOpts.Swarm)
assert.Equal(t, pinnedImage, spec.OrchestratorOpts.Swarm.ResolvedImage)
})

t.Run("same version: old.ResolvedImage carried forward, manifest not re-consulted", func(t *testing.T) {
old := &database.InstanceSpec{
PgEdgeVersion: knownVersion,
OrchestratorOpts: &database.OrchestratorOpts{
Swarm: &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:pinned-tag"},
},
}
newSpec := &database.InstanceSpec{PgEdgeVersion: knownVersion}

require.NoError(t, o.ReconcileInstanceSpec(old, newSpec))
require.NotNil(t, newSpec.OrchestratorOpts)
require.NotNil(t, newSpec.OrchestratorOpts.Swarm)
// Must use the stored pin, not re-derive from manifest.
assert.Equal(t, "registry.example.com/pgedge:pinned-tag", newSpec.OrchestratorOpts.Swarm.ResolvedImage)
})

t.Run("same version, old has no ResolvedImage: manifest lookup runs", func(t *testing.T) {
old := &database.InstanceSpec{PgEdgeVersion: knownVersion}
newSpec := &database.InstanceSpec{PgEdgeVersion: knownVersion}

require.NoError(t, o.ReconcileInstanceSpec(old, newSpec))
require.NotNil(t, newSpec.OrchestratorOpts)
assert.Equal(t, pinnedImage, newSpec.OrchestratorOpts.Swarm.ResolvedImage)
})

t.Run("version changed: old ResolvedImage cleared, manifest re-consulted", func(t *testing.T) {
newVersionImage, err := o.versions.GetImages(newKnownVersion)
require.NoError(t, err)

old := &database.InstanceSpec{
PgEdgeVersion: knownVersion,
OrchestratorOpts: &database.OrchestratorOpts{
Swarm: &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:old-tag"},
},
}
newSpec := &database.InstanceSpec{
PgEdgeVersion: newKnownVersion,
OrchestratorOpts: &database.OrchestratorOpts{
Swarm: &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:old-tag"},
},
}

require.NoError(t, o.ReconcileInstanceSpec(old, newSpec))
// Must use the new version's manifest image, not the stale pin.
assert.Equal(t, newVersionImage.PgEdgeImage, newSpec.OrchestratorOpts.Swarm.ResolvedImage)
assert.NotEqual(t, "registry.example.com/pgedge:old-tag", newSpec.OrchestratorOpts.Swarm.ResolvedImage)
})

t.Run("user Image override: preserved, ResolvedImage from old still copied but case 1 wins", func(t *testing.T) {
old := &database.InstanceSpec{
PgEdgeVersion: knownVersion,
OrchestratorOpts: &database.OrchestratorOpts{
Swarm: &database.SwarmOpts{ResolvedImage: "registry.example.com/pgedge:pinned-tag"},
},
}
newSpec := &database.InstanceSpec{
PgEdgeVersion: knownVersion,
OrchestratorOpts: &database.OrchestratorOpts{
Swarm: &database.SwarmOpts{Image: "my-custom/image:dev"},
},
}

require.NoError(t, o.ReconcileInstanceSpec(old, newSpec))
// Image override must win; resolveInstanceImages case 1 returns it directly.
assert.Equal(t, "my-custom/image:dev", newSpec.OrchestratorOpts.Swarm.Image)
// ResolvedImage is copied but resolveInstanceImages does not overwrite it in case 1.
assert.Equal(t, "registry.example.com/pgedge:pinned-tag", newSpec.OrchestratorOpts.Swarm.ResolvedImage)
})
}
Loading