diff --git a/internal/sync/shared.go b/internal/sync/shared.go index 6e2b117cb..4fd9df377 100644 --- a/internal/sync/shared.go +++ b/internal/sync/shared.go @@ -16,8 +16,6 @@ package sync import ( "context" - - "github.com/compose-spec/compose-go/v2/types" ) // PathMapping contains the Compose service and modified host system path. @@ -38,5 +36,5 @@ type PathMapping struct { } type Syncer interface { - Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error + Sync(ctx context.Context, service string, paths []*PathMapping) error } diff --git a/internal/sync/tar.go b/internal/sync/tar.go index 050f6ab95..d9abd2f88 100644 --- a/internal/sync/tar.go +++ b/internal/sync/tar.go @@ -32,7 +32,6 @@ import ( "github.com/hashicorp/go-multierror" - "github.com/compose-spec/compose-go/v2/types" moby "github.com/docker/docker/api/types" "github.com/docker/docker/pkg/archive" ) @@ -65,8 +64,8 @@ func NewTar(projectName string, client LowLevelClient) *Tar { } } -func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error { - containers, err := t.client.ContainersForService(ctx, t.projectName, service.Name) +func (t *Tar) Sync(ctx context.Context, service string, paths []*PathMapping) error { + containers, err := t.client.ContainersForService(ctx, t.projectName, service) if err != nil { return err } @@ -77,7 +76,7 @@ func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []Pat if _, err := os.Stat(p.HostPath); err != nil && errors.Is(err, fs.ErrNotExist) { pathsToDelete = append(pathsToDelete, p.ContainerPath) } else { - pathsToCopy = append(pathsToCopy, p) + pathsToCopy = append(pathsToCopy, *p) } } diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index cc9fba219..b4edff9f4 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -23,11 +23,13 @@ import ( "os" "path" "path/filepath" + "slices" "strconv" "strings" "time" "github.com/compose-spec/compose-go/v2/types" + "github.com/compose-spec/compose-go/v2/utils" ccli "github.com/docker/cli/cli/command/container" pathutil "github.com/docker/compose/v2/internal/paths" "github.com/docker/compose/v2/internal/sync" @@ -37,20 +39,11 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/image" - "github.com/jonboulle/clockwork" "github.com/mitchellh/mapstructure" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" ) -const quietPeriod = 500 * time.Millisecond - -// fileEvent contains the Compose service and modified host system path. -type fileEvent struct { - sync.PathMapping - Trigger types.Trigger -} - // getSyncImplementation returns an appropriate sync implementation for the // project. // @@ -86,6 +79,44 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv return s.watch(ctx, nil, project, services, options) } +type watchRule struct { + types.Trigger + ignore watch.PathMatcher + service string +} + +func (r watchRule) Matches(event watch.FileEvent) *sync.PathMapping { + hostPath := string(event) + if !pathutil.IsChild(r.Path, hostPath) { + return nil + } + isIgnored, err := r.ignore.Matches(hostPath) + if err != nil { + logrus.Warnf("error ignore matching %q: %v", hostPath, err) + return nil + } + + if isIgnored { + logrus.Debugf("%s is matching ignore pattern", hostPath) + return nil + } + + var containerPath string + if r.Target != "" { + rel, err := filepath.Rel(r.Path, hostPath) + if err != nil { + logrus.Warnf("error making %s relative to %s: %v", hostPath, r.Path, err) + return nil + } + // always use Unix-style paths for inside the container + containerPath = path.Join(r.Target, filepath.ToSlash(rel)) + } + return &sync.PathMapping{ + HostPath: hostPath, + ContainerPath: containerPath, + } +} + func (s *composeService) watch(ctx context.Context, syncChannel chan bool, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo var err error if project, err = project.WithSelectedServices(services); err != nil { @@ -96,10 +127,13 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje return err } eg, ctx := errgroup.WithContext(ctx) - watching := false options.LogTo.Register(api.WatchLogger) - for i := range project.Services { - service := project.Services[i] + + var ( + rules []watchRule + paths []string + ) + for serviceName, service := range project.Services { config, err := loadDevelopmentConfig(service, project) if err != nil { return err @@ -123,29 +157,10 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje } // set the service to always be built - watch triggers `Up()` when it receives a rebuild event service.PullPolicy = types.PullPolicyBuild - project.Services[i] = service + project.Services[serviceName] = service } } - dockerIgnores, err := watch.LoadDockerIgnore(service.Build) - if err != nil { - return err - } - - // add a hardcoded set of ignores on top of what came from .dockerignore - // some of this should likely be configurable (e.g. there could be cases - // where you want `.git` to be synced) but this is suitable for now - dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"}) - if err != nil { - return err - } - ignore := watch.NewCompositeMatcher( - dockerIgnores, - watch.EphemeralPathMatcher(), - dotGitIgnore, - ) - - var paths, pathLogs []string for _, trigger := range config.Watch { if isSync(trigger) && checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) { logrus.Warnf("path '%s' also declared by a bind mount volume, this path won't be monitored!\n", trigger.Path) @@ -155,42 +170,45 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje success, err := trigger.Extensions.Get("x-initialSync", &initialSync) if err == nil && success && initialSync && isSync(trigger) { // Need to check initial files are in container that are meant to be synched from watch action - err := s.initialSync(ctx, project, service, trigger, ignore, syncer) + err := s.initialSync(ctx, project, service, trigger, syncer) if err != nil { return err } } } paths = append(paths, trigger.Path) - pathLogs = append(pathLogs, fmt.Sprintf("Action %s for path %q", trigger.Action, trigger.Path)) } - watcher, err := watch.NewWatcher(paths, ignore) + serviceWatchRules, err := getWatchRules(config, service) if err != nil { return err } - - logrus.Debugf("Watch configuration for service %q:%s\n", - service.Name, - strings.Join(append([]string{""}, pathLogs...), "\n - "), - ) - err = watcher.Start() - if err != nil { - return err - } - watching = true - eg.Go(func() error { - defer func() { - if err := watcher.Close(); err != nil { - logrus.Debugf("Error closing watcher for service %s: %v", service.Name, err) - } - }() - return s.watchEvents(ctx, project, service.Name, options, watcher, syncer, config.Watch) - }) + rules = append(rules, serviceWatchRules...) } - if !watching { + + if len(paths) == 0 { return fmt.Errorf("none of the selected services is configured for watch, consider setting an 'develop' section") } + + watcher, err := watch.NewWatcher(paths) + if err != nil { + return err + } + + err = watcher.Start() + if err != nil { + return err + } + + defer func() { + if err := watcher.Close(); err != nil { + logrus.Debugf("Error closing watcher: %v", err) + } + }() + + eg.Go(func() error { + return s.watchEvents(ctx, project, options, watcher, syncer, rules) + }) options.LogTo.Log(api.WatchLogger, "Watch enabled") for { @@ -204,103 +222,73 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje } } +func getWatchRules(config *types.DevelopConfig, service types.ServiceConfig) ([]watchRule, error) { + var rules []watchRule + + dockerIgnores, err := watch.LoadDockerIgnore(service.Build) + if err != nil { + return nil, err + } + + // add a hardcoded set of ignores on top of what came from .dockerignore + // some of this should likely be configurable (e.g. there could be cases + // where you want `.git` to be synced) but this is suitable for now + dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"}) + if err != nil { + return nil, err + } + + for _, trigger := range config.Watch { + ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore) + if err != nil { + return nil, err + } + + rules = append(rules, watchRule{ + Trigger: trigger, + ignore: watch.NewCompositeMatcher( + dockerIgnores, + watch.EphemeralPathMatcher(), + dotGitIgnore, + ignore, + ), + service: service.Name, + }) + } + return rules, nil +} + func isSync(trigger types.Trigger) bool { return trigger.Action == types.WatchActionSync || trigger.Action == types.WatchActionSyncRestart } -func (s *composeService) watchEvents(ctx context.Context, project *types.Project, name string, options api.WatchOptions, watcher watch.Notify, syncer sync.Syncer, triggers []types.Trigger) error { +func (s *composeService) watchEvents(ctx context.Context, project *types.Project, options api.WatchOptions, watcher watch.Notify, syncer sync.Syncer, rules []watchRule) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - ignores := make([]watch.PathMatcher, len(triggers)) - for i, trigger := range triggers { - ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore) - if err != nil { - return err - } - ignores[i] = ignore - } - - events := make(chan fileEvent) - batchEvents := batchDebounceEvents(ctx, s.clock, quietPeriod, events) - quit := make(chan bool) - go func() { - for { - select { - case <-ctx.Done(): - quit <- true - return - case batch := <-batchEvents: - start := time.Now() - logrus.Debugf("batch start: service[%s] count[%d]", name, len(batch)) - if err := s.handleWatchBatch(ctx, project, name, options, batch, syncer); err != nil { - logrus.Warnf("Error handling changed files for service %s: %v", name, err) - } - logrus.Debugf("batch complete: service[%s] duration[%s] count[%d]", - name, time.Since(start), len(batch)) - } - } - }() + // debounce and group filesystem events so that we capture IDE saving many files as one "batch" event + batchEvents := watch.BatchDebounceEvents(ctx, s.clock, watcher.Events()) for { select { - case <-quit: + case <-ctx.Done(): options.LogTo.Log(api.WatchLogger, "Watch disabled") return nil case err := <-watcher.Errors(): options.LogTo.Err(api.WatchLogger, "Watch disabled with errors") return err - case event := <-watcher.Events(): - hostPath := event.Path() - for i, trigger := range triggers { - logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path) - if fileEvent := maybeFileEvent(trigger, hostPath, ignores[i]); fileEvent != nil { - events <- *fileEvent - } + case batch := <-batchEvents: + start := time.Now() + logrus.Debugf("batch start: count[%d]", len(batch)) + err := s.handleWatchBatch(ctx, project, options, batch, rules, syncer) + if err != nil { + logrus.Warnf("Error handling changed files: %v", err) } + logrus.Debugf("batch complete: duration[%s] count[%d]", time.Since(start), len(batch)) } } } -// maybeFileEvent returns a file event object if hostPath is valid for the provided trigger and ignore -// rules. -// -// Any errors are logged as warnings and nil (no file event) is returned. -func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMatcher) *fileEvent { - if !pathutil.IsChild(trigger.Path, hostPath) { - return nil - } - isIgnored, err := ignore.Matches(hostPath) - if err != nil { - logrus.Warnf("error ignore matching %q: %v", hostPath, err) - return nil - } - - if isIgnored { - logrus.Debugf("%s is matching ignore pattern", hostPath) - return nil - } - - var containerPath string - if trigger.Target != "" { - rel, err := filepath.Rel(trigger.Path, hostPath) - if err != nil { - logrus.Warnf("error making %s relative to %s: %v", hostPath, trigger.Path, err) - return nil - } - // always use Unix-style paths for inside the container - containerPath = path.Join(trigger.Target, filepath.ToSlash(rel)) - } - - return &fileEvent{ - Trigger: trigger, - PathMapping: sync.PathMapping{ - HostPath: hostPath, - ContainerPath: containerPath, - }, - } -} - func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*types.DevelopConfig, error) { var config types.DevelopConfig y, ok := service.Extensions["x-develop"] @@ -342,52 +330,6 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) return &config, nil } -// batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned -// channel. -// -// The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel. -func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent) <-chan []fileEvent { - out := make(chan []fileEvent) - go func() { - defer close(out) - seen := make(map[string]fileEvent) - flushEvents := func() { - if len(seen) == 0 { - return - } - events := make([]fileEvent, 0, len(seen)) - for _, e := range seen { - events = append(events, e) - } - out <- events - seen = make(map[string]fileEvent) - } - - t := clock.NewTicker(delay) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.Chan(): - flushEvents() - case e, ok := <-input: - if !ok { - // input channel was closed - flushEvents() - return - } - if _, ok := seen[e.HostPath]; !ok { - // already know updated path, first rule in watch configuration wins - seen[e.HostPath] = e - } - t.Reset(delay) - } - } - }() - return out -} - func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool { for _, volume := range volumes { if volume.Bind != nil && strings.HasPrefix(watchPath, volume.Source) { @@ -475,39 +417,60 @@ func (t tarDockerClient) Untar(ctx context.Context, id string, archive io.ReadCl }) } -func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Project, serviceName string, options api.WatchOptions, batch []fileEvent, syncer sync.Syncer) error { - pathMappings := make([]sync.PathMapping, len(batch)) - restartService := false - syncService := false - for i := range batch { - switch batch[i].Trigger.Action { - case types.WatchActionRebuild: - return s.rebuild(ctx, project, serviceName, options) - case types.WatchActionSync, types.WatchActionSyncExec: - syncService = true - case types.WatchActionSyncRestart: - restartService = true - syncService = true - case types.WatchActionRestart: - restartService = true +//nolint:gocyclo +func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Project, options api.WatchOptions, batch []watch.FileEvent, rules []watchRule, syncer sync.Syncer) error { + var ( + restart = map[string]bool{} + syncfiles = map[string][]*sync.PathMapping{} + exec = map[string][]int{} + rebuild = map[string]bool{} + ) + for _, event := range batch { + for i, rule := range rules { + mapping := rule.Matches(event) + if mapping == nil { + continue + } + + switch rule.Action { + case types.WatchActionRebuild: + rebuild[rule.service] = true + case types.WatchActionSync: + syncfiles[rule.service] = append(syncfiles[rule.service], mapping) + case types.WatchActionRestart: + restart[rule.service] = true + case types.WatchActionSyncRestart: + syncfiles[rule.service] = append(syncfiles[rule.service], mapping) + restart[rule.service] = true + case types.WatchActionSyncExec: + syncfiles[rule.service] = append(syncfiles[rule.service], mapping) + // We want to run exec hooks only once after syncfiles if multiple file events match + // as we can't compare ServiceHook to sort and compact a slice, collect rule indexes + exec[rule.service] = append(exec[rule.service], i) + } } - pathMappings[i] = batch[i].PathMapping } - writeWatchSyncMessage(options.LogTo, serviceName, pathMappings, restartService) + logrus.Debugf("watch actions: rebuild %d sync %d restart %d", len(rebuild), len(syncfiles), len(restart)) - service, err := project.GetService(serviceName) - if err != nil { - return err - } - if syncService { - if err := syncer.Sync(ctx, service, pathMappings); err != nil { + if len(rebuild) > 0 { + err := s.rebuild(ctx, project, utils.MapKeys(rebuild), options) + if err != nil { return err } } - if restartService { - err = s.restart(ctx, project.Name, api.RestartOptions{ - Services: []string{serviceName}, + + for serviceName, pathMappings := range syncfiles { + writeWatchSyncMessage(options.LogTo, serviceName, pathMappings) + err := syncer.Sync(ctx, serviceName, pathMappings) + if err != nil { + return err + } + } + if len(restart) > 0 { + services := utils.MapKeys(restart) + err := s.restart(ctx, project.Name, api.RestartOptions{ + Services: services, Project: project, NoDeps: false, }) @@ -516,12 +479,14 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr } options.LogTo.Log( api.WatchLogger, - fmt.Sprintf("service %q restarted", serviceName)) + fmt.Sprintf("service(s) %q restarted", services)) } + eg, ctx := errgroup.WithContext(ctx) - for _, b := range batch { - if b.Trigger.Action == types.WatchActionSyncExec { - err := s.exec(ctx, project, serviceName, b.Trigger.Exec, eg) + for service, rulesToExec := range exec { + slices.Sort(rulesToExec) + for _, i := range slices.Compact(rulesToExec) { + err := s.exec(ctx, project, service, rules[i].Exec, eg) if err != nil { return err } @@ -554,10 +519,10 @@ func (s *composeService) exec(ctx context.Context, project *types.Project, servi return nil } -func (s *composeService) rebuild(ctx context.Context, project *types.Project, serviceName string, options api.WatchOptions) error { - options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName)) +func (s *composeService) rebuild(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { + options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service(s) %q after changes were detected...", services)) // restrict the build to ONLY this service, not any of its dependencies - options.Build.Services = []string{serviceName} + options.Build.Services = services imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil) if err != nil { options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err)) @@ -568,19 +533,18 @@ func (s *composeService) rebuild(ctx context.Context, project *types.Project, se s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap) } - options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName)) + options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service(s) %q successfully built", services)) err = s.create(ctx, project, api.CreateOptions{ - Services: []string{serviceName}, + Services: services, Inherit: true, Recreate: api.RecreateForce, }) if err != nil { - options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err)) + options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate services after update. Error: %v", err)) return err } - services := []string{serviceName} p, err := project.WithSelectedServices(services) if err != nil { return err @@ -597,11 +561,7 @@ func (s *composeService) rebuild(ctx context.Context, project *types.Project, se } // writeWatchSyncMessage prints out a message about the sync for the changed paths. -func writeWatchSyncMessage(log api.LogConsumer, serviceName string, pathMappings []sync.PathMapping, restart bool) { - action := "Syncing" - if restart { - action = "Syncing and restarting" - } +func writeWatchSyncMessage(log api.LogConsumer, serviceName string, pathMappings []*sync.PathMapping) { if logrus.IsLevelEnabled(logrus.DebugLevel) { hostPathsToSync := make([]string, len(pathMappings)) for i := range pathMappings { @@ -610,8 +570,7 @@ func writeWatchSyncMessage(log api.LogConsumer, serviceName string, pathMappings log.Log( api.WatchLogger, fmt.Sprintf( - "%s service %q after changes were detected: %s", - action, + "Syncing service %q after changes were detected: %s", serviceName, strings.Join(hostPathsToSync, ", "), ), @@ -619,7 +578,7 @@ func writeWatchSyncMessage(log api.LogConsumer, serviceName string, pathMappings } else { log.Log( api.WatchLogger, - fmt.Sprintf("%s service %q after %d changes were detected", action, serviceName, len(pathMappings)), + fmt.Sprintf("Syncing service %q after %d changes were detected", serviceName, len(pathMappings)), ) } } @@ -648,29 +607,40 @@ func (s *composeService) pruneDanglingImagesOnRebuild(ctx context.Context, proje // Walks develop.watch.path and checks which files should be copied inside the container // ignores develop.watch.ignore, Dockerfile, compose files, bind mounted paths and .git -func (s *composeService) initialSync(ctx context.Context, project *types.Project, service types.ServiceConfig, trigger types.Trigger, ignore watch.PathMatcher, syncer sync.Syncer) error { - dockerFileIgnore, err := watch.NewDockerPatternMatcher("/", []string{"Dockerfile", "*compose*.y*ml"}) +func (s *composeService) initialSync(ctx context.Context, project *types.Project, service types.ServiceConfig, trigger types.Trigger, syncer sync.Syncer) error { + dockerIgnores, err := watch.LoadDockerIgnore(service.Build) if err != nil { return err } - triggerIgnore, err := watch.NewDockerPatternMatcher("/", trigger.Ignore) + + dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"}) if err != nil { return err } - ignoreInitialSync := watch.NewCompositeMatcher(ignore, dockerFileIgnore, triggerIgnore) + + triggerIgnore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore) + if err != nil { + return err + } + // FIXME .dockerignore + ignoreInitialSync := watch.NewCompositeMatcher( + dockerIgnores, + watch.EphemeralPathMatcher(), + dotGitIgnore, + triggerIgnore) pathsToCopy, err := s.initialSyncFiles(ctx, project, service, trigger, ignoreInitialSync) if err != nil { return err } - return syncer.Sync(ctx, service, pathsToCopy) + return syncer.Sync(ctx, service.Name, pathsToCopy) } // Syncs files from develop.watch.path if thy have been modified after the image has been created // //nolint:gocyclo -func (s *composeService) initialSyncFiles(ctx context.Context, project *types.Project, service types.ServiceConfig, trigger types.Trigger, ignore watch.PathMatcher) ([]sync.PathMapping, error) { +func (s *composeService) initialSyncFiles(ctx context.Context, project *types.Project, service types.ServiceConfig, trigger types.Trigger, ignore watch.PathMatcher) ([]*sync.PathMapping, error) { fi, err := os.Stat(trigger.Path) if err != nil { return nil, err @@ -679,7 +649,7 @@ func (s *composeService) initialSyncFiles(ctx context.Context, project *types.Pr if err != nil { return nil, err } - var pathsToCopy []sync.PathMapping + var pathsToCopy []*sync.PathMapping switch mode := fi.Mode(); { case mode.IsDir(): // process directory @@ -714,7 +684,7 @@ func (s *composeService) initialSyncFiles(ctx context.Context, project *types.Pr return err } // only copy files (and not full directories) - pathsToCopy = append(pathsToCopy, sync.PathMapping{ + pathsToCopy = append(pathsToCopy, &sync.PathMapping{ HostPath: path, ContainerPath: filepath.Join(trigger.Target, rel), }) @@ -724,7 +694,7 @@ func (s *composeService) initialSyncFiles(ctx context.Context, project *types.Pr case mode.IsRegular(): // process file if fi.ModTime().After(timeImageCreated) && !shouldIgnore(filepath.Base(trigger.Path), ignore) && !checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) { - pathsToCopy = append(pathsToCopy, sync.PathMapping{ + pathsToCopy = append(pathsToCopy, &sync.PathMapping{ HostPath: trigger.Path, ContainerPath: trigger.Target, }) diff --git a/pkg/compose/watch_test.go b/pkg/compose/watch_test.go index 036e1677b..d359a19fb 100644 --- a/pkg/compose/watch_test.go +++ b/pkg/compose/watch_test.go @@ -18,8 +18,6 @@ import ( "context" "fmt" "os" - "slices" - "strings" "testing" "time" @@ -38,53 +36,6 @@ import ( "gotest.tools/v3/assert" ) -func TestDebounceBatching(t *testing.T) { - ch := make(chan fileEvent) - clock := clockwork.NewFakeClock() - ctx, stop := context.WithCancel(context.Background()) - t.Cleanup(stop) - - trigger := types.Trigger{ - Path: "/", - } - matcher := watch.EmptyMatcher{} - eventBatchCh := batchDebounceEvents(ctx, clock, quietPeriod, ch) - for i := 0; i < 100; i++ { - path := "/a" - if i%2 == 0 { - path = "/b" - } - - event := maybeFileEvent(trigger, path, matcher) - require.NotNil(t, event) - ch <- *event - } - // we sent 100 events + the debouncer - clock.BlockUntil(101) - clock.Advance(quietPeriod) - select { - case batch := <-eventBatchCh: - slices.SortFunc(batch, func(a, b fileEvent) int { - return strings.Compare(a.HostPath, b.HostPath) - }) - assert.Equal(t, len(batch), 2) - assert.Equal(t, batch[0].HostPath, "/a") - assert.Equal(t, batch[1].HostPath, "/b") - case <-time.After(50 * time.Millisecond): - t.Fatal("timed out waiting for events") - } - clock.BlockUntil(1) - clock.Advance(quietPeriod) - - // there should only be a single batch - select { - case batch := <-eventBatchCh: - t.Fatalf("unexpected events: %v", batch) - case <-time.After(50 * time.Millisecond): - // channel is empty - } -} - type testWatcher struct { events chan watch.FileEvent errors chan error @@ -170,32 +121,37 @@ func TestWatch_Sync(t *testing.T) { dockerCli: cli, clock: clock, } - err := service.watchEvents(ctx, &proj, "test", api.WatchOptions{ + rules, err := getWatchRules(&types.DevelopConfig{ + Watch: []types.Trigger{ + { + Path: "/sync", + Action: "sync", + Target: "/work", + Ignore: []string{"ignore"}, + }, + { + Path: "/rebuild", + Action: "rebuild", + }, + }, + }, types.ServiceConfig{Name: "test"}) + assert.NilError(t, err) + + err = service.watchEvents(ctx, &proj, api.WatchOptions{ Build: &api.BuildOptions{}, LogTo: stdLogger{}, Prune: true, - }, watcher, syncer, []types.Trigger{ - { - Path: "/sync", - Action: "sync", - Target: "/work", - Ignore: []string{"ignore"}, - }, - { - Path: "/rebuild", - Action: "rebuild", - }, - }) + }, watcher, syncer, rules) assert.NilError(t, err) }() watcher.Events() <- watch.NewFileEvent("/sync/changed") watcher.Events() <- watch.NewFileEvent("/sync/changed/sub") clock.BlockUntil(3) - clock.Advance(quietPeriod) + clock.Advance(watch.QuietPeriod) select { case actual := <-syncer.synced: - require.ElementsMatch(t, []sync.PathMapping{ + require.ElementsMatch(t, []*sync.PathMapping{ {HostPath: "/sync/changed", ContainerPath: "/work/changed"}, {HostPath: "/sync/changed/sub", ContainerPath: "/work/changed/sub"}, }, actual) @@ -203,24 +159,10 @@ func TestWatch_Sync(t *testing.T) { t.Error("timeout") } - watcher.Events() <- watch.NewFileEvent("/sync/ignore") - watcher.Events() <- watch.NewFileEvent("/sync/ignore/sub") - watcher.Events() <- watch.NewFileEvent("/sync/changed") - clock.BlockUntil(4) - clock.Advance(quietPeriod) - select { - case actual := <-syncer.synced: - require.ElementsMatch(t, []sync.PathMapping{ - {HostPath: "/sync/changed", ContainerPath: "/work/changed"}, - }, actual) - case <-time.After(100 * time.Millisecond): - t.Error("timed out waiting for events") - } - watcher.Events() <- watch.NewFileEvent("/rebuild") watcher.Events() <- watch.NewFileEvent("/sync/changed") clock.BlockUntil(4) - clock.Advance(quietPeriod) + clock.Advance(watch.QuietPeriod) select { case batch := <-syncer.synced: t.Fatalf("received unexpected events: %v", batch) @@ -231,16 +173,16 @@ func TestWatch_Sync(t *testing.T) { } type fakeSyncer struct { - synced chan []sync.PathMapping + synced chan []*sync.PathMapping } func newFakeSyncer() *fakeSyncer { return &fakeSyncer{ - synced: make(chan []sync.PathMapping), + synced: make(chan []*sync.PathMapping), } } -func (f *fakeSyncer) Sync(_ context.Context, _ types.ServiceConfig, paths []sync.PathMapping) error { +func (f *fakeSyncer) Sync(ctx context.Context, service string, paths []*sync.PathMapping) error { f.synced <- paths return nil } diff --git a/pkg/e2e/watch_test.go b/pkg/e2e/watch_test.go index 7c7011b24..a0b8173c9 100644 --- a/pkg/e2e/watch_test.go +++ b/pkg/e2e/watch_test.go @@ -93,10 +93,7 @@ func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) { t.Log("wait for watch to start watching") c.WaitForCondition(t, func() (bool, string) { out := r.String() - errors := r.String() - return strings.Contains(out, - "Watch configuration"), fmt.Sprintf("'Watch configuration' not found in : \n%s\nStderr: \n%s\n", out, - errors) + return strings.Contains(out, "Watch enabled"), "watch not started" }, 30*time.Second, 1*time.Second) pn := c.RunDockerCmd(t, "inspect", containerName, "-f", "{{ .HostConfig.NetworkMode }}") @@ -112,7 +109,7 @@ func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) { t.Log("check if the container has been rebuild") c.WaitForCondition(t, func() (bool, string) { out := r.String() - if strings.Count(out, "batch complete: service["+svcName+"]") != 1 { + if strings.Count(out, "batch complete") != 1 { return false, fmt.Sprintf("container %s was not rebuilt", containerName) } return true, fmt.Sprintf("container %s was rebuilt", containerName) @@ -283,7 +280,7 @@ func doTest(t *testing.T, svcName string) { return poll.Continue("%v", r.Combined()) } } - poll.WaitOn(t, checkRestart(fmt.Sprintf("service %q restarted", svcName))) + poll.WaitOn(t, checkRestart(fmt.Sprintf("service(s) [%q] restarted", svcName))) poll.WaitOn(t, checkFileContents("/app/config/file.config", "This is an updated config file")) testComplete.Store(true) diff --git a/pkg/watch/debounce.go b/pkg/watch/debounce.go new file mode 100644 index 000000000..76d0797b9 --- /dev/null +++ b/pkg/watch/debounce.go @@ -0,0 +1,73 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package watch + +import ( + "context" + "time" + + "github.com/docker/compose/v2/pkg/utils" + "github.com/jonboulle/clockwork" + "github.com/sirupsen/logrus" +) + +const QuietPeriod = 500 * time.Millisecond + +// batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned +// channel. +// +// The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel. +func BatchDebounceEvents(ctx context.Context, clock clockwork.Clock, input <-chan FileEvent) <-chan []FileEvent { + out := make(chan []FileEvent) + go func() { + defer close(out) + seen := utils.Set[FileEvent]{} + flushEvents := func() { + if len(seen) == 0 { + return + } + logrus.Debugf("flush: %d events %s", len(seen), seen) + + events := make([]FileEvent, 0, len(seen)) + for e := range seen { + events = append(events, e) + } + out <- events + seen = utils.Set[FileEvent]{} + } + + t := clock.NewTicker(QuietPeriod) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.Chan(): + flushEvents() + case e, ok := <-input: + if !ok { + // input channel was closed + flushEvents() + return + } + if _, ok := seen[e]; !ok { + seen.Add(e) + } + t.Reset(QuietPeriod) + } + } + }() + return out +} diff --git a/pkg/watch/debounce_test.go b/pkg/watch/debounce_test.go new file mode 100644 index 000000000..9571fd546 --- /dev/null +++ b/pkg/watch/debounce_test.go @@ -0,0 +1,64 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package watch + +import ( + "context" + "slices" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "gotest.tools/v3/assert" +) + +func Test_BatchDebounceEvents(t *testing.T) { + ch := make(chan FileEvent) + clock := clockwork.NewFakeClock() + ctx, stop := context.WithCancel(context.Background()) + t.Cleanup(stop) + + eventBatchCh := BatchDebounceEvents(ctx, clock, ch) + for i := 0; i < 100; i++ { + path := "/a" + if i%2 == 0 { + path = "/b" + } + + ch <- FileEvent(path) + } + // we sent 100 events + the debouncer + clock.BlockUntil(101) + clock.Advance(QuietPeriod) + select { + case batch := <-eventBatchCh: + slices.Sort(batch) + assert.Equal(t, len(batch), 2) + assert.Equal(t, batch[0], FileEvent("/a")) + assert.Equal(t, batch[1], FileEvent("/b")) + case <-time.After(50 * time.Millisecond): + t.Fatal("timed out waiting for events") + } + clock.BlockUntil(1) + clock.Advance(QuietPeriod) + + // there should only be a single batch + select { + case batch := <-eventBatchCh: + t.Fatalf("unexpected events: %v", batch) + case <-time.After(50 * time.Millisecond): + // channel is empty + } +} diff --git a/pkg/watch/notify.go b/pkg/watch/notify.go index 653d536dd..b76292106 100644 --- a/pkg/watch/notify.go +++ b/pkg/watch/notify.go @@ -30,19 +30,13 @@ import ( var numberOfWatches = expvar.NewInt("watch.naive.numberOfWatches") -type FileEvent struct { - path string -} +type FileEvent string func NewFileEvent(p string) FileEvent { if !filepath.IsAbs(p) { panic(fmt.Sprintf("NewFileEvent only accepts absolute paths. Actual: %s", p)) } - return FileEvent{path: p} -} - -func (e FileEvent) Path() string { - return e.path + return FileEvent(p) } type Notify interface { @@ -81,8 +75,8 @@ func (EmptyMatcher) MatchesEntireDir(f string) (bool, error) { return false, nil var _ PathMatcher = EmptyMatcher{} -func NewWatcher(paths []string, ignore PathMatcher) (Notify, error) { - return newWatcher(paths, ignore) +func NewWatcher(paths []string) (Notify, error) { + return newWatcher(paths) } const WindowsBufferSizeEnvVar = "COMPOSE_WATCH_WINDOWS_BUFFER_SIZE" diff --git a/pkg/watch/notify_test.go b/pkg/watch/notify_test.go index 116d9166e..9146c3c49 100644 --- a/pkg/watch/notify_test.go +++ b/pkg/watch/notify_test.go @@ -485,96 +485,6 @@ func TestWatchCountInnerFile(t *testing.T) { assert.Equal(t, expectedWatches, int(numberOfWatches.Value())) } -func TestWatchCountInnerFileWithIgnore(t *testing.T) { - f := newNotifyFixture(t) - - root := f.paths[0] - ignore, _ := NewDockerPatternMatcher(root, []string{ - "a", - "!a/b", - }) - f.setIgnore(ignore) - - a := f.JoinPath(root, "a") - b := f.JoinPath(a, "b") - file := f.JoinPath(b, "bigFile") - f.WriteFile(file, "hello") - f.assertEvents(b, file) - - expectedWatches := 3 - if isRecursiveWatcher() { - expectedWatches = 1 - } - assert.Equal(t, expectedWatches, int(numberOfWatches.Value())) -} - -func TestIgnoreCreatedDir(t *testing.T) { - f := newNotifyFixture(t) - - root := f.paths[0] - ignore, _ := NewDockerPatternMatcher(root, []string{"a/b"}) - f.setIgnore(ignore) - - a := f.JoinPath(root, "a") - b := f.JoinPath(a, "b") - file := f.JoinPath(b, "bigFile") - f.WriteFile(file, "hello") - f.assertEvents(a) - - expectedWatches := 2 - if isRecursiveWatcher() { - expectedWatches = 1 - } - assert.Equal(t, expectedWatches, int(numberOfWatches.Value())) -} - -func TestIgnoreCreatedDirWithExclusions(t *testing.T) { - f := newNotifyFixture(t) - - root := f.paths[0] - ignore, _ := NewDockerPatternMatcher(root, - []string{ - "a/b", - "c", - "!c/d", - }) - f.setIgnore(ignore) - - a := f.JoinPath(root, "a") - b := f.JoinPath(a, "b") - file := f.JoinPath(b, "bigFile") - f.WriteFile(file, "hello") - f.assertEvents(a) - - expectedWatches := 2 - if isRecursiveWatcher() { - expectedWatches = 1 - } - assert.Equal(t, expectedWatches, int(numberOfWatches.Value())) -} - -func TestIgnoreInitialDir(t *testing.T) { - f := newNotifyFixture(t) - - root := f.TempDir("root") - ignore, _ := NewDockerPatternMatcher(root, []string{"a/b"}) - f.setIgnore(ignore) - - a := f.JoinPath(root, "a") - b := f.JoinPath(a, "b") - file := f.JoinPath(b, "bigFile") - f.WriteFile(file, "hello") - f.watch(root) - - f.assertEvents() - - expectedWatches := 3 - if isRecursiveWatcher() { - expectedWatches = 2 - } - assert.Equal(t, expectedWatches, int(numberOfWatches.Value())) -} - func isRecursiveWatcher() bool { return runtime.GOOS == "darwin" || runtime.GOOS == "windows" } @@ -585,7 +495,6 @@ type notifyFixture struct { out *bytes.Buffer *TempDirFixture notify Notify - ignore PathMatcher paths []string events []FileEvent } @@ -598,7 +507,6 @@ func newNotifyFixture(t *testing.T) *notifyFixture { cancel: cancel, TempDirFixture: NewTempDirFixture(t), paths: []string{}, - ignore: EmptyMatcher{}, out: out, } nf.watch(nf.TempDir("watched")) @@ -606,11 +514,6 @@ func newNotifyFixture(t *testing.T) *notifyFixture { return nf } -func (f *notifyFixture) setIgnore(ignore PathMatcher) { - f.ignore = ignore - f.rebuildWatcher() -} - func (f *notifyFixture) watch(path string) { f.paths = append(f.paths, path) f.rebuildWatcher() @@ -624,7 +527,7 @@ func (f *notifyFixture) rebuildWatcher() { } // create a new watcher - notify, err := NewWatcher(f.paths, f.ignore) + notify, err := NewWatcher(f.paths) if err != nil { f.T().Fatal(err) } @@ -648,7 +551,7 @@ func (f *notifyFixture) assertEvents(expected ...string) { } for i, actual := range f.events { - e := FileEvent{expected[i]} + e := FileEvent(expected[i]) if actual != e { f.T().Fatalf("Got event %v (expected %v)", actual, e) } @@ -702,16 +605,16 @@ F: f.T().Fatal(err) case event := <-f.notify.Events(): - if strings.Contains(event.Path(), syncPath) { + if strings.Contains(string(event), syncPath) { break F } - if strings.Contains(event.Path(), anySyncPath) { + if strings.Contains(string(event), anySyncPath) { continue } // Don't bother tracking duplicate changes to the same path // for testing. - if len(f.events) > 0 && f.events[len(f.events)-1].Path() == event.Path() { + if len(f.events) > 0 && f.events[len(f.events)-1] == event { continue } diff --git a/pkg/watch/watcher_darwin.go b/pkg/watch/watcher_darwin.go index 0254f34e1..662746057 100644 --- a/pkg/watch/watcher_darwin.go +++ b/pkg/watch/watcher_darwin.go @@ -27,7 +27,6 @@ import ( pathutil "github.com/docker/compose/v2/internal/paths" "github.com/fsnotify/fsevents" - "github.com/sirupsen/logrus" ) // A file watcher optimized for Darwin. @@ -39,7 +38,6 @@ type fseventNotify struct { stop chan struct{} pathsWereWatching map[string]interface{} - ignore PathMatcher } func (d *fseventNotify) loop() { @@ -62,14 +60,6 @@ func (d *fseventNotify) loop() { continue } - ignore, err := d.ignore.Matches(e.Path) - if err != nil { - logrus.Infof("Error matching path %q: %v", e.Path, err) - } else if ignore { - logrus.Tracef("Ignoring event for path: %v", e.Path) - continue - } - d.events <- NewFileEvent(e.Path) } } @@ -118,9 +108,8 @@ func (d *fseventNotify) Errors() chan error { return d.errors } -func newWatcher(paths []string, ignore PathMatcher) (Notify, error) { +func newWatcher(paths []string) (Notify, error) { dw := &fseventNotify{ - ignore: ignore, stream: &fsevents.EventStream{ Latency: 50 * time.Millisecond, Flags: fsevents.FileEvents | fsevents.IgnoreSelf, diff --git a/pkg/watch/watcher_naive.go b/pkg/watch/watcher_naive.go index 5ee0b0536..5cacd2205 100644 --- a/pkg/watch/watcher_naive.go +++ b/pkg/watch/watcher_naive.go @@ -46,8 +46,6 @@ type naiveNotify struct { // structure, so we can filter the list quickly. notifyList map[string]bool - ignore PathMatcher - isWatcherRecursive bool watcher *fsnotify.Watcher events chan fsnotify.Event @@ -122,12 +120,7 @@ func (d *naiveNotify) watchRecursively(dir string) error { return nil } - shouldSkipDir, err := d.shouldSkipDir(path) - if err != nil { - return err - } - - if shouldSkipDir { + if d.shouldSkipDir(path) { logrus.Debugf("Ignoring directory and its contents (recursively): %s", path) return filepath.SkipDir } @@ -168,14 +161,14 @@ func (d *naiveNotify) loop() { //nolint:gocyclo if e.Op&fsnotify.Create != fsnotify.Create { if d.shouldNotify(e.Name) { - d.wrappedEvents <- FileEvent{e.Name} + d.wrappedEvents <- FileEvent(e.Name) } continue } if d.isWatcherRecursive { if d.shouldNotify(e.Name) { - d.wrappedEvents <- FileEvent{e.Name} + d.wrappedEvents <- FileEvent(e.Name) } continue } @@ -191,7 +184,7 @@ func (d *naiveNotify) loop() { //nolint:gocyclo } if d.shouldNotify(path) { - d.wrappedEvents <- FileEvent{path} + d.wrappedEvents <- FileEvent(path) } // TODO(dmiller): symlinks 😭 @@ -199,11 +192,7 @@ func (d *naiveNotify) loop() { //nolint:gocyclo shouldWatch := false if info.IsDir() { // watch directories unless we can skip them entirely - shouldSkipDir, err := d.shouldSkipDir(path) - if err != nil { - return err - } - if shouldSkipDir { + if d.shouldSkipDir(path) { return filepath.SkipDir } @@ -230,14 +219,6 @@ func (d *naiveNotify) loop() { //nolint:gocyclo } func (d *naiveNotify) shouldNotify(path string) bool { - ignore, err := d.ignore.Matches(path) - if err != nil { - logrus.Infof("Error matching path %q: %v", path, err) - } else if ignore { - logrus.Tracef("Ignoring event for path: %v", path) - return false - } - if _, ok := d.notifyList[path]; ok { // We generally don't care when directories change at the root of an ADD stat, err := os.Lstat(path) @@ -253,19 +234,10 @@ func (d *naiveNotify) shouldNotify(path string) bool { return false } -func (d *naiveNotify) shouldSkipDir(path string) (bool, error) { +func (d *naiveNotify) shouldSkipDir(path string) bool { // If path is directly in the notifyList, we should always watch it. if d.notifyList[path] { - return false, nil - } - - skip, err := d.ignore.MatchesEntireDir(path) - if err != nil { - return false, fmt.Errorf("shouldSkipDir: %w", err) - } - - if skip { - return true, nil + return false } // Suppose we're watching @@ -282,10 +254,10 @@ func (d *naiveNotify) shouldSkipDir(path string) (bool, error) { // (i.e., to cover the "path doesn't exist" case). for root := range d.notifyList { if pathutil.IsChild(root, path) || pathutil.IsChild(path, root) { - return false, nil + return false } } - return true, nil + return true } func (d *naiveNotify) add(path string) error { @@ -298,11 +270,7 @@ func (d *naiveNotify) add(path string) error { return nil } -func newWatcher(paths []string, ignore PathMatcher) (Notify, error) { - if ignore == nil { - return nil, fmt.Errorf("newWatcher: ignore is nil") - } - +func newWatcher(paths []string) (Notify, error) { fsw, err := fsnotify.NewWatcher() if err != nil { if strings.Contains(err.Error(), "too many open files") && runtime.GOOS == "linux" { @@ -332,7 +300,6 @@ func newWatcher(paths []string, ignore PathMatcher) (Notify, error) { wmw := &naiveNotify{ notifyList: notifyList, - ignore: ignore, watcher: fsw, events: fsw.Events, wrappedEvents: wrappedEvents,