From 88f4f265db25b2524e5c579091c94d9cf1d17371 Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Tue, 1 Apr 2025 14:43:23 +0200 Subject: [PATCH] communicate with plugin using json events Signed-off-by: Nicolas De Loof --- pkg/compose/convergence.go | 2 +- pkg/compose/down.go | 5 ++- pkg/compose/plugins.go | 86 ++++++++++++++++++++++++++++---------- 3 files changed, 69 insertions(+), 24 deletions(-) diff --git a/pkg/compose/convergence.go b/pkg/compose/convergence.go index 063501fc9..ee14e3cfa 100644 --- a/pkg/compose/convergence.go +++ b/pkg/compose/convergence.go @@ -111,7 +111,7 @@ func (c *convergence) apply(ctx context.Context, project *types.Project, options func (c *convergence) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig, recreate string, inherit bool, timeout *time.Duration) error { //nolint:gocyclo if service.External != nil { - return c.service.runPlugin(ctx, project, service, "create") + return c.service.runPlugin(ctx, project, service, "up") } expected, err := getScale(service) if err != nil { diff --git a/pkg/compose/down.go b/pkg/compose/down.go index 3a088251b..76945dd51 100644 --- a/pkg/compose/down.go +++ b/pkg/compose/down.go @@ -83,8 +83,11 @@ func (s *composeService) down(ctx context.Context, projectName string, options a } err = InReverseDependencyOrder(ctx, project, func(c context.Context, service string) error { - serviceContainers := containers.filter(isService(service)) serv := project.Services[service] + if serv.External != nil { + return s.runPlugin(ctx, project, serv, "down") + } + serviceContainers := containers.filter(isService(service)) err := s.removeContainers(ctx, serviceContainers, &serv, options.Timeout, options.Volumes) return err }, WithRootNodesAndDown(options.Services)) diff --git a/pkg/compose/plugins.go b/pkg/compose/plugins.go index b0dfb50b5..92958af57 100644 --- a/pkg/compose/plugins.go +++ b/pkg/compose/plugins.go @@ -17,10 +17,10 @@ package compose import ( - "bufio" "context" - "errors" + "encoding/json" "fmt" + "io" "os" "os/exec" "strings" @@ -28,27 +28,43 @@ import ( "github.com/compose-spec/compose-go/v2/types" "github.com/docker/cli/cli-plugins/manager" "github.com/docker/cli/cli-plugins/socket" + "github.com/docker/compose/v2/pkg/progress" + "github.com/docker/docker/errdefs" + "github.com/pkg/errors" "github.com/spf13/cobra" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" "golang.org/x/sync/errgroup" ) +type JsonMessage struct { + Type string `json:"type"` + Message string `json:"message"` +} + +const ( + ErrorType = "error" + InfoType = "info" + SetEnvType = "setenv" +) + func (s *composeService) runPlugin(ctx context.Context, project *types.Project, service types.ServiceConfig, command string) error { x := *service.External - if x.Type != "model" { - return fmt.Errorf("unsupported external service type %s", x.Type) - } + + // Only support Docker CLI plugins for first iteration. Could support any binary from PATH plugin, err := manager.GetPlugin(x.Type, s.dockerCli, &cobra.Command{}) if err != nil { + if errdefs.IsNotFound(err) { + return fmt.Errorf("unsupported external service type %s", x.Type) + } return err } - model, ok := x.Options["model"] - if !ok { - return errors.New("model option is required") + args := []string{"compose", "--project-name", project.Name, command} + for k, v := range x.Options { + args = append(args, fmt.Sprintf("--%s=%s", k, v)) } - args := []string{"pull", model} + cmd := exec.CommandContext(ctx, plugin.Path, args...) // Remove DOCKER_CLI_PLUGIN... variable so plugin can detect it run standalone cmd.Env = filter(os.Environ(), manager.ReexecEnvvar) @@ -68,13 +84,11 @@ func (s *composeService) runPlugin(ctx context.Context, project *types.Project, otel.GetTextMapPropagator().Inject(ctx, &carrier) cmd.Env = append(cmd.Env, types.Mapping(carrier).Values()...) - var variables []string eg := errgroup.Group{} - out, err := cmd.StdoutPipe() + stdout, err := cmd.StdoutPipe() if err != nil { return err } - cmd.Stderr = os.Stderr err = cmd.Start() if err != nil { @@ -82,24 +96,52 @@ func (s *composeService) runPlugin(ctx context.Context, project *types.Project, } eg.Go(cmd.Wait) - scanner := bufio.NewScanner(out) - scanner.Split(bufio.ScanLines) - for scanner.Scan() { - line := scanner.Text() - variables = append(variables, line) + decoder := json.NewDecoder(stdout) + defer stdout.Close() + + variables := types.Mapping{} + + pw := progress.ContextWriter(ctx) + pw.Event(progress.CreatingEvent(service.Name)) + for { + var msg JsonMessage + err = decoder.Decode(&msg) + if err == io.EOF { + break + } + if err != nil { + return err + } + switch msg.Type { + case ErrorType: + pw.Event(progress.ErrorMessageEvent(service.Name, "error")) + return errors.New(msg.Message) + case InfoType: + pw.Event(progress.ErrorMessageEvent(service.Name, msg.Message)) + case SetEnvType: + key, val, found := strings.Cut(msg.Message, "=") + if !found { + return fmt.Errorf("invalid response from plugin: %s", msg.Message) + } + variables[key] = val + default: + return fmt.Errorf("invalid response from plugin: %s", msg.Type) + } } err = eg.Wait() if err != nil { - return err + pw.Event(progress.ErrorMessageEvent(service.Name, err.Error())) + return errors.Wrapf(err, "failed to create external service") } + pw.Event(progress.CreatedEvent(service.Name)) - variable := fmt.Sprintf("%s_URL", strings.ToUpper(service.Name)) - // FIXME can we obtain this URL from Docker Destktop API ? - url := "http://host.docker.internal:12434/engines/llama.cpp/v1/" + prefix := strings.ToUpper(service.Name) + "_" for name, s := range project.Services { if _, ok := s.DependsOn[service.Name]; ok { - s.Environment[variable] = &url + for key, val := range variables { + s.Environment[prefix+key] = &val + } project.Services[name] = s } }