communicate with plugin using json events

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof 2025-04-01 14:43:23 +02:00 committed by Guillaume Lours
parent e67348222f
commit 88f4f265db
3 changed files with 69 additions and 24 deletions

View File

@ -111,7 +111,7 @@ func (c *convergence) apply(ctx context.Context, project *types.Project, options
func (c *convergence) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig, recreate string, inherit bool, timeout *time.Duration) error { //nolint:gocyclo func (c *convergence) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig, recreate string, inherit bool, timeout *time.Duration) error { //nolint:gocyclo
if service.External != nil { if service.External != nil {
return c.service.runPlugin(ctx, project, service, "create") return c.service.runPlugin(ctx, project, service, "up")
} }
expected, err := getScale(service) expected, err := getScale(service)
if err != nil { if err != nil {

View File

@ -83,8 +83,11 @@ func (s *composeService) down(ctx context.Context, projectName string, options a
} }
err = InReverseDependencyOrder(ctx, project, func(c context.Context, service string) error { err = InReverseDependencyOrder(ctx, project, func(c context.Context, service string) error {
serviceContainers := containers.filter(isService(service))
serv := project.Services[service] serv := project.Services[service]
if serv.External != nil {
return s.runPlugin(ctx, project, serv, "down")
}
serviceContainers := containers.filter(isService(service))
err := s.removeContainers(ctx, serviceContainers, &serv, options.Timeout, options.Volumes) err := s.removeContainers(ctx, serviceContainers, &serv, options.Timeout, options.Volumes)
return err return err
}, WithRootNodesAndDown(options.Services)) }, WithRootNodesAndDown(options.Services))

View File

@ -17,10 +17,10 @@
package compose package compose
import ( import (
"bufio"
"context" "context"
"errors" "encoding/json"
"fmt" "fmt"
"io"
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
@ -28,27 +28,43 @@ import (
"github.com/compose-spec/compose-go/v2/types" "github.com/compose-spec/compose-go/v2/types"
"github.com/docker/cli/cli-plugins/manager" "github.com/docker/cli/cli-plugins/manager"
"github.com/docker/cli/cli-plugins/socket" "github.com/docker/cli/cli-plugins/socket"
"github.com/docker/compose/v2/pkg/progress"
"github.com/docker/docker/errdefs"
"github.com/pkg/errors"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/propagation"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
type JsonMessage struct {
Type string `json:"type"`
Message string `json:"message"`
}
const (
ErrorType = "error"
InfoType = "info"
SetEnvType = "setenv"
)
func (s *composeService) runPlugin(ctx context.Context, project *types.Project, service types.ServiceConfig, command string) error { func (s *composeService) runPlugin(ctx context.Context, project *types.Project, service types.ServiceConfig, command string) error {
x := *service.External x := *service.External
if x.Type != "model" {
return fmt.Errorf("unsupported external service type %s", x.Type) // Only support Docker CLI plugins for first iteration. Could support any binary from PATH
}
plugin, err := manager.GetPlugin(x.Type, s.dockerCli, &cobra.Command{}) plugin, err := manager.GetPlugin(x.Type, s.dockerCli, &cobra.Command{})
if err != nil { if err != nil {
if errdefs.IsNotFound(err) {
return fmt.Errorf("unsupported external service type %s", x.Type)
}
return err return err
} }
model, ok := x.Options["model"] args := []string{"compose", "--project-name", project.Name, command}
if !ok { for k, v := range x.Options {
return errors.New("model option is required") args = append(args, fmt.Sprintf("--%s=%s", k, v))
} }
args := []string{"pull", model}
cmd := exec.CommandContext(ctx, plugin.Path, args...) cmd := exec.CommandContext(ctx, plugin.Path, args...)
// Remove DOCKER_CLI_PLUGIN... variable so plugin can detect it run standalone // Remove DOCKER_CLI_PLUGIN... variable so plugin can detect it run standalone
cmd.Env = filter(os.Environ(), manager.ReexecEnvvar) cmd.Env = filter(os.Environ(), manager.ReexecEnvvar)
@ -68,13 +84,11 @@ func (s *composeService) runPlugin(ctx context.Context, project *types.Project,
otel.GetTextMapPropagator().Inject(ctx, &carrier) otel.GetTextMapPropagator().Inject(ctx, &carrier)
cmd.Env = append(cmd.Env, types.Mapping(carrier).Values()...) cmd.Env = append(cmd.Env, types.Mapping(carrier).Values()...)
var variables []string
eg := errgroup.Group{} eg := errgroup.Group{}
out, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return err return err
} }
cmd.Stderr = os.Stderr
err = cmd.Start() err = cmd.Start()
if err != nil { if err != nil {
@ -82,24 +96,52 @@ func (s *composeService) runPlugin(ctx context.Context, project *types.Project,
} }
eg.Go(cmd.Wait) eg.Go(cmd.Wait)
scanner := bufio.NewScanner(out) decoder := json.NewDecoder(stdout)
scanner.Split(bufio.ScanLines) defer stdout.Close()
for scanner.Scan() {
line := scanner.Text() variables := types.Mapping{}
variables = append(variables, line)
pw := progress.ContextWriter(ctx)
pw.Event(progress.CreatingEvent(service.Name))
for {
var msg JsonMessage
err = decoder.Decode(&msg)
if err == io.EOF {
break
}
if err != nil {
return err
}
switch msg.Type {
case ErrorType:
pw.Event(progress.ErrorMessageEvent(service.Name, "error"))
return errors.New(msg.Message)
case InfoType:
pw.Event(progress.ErrorMessageEvent(service.Name, msg.Message))
case SetEnvType:
key, val, found := strings.Cut(msg.Message, "=")
if !found {
return fmt.Errorf("invalid response from plugin: %s", msg.Message)
}
variables[key] = val
default:
return fmt.Errorf("invalid response from plugin: %s", msg.Type)
}
} }
err = eg.Wait() err = eg.Wait()
if err != nil { if err != nil {
return err pw.Event(progress.ErrorMessageEvent(service.Name, err.Error()))
return errors.Wrapf(err, "failed to create external service")
} }
pw.Event(progress.CreatedEvent(service.Name))
variable := fmt.Sprintf("%s_URL", strings.ToUpper(service.Name)) prefix := strings.ToUpper(service.Name) + "_"
// FIXME can we obtain this URL from Docker Destktop API ?
url := "http://host.docker.internal:12434/engines/llama.cpp/v1/"
for name, s := range project.Services { for name, s := range project.Services {
if _, ok := s.DependsOn[service.Name]; ok { if _, ok := s.DependsOn[service.Name]; ok {
s.Environment[variable] = &url for key, val := range variables {
s.Environment[prefix+key] = &val
}
project.Services[name] = s project.Services[name] = s
} }
} }