mirror of https://github.com/docker/buildx.git
Merge pull request #2112 from cpuguy83/dialstdio
Add dial-stdio command
This commit is contained in:
commit
481384b185
|
@ -0,0 +1,62 @@
|
|||
package build
|
||||
|
||||
import (
|
||||
"context"
|
||||
stderrors "errors"
|
||||
"net"
|
||||
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/docker/buildx/builder"
|
||||
"github.com/docker/buildx/util/progress"
|
||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func Dial(ctx context.Context, nodes []builder.Node, pw progress.Writer, platform *v1.Platform) (net.Conn, error) {
|
||||
nodes, err := filterAvailableNodes(nodes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(nodes) == 0 {
|
||||
return nil, errors.New("no nodes available")
|
||||
}
|
||||
|
||||
var pls []v1.Platform
|
||||
if platform != nil {
|
||||
pls = []v1.Platform{*platform}
|
||||
}
|
||||
|
||||
opts := map[string]Options{"default": {Platforms: pls}}
|
||||
resolved, err := resolveDrivers(ctx, nodes, opts, pw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var dialError error
|
||||
for _, ls := range resolved {
|
||||
for _, rn := range ls {
|
||||
if platform != nil {
|
||||
p := *platform
|
||||
var found bool
|
||||
for _, pp := range rn.platforms {
|
||||
if platforms.Only(p).Match(pp) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
conn, err := nodes[rn.driverIndex].Driver.Dial(ctx)
|
||||
if err == nil {
|
||||
return conn, nil
|
||||
}
|
||||
dialError = stderrors.Join(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(dialError, "no nodes available")
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
package commands
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/docker/buildx/build"
|
||||
"github.com/docker/buildx/builder"
|
||||
"github.com/docker/buildx/util/progress"
|
||||
"github.com/docker/cli/cli/command"
|
||||
"github.com/moby/buildkit/util/appcontext"
|
||||
"github.com/moby/buildkit/util/progress/progressui"
|
||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type stdioOptions struct {
|
||||
builder string
|
||||
platform string
|
||||
progress string
|
||||
}
|
||||
|
||||
func runDialStdio(dockerCli command.Cli, opts stdioOptions) error {
|
||||
ctx := appcontext.Context()
|
||||
|
||||
contextPathHash, _ := os.Getwd()
|
||||
b, err := builder.New(dockerCli,
|
||||
builder.WithName(opts.builder),
|
||||
builder.WithContextPathHash(contextPathHash),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = updateLastActivity(dockerCli, b.NodeGroup); err != nil {
|
||||
return errors.Wrapf(err, "failed to update builder last activity time")
|
||||
}
|
||||
nodes, err := b.LoadNodes(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
printer, err := progress.NewPrinter(ctx, os.Stderr, progressui.DisplayMode(opts.progress), progress.WithPhase("dial-stdio"), progress.WithDesc("builder: "+b.Name, "builder:"+b.Name))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var p *v1.Platform
|
||||
if opts.platform != "" {
|
||||
pp, err := platforms.Parse(opts.platform)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "invalid platform %q", opts.platform)
|
||||
}
|
||||
p = &pp
|
||||
}
|
||||
|
||||
defer printer.Wait()
|
||||
|
||||
return progress.Wrap("Proxying to builder", printer.Write, func(sub progress.SubLogger) error {
|
||||
var conn net.Conn
|
||||
|
||||
err := sub.Wrap("Dialing builder", func() error {
|
||||
conn, err = build.Dial(ctx, nodes, printer, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer conn.Close()
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
closeWrite(conn)
|
||||
}()
|
||||
|
||||
var eg errgroup.Group
|
||||
|
||||
eg.Go(func() error {
|
||||
_, err := io.Copy(conn, os.Stdin)
|
||||
closeWrite(conn)
|
||||
return err
|
||||
})
|
||||
eg.Go(func() error {
|
||||
_, err := io.Copy(os.Stdout, conn)
|
||||
closeRead(conn)
|
||||
return err
|
||||
})
|
||||
return eg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
func closeRead(conn net.Conn) error {
|
||||
if c, ok := conn.(interface{ CloseRead() error }); ok {
|
||||
return c.CloseRead()
|
||||
}
|
||||
return conn.Close()
|
||||
}
|
||||
|
||||
func closeWrite(conn net.Conn) error {
|
||||
if c, ok := conn.(interface{ CloseWrite() error }); ok {
|
||||
return c.CloseWrite()
|
||||
}
|
||||
return conn.Close()
|
||||
}
|
||||
|
||||
func dialStdioCmd(dockerCli command.Cli, rootOpts *rootOptions) *cobra.Command {
|
||||
opts := stdioOptions{}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "dial-stdio",
|
||||
Short: "Proxy current stdio streams to builder instance",
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
opts.builder = rootOpts.builder
|
||||
return runDialStdio(dockerCli, opts)
|
||||
},
|
||||
}
|
||||
|
||||
flags := cmd.Flags()
|
||||
cmd.Flags()
|
||||
flags.StringVar(&opts.platform, "platform", os.Getenv("DOCKER_DEFAULT_PLATFORM"), "Target platform: this is used for node selection")
|
||||
flags.StringVar(&opts.progress, "progress", "quiet", "Set type of progress output (auto, plain, tty).")
|
||||
return cmd
|
||||
}
|
|
@ -83,6 +83,7 @@ func addCommands(cmd *cobra.Command, dockerCli command.Cli) {
|
|||
buildCmd(dockerCli, opts, nil),
|
||||
bakeCmd(dockerCli, opts),
|
||||
createCmd(dockerCli),
|
||||
dialStdioCmd(dockerCli, opts),
|
||||
rmCmd(dockerCli, opts),
|
||||
lsCmd(dockerCli),
|
||||
useCmd(dockerCli, opts),
|
||||
|
|
|
@ -10,11 +10,12 @@ Extended build capabilities with BuildKit
|
|||
### Subcommands
|
||||
|
||||
| Name | Description |
|
||||
|:-------------------------------------|:---------------------------------------|
|
||||
|:-------------------------------------|:------------------------------------------------|
|
||||
| [`bake`](buildx_bake.md) | Build from a file |
|
||||
| [`build`](buildx_build.md) | Start a build |
|
||||
| [`create`](buildx_create.md) | Create a new builder instance |
|
||||
| [`debug`](buildx_debug.md) | Start debugger (EXPERIMENTAL) |
|
||||
| [`dial-stdio`](buildx_dial-stdio.md) | Proxy current stdio streams to builder instance |
|
||||
| [`du`](buildx_du.md) | Disk usage |
|
||||
| [`imagetools`](buildx_imagetools.md) | Commands to work on images in registry |
|
||||
| [`inspect`](buildx_inspect.md) | Inspect current builder instance |
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
# docker buildx dial-stdio
|
||||
|
||||
<!---MARKER_GEN_START-->
|
||||
Proxy current stdio streams to builder instance
|
||||
|
||||
### Options
|
||||
|
||||
| Name | Type | Default | Description |
|
||||
|:-------------|:---------|:--------|:-------------------------------------------------|
|
||||
| `--builder` | `string` | | Override the configured builder instance |
|
||||
| `--platform` | `string` | | Target platform: this is used for node selection |
|
||||
| `--progress` | `string` | `quiet` | Set type of progress output (auto, plain, tty). |
|
||||
|
||||
|
||||
<!---MARKER_GEN_END-->
|
||||
|
||||
## Description
|
||||
|
||||
dial-stdio uses the stdin and stdout streams of the command to proxy to the configured builder instance.
|
||||
It is not intended to be used by humans, but rather by other tools that want to interact with the builder instance via BuildKit API.
|
||||
|
||||
## Examples
|
||||
|
||||
Example go program that uses the dial-stdio command wire up a buildkit client.
|
||||
This is for example use only and may not be suitable for production use.
|
||||
|
||||
```go
|
||||
client.New(ctx, "", client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
||||
c1, c2 := net.Pipe()
|
||||
cmd := exec.Command("docker", "buildx", "dial-stdio")
|
||||
cmd.Stdin = c1
|
||||
cmd.Stdout = c1
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
c1.Close()
|
||||
c2.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
cmd.Wait()
|
||||
c2.Close()
|
||||
}()
|
||||
|
||||
return c2
|
||||
}))
|
||||
```
|
|
@ -386,13 +386,20 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
|
||||
func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
|
||||
_, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn = demuxConn(conn)
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
|
||||
conn, err := d.Dial(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
exp, _, err := detect.Exporter()
|
||||
if err != nil {
|
||||
|
|
|
@ -57,10 +57,14 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
|
||||
return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta)
|
||||
}
|
||||
|
||||
func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
|
||||
opts := []client.ClientOpt{
|
||||
client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
||||
return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta)
|
||||
return d.Dial(ctx)
|
||||
}), client.WithSessionDialer(func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) {
|
||||
return d.DockerAPI.DialHijack(ctx, "/session", proto, meta)
|
||||
}),
|
||||
|
|
|
@ -59,6 +59,7 @@ type Driver interface {
|
|||
Version(context.Context) (string, error)
|
||||
Stop(ctx context.Context, force bool) error
|
||||
Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error
|
||||
Dial(ctx context.Context) (net.Conn, error)
|
||||
Client(ctx context.Context) (*client.Client, error)
|
||||
Features(ctx context.Context) map[Feature]bool
|
||||
HostGatewayIP(ctx context.Context) (net.IP, error)
|
||||
|
|
|
@ -189,7 +189,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
|
||||
func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
|
||||
restClient := d.clientset.CoreV1().RESTClient()
|
||||
restClientConfig, err := d.KubeClientConfig.ClientConfig()
|
||||
if err != nil {
|
||||
|
@ -208,7 +208,10 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
|
||||
exp, _, err := detect.Exporter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -216,7 +219,7 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
|
|||
|
||||
var opts []client.ClientOpt
|
||||
opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
||||
return conn, nil
|
||||
return d.Dial(ctx)
|
||||
}))
|
||||
if td, ok := exp.(client.TracerDelegate); ok {
|
||||
opts = append(opts, client.WithTracerDelegate(td))
|
||||
|
|
|
@ -2,14 +2,18 @@ package remote
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/docker/buildx/driver"
|
||||
"github.com/docker/buildx/util/progress"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/util/tracing/detect"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Driver struct {
|
||||
|
@ -82,14 +86,61 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
|
|||
opts = append(opts, client.WithTracerDelegate(td))
|
||||
}
|
||||
|
||||
if d.tlsOpts != nil {
|
||||
opts = append(opts, []client.ClientOpt{
|
||||
client.WithServerConfig(d.tlsOpts.serverName, d.tlsOpts.caCert),
|
||||
client.WithCredentials(d.tlsOpts.cert, d.tlsOpts.key),
|
||||
}...)
|
||||
opts = append(opts, client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
|
||||
return d.Dial(ctx)
|
||||
}))
|
||||
|
||||
return client.New(ctx, "", opts...)
|
||||
}
|
||||
|
||||
func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
|
||||
network, addr, ok := strings.Cut(d.InitConfig.EndpointAddr, "://")
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid endpoint address: %s", d.InitConfig.EndpointAddr)
|
||||
}
|
||||
|
||||
return client.New(ctx, d.InitConfig.EndpointAddr, opts...)
|
||||
dialer := &net.Dialer{}
|
||||
|
||||
conn, err := dialer.DialContext(ctx, network, addr)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if d.tlsOpts != nil {
|
||||
cfg, err := loadTLS(d.tlsOpts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error loading tls config")
|
||||
}
|
||||
conn = tls.Client(conn, cfg)
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func loadTLS(opts *tlsOpts) (*tls.Config, error) {
|
||||
cfg := &tls.Config{
|
||||
ServerName: opts.serverName,
|
||||
RootCAs: x509.NewCertPool(),
|
||||
}
|
||||
|
||||
if opts.caCert != "" {
|
||||
ca, err := os.ReadFile(opts.caCert)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not read ca certificate")
|
||||
}
|
||||
if ok := cfg.RootCAs.AppendCertsFromPEM(ca); !ok {
|
||||
return nil, errors.New("failed to append ca certs")
|
||||
}
|
||||
}
|
||||
|
||||
if opts.cert != "" || opts.key != "" {
|
||||
cert, err := tls.LoadX509KeyPair(opts.cert, opts.key)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not read certificate/key")
|
||||
}
|
||||
cfg.Certificates = append(cfg.Certificates, cert)
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (d *Driver) Features(ctx context.Context) map[driver.Feature]bool {
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
package tests
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/buildx/util/progress"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/client/llb"
|
||||
gwclient "github.com/moby/buildkit/frontend/gateway/client"
|
||||
"github.com/moby/buildkit/util/progress/progressui"
|
||||
"github.com/moby/buildkit/util/testutil/integration"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var dialstdioTests = []func(t *testing.T, sb integration.Sandbox){
|
||||
testDialStdio,
|
||||
}
|
||||
|
||||
func testDialStdio(t *testing.T, sb integration.Sandbox) {
|
||||
do := func(t *testing.T, pipe func(t *testing.T, cmd *exec.Cmd) net.Conn) {
|
||||
errBuf := bytes.NewBuffer(nil)
|
||||
defer func() {
|
||||
if t.Failed() {
|
||||
t.Log(errBuf.String())
|
||||
}
|
||||
}()
|
||||
var cmd *exec.Cmd
|
||||
c, err := client.New(sb.Context(), "", client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
|
||||
cmd = buildxCmd(sb, withArgs("dial-stdio", "--progress", "auto"))
|
||||
conn := pipe(t, cmd)
|
||||
cmd.Stderr = errBuf
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, errors.Wrap(err, errBuf.String())
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
c.Close()
|
||||
// Since the client is closed (and as such the connection shutdown), the buildx command should exit cleanly.
|
||||
chErr := make(chan error, 1)
|
||||
go func() {
|
||||
chErr <- cmd.Wait()
|
||||
}()
|
||||
select {
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Error("timeout waiting for buildx command to exit")
|
||||
case <-chErr:
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = c.Info(sb.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Contains(t, errBuf.String(), "builder: "+sb.Address())
|
||||
|
||||
dir := t.TempDir()
|
||||
|
||||
f, err := os.CreateTemp(dir, "log")
|
||||
require.NoError(t, err)
|
||||
defer f.Close()
|
||||
|
||||
defer func() {
|
||||
if t.Failed() {
|
||||
dt, _ := os.ReadFile(f.Name())
|
||||
t.Log(string(dt))
|
||||
}
|
||||
}()
|
||||
|
||||
p, err := progress.NewPrinter(sb.Context(), f, progressui.AutoMode)
|
||||
require.NoError(t, err)
|
||||
|
||||
ch, chDone := progress.NewChannel(p)
|
||||
done := func() {
|
||||
select {
|
||||
case <-sb.Context().Done():
|
||||
case <-chDone:
|
||||
}
|
||||
}
|
||||
|
||||
_, err = c.Build(sb.Context(), client.SolveOpt{
|
||||
Exports: []client.ExportEntry{
|
||||
{Type: "local", OutputDir: dir},
|
||||
},
|
||||
}, "", func(ctx context.Context, gwc gwclient.Client) (*gwclient.Result, error) {
|
||||
def, err := llb.Scratch().File(llb.Mkfile("hello", 0o600, []byte("world"))).Marshal(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return gwc.Solve(ctx, gwclient.SolveRequest{
|
||||
Definition: def.ToPB(),
|
||||
})
|
||||
}, ch)
|
||||
done()
|
||||
require.NoError(t, err)
|
||||
|
||||
dt, err := os.ReadFile(filepath.Join(dir, "hello"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "world", string(dt))
|
||||
}
|
||||
|
||||
t.Run("conn=netpipe", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
do(t, func(t *testing.T, cmd *exec.Cmd) net.Conn {
|
||||
c1, c2 := net.Pipe()
|
||||
cmd.Stdin = c1
|
||||
cmd.Stdout = c1
|
||||
return c2
|
||||
})
|
||||
})
|
||||
}
|
|
@ -29,6 +29,7 @@ func TestIntegration(t *testing.T) {
|
|||
tests = append(tests, versionTests...)
|
||||
tests = append(tests, createTests...)
|
||||
tests = append(tests, rmTests...)
|
||||
tests = append(tests, dialstdioTests...)
|
||||
testIntegration(t, tests...)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue