Merge pull request #2656 from tonistiigi/repl-stdin

build: allow builds from stdin for multi-node builders
This commit is contained in:
thompson-shaun 2024-08-22 11:28:55 -04:00 committed by GitHub
commit e403ab2d63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 266 additions and 14 deletions

View File

@ -97,7 +97,7 @@ type CallFunc struct {
type Inputs struct { type Inputs struct {
ContextPath string ContextPath string
DockerfilePath string DockerfilePath string
InStream io.Reader InStream *SyncMultiReader
ContextState *llb.State ContextState *llb.State
DockerfileInline string DockerfileInline string
NamedContexts map[string]NamedContext NamedContexts map[string]NamedContext

View File

@ -1,11 +1,12 @@
package build package build
import ( import (
"bufio" "bytes"
"context" "context"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"slices"
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
@ -260,7 +261,7 @@ func toSolveOpt(ctx context.Context, node builder.Node, multiDriver bool, opt Op
} }
so.Exports = opt.Exports so.Exports = opt.Exports
so.Session = opt.Session so.Session = slices.Clone(opt.Session)
releaseLoad, err := loadInputs(ctx, nodeDriver, opt.Inputs, pw, &so) releaseLoad, err := loadInputs(ctx, nodeDriver, opt.Inputs, pw, &so)
if err != nil { if err != nil {
@ -364,7 +365,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
var ( var (
err error err error
dockerfileReader io.Reader dockerfileReader io.ReadCloser
dockerfileDir string dockerfileDir string
dockerfileName = inp.DockerfilePath dockerfileName = inp.DockerfilePath
toRemove []string toRemove []string
@ -382,8 +383,8 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
return nil, errors.Errorf("invalid argument: can't use stdin for both build context and dockerfile") return nil, errors.Errorf("invalid argument: can't use stdin for both build context and dockerfile")
} }
buf := bufio.NewReader(inp.InStream) rc := inp.InStream.NewReadCloser()
magic, err := buf.Peek(archiveHeaderSize * 2) magic, err := inp.InStream.Peek(archiveHeaderSize * 2)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return nil, errors.Wrap(err, "failed to peek context header from STDIN") return nil, errors.Wrap(err, "failed to peek context header from STDIN")
} }
@ -391,14 +392,14 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
if isArchive(magic) { if isArchive(magic) {
// stdin is context // stdin is context
up := uploadprovider.New() up := uploadprovider.New()
target.FrontendAttrs["context"] = up.Add(buf) target.FrontendAttrs["context"] = up.Add(rc)
target.Session = append(target.Session, up) target.Session = append(target.Session, up)
} else { } else {
if inp.DockerfilePath != "" { if inp.DockerfilePath != "" {
return nil, errors.Errorf("ambiguous Dockerfile source: both stdin and flag correspond to Dockerfiles") return nil, errors.Errorf("ambiguous Dockerfile source: both stdin and flag correspond to Dockerfiles")
} }
// stdin is dockerfile // stdin is dockerfile
dockerfileReader = buf dockerfileReader = rc
inp.ContextPath, _ = os.MkdirTemp("", "empty-dir") inp.ContextPath, _ = os.MkdirTemp("", "empty-dir")
toRemove = append(toRemove, inp.ContextPath) toRemove = append(toRemove, inp.ContextPath)
if err := setLocalMount("context", inp.ContextPath, target); err != nil { if err := setLocalMount("context", inp.ContextPath, target); err != nil {
@ -417,7 +418,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
target.SharedKey = sharedKey target.SharedKey = sharedKey
switch inp.DockerfilePath { switch inp.DockerfilePath {
case "-": case "-":
dockerfileReader = inp.InStream dockerfileReader = inp.InStream.NewReadCloser()
case "": case "":
dockerfileDir = inp.ContextPath dockerfileDir = inp.ContextPath
default: default:
@ -426,7 +427,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
} }
case IsRemoteURL(inp.ContextPath): case IsRemoteURL(inp.ContextPath):
if inp.DockerfilePath == "-" { if inp.DockerfilePath == "-" {
dockerfileReader = inp.InStream dockerfileReader = inp.InStream.NewReadCloser()
} else if filepath.IsAbs(inp.DockerfilePath) { } else if filepath.IsAbs(inp.DockerfilePath) {
dockerfileDir = filepath.Dir(inp.DockerfilePath) dockerfileDir = filepath.Dir(inp.DockerfilePath)
dockerfileName = filepath.Base(inp.DockerfilePath) dockerfileName = filepath.Base(inp.DockerfilePath)
@ -438,11 +439,11 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
} }
if inp.DockerfileInline != "" { if inp.DockerfileInline != "" {
dockerfileReader = strings.NewReader(inp.DockerfileInline) dockerfileReader = io.NopCloser(strings.NewReader(inp.DockerfileInline))
} }
if dockerfileReader != nil { if dockerfileReader != nil {
dockerfileDir, err = createTempDockerfile(dockerfileReader) dockerfileDir, err = createTempDockerfile(dockerfileReader, inp.InStream)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -582,7 +583,7 @@ func setLocalMount(name, dir string, so *client.SolveOpt) error {
return nil return nil
} }
func createTempDockerfile(r io.Reader) (string, error) { func createTempDockerfile(r io.Reader, multiReader *SyncMultiReader) (string, error) {
dir, err := os.MkdirTemp("", "dockerfile") dir, err := os.MkdirTemp("", "dockerfile")
if err != nil { if err != nil {
return "", err return "", err
@ -592,6 +593,16 @@ func createTempDockerfile(r io.Reader) (string, error) {
return "", err return "", err
} }
defer f.Close() defer f.Close()
if multiReader != nil {
dt, err := io.ReadAll(r)
if err != nil {
return "", err
}
multiReader.Reset(dt)
r = bytes.NewReader(dt)
}
if _, err := io.Copy(f, r); err != nil { if _, err := io.Copy(f, r); err != nil {
return "", err return "", err
} }

164
build/replicatedstream.go Normal file
View File

@ -0,0 +1,164 @@
package build
import (
"bufio"
"bytes"
"io"
"sync"
)
type SyncMultiReader struct {
source *bufio.Reader
buffer []byte
static []byte
mu sync.Mutex
cond *sync.Cond
readers []*syncReader
err error
offset int
}
type syncReader struct {
mr *SyncMultiReader
offset int
closed bool
}
func NewSyncMultiReader(source io.Reader) *SyncMultiReader {
mr := &SyncMultiReader{
source: bufio.NewReader(source),
buffer: make([]byte, 0, 32*1024),
}
mr.cond = sync.NewCond(&mr.mu)
return mr
}
func (mr *SyncMultiReader) Peek(n int) ([]byte, error) {
mr.mu.Lock()
defer mr.mu.Unlock()
if mr.static != nil {
return mr.static[min(n, len(mr.static)):], nil
}
return mr.source.Peek(n)
}
func (mr *SyncMultiReader) Reset(dt []byte) {
mr.mu.Lock()
defer mr.mu.Unlock()
mr.static = dt
}
func (mr *SyncMultiReader) NewReadCloser() io.ReadCloser {
mr.mu.Lock()
defer mr.mu.Unlock()
if mr.static != nil {
return io.NopCloser(bytes.NewReader(mr.static))
}
reader := &syncReader{
mr: mr,
}
mr.readers = append(mr.readers, reader)
return reader
}
func (sr *syncReader) Read(p []byte) (int, error) {
sr.mr.mu.Lock()
defer sr.mr.mu.Unlock()
return sr.read(p)
}
func (sr *syncReader) read(p []byte) (int, error) {
end := sr.mr.offset + len(sr.mr.buffer)
loop0:
for {
if sr.closed {
return 0, io.EOF
}
end := sr.mr.offset + len(sr.mr.buffer)
if sr.mr.err != nil && sr.offset == end {
return 0, sr.mr.err
}
start := sr.offset - sr.mr.offset
dt := sr.mr.buffer[start:]
if len(dt) > 0 {
n := copy(p, dt)
sr.offset += n
sr.mr.cond.Broadcast()
return n, nil
}
// check for readers that have not caught up
hasOpen := false
for _, r := range sr.mr.readers {
if !r.closed {
hasOpen = true
} else {
continue
}
if r.offset < end {
sr.mr.cond.Wait()
continue loop0
}
}
if !hasOpen {
return 0, io.EOF
}
break
}
last := sr.mr.offset + len(sr.mr.buffer)
// another reader has already updated the buffer
if last > end || sr.mr.err != nil {
return sr.read(p)
}
sr.mr.offset += len(sr.mr.buffer)
sr.mr.buffer = sr.mr.buffer[:cap(sr.mr.buffer)]
n, err := sr.mr.source.Read(sr.mr.buffer)
if n >= 0 {
sr.mr.buffer = sr.mr.buffer[:n]
} else {
sr.mr.buffer = sr.mr.buffer[:0]
}
sr.mr.cond.Broadcast()
if err != nil {
sr.mr.err = err
return 0, err
}
nn := copy(p, sr.mr.buffer)
sr.offset += nn
return nn, nil
}
func (sr *syncReader) Close() error {
sr.mr.mu.Lock()
defer sr.mr.mu.Unlock()
if sr.closed {
return nil
}
sr.closed = true
sr.mr.cond.Broadcast()
return nil
}

View File

@ -0,0 +1,77 @@
package build
import (
"bytes"
"crypto/rand"
"io"
mathrand "math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func generateRandomData(size int) []byte {
data := make([]byte, size)
rand.Read(data)
return data
}
func TestSyncMultiReaderParallel(t *testing.T) {
data := generateRandomData(1024 * 1024)
source := bytes.NewReader(data)
mr := NewSyncMultiReader(source)
var wg sync.WaitGroup
numReaders := 10
bufferSize := 4096 * 4
readers := make([]io.ReadCloser, numReaders)
for i := 0; i < numReaders; i++ {
readers[i] = mr.NewReadCloser()
}
for i := 0; i < numReaders; i++ {
wg.Add(1)
go func(readerId int) {
defer wg.Done()
reader := readers[readerId]
defer reader.Close()
totalRead := 0
buf := make([]byte, bufferSize)
for totalRead < len(data) {
// Simulate random read sizes
readSize := mathrand.Intn(bufferSize) //nolint:gosec
n, err := reader.Read(buf[:readSize])
if n > 0 {
assert.Equal(t, data[totalRead:totalRead+n], buf[:n], "Reader %d mismatch", readerId)
totalRead += n
}
if err == io.EOF {
assert.Equal(t, len(data), totalRead, "Reader %d EOF mismatch", readerId)
return
}
require.NoError(t, err, "Reader %d error", readerId)
if mathrand.Intn(1000) == 0 { //nolint:gosec
t.Logf("Reader %d closing", readerId)
// Simulate random close
return
}
// Simulate random timing between reads
time.Sleep(time.Millisecond * time.Duration(mathrand.Intn(5))) //nolint:gosec
}
assert.Equal(t, len(data), totalRead, "Reader %d total read mismatch", readerId)
}(i)
}
wg.Wait()
}

View File

@ -48,7 +48,7 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
Inputs: build.Inputs{ Inputs: build.Inputs{
ContextPath: in.ContextPath, ContextPath: in.ContextPath,
DockerfilePath: in.DockerfileName, DockerfilePath: in.DockerfileName,
InStream: inStream, InStream: build.NewSyncMultiReader(inStream),
NamedContexts: contexts, NamedContexts: contexts,
}, },
Ref: in.Ref, Ref: in.Ref,