bake: use shared session for local sources for multiple targets

Detect cases where multiple bake targets would use the same
local source. For such cases a separate session request is
made in addition to session per target and local source
is made available in that source as well.

The new sessionID is sent with the request so the frontend
can ask associate it with the local source it needs.

The sources are still available in the main request session
as well. This would be used if frontend ignores the local-sessionid
parameter and makes sure that old version continue working.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi 2024-07-17 22:34:06 -07:00
parent f28dff7598
commit b62d64b2b5
No known key found for this signature in database
GPG Key ID: AFA9DE5F8AB7AF39
1 changed files with 158 additions and 0 deletions

View File

@ -9,6 +9,7 @@ import (
"fmt"
"io"
"os"
"slices"
"strconv"
"strings"
"sync"
@ -34,6 +35,7 @@ import (
gateway "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/solver/pb"
spb "github.com/moby/buildkit/sourcepolicy/pb"
@ -44,6 +46,8 @@ import (
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
@ -291,6 +295,12 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
}
}
sharedSessions, err := detectSharedMounts(ctx, reqForNodes)
if err != nil {
return nil, err
}
sharedSessionsWG := map[string]*sync.WaitGroup{}
resp = map[string]*client.SolveResponse{}
var respMu sync.Mutex
results := waitmap.New()
@ -355,7 +365,37 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
if err != nil {
return err
}
var done func()
if sessions, ok := sharedSessions[node.Name]; ok {
wg, ok := sharedSessionsWG[node.Name]
if ok {
wg.Add(1)
} else {
wg = &sync.WaitGroup{}
wg.Add(1)
sharedSessionsWG[node.Name] = wg
for _, s := range sessions {
s := s
eg.Go(func() error {
return s.Run(baseCtx, c.Dialer())
})
}
go func() {
wg.Wait()
for _, s := range sessions {
s.Close()
}
}()
}
done = wg.Done
}
eg2.Go(func() error {
if done != nil {
defer done()
}
pw = progress.ResetTime(pw)
if err := waitContextDeps(ctx, dp.driverIndex, results, so); err != nil {
@ -786,6 +826,124 @@ func resultKey(index int, name string) string {
return fmt.Sprintf("%d-%s", index, name)
}
// detectSharedMounts looks for same local mounts used by multiple requests to the same node
// and creates a separate session that will be used by all detected requests.
func detectSharedMounts(ctx context.Context, reqs map[string][]*reqForNode) (_ map[string][]*session.Session, err error) {
type fsTracker struct {
fs fsutil.FS
so []*client.SolveOpt
}
type fsKey struct {
name string
dir string
}
m := map[string]map[fsKey]*fsTracker{}
for _, reqs := range reqs {
for _, req := range reqs {
nodeName := req.resolvedNode.Node().Name
if _, ok := m[nodeName]; !ok {
m[nodeName] = map[fsKey]*fsTracker{}
}
fsMap := m[nodeName]
for name, m := range req.so.LocalMounts {
fs, ok := m.(*fs)
if !ok {
continue
}
key := fsKey{name: name, dir: fs.dir}
if _, ok := fsMap[key]; !ok {
fsMap[key] = &fsTracker{fs: fs.FS}
}
fsMap[key].so = append(fsMap[key].so, req.so)
}
}
}
type sharedSession struct {
*session.Session
fsMap map[string]fsutil.FS
}
sessionMap := map[string][]*sharedSession{}
defer func() {
if err != nil {
for _, sessions := range sessionMap {
for _, s := range sessions {
s.Close()
}
}
}
}()
for node, fsMap := range m {
for key, fs := range fsMap {
if len(fs.so) <= 1 {
continue
}
sessions := sessionMap[node]
// find session that doesn't have the fs name reserved
idx := slices.IndexFunc(sessions, func(s *sharedSession) bool {
_, ok := s.fsMap[key.name]
return !ok
})
var ss *sharedSession
if idx == -1 {
s, err := session.NewSession(ctx, "", fs.so[0].SharedKey)
if err != nil {
return nil, err
}
ss = &sharedSession{Session: s, fsMap: map[string]fsutil.FS{}}
sessions = append(sessions, ss)
sessionMap[node] = sessions
} else {
ss = sessions[idx]
}
ss.fsMap[key.name] = fs.fs
for _, so := range fs.so {
if so.FrontendAttrs == nil {
so.FrontendAttrs = map[string]string{}
}
so.FrontendAttrs["local-sessionid:"+key.name] = ss.ID()
}
}
}
resetUIDAndGID := func(p string, st *fstypes.Stat) fsutil.MapResult {
st.Uid = 0
st.Gid = 0
return fsutil.MapResultKeep
}
// convert back to regular sessions
sessions := map[string][]*session.Session{}
for n, ss := range sessionMap {
arr := make([]*session.Session, 0, len(ss))
for _, s := range ss {
arr = append(arr, s.Session)
src := make(filesync.StaticDirSource, len(s.fsMap))
for name, fs := range s.fsMap {
fs, err := fsutil.NewFilterFS(fs, &fsutil.FilterOpt{
Map: resetUIDAndGID,
})
if err != nil {
return nil, err
}
src[name] = fs
}
s.Allow(filesync.NewFSSyncProvider(src))
}
sessions[n] = arr
}
return sessions, nil
}
// calculateChildTargets returns all the targets that depend on current target for reverse index
func calculateChildTargets(reqs map[string][]*reqForNode, opt map[string]Options) map[string][]string {
out := make(map[string][]string)