From 760244ee3ed0e0d6650627c36e2985f6ac4b9d66 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 13 Nov 2023 23:57:12 +0000 Subject: [PATCH] Add dial-stdio command This allows the buildx CLI to act a proxy to the configured instance. It allows external code to use buildx itself as a driver for connecting to buildkitd instances. Instance and node selection should follow the same semantics as as `buildx build`, including taking into account the `BUILDX_BUILDER` env var and the `--builder` global flag. Signed-off-by: Brian Goff --- build/dial.go | 62 +++++++++++++ commands/dial_stdio.go | 132 ++++++++++++++++++++++++++++ commands/root.go | 1 + docs/reference/buildx.md | 31 +++---- docs/reference/buildx_dial-stdio.md | 47 ++++++++++ driver/docker-container/driver.go | 11 ++- driver/docker/driver.go | 6 +- driver/driver.go | 1 + driver/kubernetes/driver.go | 7 +- driver/remote/driver.go | 65 ++++++++++++-- tests/dialstdio.go | 124 ++++++++++++++++++++++++++ tests/integration_test.go | 1 + 12 files changed, 461 insertions(+), 27 deletions(-) create mode 100644 build/dial.go create mode 100644 commands/dial_stdio.go create mode 100644 docs/reference/buildx_dial-stdio.md create mode 100644 tests/dialstdio.go diff --git a/build/dial.go b/build/dial.go new file mode 100644 index 00000000..78391ebb --- /dev/null +++ b/build/dial.go @@ -0,0 +1,62 @@ +package build + +import ( + "context" + stderrors "errors" + "net" + + "github.com/containerd/containerd/platforms" + "github.com/docker/buildx/builder" + "github.com/docker/buildx/util/progress" + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" +) + +func Dial(ctx context.Context, nodes []builder.Node, pw progress.Writer, platform *v1.Platform) (net.Conn, error) { + nodes, err := filterAvailableNodes(nodes) + if err != nil { + return nil, err + } + + if len(nodes) == 0 { + return nil, errors.New("no nodes available") + } + + var pls []v1.Platform + if platform != nil { + pls = []v1.Platform{*platform} + } + + opts := map[string]Options{"default": {Platforms: pls}} + resolved, err := resolveDrivers(ctx, nodes, opts, pw) + if err != nil { + return nil, err + } + + var dialError error + for _, ls := range resolved { + for _, rn := range ls { + if platform != nil { + p := *platform + var found bool + for _, pp := range rn.platforms { + if platforms.Only(p).Match(pp) { + found = true + break + } + } + if !found { + continue + } + } + + conn, err := nodes[rn.driverIndex].Driver.Dial(ctx) + if err == nil { + return conn, nil + } + dialError = stderrors.Join(err) + } + } + + return nil, errors.Wrap(dialError, "no nodes available") +} diff --git a/commands/dial_stdio.go b/commands/dial_stdio.go new file mode 100644 index 00000000..3ab1c465 --- /dev/null +++ b/commands/dial_stdio.go @@ -0,0 +1,132 @@ +package commands + +import ( + "io" + "net" + "os" + + "github.com/containerd/containerd/platforms" + "github.com/docker/buildx/build" + "github.com/docker/buildx/builder" + "github.com/docker/buildx/util/progress" + "github.com/docker/cli/cli/command" + "github.com/moby/buildkit/util/appcontext" + "github.com/moby/buildkit/util/progress/progressui" + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" +) + +type stdioOptions struct { + builder string + platform string + progress string +} + +func runDialStdio(dockerCli command.Cli, opts stdioOptions) error { + ctx := appcontext.Context() + + contextPathHash, _ := os.Getwd() + b, err := builder.New(dockerCli, + builder.WithName(opts.builder), + builder.WithContextPathHash(contextPathHash), + ) + if err != nil { + return err + } + + if err = updateLastActivity(dockerCli, b.NodeGroup); err != nil { + return errors.Wrapf(err, "failed to update builder last activity time") + } + nodes, err := b.LoadNodes(ctx) + if err != nil { + return err + } + + printer, err := progress.NewPrinter(ctx, os.Stderr, progressui.DisplayMode(opts.progress), progress.WithPhase("dial-stdio"), progress.WithDesc("builder: "+b.Name, "builder:"+b.Name)) + if err != nil { + return err + } + + var p *v1.Platform + if opts.platform != "" { + pp, err := platforms.Parse(opts.platform) + if err != nil { + return errors.Wrapf(err, "invalid platform %q", opts.platform) + } + p = &pp + } + + defer printer.Wait() + + return progress.Wrap("Proxying to builder", printer.Write, func(sub progress.SubLogger) error { + var conn net.Conn + + err := sub.Wrap("Dialing builder", func() error { + conn, err = build.Dial(ctx, nodes, printer, p) + if err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + + defer conn.Close() + + go func() { + <-ctx.Done() + closeWrite(conn) + }() + + var eg errgroup.Group + + eg.Go(func() error { + _, err := io.Copy(conn, os.Stdin) + closeWrite(conn) + return err + }) + eg.Go(func() error { + _, err := io.Copy(os.Stdout, conn) + closeRead(conn) + return err + }) + return eg.Wait() + }) +} + +func closeRead(conn net.Conn) error { + if c, ok := conn.(interface{ CloseRead() error }); ok { + return c.CloseRead() + } + return conn.Close() +} + +func closeWrite(conn net.Conn) error { + if c, ok := conn.(interface{ CloseWrite() error }); ok { + return c.CloseWrite() + } + return conn.Close() +} + +func dialStdioCmd(dockerCli command.Cli, rootOpts *rootOptions) *cobra.Command { + opts := stdioOptions{} + + cmd := &cobra.Command{ + Use: "dial-stdio", + Short: "Proxy current stdio streams to builder instance", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + opts.builder = rootOpts.builder + return runDialStdio(dockerCli, opts) + }, + } + + flags := cmd.Flags() + cmd.Flags() + flags.StringVar(&opts.platform, "platform", os.Getenv("DOCKER_DEFAULT_PLATFORM"), "Target platform: this is used for node selection") + flags.StringVar(&opts.progress, "progress", "quiet", "Set type of progress output (auto, plain, tty).") + return cmd +} diff --git a/commands/root.go b/commands/root.go index 1c862489..cf351200 100644 --- a/commands/root.go +++ b/commands/root.go @@ -83,6 +83,7 @@ func addCommands(cmd *cobra.Command, dockerCli command.Cli) { buildCmd(dockerCli, opts, nil), bakeCmd(dockerCli, opts), createCmd(dockerCli), + dialStdioCmd(dockerCli, opts), rmCmd(dockerCli, opts), lsCmd(dockerCli), useCmd(dockerCli, opts), diff --git a/docs/reference/buildx.md b/docs/reference/buildx.md index 1ca845ba..2dd0b741 100644 --- a/docs/reference/buildx.md +++ b/docs/reference/buildx.md @@ -9,21 +9,22 @@ Extended build capabilities with BuildKit ### Subcommands -| Name | Description | -|:-------------------------------------|:---------------------------------------| -| [`bake`](buildx_bake.md) | Build from a file | -| [`build`](buildx_build.md) | Start a build | -| [`create`](buildx_create.md) | Create a new builder instance | -| [`debug`](buildx_debug.md) | Start debugger (EXPERIMENTAL) | -| [`du`](buildx_du.md) | Disk usage | -| [`imagetools`](buildx_imagetools.md) | Commands to work on images in registry | -| [`inspect`](buildx_inspect.md) | Inspect current builder instance | -| [`ls`](buildx_ls.md) | List builder instances | -| [`prune`](buildx_prune.md) | Remove build cache | -| [`rm`](buildx_rm.md) | Remove one or more builder instances | -| [`stop`](buildx_stop.md) | Stop builder instance | -| [`use`](buildx_use.md) | Set the current builder instance | -| [`version`](buildx_version.md) | Show buildx version information | +| Name | Description | +|:-------------------------------------|:------------------------------------------------| +| [`bake`](buildx_bake.md) | Build from a file | +| [`build`](buildx_build.md) | Start a build | +| [`create`](buildx_create.md) | Create a new builder instance | +| [`debug`](buildx_debug.md) | Start debugger (EXPERIMENTAL) | +| [`dial-stdio`](buildx_dial-stdio.md) | Proxy current stdio streams to builder instance | +| [`du`](buildx_du.md) | Disk usage | +| [`imagetools`](buildx_imagetools.md) | Commands to work on images in registry | +| [`inspect`](buildx_inspect.md) | Inspect current builder instance | +| [`ls`](buildx_ls.md) | List builder instances | +| [`prune`](buildx_prune.md) | Remove build cache | +| [`rm`](buildx_rm.md) | Remove one or more builder instances | +| [`stop`](buildx_stop.md) | Stop builder instance | +| [`use`](buildx_use.md) | Set the current builder instance | +| [`version`](buildx_version.md) | Show buildx version information | ### Options diff --git a/docs/reference/buildx_dial-stdio.md b/docs/reference/buildx_dial-stdio.md new file mode 100644 index 00000000..0703f373 --- /dev/null +++ b/docs/reference/buildx_dial-stdio.md @@ -0,0 +1,47 @@ +# docker buildx dial-stdio + + +Proxy current stdio streams to builder instance + +### Options + +| Name | Type | Default | Description | +|:-------------|:---------|:--------|:-------------------------------------------------| +| `--builder` | `string` | | Override the configured builder instance | +| `--platform` | `string` | | Target platform: this is used for node selection | +| `--progress` | `string` | `quiet` | Set type of progress output (auto, plain, tty). | + + + + +## Description + +dial-stdio uses the stdin and stdout streams of the command to proxy to the configured builder instance. +It is not intended to be used by humans, but rather by other tools that want to interact with the builder instance via BuildKit API. + +## Examples + +Example go program that uses the dial-stdio command wire up a buildkit client. +This is for example use only and may not be suitable for production use. + +```go +client.New(ctx, "", client.WithContextDialer(func(context.Context, string) (net.Conn, error) { + c1, c2 := net.Pipe() + cmd := exec.Command("docker", "buildx", "dial-stdio") + cmd.Stdin = c1 + cmd.Stdout = c1 + + if err := cmd.Start(); err != nil { + c1.Close() + c2.Close() + return nil, err + } + + go func() { + cmd.Wait() + c2.Close() + }() + + return c2 +})) +``` \ No newline at end of file diff --git a/driver/docker-container/driver.go b/driver/docker-container/driver.go index 4998b0cc..53350cea 100644 --- a/driver/docker-container/driver.go +++ b/driver/docker-container/driver.go @@ -384,13 +384,20 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { _, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"}) if err != nil { return nil, err } - conn = demuxConn(conn) + return conn, nil +} + +func (d *Driver) Client(ctx context.Context) (*client.Client, error) { + conn, err := d.Dial(ctx) + if err != nil { + return nil, err + } exp, _, err := detect.Exporter() if err != nil { diff --git a/driver/docker/driver.go b/driver/docker/driver.go index 2c8b1637..85f4248c 100644 --- a/driver/docker/driver.go +++ b/driver/docker/driver.go @@ -57,10 +57,14 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { + return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta) +} + func (d *Driver) Client(ctx context.Context) (*client.Client, error) { opts := []client.ClientOpt{ client.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta) + return d.Dial(ctx) }), client.WithSessionDialer(func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) { return d.DockerAPI.DialHijack(ctx, "/session", proto, meta) }), diff --git a/driver/driver.go b/driver/driver.go index 16d43d7a..6d3c5467 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -59,6 +59,7 @@ type Driver interface { Version(context.Context) (string, error) Stop(ctx context.Context, force bool) error Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error + Dial(ctx context.Context) (net.Conn, error) Client(ctx context.Context) (*client.Client, error) Features(ctx context.Context) map[Feature]bool HostGatewayIP(ctx context.Context) (net.IP, error) diff --git a/driver/kubernetes/driver.go b/driver/kubernetes/driver.go index 9ce1dd3e..29ffb21e 100644 --- a/driver/kubernetes/driver.go +++ b/driver/kubernetes/driver.go @@ -189,7 +189,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { restClient := d.clientset.CoreV1().RESTClient() restClientConfig, err := d.KubeClientConfig.ClientConfig() if err != nil { @@ -208,7 +208,10 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { if err != nil { return nil, err } + return conn, nil +} +func (d *Driver) Client(ctx context.Context) (*client.Client, error) { exp, _, err := detect.Exporter() if err != nil { return nil, err @@ -216,7 +219,7 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { var opts []client.ClientOpt opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return conn, nil + return d.Dial(ctx) })) if td, ok := exp.(client.TracerDelegate); ok { opts = append(opts, client.WithTracerDelegate(td)) diff --git a/driver/remote/driver.go b/driver/remote/driver.go index e4e2b057..2f2ab589 100644 --- a/driver/remote/driver.go +++ b/driver/remote/driver.go @@ -2,14 +2,18 @@ package remote import ( "context" - "errors" + "crypto/tls" + "crypto/x509" "net" + "os" + "strings" "time" "github.com/docker/buildx/driver" "github.com/docker/buildx/util/progress" "github.com/moby/buildkit/client" "github.com/moby/buildkit/util/tracing/detect" + "github.com/pkg/errors" ) type Driver struct { @@ -82,14 +86,61 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { opts = append(opts, client.WithTracerDelegate(td)) } - if d.tlsOpts != nil { - opts = append(opts, []client.ClientOpt{ - client.WithServerConfig(d.tlsOpts.serverName, d.tlsOpts.caCert), - client.WithCredentials(d.tlsOpts.cert, d.tlsOpts.key), - }...) + opts = append(opts, client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + return d.Dial(ctx) + })) + + return client.New(ctx, "", opts...) +} + +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { + network, addr, ok := strings.Cut(d.InitConfig.EndpointAddr, "://") + if !ok { + return nil, errors.Errorf("invalid endpoint address: %s", d.InitConfig.EndpointAddr) } - return client.New(ctx, d.InitConfig.EndpointAddr, opts...) + dialer := &net.Dialer{} + + conn, err := dialer.DialContext(ctx, network, addr) + if err != nil { + return nil, errors.WithStack(err) + } + + if d.tlsOpts != nil { + cfg, err := loadTLS(d.tlsOpts) + if err != nil { + return nil, errors.Wrap(err, "error loading tls config") + } + conn = tls.Client(conn, cfg) + } + return conn, nil +} + +func loadTLS(opts *tlsOpts) (*tls.Config, error) { + cfg := &tls.Config{ + ServerName: opts.serverName, + RootCAs: x509.NewCertPool(), + } + + if opts.caCert != "" { + ca, err := os.ReadFile(opts.caCert) + if err != nil { + return nil, errors.Wrap(err, "could not read ca certificate") + } + if ok := cfg.RootCAs.AppendCertsFromPEM(ca); !ok { + return nil, errors.New("failed to append ca certs") + } + } + + if opts.cert != "" || opts.key != "" { + cert, err := tls.LoadX509KeyPair(opts.cert, opts.key) + if err != nil { + return nil, errors.Wrap(err, "could not read certificate/key") + } + cfg.Certificates = append(cfg.Certificates, cert) + } + + return cfg, nil } func (d *Driver) Features(ctx context.Context) map[driver.Feature]bool { diff --git a/tests/dialstdio.go b/tests/dialstdio.go new file mode 100644 index 00000000..afb441cb --- /dev/null +++ b/tests/dialstdio.go @@ -0,0 +1,124 @@ +package tests + +import ( + "bytes" + "context" + "net" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/docker/buildx/util/progress" + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/client/llb" + gwclient "github.com/moby/buildkit/frontend/gateway/client" + "github.com/moby/buildkit/util/progress/progressui" + "github.com/moby/buildkit/util/testutil/integration" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var dialstdioTests = []func(t *testing.T, sb integration.Sandbox){ + testDialStdio, +} + +func testDialStdio(t *testing.T, sb integration.Sandbox) { + do := func(t *testing.T, pipe func(t *testing.T, cmd *exec.Cmd) net.Conn) { + errBuf := bytes.NewBuffer(nil) + defer func() { + if t.Failed() { + t.Log(errBuf.String()) + } + }() + var cmd *exec.Cmd + c, err := client.New(sb.Context(), "", client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + cmd = buildxCmd(sb, withArgs("dial-stdio", "--progress", "auto")) + conn := pipe(t, cmd) + cmd.Stderr = errBuf + if err := cmd.Start(); err != nil { + return nil, errors.Wrap(err, errBuf.String()) + } + + return conn, nil + })) + require.NoError(t, err) + + defer func() { + c.Close() + // Since the client is closed (and as such the connection shutdown), the buildx command should exit cleanly. + chErr := make(chan error, 1) + go func() { + chErr <- cmd.Wait() + }() + select { + case <-time.After(10 * time.Second): + t.Error("timeout waiting for buildx command to exit") + case <-chErr: + assert.NoError(t, err) + } + }() + + _, err = c.Info(sb.Context()) + require.NoError(t, err) + + require.Contains(t, errBuf.String(), "builder: "+sb.Address()) + + dir := t.TempDir() + + f, err := os.CreateTemp(dir, "log") + require.NoError(t, err) + defer f.Close() + + defer func() { + if t.Failed() { + dt, _ := os.ReadFile(f.Name()) + t.Log(string(dt)) + } + }() + + p, err := progress.NewPrinter(sb.Context(), f, progressui.AutoMode) + require.NoError(t, err) + + ch, chDone := progress.NewChannel(p) + done := func() { + select { + case <-sb.Context().Done(): + case <-chDone: + } + } + + _, err = c.Build(sb.Context(), client.SolveOpt{ + Exports: []client.ExportEntry{ + {Type: "local", OutputDir: dir}, + }, + }, "", func(ctx context.Context, gwc gwclient.Client) (*gwclient.Result, error) { + def, err := llb.Scratch().File(llb.Mkfile("hello", 0o600, []byte("world"))).Marshal(ctx) + if err != nil { + return nil, err + } + + return gwc.Solve(ctx, gwclient.SolveRequest{ + Definition: def.ToPB(), + }) + }, ch) + done() + require.NoError(t, err) + + dt, err := os.ReadFile(filepath.Join(dir, "hello")) + require.NoError(t, err) + require.Equal(t, "world", string(dt)) + } + + t.Run("conn=netpipe", func(t *testing.T) { + t.Parallel() + do(t, func(t *testing.T, cmd *exec.Cmd) net.Conn { + c1, c2 := net.Pipe() + cmd.Stdin = c1 + cmd.Stdout = c1 + return c2 + }) + }) +} diff --git a/tests/integration_test.go b/tests/integration_test.go index e38e46ca..49c5d06d 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -29,6 +29,7 @@ func TestIntegration(t *testing.T) { tests = append(tests, versionTests...) tests = append(tests, createTests...) tests = append(tests, rmTests...) + tests = append(tests, dialstdioTests...) testIntegration(t, tests...) }