From adbcc2225e32f31502ac09deaf21ba99841bfbf5 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Tue, 13 Aug 2024 11:03:04 +0300 Subject: [PATCH] build: allow builds from stdin for multi-node builders When building from same stream all nodes need to read data from the same stream. In order to achive that there is a new SyncMultiReader wrapper that sends the stream concurrently to all readers. Readers must read at similar speed or pauses will happen while they wait for each other. Dockerfiles were already written to disk before sent. Now the file written by first node is reused for others. Signed-off-by: Tonis Tiigi --- build/build.go | 2 +- build/opt.go | 35 ++++--- build/replicatedstream.go | 164 +++++++++++++++++++++++++++++++++ build/replicatedstream_test.go | 77 ++++++++++++++++ controller/build/build.go | 2 +- 5 files changed, 266 insertions(+), 14 deletions(-) create mode 100644 build/replicatedstream.go create mode 100644 build/replicatedstream_test.go diff --git a/build/build.go b/build/build.go index 82934f74..3bc1c1f1 100644 --- a/build/build.go +++ b/build/build.go @@ -97,7 +97,7 @@ type CallFunc struct { type Inputs struct { ContextPath string DockerfilePath string - InStream io.Reader + InStream *SyncMultiReader ContextState *llb.State DockerfileInline string NamedContexts map[string]NamedContext diff --git a/build/opt.go b/build/opt.go index 3842e8dc..c8d52c91 100644 --- a/build/opt.go +++ b/build/opt.go @@ -1,11 +1,12 @@ package build import ( - "bufio" + "bytes" "context" "io" "os" "path/filepath" + "slices" "strconv" "strings" "syscall" @@ -260,7 +261,7 @@ func toSolveOpt(ctx context.Context, node builder.Node, multiDriver bool, opt Op } so.Exports = opt.Exports - so.Session = opt.Session + so.Session = slices.Clone(opt.Session) releaseLoad, err := loadInputs(ctx, nodeDriver, opt.Inputs, pw, &so) if err != nil { @@ -364,7 +365,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog var ( err error - dockerfileReader io.Reader + dockerfileReader io.ReadCloser dockerfileDir string dockerfileName = inp.DockerfilePath toRemove []string @@ -382,8 +383,8 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog return nil, errors.Errorf("invalid argument: can't use stdin for both build context and dockerfile") } - buf := bufio.NewReader(inp.InStream) - magic, err := buf.Peek(archiveHeaderSize * 2) + rc := inp.InStream.NewReadCloser() + magic, err := inp.InStream.Peek(archiveHeaderSize * 2) if err != nil && err != io.EOF { return nil, errors.Wrap(err, "failed to peek context header from STDIN") } @@ -391,14 +392,14 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog if isArchive(magic) { // stdin is context up := uploadprovider.New() - target.FrontendAttrs["context"] = up.Add(buf) + target.FrontendAttrs["context"] = up.Add(rc) target.Session = append(target.Session, up) } else { if inp.DockerfilePath != "" { return nil, errors.Errorf("ambiguous Dockerfile source: both stdin and flag correspond to Dockerfiles") } // stdin is dockerfile - dockerfileReader = buf + dockerfileReader = rc inp.ContextPath, _ = os.MkdirTemp("", "empty-dir") toRemove = append(toRemove, inp.ContextPath) if err := setLocalMount("context", inp.ContextPath, target); err != nil { @@ -417,7 +418,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog target.SharedKey = sharedKey switch inp.DockerfilePath { case "-": - dockerfileReader = inp.InStream + dockerfileReader = inp.InStream.NewReadCloser() case "": dockerfileDir = inp.ContextPath default: @@ -426,7 +427,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog } case IsRemoteURL(inp.ContextPath): if inp.DockerfilePath == "-" { - dockerfileReader = inp.InStream + dockerfileReader = inp.InStream.NewReadCloser() } else if filepath.IsAbs(inp.DockerfilePath) { dockerfileDir = filepath.Dir(inp.DockerfilePath) dockerfileName = filepath.Base(inp.DockerfilePath) @@ -438,11 +439,11 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog } if inp.DockerfileInline != "" { - dockerfileReader = strings.NewReader(inp.DockerfileInline) + dockerfileReader = io.NopCloser(strings.NewReader(inp.DockerfileInline)) } if dockerfileReader != nil { - dockerfileDir, err = createTempDockerfile(dockerfileReader) + dockerfileDir, err = createTempDockerfile(dockerfileReader, inp.InStream) if err != nil { return nil, err } @@ -582,7 +583,7 @@ func setLocalMount(name, dir string, so *client.SolveOpt) error { return nil } -func createTempDockerfile(r io.Reader) (string, error) { +func createTempDockerfile(r io.Reader, multiReader *SyncMultiReader) (string, error) { dir, err := os.MkdirTemp("", "dockerfile") if err != nil { return "", err @@ -592,6 +593,16 @@ func createTempDockerfile(r io.Reader) (string, error) { return "", err } defer f.Close() + + if multiReader != nil { + dt, err := io.ReadAll(r) + if err != nil { + return "", err + } + multiReader.Reset(dt) + r = bytes.NewReader(dt) + } + if _, err := io.Copy(f, r); err != nil { return "", err } diff --git a/build/replicatedstream.go b/build/replicatedstream.go new file mode 100644 index 00000000..cdf2c724 --- /dev/null +++ b/build/replicatedstream.go @@ -0,0 +1,164 @@ +package build + +import ( + "bufio" + "bytes" + "io" + "sync" +) + +type SyncMultiReader struct { + source *bufio.Reader + buffer []byte + static []byte + mu sync.Mutex + cond *sync.Cond + readers []*syncReader + err error + offset int +} + +type syncReader struct { + mr *SyncMultiReader + offset int + closed bool +} + +func NewSyncMultiReader(source io.Reader) *SyncMultiReader { + mr := &SyncMultiReader{ + source: bufio.NewReader(source), + buffer: make([]byte, 0, 32*1024), + } + mr.cond = sync.NewCond(&mr.mu) + return mr +} + +func (mr *SyncMultiReader) Peek(n int) ([]byte, error) { + mr.mu.Lock() + defer mr.mu.Unlock() + + if mr.static != nil { + return mr.static[min(n, len(mr.static)):], nil + } + + return mr.source.Peek(n) +} + +func (mr *SyncMultiReader) Reset(dt []byte) { + mr.mu.Lock() + defer mr.mu.Unlock() + + mr.static = dt +} + +func (mr *SyncMultiReader) NewReadCloser() io.ReadCloser { + mr.mu.Lock() + defer mr.mu.Unlock() + + if mr.static != nil { + return io.NopCloser(bytes.NewReader(mr.static)) + } + + reader := &syncReader{ + mr: mr, + } + mr.readers = append(mr.readers, reader) + return reader +} + +func (sr *syncReader) Read(p []byte) (int, error) { + sr.mr.mu.Lock() + defer sr.mr.mu.Unlock() + + return sr.read(p) +} + +func (sr *syncReader) read(p []byte) (int, error) { + end := sr.mr.offset + len(sr.mr.buffer) + +loop0: + for { + if sr.closed { + return 0, io.EOF + } + + end := sr.mr.offset + len(sr.mr.buffer) + + if sr.mr.err != nil && sr.offset == end { + return 0, sr.mr.err + } + + start := sr.offset - sr.mr.offset + + dt := sr.mr.buffer[start:] + + if len(dt) > 0 { + n := copy(p, dt) + sr.offset += n + sr.mr.cond.Broadcast() + return n, nil + } + + // check for readers that have not caught up + hasOpen := false + for _, r := range sr.mr.readers { + if !r.closed { + hasOpen = true + } else { + continue + } + if r.offset < end { + sr.mr.cond.Wait() + continue loop0 + } + } + + if !hasOpen { + return 0, io.EOF + } + break + } + + last := sr.mr.offset + len(sr.mr.buffer) + // another reader has already updated the buffer + if last > end || sr.mr.err != nil { + return sr.read(p) + } + + sr.mr.offset += len(sr.mr.buffer) + + sr.mr.buffer = sr.mr.buffer[:cap(sr.mr.buffer)] + n, err := sr.mr.source.Read(sr.mr.buffer) + if n >= 0 { + sr.mr.buffer = sr.mr.buffer[:n] + } else { + sr.mr.buffer = sr.mr.buffer[:0] + } + + sr.mr.cond.Broadcast() + + if err != nil { + sr.mr.err = err + return 0, err + } + + nn := copy(p, sr.mr.buffer) + sr.offset += nn + + return nn, nil +} + +func (sr *syncReader) Close() error { + sr.mr.mu.Lock() + defer sr.mr.mu.Unlock() + + if sr.closed { + return nil + } + + sr.closed = true + + sr.mr.cond.Broadcast() + + return nil +} diff --git a/build/replicatedstream_test.go b/build/replicatedstream_test.go new file mode 100644 index 00000000..985303b4 --- /dev/null +++ b/build/replicatedstream_test.go @@ -0,0 +1,77 @@ +package build + +import ( + "bytes" + "crypto/rand" + "io" + mathrand "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func generateRandomData(size int) []byte { + data := make([]byte, size) + rand.Read(data) + return data +} +func TestSyncMultiReaderParallel(t *testing.T) { + data := generateRandomData(1024 * 1024) + source := bytes.NewReader(data) + mr := NewSyncMultiReader(source) + + var wg sync.WaitGroup + numReaders := 10 + bufferSize := 4096 * 4 + + readers := make([]io.ReadCloser, numReaders) + + for i := 0; i < numReaders; i++ { + readers[i] = mr.NewReadCloser() + } + + for i := 0; i < numReaders; i++ { + wg.Add(1) + go func(readerId int) { + defer wg.Done() + reader := readers[readerId] + defer reader.Close() + + totalRead := 0 + buf := make([]byte, bufferSize) + for totalRead < len(data) { + // Simulate random read sizes + readSize := mathrand.Intn(bufferSize) //nolint:gosec + n, err := reader.Read(buf[:readSize]) + + if n > 0 { + assert.Equal(t, data[totalRead:totalRead+n], buf[:n], "Reader %d mismatch", readerId) + totalRead += n + } + + if err == io.EOF { + assert.Equal(t, len(data), totalRead, "Reader %d EOF mismatch", readerId) + return + } + + require.NoError(t, err, "Reader %d error", readerId) + + if mathrand.Intn(1000) == 0 { //nolint:gosec + t.Logf("Reader %d closing", readerId) + // Simulate random close + return + } + + // Simulate random timing between reads + time.Sleep(time.Millisecond * time.Duration(mathrand.Intn(5))) //nolint:gosec + } + + assert.Equal(t, len(data), totalRead, "Reader %d total read mismatch", readerId) + }(i) + } + + wg.Wait() +} diff --git a/controller/build/build.go b/controller/build/build.go index aa290506..1f53ee03 100644 --- a/controller/build/build.go +++ b/controller/build/build.go @@ -48,7 +48,7 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build Inputs: build.Inputs{ ContextPath: in.ContextPath, DockerfilePath: in.DockerfileName, - InStream: inStream, + InStream: build.NewSyncMultiReader(inStream), NamedContexts: contexts, }, Ref: in.Ref,