Merge pull request #1504 from jedevc/fix-1497

Refactor BuildWithResultHandler to simplify concurrency usage
This commit is contained in:
Tõnis Tiigi 2023-01-25 10:52:17 -08:00 committed by GitHub
commit b1b4e64c97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 171 additions and 177 deletions

View File

@ -952,133 +952,16 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
if multiTarget {
span, ctx = tracing.StartSpan(ctx, k)
}
baseCtx := ctx
res := make([]*client.SolveResponse, len(dps))
wg := &sync.WaitGroup{}
wg.Add(len(dps))
eg2, ctx := errgroup.WithContext(ctx)
var pushNames string
var insecurePush bool
eg.Go(func() (err error) {
defer func() {
if span != nil {
tracing.FinishWithError(span, err)
}
}()
pw := progress.WithPrefix(w, "default", false)
wg.Wait()
select {
case <-ctx.Done():
return ctx.Err()
default:
}
respMu.Lock()
resp[k] = res[0]
respMu.Unlock()
if len(res) == 1 {
dgst := res[0].ExporterResponse[exptypes.ExporterImageDigestKey]
if v, ok := res[0].ExporterResponse[exptypes.ExporterImageConfigDigestKey]; ok {
dgst = v
}
if opt.ImageIDFile != "" {
return os.WriteFile(opt.ImageIDFile, []byte(dgst), 0644)
}
return nil
}
if pushNames != "" {
progress.Write(pw, fmt.Sprintf("merging manifest list %s", pushNames), func() error {
descs := make([]specs.Descriptor, 0, len(res))
for _, r := range res {
s, ok := r.ExporterResponse[exptypes.ExporterImageDigestKey]
if ok {
descs = append(descs, specs.Descriptor{
Digest: digest.Digest(s),
MediaType: images.MediaTypeDockerSchema2ManifestList,
Size: -1,
})
}
}
if len(descs) > 0 {
var imageopt imagetools.Opt
for _, dp := range dps {
imageopt = nodes[dp.driverIndex].ImageOpt
break
}
names := strings.Split(pushNames, ",")
if insecurePush {
insecureTrue := true
httpTrue := true
nn, err := reference.ParseNormalizedNamed(names[0])
if err != nil {
return err
}
imageopt.RegistryConfig = map[string]resolver.RegistryConfig{
reference.Domain(nn): {
Insecure: &insecureTrue,
PlainHTTP: &httpTrue,
},
}
}
itpull := imagetools.New(imageopt)
ref, err := reference.ParseNormalizedNamed(names[0])
if err != nil {
return err
}
ref = reference.TagNameOnly(ref)
srcs := make([]*imagetools.Source, len(descs))
for i, desc := range descs {
srcs[i] = &imagetools.Source{
Desc: desc,
Ref: ref,
}
}
dt, desc, err := itpull.Combine(ctx, srcs)
if err != nil {
return err
}
if opt.ImageIDFile != "" {
if err := os.WriteFile(opt.ImageIDFile, []byte(desc.Digest), 0644); err != nil {
return err
}
}
itpush := imagetools.New(imageopt)
for _, n := range names {
nn, err := reference.ParseNormalizedNamed(n)
if err != nil {
return err
}
if err := itpush.Push(ctx, nn, desc, dt); err != nil {
return err
}
}
respMu.Lock()
resp[k] = &client.SolveResponse{
ExporterResponse: map[string]string{
"containerimage.digest": desc.Digest.String(),
},
}
respMu.Unlock()
}
return nil
})
}
return nil
})
for i, dp := range dps {
so := *dp.so
i, dp, so := i, dp, *dp.so
if multiDriver {
for i, e := range so.Exports {
switch e.Type {
@ -1107,13 +990,11 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
}
}
func(i int, dp driverPair, so client.SolveOpt) {
pw := progress.WithPrefix(w, k, multiTarget)
c := clients[dp.driverIndex]
eg.Go(func() error {
eg2.Go(func() error {
pw = progress.ResetTime(pw)
defer wg.Done()
if err := waitContextDeps(ctx, dp.driverIndex, results, &so); err != nil {
return err
@ -1243,10 +1124,123 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
}
return nil
})
}(i, dp, so)
}
eg.Go(func() (err error) {
ctx := baseCtx
defer func() {
if span != nil {
tracing.FinishWithError(span, err)
}
}()
pw := progress.WithPrefix(w, "default", false)
if err := eg2.Wait(); err != nil {
return err
}
respMu.Lock()
resp[k] = res[0]
respMu.Unlock()
if len(res) == 1 {
dgst := res[0].ExporterResponse[exptypes.ExporterImageDigestKey]
if v, ok := res[0].ExporterResponse[exptypes.ExporterImageConfigDigestKey]; ok {
dgst = v
}
if opt.ImageIDFile != "" {
return os.WriteFile(opt.ImageIDFile, []byte(dgst), 0644)
}
return nil
}
if pushNames != "" {
progress.Write(pw, fmt.Sprintf("merging manifest list %s", pushNames), func() error {
descs := make([]specs.Descriptor, 0, len(res))
for _, r := range res {
s, ok := r.ExporterResponse[exptypes.ExporterImageDigestKey]
if ok {
descs = append(descs, specs.Descriptor{
Digest: digest.Digest(s),
MediaType: images.MediaTypeDockerSchema2ManifestList,
Size: -1,
})
}
}
if len(descs) > 0 {
var imageopt imagetools.Opt
for _, dp := range dps {
imageopt = nodes[dp.driverIndex].ImageOpt
break
}
names := strings.Split(pushNames, ",")
if insecurePush {
insecureTrue := true
httpTrue := true
nn, err := reference.ParseNormalizedNamed(names[0])
if err != nil {
return err
}
imageopt.RegistryConfig = map[string]resolver.RegistryConfig{
reference.Domain(nn): {
Insecure: &insecureTrue,
PlainHTTP: &httpTrue,
},
}
}
itpull := imagetools.New(imageopt)
ref, err := reference.ParseNormalizedNamed(names[0])
if err != nil {
return err
}
ref = reference.TagNameOnly(ref)
srcs := make([]*imagetools.Source, len(descs))
for i, desc := range descs {
srcs[i] = &imagetools.Source{
Desc: desc,
Ref: ref,
}
}
dt, desc, err := itpull.Combine(ctx, srcs)
if err != nil {
return err
}
if opt.ImageIDFile != "" {
if err := os.WriteFile(opt.ImageIDFile, []byte(desc.Digest), 0644); err != nil {
return err
}
}
itpush := imagetools.New(imageopt)
for _, n := range names {
nn, err := reference.ParseNormalizedNamed(n)
if err != nil {
return err
}
if err := itpush.Push(ctx, nn, desc, dt); err != nil {
return err
}
}
respMu.Lock()
resp[k] = &client.SolveResponse{
ExporterResponse: map[string]string{
"containerimage.digest": desc.Digest.String(),
},
}
respMu.Unlock()
}
return nil
})
}
return nil
})
return nil
}(k)
if err != nil {