diff --git a/build/build.go b/build/build.go index ce6b872a..dadcdcff 100644 --- a/build/build.go +++ b/build/build.go @@ -97,7 +97,7 @@ type CallFunc struct { type Inputs struct { ContextPath string DockerfilePath string - InStream io.Reader + InStream *SyncMultiReader ContextState *llb.State DockerfileInline string NamedContexts map[string]NamedContext diff --git a/build/opt.go b/build/opt.go index 3842e8dc..c8d52c91 100644 --- a/build/opt.go +++ b/build/opt.go @@ -1,11 +1,12 @@ package build import ( - "bufio" + "bytes" "context" "io" "os" "path/filepath" + "slices" "strconv" "strings" "syscall" @@ -260,7 +261,7 @@ func toSolveOpt(ctx context.Context, node builder.Node, multiDriver bool, opt Op } so.Exports = opt.Exports - so.Session = opt.Session + so.Session = slices.Clone(opt.Session) releaseLoad, err := loadInputs(ctx, nodeDriver, opt.Inputs, pw, &so) if err != nil { @@ -364,7 +365,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog var ( err error - dockerfileReader io.Reader + dockerfileReader io.ReadCloser dockerfileDir string dockerfileName = inp.DockerfilePath toRemove []string @@ -382,8 +383,8 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog return nil, errors.Errorf("invalid argument: can't use stdin for both build context and dockerfile") } - buf := bufio.NewReader(inp.InStream) - magic, err := buf.Peek(archiveHeaderSize * 2) + rc := inp.InStream.NewReadCloser() + magic, err := inp.InStream.Peek(archiveHeaderSize * 2) if err != nil && err != io.EOF { return nil, errors.Wrap(err, "failed to peek context header from STDIN") } @@ -391,14 +392,14 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog if isArchive(magic) { // stdin is context up := uploadprovider.New() - target.FrontendAttrs["context"] = up.Add(buf) + target.FrontendAttrs["context"] = up.Add(rc) target.Session = append(target.Session, up) } else { if inp.DockerfilePath != "" { return nil, errors.Errorf("ambiguous Dockerfile source: both stdin and flag correspond to Dockerfiles") } // stdin is dockerfile - dockerfileReader = buf + dockerfileReader = rc inp.ContextPath, _ = os.MkdirTemp("", "empty-dir") toRemove = append(toRemove, inp.ContextPath) if err := setLocalMount("context", inp.ContextPath, target); err != nil { @@ -417,7 +418,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog target.SharedKey = sharedKey switch inp.DockerfilePath { case "-": - dockerfileReader = inp.InStream + dockerfileReader = inp.InStream.NewReadCloser() case "": dockerfileDir = inp.ContextPath default: @@ -426,7 +427,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog } case IsRemoteURL(inp.ContextPath): if inp.DockerfilePath == "-" { - dockerfileReader = inp.InStream + dockerfileReader = inp.InStream.NewReadCloser() } else if filepath.IsAbs(inp.DockerfilePath) { dockerfileDir = filepath.Dir(inp.DockerfilePath) dockerfileName = filepath.Base(inp.DockerfilePath) @@ -438,11 +439,11 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog } if inp.DockerfileInline != "" { - dockerfileReader = strings.NewReader(inp.DockerfileInline) + dockerfileReader = io.NopCloser(strings.NewReader(inp.DockerfileInline)) } if dockerfileReader != nil { - dockerfileDir, err = createTempDockerfile(dockerfileReader) + dockerfileDir, err = createTempDockerfile(dockerfileReader, inp.InStream) if err != nil { return nil, err } @@ -582,7 +583,7 @@ func setLocalMount(name, dir string, so *client.SolveOpt) error { return nil } -func createTempDockerfile(r io.Reader) (string, error) { +func createTempDockerfile(r io.Reader, multiReader *SyncMultiReader) (string, error) { dir, err := os.MkdirTemp("", "dockerfile") if err != nil { return "", err @@ -592,6 +593,16 @@ func createTempDockerfile(r io.Reader) (string, error) { return "", err } defer f.Close() + + if multiReader != nil { + dt, err := io.ReadAll(r) + if err != nil { + return "", err + } + multiReader.Reset(dt) + r = bytes.NewReader(dt) + } + if _, err := io.Copy(f, r); err != nil { return "", err } diff --git a/build/replicatedstream.go b/build/replicatedstream.go new file mode 100644 index 00000000..cdf2c724 --- /dev/null +++ b/build/replicatedstream.go @@ -0,0 +1,164 @@ +package build + +import ( + "bufio" + "bytes" + "io" + "sync" +) + +type SyncMultiReader struct { + source *bufio.Reader + buffer []byte + static []byte + mu sync.Mutex + cond *sync.Cond + readers []*syncReader + err error + offset int +} + +type syncReader struct { + mr *SyncMultiReader + offset int + closed bool +} + +func NewSyncMultiReader(source io.Reader) *SyncMultiReader { + mr := &SyncMultiReader{ + source: bufio.NewReader(source), + buffer: make([]byte, 0, 32*1024), + } + mr.cond = sync.NewCond(&mr.mu) + return mr +} + +func (mr *SyncMultiReader) Peek(n int) ([]byte, error) { + mr.mu.Lock() + defer mr.mu.Unlock() + + if mr.static != nil { + return mr.static[min(n, len(mr.static)):], nil + } + + return mr.source.Peek(n) +} + +func (mr *SyncMultiReader) Reset(dt []byte) { + mr.mu.Lock() + defer mr.mu.Unlock() + + mr.static = dt +} + +func (mr *SyncMultiReader) NewReadCloser() io.ReadCloser { + mr.mu.Lock() + defer mr.mu.Unlock() + + if mr.static != nil { + return io.NopCloser(bytes.NewReader(mr.static)) + } + + reader := &syncReader{ + mr: mr, + } + mr.readers = append(mr.readers, reader) + return reader +} + +func (sr *syncReader) Read(p []byte) (int, error) { + sr.mr.mu.Lock() + defer sr.mr.mu.Unlock() + + return sr.read(p) +} + +func (sr *syncReader) read(p []byte) (int, error) { + end := sr.mr.offset + len(sr.mr.buffer) + +loop0: + for { + if sr.closed { + return 0, io.EOF + } + + end := sr.mr.offset + len(sr.mr.buffer) + + if sr.mr.err != nil && sr.offset == end { + return 0, sr.mr.err + } + + start := sr.offset - sr.mr.offset + + dt := sr.mr.buffer[start:] + + if len(dt) > 0 { + n := copy(p, dt) + sr.offset += n + sr.mr.cond.Broadcast() + return n, nil + } + + // check for readers that have not caught up + hasOpen := false + for _, r := range sr.mr.readers { + if !r.closed { + hasOpen = true + } else { + continue + } + if r.offset < end { + sr.mr.cond.Wait() + continue loop0 + } + } + + if !hasOpen { + return 0, io.EOF + } + break + } + + last := sr.mr.offset + len(sr.mr.buffer) + // another reader has already updated the buffer + if last > end || sr.mr.err != nil { + return sr.read(p) + } + + sr.mr.offset += len(sr.mr.buffer) + + sr.mr.buffer = sr.mr.buffer[:cap(sr.mr.buffer)] + n, err := sr.mr.source.Read(sr.mr.buffer) + if n >= 0 { + sr.mr.buffer = sr.mr.buffer[:n] + } else { + sr.mr.buffer = sr.mr.buffer[:0] + } + + sr.mr.cond.Broadcast() + + if err != nil { + sr.mr.err = err + return 0, err + } + + nn := copy(p, sr.mr.buffer) + sr.offset += nn + + return nn, nil +} + +func (sr *syncReader) Close() error { + sr.mr.mu.Lock() + defer sr.mr.mu.Unlock() + + if sr.closed { + return nil + } + + sr.closed = true + + sr.mr.cond.Broadcast() + + return nil +} diff --git a/build/replicatedstream_test.go b/build/replicatedstream_test.go new file mode 100644 index 00000000..985303b4 --- /dev/null +++ b/build/replicatedstream_test.go @@ -0,0 +1,77 @@ +package build + +import ( + "bytes" + "crypto/rand" + "io" + mathrand "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func generateRandomData(size int) []byte { + data := make([]byte, size) + rand.Read(data) + return data +} +func TestSyncMultiReaderParallel(t *testing.T) { + data := generateRandomData(1024 * 1024) + source := bytes.NewReader(data) + mr := NewSyncMultiReader(source) + + var wg sync.WaitGroup + numReaders := 10 + bufferSize := 4096 * 4 + + readers := make([]io.ReadCloser, numReaders) + + for i := 0; i < numReaders; i++ { + readers[i] = mr.NewReadCloser() + } + + for i := 0; i < numReaders; i++ { + wg.Add(1) + go func(readerId int) { + defer wg.Done() + reader := readers[readerId] + defer reader.Close() + + totalRead := 0 + buf := make([]byte, bufferSize) + for totalRead < len(data) { + // Simulate random read sizes + readSize := mathrand.Intn(bufferSize) //nolint:gosec + n, err := reader.Read(buf[:readSize]) + + if n > 0 { + assert.Equal(t, data[totalRead:totalRead+n], buf[:n], "Reader %d mismatch", readerId) + totalRead += n + } + + if err == io.EOF { + assert.Equal(t, len(data), totalRead, "Reader %d EOF mismatch", readerId) + return + } + + require.NoError(t, err, "Reader %d error", readerId) + + if mathrand.Intn(1000) == 0 { //nolint:gosec + t.Logf("Reader %d closing", readerId) + // Simulate random close + return + } + + // Simulate random timing between reads + time.Sleep(time.Millisecond * time.Duration(mathrand.Intn(5))) //nolint:gosec + } + + assert.Equal(t, len(data), totalRead, "Reader %d total read mismatch", readerId) + }(i) + } + + wg.Wait() +} diff --git a/controller/build/build.go b/controller/build/build.go index aa290506..1f53ee03 100644 --- a/controller/build/build.go +++ b/controller/build/build.go @@ -48,7 +48,7 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build Inputs: build.Inputs{ ContextPath: in.ContextPath, DockerfilePath: in.DockerfileName, - InStream: inStream, + InStream: build.NewSyncMultiReader(inStream), NamedContexts: contexts, }, Ref: in.Ref,