resolve service reference into container based on observed state
Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
parent
f6e31dbc6a
commit
8af49ff369
@ -102,71 +102,13 @@ func (c *convergence) apply(ctx context.Context, project *types.Project, options
|
|||||||
if utils.StringContains(options.Services, name) {
|
if utils.StringContains(options.Services, name) {
|
||||||
strategy = options.Recreate
|
strategy = options.Recreate
|
||||||
}
|
}
|
||||||
err = c.ensureService(ctx, project, service, strategy, options.Inherit, options.Timeout)
|
return c.ensureService(ctx, project, service, strategy, options.Inherit, options.Timeout)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
c.updateProject(project, name)
|
|
||||||
return nil
|
|
||||||
})(ctx)
|
})(ctx)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
|
|
||||||
// updateProject updates project after service converged, so dependent services relying on `service:xx` can refer to actual containers.
|
|
||||||
func (c *convergence) updateProject(project *types.Project, serviceName string) {
|
|
||||||
// operation is protected by a Mutex so that we can safely update project.Services while running concurrent convergence on services
|
|
||||||
mu.Lock()
|
|
||||||
defer mu.Unlock()
|
|
||||||
|
|
||||||
cnts := c.getObservedState(serviceName)
|
|
||||||
for i, s := range project.Services {
|
|
||||||
updateServices(&s, cnts)
|
|
||||||
project.Services[i] = s
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func updateServices(service *types.ServiceConfig, cnts Containers) {
|
|
||||||
if len(cnts) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, str := range []*string{&service.NetworkMode, &service.Ipc, &service.Pid} {
|
|
||||||
if d := getDependentServiceFromMode(*str); d != "" {
|
|
||||||
if serviceContainers := cnts.filter(isService(d)); len(serviceContainers) > 0 {
|
|
||||||
*str = types.NetworkModeContainerPrefix + serviceContainers[0].ID
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
var links []string
|
|
||||||
for _, serviceLink := range service.Links {
|
|
||||||
parts := strings.Split(serviceLink, ":")
|
|
||||||
serviceName := serviceLink
|
|
||||||
serviceAlias := ""
|
|
||||||
if len(parts) == 2 {
|
|
||||||
serviceName = parts[0]
|
|
||||||
serviceAlias = parts[1]
|
|
||||||
}
|
|
||||||
if serviceName != service.Name {
|
|
||||||
links = append(links, serviceLink)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, container := range cnts {
|
|
||||||
name := getCanonicalContainerName(container)
|
|
||||||
if serviceAlias != "" {
|
|
||||||
links = append(links,
|
|
||||||
fmt.Sprintf("%s:%s", name, serviceAlias))
|
|
||||||
}
|
|
||||||
links = append(links,
|
|
||||||
fmt.Sprintf("%s:%s", name, name),
|
|
||||||
fmt.Sprintf("%s:%s", name, getContainerNameWithoutProject(container)))
|
|
||||||
}
|
|
||||||
service.Links = links
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *convergence) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig, recreate string, inherit bool, timeout *time.Duration) error {
|
func (c *convergence) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig, recreate string, inherit bool, timeout *time.Duration) error {
|
||||||
expected, err := getScale(service)
|
expected, err := getScale(service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -178,7 +120,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
|
|||||||
|
|
||||||
eg, _ := errgroup.WithContext(ctx)
|
eg, _ := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
err = c.resolveVolumeFrom(&service)
|
err = c.resolveServiceReferences(&service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -263,6 +205,20 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resolveServiceReferences replaces reference to another service with reference to an actual container
|
||||||
|
func (c *convergence) resolveServiceReferences(service *types.ServiceConfig) error {
|
||||||
|
err := c.resolveVolumeFrom(service)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.resolveSharedNamespaces(service)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *convergence) resolveVolumeFrom(service *types.ServiceConfig) error {
|
func (c *convergence) resolveVolumeFrom(service *types.ServiceConfig) error {
|
||||||
for i, vol := range service.VolumesFrom {
|
for i, vol := range service.VolumesFrom {
|
||||||
spec := strings.Split(vol, ":")
|
spec := strings.Split(vol, ":")
|
||||||
@ -283,6 +239,37 @@ func (c *convergence) resolveVolumeFrom(service *types.ServiceConfig) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *convergence) resolveSharedNamespaces(service *types.ServiceConfig) error {
|
||||||
|
str := service.NetworkMode
|
||||||
|
if name := getDependentServiceFromMode(str); name != "" {
|
||||||
|
dependencies := c.getObservedState(name)
|
||||||
|
if len(dependencies) == 0 {
|
||||||
|
return fmt.Errorf("cannot share network namespace with service %s: container missing", name)
|
||||||
|
}
|
||||||
|
service.NetworkMode = types.ContainerPrefix + dependencies.sorted()[0].ID
|
||||||
|
}
|
||||||
|
|
||||||
|
str = service.Ipc
|
||||||
|
if name := getDependentServiceFromMode(str); name != "" {
|
||||||
|
dependencies := c.getObservedState(name)
|
||||||
|
if len(dependencies) == 0 {
|
||||||
|
return fmt.Errorf("cannot share IPC namespace with service %s: container missing", name)
|
||||||
|
}
|
||||||
|
service.Ipc = types.ContainerPrefix + dependencies.sorted()[0].ID
|
||||||
|
}
|
||||||
|
|
||||||
|
str = service.Pid
|
||||||
|
if name := getDependentServiceFromMode(str); name != "" {
|
||||||
|
dependencies := c.getObservedState(name)
|
||||||
|
if len(dependencies) == 0 {
|
||||||
|
return fmt.Errorf("cannot share PID namespace with service %s: container missing", name)
|
||||||
|
}
|
||||||
|
service.Pid = types.ContainerPrefix + dependencies.sorted()[0].ID
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func mustRecreate(expected types.ServiceConfig, actual moby.Container, policy string) (bool, error) {
|
func mustRecreate(expected types.ServiceConfig, actual moby.Container, policy string) (bool, error) {
|
||||||
if policy == api.RecreateNever {
|
if policy == api.RecreateNever {
|
||||||
return false, nil
|
return false, nil
|
||||||
|
@ -90,7 +90,6 @@ func (s *composeService) prepareRun(ctx context.Context, project *types.Project,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
updateServices(&service, observedState)
|
|
||||||
|
|
||||||
if !opts.NoDeps {
|
if !opts.NoDeps {
|
||||||
if err := s.waitDependencies(ctx, project, service.DependsOn, observedState); err != nil {
|
if err := s.waitDependencies(ctx, project, service.DependsOn, observedState); err != nil {
|
||||||
@ -104,6 +103,11 @@ func (s *composeService) prepareRun(ctx context.Context, project *types.Project,
|
|||||||
Labels: mergeLabels(service.Labels, service.CustomLabels),
|
Labels: mergeLabels(service.Labels, service.CustomLabels),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = newConvergence(project.ServiceNames(), observedState, s).resolveServiceReferences(&service)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
created, err := s.createContainer(ctx, project, service, service.ContainerName, 1, createOpts)
|
created, err := s.createContainer(ctx, project, service, service.ContainerName, 1, createOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
7
pkg/e2e/fixtures/no-deps/network-mode.yaml
Normal file
7
pkg/e2e/fixtures/no-deps/network-mode.yaml
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
services:
|
||||||
|
app:
|
||||||
|
image: nginx:alpine
|
||||||
|
network_mode: service:db
|
||||||
|
|
||||||
|
db:
|
||||||
|
image: nginx:alpine
|
@ -42,3 +42,20 @@ func TestNoDepsVolumeFrom(t *testing.T) {
|
|||||||
res := c.RunDockerComposeCmdNoCheck(t, "-f", "fixtures/no-deps/volume-from.yaml", "--project-name", projectName, "up", "--no-deps", "-d", "app")
|
res := c.RunDockerComposeCmdNoCheck(t, "-f", "fixtures/no-deps/volume-from.yaml", "--project-name", projectName, "up", "--no-deps", "-d", "app")
|
||||||
res.Assert(t, icmd.Expected{ExitCode: 1, Err: "cannot share volume with service db: container missing"})
|
res.Assert(t, icmd.Expected{ExitCode: 1, Err: "cannot share volume with service db: container missing"})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNoDepsNetworkMode(t *testing.T) {
|
||||||
|
c := NewParallelCLI(t)
|
||||||
|
const projectName = "e2e-no-deps-network-mode"
|
||||||
|
t.Cleanup(func() {
|
||||||
|
c.RunDockerComposeCmd(t, "--project-name", projectName, "down")
|
||||||
|
})
|
||||||
|
|
||||||
|
c.RunDockerComposeCmd(t, "-f", "fixtures/no-deps/network-mode.yaml", "--project-name", projectName, "up", "-d")
|
||||||
|
|
||||||
|
c.RunDockerComposeCmd(t, "-f", "fixtures/no-deps/network-mode.yaml", "--project-name", projectName, "up", "--no-deps", "-d", "app")
|
||||||
|
|
||||||
|
c.RunDockerCmd(t, "rm", "-f", fmt.Sprintf("%s-db-1", projectName))
|
||||||
|
|
||||||
|
res := c.RunDockerComposeCmdNoCheck(t, "-f", "fixtures/no-deps/network-mode.yaml", "--project-name", projectName, "up", "--no-deps", "-d", "app")
|
||||||
|
res.Assert(t, icmd.Expected{ExitCode: 1, Err: "cannot share network namespace with service db: container missing"})
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user