diff --git a/build/build.go b/build/build.go index 9a56d3dc..383a4c82 100644 --- a/build/build.go +++ b/build/build.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "os" + "slices" "strconv" "strings" "sync" @@ -34,6 +35,7 @@ import ( gateway "github.com/moby/buildkit/frontend/gateway/client" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" + "github.com/moby/buildkit/session/filesync" "github.com/moby/buildkit/solver/errdefs" "github.com/moby/buildkit/solver/pb" spb "github.com/moby/buildkit/sourcepolicy/pb" @@ -44,6 +46,8 @@ import ( specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/tonistiigi/fsutil" + fstypes "github.com/tonistiigi/fsutil/types" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" ) @@ -291,6 +295,12 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s } } + sharedSessions, err := detectSharedMounts(ctx, reqForNodes) + if err != nil { + return nil, err + } + sharedSessionsWG := map[string]*sync.WaitGroup{} + resp = map[string]*client.SolveResponse{} var respMu sync.Mutex results := waitmap.New() @@ -355,7 +365,37 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s if err != nil { return err } + + var done func() + if sessions, ok := sharedSessions[node.Name]; ok { + wg, ok := sharedSessionsWG[node.Name] + if ok { + wg.Add(1) + } else { + wg = &sync.WaitGroup{} + wg.Add(1) + sharedSessionsWG[node.Name] = wg + for _, s := range sessions { + s := s + eg.Go(func() error { + return s.Run(baseCtx, c.Dialer()) + }) + } + go func() { + wg.Wait() + for _, s := range sessions { + s.Close() + } + }() + } + done = wg.Done + } + eg2.Go(func() error { + if done != nil { + defer done() + } + pw = progress.ResetTime(pw) if err := waitContextDeps(ctx, dp.driverIndex, results, so); err != nil { @@ -786,6 +826,124 @@ func resultKey(index int, name string) string { return fmt.Sprintf("%d-%s", index, name) } +// detectSharedMounts looks for same local mounts used by multiple requests to the same node +// and creates a separate session that will be used by all detected requests. +func detectSharedMounts(ctx context.Context, reqs map[string][]*reqForNode) (_ map[string][]*session.Session, err error) { + type fsTracker struct { + fs fsutil.FS + so []*client.SolveOpt + } + type fsKey struct { + name string + dir string + } + + m := map[string]map[fsKey]*fsTracker{} + for _, reqs := range reqs { + for _, req := range reqs { + nodeName := req.resolvedNode.Node().Name + if _, ok := m[nodeName]; !ok { + m[nodeName] = map[fsKey]*fsTracker{} + } + fsMap := m[nodeName] + for name, m := range req.so.LocalMounts { + fs, ok := m.(*fs) + if !ok { + continue + } + key := fsKey{name: name, dir: fs.dir} + if _, ok := fsMap[key]; !ok { + fsMap[key] = &fsTracker{fs: fs.FS} + } + fsMap[key].so = append(fsMap[key].so, req.so) + } + } + } + + type sharedSession struct { + *session.Session + fsMap map[string]fsutil.FS + } + + sessionMap := map[string][]*sharedSession{} + + defer func() { + if err != nil { + for _, sessions := range sessionMap { + for _, s := range sessions { + s.Close() + } + } + } + }() + + for node, fsMap := range m { + for key, fs := range fsMap { + if len(fs.so) <= 1 { + continue + } + + sessions := sessionMap[node] + + // find session that doesn't have the fs name reserved + idx := slices.IndexFunc(sessions, func(s *sharedSession) bool { + _, ok := s.fsMap[key.name] + return !ok + }) + + var ss *sharedSession + if idx == -1 { + s, err := session.NewSession(ctx, "", fs.so[0].SharedKey) + if err != nil { + return nil, err + } + ss = &sharedSession{Session: s, fsMap: map[string]fsutil.FS{}} + sessions = append(sessions, ss) + sessionMap[node] = sessions + } else { + ss = sessions[idx] + } + + ss.fsMap[key.name] = fs.fs + for _, so := range fs.so { + if so.FrontendAttrs == nil { + so.FrontendAttrs = map[string]string{} + } + so.FrontendAttrs["local-sessionid:"+key.name] = ss.ID() + } + } + } + + resetUIDAndGID := func(p string, st *fstypes.Stat) fsutil.MapResult { + st.Uid = 0 + st.Gid = 0 + return fsutil.MapResultKeep + } + + // convert back to regular sessions + sessions := map[string][]*session.Session{} + for n, ss := range sessionMap { + arr := make([]*session.Session, 0, len(ss)) + for _, s := range ss { + arr = append(arr, s.Session) + + src := make(filesync.StaticDirSource, len(s.fsMap)) + for name, fs := range s.fsMap { + fs, err := fsutil.NewFilterFS(fs, &fsutil.FilterOpt{ + Map: resetUIDAndGID, + }) + if err != nil { + return nil, err + } + src[name] = fs + } + s.Allow(filesync.NewFSSyncProvider(src)) + } + sessions[n] = arr + } + return sessions, nil +} + // calculateChildTargets returns all the targets that depend on current target for reverse index func calculateChildTargets(reqs map[string][]*reqForNode, opt map[string]Options) map[string][]string { out := make(map[string][]string)