Merge pull request #2615 from tonistiigi/bake-shared-transfer-sessions

bake: use shared session for local sources for multiple targets
This commit is contained in:
CrazyMax 2024-08-07 12:53:03 +02:00 committed by GitHub
commit 5a50d13641
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 158 additions and 0 deletions

View File

@ -9,6 +9,7 @@ import (
"fmt"
"io"
"os"
"slices"
"strconv"
"strings"
"sync"
@ -34,6 +35,7 @@ import (
gateway "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/solver/pb"
spb "github.com/moby/buildkit/sourcepolicy/pb"
@ -44,6 +46,8 @@ import (
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
@ -291,6 +295,12 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
}
}
sharedSessions, err := detectSharedMounts(ctx, reqForNodes)
if err != nil {
return nil, err
}
sharedSessionsWG := map[string]*sync.WaitGroup{}
resp = map[string]*client.SolveResponse{}
var respMu sync.Mutex
results := waitmap.New()
@ -355,7 +365,37 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
if err != nil {
return err
}
var done func()
if sessions, ok := sharedSessions[node.Name]; ok {
wg, ok := sharedSessionsWG[node.Name]
if ok {
wg.Add(1)
} else {
wg = &sync.WaitGroup{}
wg.Add(1)
sharedSessionsWG[node.Name] = wg
for _, s := range sessions {
s := s
eg.Go(func() error {
return s.Run(baseCtx, c.Dialer())
})
}
go func() {
wg.Wait()
for _, s := range sessions {
s.Close()
}
}()
}
done = wg.Done
}
eg2.Go(func() error {
if done != nil {
defer done()
}
pw = progress.ResetTime(pw)
if err := waitContextDeps(ctx, dp.driverIndex, results, so); err != nil {
@ -786,6 +826,124 @@ func resultKey(index int, name string) string {
return fmt.Sprintf("%d-%s", index, name)
}
// detectSharedMounts looks for same local mounts used by multiple requests to the same node
// and creates a separate session that will be used by all detected requests.
func detectSharedMounts(ctx context.Context, reqs map[string][]*reqForNode) (_ map[string][]*session.Session, err error) {
type fsTracker struct {
fs fsutil.FS
so []*client.SolveOpt
}
type fsKey struct {
name string
dir string
}
m := map[string]map[fsKey]*fsTracker{}
for _, reqs := range reqs {
for _, req := range reqs {
nodeName := req.resolvedNode.Node().Name
if _, ok := m[nodeName]; !ok {
m[nodeName] = map[fsKey]*fsTracker{}
}
fsMap := m[nodeName]
for name, m := range req.so.LocalMounts {
fs, ok := m.(*fs)
if !ok {
continue
}
key := fsKey{name: name, dir: fs.dir}
if _, ok := fsMap[key]; !ok {
fsMap[key] = &fsTracker{fs: fs.FS}
}
fsMap[key].so = append(fsMap[key].so, req.so)
}
}
}
type sharedSession struct {
*session.Session
fsMap map[string]fsutil.FS
}
sessionMap := map[string][]*sharedSession{}
defer func() {
if err != nil {
for _, sessions := range sessionMap {
for _, s := range sessions {
s.Close()
}
}
}
}()
for node, fsMap := range m {
for key, fs := range fsMap {
if len(fs.so) <= 1 {
continue
}
sessions := sessionMap[node]
// find session that doesn't have the fs name reserved
idx := slices.IndexFunc(sessions, func(s *sharedSession) bool {
_, ok := s.fsMap[key.name]
return !ok
})
var ss *sharedSession
if idx == -1 {
s, err := session.NewSession(ctx, "", fs.so[0].SharedKey)
if err != nil {
return nil, err
}
ss = &sharedSession{Session: s, fsMap: map[string]fsutil.FS{}}
sessions = append(sessions, ss)
sessionMap[node] = sessions
} else {
ss = sessions[idx]
}
ss.fsMap[key.name] = fs.fs
for _, so := range fs.so {
if so.FrontendAttrs == nil {
so.FrontendAttrs = map[string]string{}
}
so.FrontendAttrs["local-sessionid:"+key.name] = ss.ID()
}
}
}
resetUIDAndGID := func(p string, st *fstypes.Stat) fsutil.MapResult {
st.Uid = 0
st.Gid = 0
return fsutil.MapResultKeep
}
// convert back to regular sessions
sessions := map[string][]*session.Session{}
for n, ss := range sessionMap {
arr := make([]*session.Session, 0, len(ss))
for _, s := range ss {
arr = append(arr, s.Session)
src := make(filesync.StaticDirSource, len(s.fsMap))
for name, fs := range s.fsMap {
fs, err := fsutil.NewFilterFS(fs, &fsutil.FilterOpt{
Map: resetUIDAndGID,
})
if err != nil {
return nil, err
}
src[name] = fs
}
s.Allow(filesync.NewFSSyncProvider(src))
}
sessions[n] = arr
}
return sessions, nil
}
// calculateChildTargets returns all the targets that depend on current target for reverse index
func calculateChildTargets(reqs map[string][]*reqForNode, opt map[string]Options) map[string][]string {
out := make(map[string][]string)