From 08ef676cdf38edde11793b17671e550887ebe659 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Tue, 17 Apr 2018 15:50:17 -0700 Subject: [PATCH] builder: experimental buildkit base Signed-off-by: Tonis Tiigi Upstream-commit: 22f7caee03b807f50f993eb387a4017acab1657e Component: engine --- .../api/server/backend/build/backend.go | 27 +- .../engine/builder/builder-next/builder.go | 416 ++++++++++++++++++ .../builder-next/containerimage/pull.go | 129 ++++-- .../builder/builder-next/exporter/export.go | 14 + .../builder/builder-next/exporter/writer.go | 16 +- .../builder/builder-next/snapshot/snapshot.go | 137 +++--- .../builder/builder-next/worker/worker.go | 301 +++++++++++++ components/engine/cmd/dockerd/daemon.go | 12 +- 8 files changed, 942 insertions(+), 110 deletions(-) create mode 100644 components/engine/builder/builder-next/builder.go create mode 100644 components/engine/builder/builder-next/worker/worker.go diff --git a/components/engine/api/server/backend/build/backend.go b/components/engine/api/server/backend/build/backend.go index 22ce9cef7c..bdde6fa2ff 100644 --- a/components/engine/api/server/backend/build/backend.go +++ b/components/engine/api/server/backend/build/backend.go @@ -3,11 +3,13 @@ package build // import "github.com/docker/docker/api/server/backend/build" import ( "context" "fmt" + "strings" "github.com/docker/distribution/reference" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/backend" "github.com/docker/docker/builder" + buildkit "github.com/docker/docker/builder/builder-next" "github.com/docker/docker/builder/fscache" "github.com/docker/docker/image" "github.com/docker/docker/pkg/stringid" @@ -30,24 +32,39 @@ type Backend struct { builder Builder fsCache *fscache.FSCache imageComponent ImageComponent + buildkit *buildkit.Builder } // NewBackend creates a new build backend from components -func NewBackend(components ImageComponent, builder Builder, fsCache *fscache.FSCache) (*Backend, error) { - return &Backend{imageComponent: components, builder: builder, fsCache: fsCache}, nil +func NewBackend(components ImageComponent, builder Builder, fsCache *fscache.FSCache, buildkit *buildkit.Builder) (*Backend, error) { + return &Backend{imageComponent: components, builder: builder, fsCache: fsCache, buildkit: buildkit}, nil } // Build builds an image from a Source func (b *Backend) Build(ctx context.Context, config backend.BuildConfig) (string, error) { options := config.Options + useBuildKit := false + if strings.HasPrefix(options.SessionID, "buildkit:") { + useBuildKit = true + options.SessionID = strings.TrimPrefix(options.SessionID, "buildkit:") + } + tagger, err := NewTagger(b.imageComponent, config.ProgressWriter.StdoutFormatter, options.Tags) if err != nil { return "", err } - build, err := b.builder.Build(ctx, config) - if err != nil { - return "", err + var build *builder.Result + if useBuildKit { + build, err = b.buildkit.Build(ctx, config) + if err != nil { + return "", err + } + } else { + build, err = b.builder.Build(ctx, config) + if err != nil { + return "", err + } } var imageID = build.ImageID diff --git a/components/engine/builder/builder-next/builder.go b/components/engine/builder/builder-next/builder.go new file mode 100644 index 0000000000..30472c831d --- /dev/null +++ b/components/engine/builder/builder-next/builder.go @@ -0,0 +1,416 @@ +package buildkit + +import ( + "context" + "encoding/json" + "io" + "os" + "path/filepath" + "sync" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/content/local" + "github.com/docker/docker/api/types/backend" + "github.com/docker/docker/builder" + "github.com/docker/docker/builder/builder-next/containerimage" + containerimageexp "github.com/docker/docker/builder/builder-next/exporter" + "github.com/docker/docker/builder/builder-next/snapshot" + mobyworker "github.com/docker/docker/builder/builder-next/worker" + "github.com/docker/docker/daemon/graphdriver" + "github.com/docker/docker/daemon/images" + "github.com/docker/docker/pkg/jsonmessage" + controlapi "github.com/moby/buildkit/api/services/control" + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/cache/metadata" + "github.com/moby/buildkit/control" + "github.com/moby/buildkit/executor/runcexecutor" + "github.com/moby/buildkit/exporter" + "github.com/moby/buildkit/frontend" + "github.com/moby/buildkit/frontend/dockerfile" + "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/snapshot/blobmapping" + "github.com/moby/buildkit/solver-next/boltdbcachestorage" + "github.com/moby/buildkit/worker" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + netcontext "golang.org/x/net/context" + "golang.org/x/sync/errgroup" + grpcmetadata "google.golang.org/grpc/metadata" +) + +// Builder defines interface for running a build +// type Builder interface { +// Build(context.Context, backend.BuildConfig) (*builder.Result, error) +// } + +// Result is the output produced by a Builder +// type Result struct { +// ImageID string +// // FromImage Image +// } + +type Opt struct { + SessionManager *session.Manager + Root string + Dist images.DistributionServices +} + +type Builder struct { + controller *control.Controller + results *results +} + +func New(opt Opt) (*Builder, error) { + results := newResultsGetter() + + c, err := newController(opt, results.ch) + if err != nil { + return nil, err + } + b := &Builder{ + controller: c, + results: results, + } + return b, nil +} + +func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.Result, error) { + id := identity.NewID() + + attrs := map[string]string{ + "ref": id, + } + + frontendAttrs := map[string]string{} + + if opt.Options.Target != "" { + frontendAttrs["target"] = opt.Options.Target + } + + if opt.Options.Dockerfile != "" && opt.Options.Dockerfile != "." { + frontendAttrs["filename"] = opt.Options.Dockerfile + } + + if opt.Options.RemoteContext != "" { + frontendAttrs["context"] = opt.Options.RemoteContext + } + + logrus.Debugf("frontend: %+v", frontendAttrs) + + for k, v := range opt.Options.BuildArgs { + if v == nil { + continue + } + frontendAttrs["build-arg:"+k] = *v + } + + req := &controlapi.SolveRequest{ + Ref: id, + Exporter: "image", + ExporterAttrs: attrs, + Frontend: "dockerfile.v0", + FrontendAttrs: frontendAttrs, + Session: opt.Options.SessionID, + } + + eg, ctx := errgroup.WithContext(ctx) + + eg.Go(func() error { + _, err := b.controller.Solve(ctx, req) + return err + }) + + ch := make(chan *controlapi.StatusResponse) + + eg.Go(func() error { + defer close(ch) + return b.controller.Status(&controlapi.StatusRequest{ + Ref: id, + }, &statusProxy{ctx: ctx, ch: ch}) + }) + + eg.Go(func() error { + for sr := range ch { + dt, err := sr.Marshal() + if err != nil { + return err + } + + auxJSONBytes, err := json.Marshal(dt) + if err != nil { + return err + } + auxJSON := new(json.RawMessage) + *auxJSON = auxJSONBytes + msgJSON, err := json.Marshal(&jsonmessage.JSONMessage{ID: "buildkit-trace", Aux: auxJSON}) + if err != nil { + return err + } + msgJSON = append(msgJSON, []byte("\r\n")...) + n, err := opt.ProgressWriter.Output.Write(msgJSON) + if err != nil { + return err + } + if n != len(msgJSON) { + return io.ErrShortWrite + } + } + return nil + }) + + out := &builder.Result{} + eg.Go(func() error { + res, err := b.results.wait(ctx, id) + if err != nil { + return err + } + out.ImageID = string(res.ID) + return nil + }) + + if err := eg.Wait(); err != nil { + return nil, err + } + + return out, nil +} + +func newController(opt Opt, reporter chan containerimageexp.Result) (*control.Controller, error) { + if err := os.MkdirAll(opt.Root, 0700); err != nil { + return nil, err + } + + dist := opt.Dist + root := opt.Root + + var driver graphdriver.Driver + if ls, ok := dist.LayerStore.(interface { + Driver() graphdriver.Driver + }); ok { + driver = ls.Driver() + } else { + return nil, errors.Errorf("could not access graphdriver") + } + + sbase, err := snapshot.NewSnapshotter(snapshot.Opt{ + GraphDriver: driver, + LayerStore: dist.LayerStore, + Root: root, + }) + if err != nil { + return nil, err + } + + store, err := local.NewStore(filepath.Join(root, "content")) + if err != nil { + return nil, err + } + store = &contentStoreNoLabels{store} + + md, err := metadata.NewStore(filepath.Join(root, "metadata.db")) + if err != nil { + return nil, err + } + + snapshotter := blobmapping.NewSnapshotter(blobmapping.Opt{ + Content: store, + Snapshotter: sbase, + MetadataStore: md, + }) + + cm, err := cache.NewManager(cache.ManagerOpt{ + Snapshotter: snapshotter, + MetadataStore: md, + }) + if err != nil { + return nil, err + } + + src, err := containerimage.NewSource(containerimage.SourceOpt{ + SessionManager: opt.SessionManager, + CacheAccessor: cm, + ContentStore: store, + DownloadManager: dist.DownloadManager, + MetadataStore: dist.V2MetadataService, + }) + if err != nil { + return nil, err + } + + exec, err := runcexecutor.New(runcexecutor.Opt{ + Root: filepath.Join(root, "executor"), + CommandCandidates: []string{"docker-runc", "runc"}, + }) + if err != nil { + return nil, err + } + + differ, ok := sbase.(containerimageexp.Differ) + if !ok { + return nil, errors.Errorf("snapshotter doesn't support differ") + } + + exp, err := containerimageexp.New(containerimageexp.Opt{ + ImageStore: dist.ImageStore, + ReferenceStore: dist.ReferenceStore, + Differ: differ, + Reporter: reporter, + }) + if err != nil { + return nil, err + } + + cacheStorage, err := boltdbcachestorage.NewStore(filepath.Join(opt.Root, "cache.db")) + if err != nil { + return nil, err + } + + frontends := map[string]frontend.Frontend{} + frontends["dockerfile.v0"] = dockerfile.NewDockerfileFrontend() + // frontends["gateway.v0"] = gateway.NewGatewayFrontend() + + // mdb := ctdmetadata.NewDB(db, c, map[string]ctdsnapshot.Snapshotter{ + // "moby": s, + // }) + // if err := mdb.Init(context.TODO()); err != nil { + // return opt, err + // } + // + // throttledGC := throttle.Throttle(time.Second, func() { + // if _, err := mdb.GarbageCollect(context.TODO()); err != nil { + // logrus.Errorf("GC error: %+v", err) + // } + // }) + // + // gc := func(ctx context.Context) error { + // throttledGC() + // return nil + // } + + wopt := mobyworker.WorkerOpt{ + ID: "moby", + SessionManager: opt.SessionManager, + MetadataStore: md, + ContentStore: store, + CacheManager: cm, + Snapshotter: snapshotter, + Executor: exec, + ImageSource: src, + Exporters: map[string]exporter.Exporter{ + "image": exp, + }, + } + + wc := &worker.Controller{} + w, err := mobyworker.NewWorker(wopt) + if err != nil { + return nil, err + } + wc.Add(w) + + return control.NewController(control.Opt{ + SessionManager: opt.SessionManager, + WorkerController: wc, + Frontends: frontends, + CacheKeyStorage: cacheStorage, + // CacheExporter: ce, + // CacheImporter: ci, + }) +} + +type statusProxy struct { + ctx context.Context + ch chan *controlapi.StatusResponse +} + +func (sp *statusProxy) SetHeader(_ grpcmetadata.MD) error { + return nil +} + +func (sp *statusProxy) SendHeader(_ grpcmetadata.MD) error { + return nil +} + +func (sp *statusProxy) SetTrailer(_ grpcmetadata.MD) { +} + +func (sp *statusProxy) Send(resp *controlapi.StatusResponse) error { + return sp.SendMsg(resp) +} + +func (sp *statusProxy) Context() netcontext.Context { + return sp.ctx +} +func (sp *statusProxy) SendMsg(m interface{}) error { + if sr, ok := m.(*controlapi.StatusResponse); ok { + sp.ch <- sr + } + return nil +} +func (sp *statusProxy) RecvMsg(m interface{}) error { + return io.EOF +} + +type results struct { + ch chan containerimageexp.Result + res map[string]containerimageexp.Result + mu sync.Mutex + cond *sync.Cond +} + +func newResultsGetter() *results { + r := &results{ + ch: make(chan containerimageexp.Result), + res: map[string]containerimageexp.Result{}, + } + r.cond = sync.NewCond(&r.mu) + + go func() { + for res := range r.ch { + r.mu.Lock() + r.res[res.Ref] = res + r.cond.Broadcast() + r.mu.Unlock() + } + }() + return r +} + +func (r *results) wait(ctx context.Context, ref string) (*containerimageexp.Result, error) { + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-ctx.Done(): + r.mu.Lock() + r.cond.Broadcast() + r.mu.Unlock() + case <-done: + } + }() + + r.mu.Lock() + for { + select { + case <-ctx.Done(): + r.mu.Unlock() + return nil, ctx.Err() + default: + } + res, ok := r.res[ref] + if ok { + r.mu.Unlock() + return &res, nil + } + r.cond.Wait() + } +} + +type contentStoreNoLabels struct { + content.Store +} + +func (c *contentStoreNoLabels) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { + return content.Info{}, nil +} diff --git a/components/engine/builder/builder-next/containerimage/pull.go b/components/engine/builder/builder-next/containerimage/pull.go index c5d7acbc34..f785b4aa48 100644 --- a/components/engine/builder/builder-next/containerimage/pull.go +++ b/components/engine/builder/builder-next/containerimage/pull.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "runtime" "sync" "time" @@ -22,7 +23,6 @@ import ( "github.com/docker/docker/distribution/xfer" "github.com/docker/docker/image" "github.com/docker/docker/layer" - "github.com/docker/docker/pkg/ioutils" pkgprogress "github.com/docker/docker/pkg/progress" "github.com/docker/docker/reference" "github.com/moby/buildkit/cache" @@ -36,8 +36,8 @@ import ( digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" - "github.com/sirupsen/logrus" netcontext "golang.org/x/net/context" + "golang.org/x/time/rate" ) type SourceOpt struct { @@ -92,23 +92,22 @@ func (is *imageSource) getCredentialsFromSession(ctx context.Context) func(strin } func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) { - // type t struct { - // dgst digest.Digest - // dt []byte - // } - // res, err := is.g.Do(ctx, ref, func(ctx context.Context) (interface{}, error) { - // dgst, dt, err := imageutil.Config(ctx, ref, is.getResolver(ctx), is.ContentStore) - // if err != nil { - // return nil, err - // } - // return &t{dgst: dgst, dt: dt}, nil - // }) - // if err != nil { - // return "", nil, err - // } - // typed := res.(*t) - // return typed.dgst, typed.dt, nil - return "", nil, errors.Errorf("not-implemented") + type t struct { + dgst digest.Digest + dt []byte + } + res, err := is.g.Do(ctx, ref, func(ctx context.Context) (interface{}, error) { + dgst, dt, err := imageutil.Config(ctx, ref, is.getResolver(ctx), is.ContentStore, "") + if err != nil { + return nil, err + } + return &t{dgst: dgst, dt: dt}, nil + }) + if err != nil { + return "", nil, err + } + typed := res.(*t) + return typed.dgst, typed.dt, nil } func (is *imageSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) { @@ -189,7 +188,17 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { pctx, stopProgress := context.WithCancel(ctx) - go showProgress(pctx, ongoing, p.is.ContentStore) + pw, _, ctx := progress.FromContext(ctx) + defer pw.Close() + + progressDone := make(chan struct{}) + go func() { + showProgress(pctx, ongoing, p.is.ContentStore, pw) + close(progressDone) + }() + defer func() { + <-progressDone + }() fetcher, err := p.resolver.Fetcher(ctx, p.ref) if err != nil { @@ -213,14 +222,33 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { return nil, nil }), } + // var schema1Converter *schema1.Converter + // if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest { + // schema1Converter = schema1.NewConverter(p.is.ContentStore, fetcher) + // handlers = append(handlers, schema1Converter) + // } else { + // handlers = append(handlers, + // remotes.FetchHandler(p.is.ContentStore, fetcher), + // + // images.ChildrenHandler(p.is.ContentStore), + // ) + // } + // var schema1Converter *schema1.Converter if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest { schema1Converter = schema1.NewConverter(p.is.ContentStore, fetcher) handlers = append(handlers, schema1Converter) } else { + // Get all the children for a descriptor + childrenHandler := images.ChildrenHandler(p.is.ContentStore) + // Set any children labels for that content + childrenHandler = images.SetChildrenLabels(p.is.ContentStore, childrenHandler) + // Filter the childen by the platform + childrenHandler = images.FilterPlatforms(childrenHandler, platforms.Default()) + handlers = append(handlers, remotes.FetchHandler(p.is.ContentStore, fetcher), - images.ChildrenHandler(p.is.ContentStore, platforms.Default()), + childrenHandler, ) } @@ -228,7 +256,7 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { stopProgress() return nil, err } - stopProgress() + defer stopProgress() mfst, err := images.Manifest(ctx, p.is.ContentStore, p.desc, platforms.Default()) if err != nil { @@ -255,16 +283,41 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { } pchan := make(chan pkgprogress.Progress, 10) + defer close(pchan) go func() { + m := map[string]struct { + st time.Time + limiter *rate.Limiter + }{} for p := range pchan { - logrus.Debugf("progress %+v", p) + if p.Action == "Extracting" { + st, ok := m[p.ID] + if !ok { + st.st = time.Now() + st.limiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1) + m[p.ID] = st + } + var end *time.Time + if p.LastUpdate || st.limiter.Allow() { + if p.LastUpdate { + tm := time.Now() + end = &tm + } + pw.Write("extracting "+p.ID, progress.Status{ + Action: "extract", + Started: &st.st, + Completed: end, + }) + } + } } }() layers := make([]xfer.DownloadDescriptor, 0, len(mfst.Layers)) for i, desc := range mfst.Layers { + ongoing.add(desc) layers = append(layers, &layerDescriptor{ desc: desc, diffID: layer.DiffID(img.RootFS.DiffIDs[i]), @@ -274,11 +327,19 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { }) } + defer func() { + <-progressDone + for _, desc := range mfst.Layers { + p.is.ContentStore.Delete(context.TODO(), desc.Digest) + } + }() + r := image.NewRootFS() rootFS, release, err := p.is.DownloadManager.Download(ctx, *r, runtime.GOOS, layers, pkgprogress.ChanOutput(pchan)) if err != nil { return nil, err } + stopProgress() ref, err := p.is.CacheAccessor.Get(ctx, string(rootFS.ChainID()), cache.WithDescription(fmt.Sprintf("pulled from %s", p.ref))) release() @@ -317,8 +378,9 @@ func (ld *layerDescriptor) Download(ctx netcontext.Context, progressOutput pkgpr } defer rc.Close() - // TODO: progress - if err := content.WriteBlob(ctx, ld.is.ContentStore, ld.desc.Digest.String(), rc, ld.desc.Size, ld.desc.Digest); err != nil { + refKey := remotes.MakeRefKey(ctx, ld.desc) + + if err := content.WriteBlob(ctx, ld.is.ContentStore, refKey, rc, ld.desc.Size, ld.desc.Digest); err != nil { return nil, 0, err } @@ -327,9 +389,7 @@ func (ld *layerDescriptor) Download(ctx netcontext.Context, progressOutput pkgpr return nil, 0, err } - return ioutils.NewReadCloserWrapper(content.NewReader(ra), func() error { - return ld.is.ContentStore.Delete(context.TODO(), ld.desc.Digest) - }), ld.desc.Size, nil + return ioutil.NopCloser(content.NewReader(ra)), ld.desc.Size, nil } func (ld *layerDescriptor) Close() { @@ -341,7 +401,7 @@ func (ld *layerDescriptor) Registered(diffID layer.DiffID) { ld.is.MetadataStore.Add(diffID, metadata.V2Metadata{Digest: ld.desc.Digest, SourceRepository: ld.ref.Locator}) } -func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) { +func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, pw progress.Writer) { var ( ticker = time.NewTicker(100 * time.Millisecond) statuses = map[string]statusInfo{} @@ -349,9 +409,6 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) { ) defer ticker.Stop() - pw, _, ctx := progress.FromContext(ctx) - defer pw.Close() - for { select { case <-ticker.C: @@ -371,7 +428,7 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) { actives := make(map[string]statusInfo) if !done { - active, err := cs.ListStatuses(ctx, "") + active, err := cs.ListStatuses(ctx) if err != nil { // log.G(ctx).WithError(err).Error("active check failed") continue @@ -407,9 +464,9 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) { info, err := cs.Info(context.TODO(), j.Digest) if err != nil { if errdefs.IsNotFound(err) { - pw.Write(j.Digest.String(), progress.Status{ - Action: "waiting", - }) + // pw.Write(j.Digest.String(), progress.Status{ + // Action: "waiting", + // }) continue } } else { diff --git a/components/engine/builder/builder-next/exporter/export.go b/components/engine/builder/builder-next/exporter/export.go index efea1ebf8c..ebcbb5d94e 100644 --- a/components/engine/builder/builder-next/exporter/export.go +++ b/components/engine/builder/builder-next/exporter/export.go @@ -23,10 +23,17 @@ type Differ interface { EnsureLayer(ctx context.Context, key string) ([]layer.DiffID, error) } +// TODO: this needs to be handled differently (return from solve) +type Result struct { + Ref string + ID image.ID +} + type Opt struct { ImageStore image.Store ReferenceStore reference.Store Differ Differ + Reporter chan Result } type imageExporter struct { @@ -50,6 +57,8 @@ func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exp i.targetName = ref case exporterImageConfig: i.config = []byte(v) + case "ref": + i.ref = v default: logrus.Warnf("image exporter: unknown option %s", k) } @@ -61,6 +70,7 @@ type imageExporterInstance struct { *imageExporter targetName distref.Named config []byte + ref string } func (e *imageExporterInstance) Name() string { @@ -131,5 +141,9 @@ func (e *imageExporterInstance) Export(ctx context.Context, ref cache.ImmutableR } } + if e.opt.Reporter != nil { + e.opt.Reporter <- Result{ID: id, Ref: e.ref} + } + return nil } diff --git a/components/engine/builder/builder-next/exporter/writer.go b/components/engine/builder/builder-next/exporter/writer.go index dc6304573d..458febda07 100644 --- a/components/engine/builder/builder-next/exporter/writer.go +++ b/components/engine/builder/builder-next/exporter/writer.go @@ -65,12 +65,12 @@ func patchImageConfig(dt []byte, dps []digest.Digest, history []ocispec.History) } m["history"] = dt - now := time.Now() - dt, err = json.Marshal(&now) - if err != nil { - return nil, errors.Wrap(err, "failed to marshal creation time") - } - m["created"] = dt + // now := time.Now() + // dt, err = json.Marshal(&now) + // if err != nil { + // return nil, errors.Wrap(err, "failed to marshal creation time") + // } + // m["created"] = dt dt, err = json.Marshal(m) return dt, errors.Wrap(err, "failed to marshal config after patch") @@ -104,9 +104,9 @@ func normalizeLayersAndHistory(diffs []digest.Digest, history []ocispec.History, if len(diffs) > historyLayers { // some history items are missing. add them based on the ref metadata for _, msg := range getRefDesciptions(ref, len(diffs)-historyLayers) { - tm := time.Now().UTC() + // tm := time.Now().UTC() history = append(history, ocispec.History{ - Created: &tm, + // Created: &tm, CreatedBy: msg, Comment: "buildkit.exporter.image.v0", }) diff --git a/components/engine/builder/builder-next/snapshot/snapshot.go b/components/engine/builder/builder-next/snapshot/snapshot.go index 0f201920a1..e72682b97e 100644 --- a/components/engine/builder/builder-next/snapshot/snapshot.go +++ b/components/engine/builder/builder-next/snapshot/snapshot.go @@ -15,7 +15,6 @@ import ( "github.com/moby/buildkit/snapshot" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) var keyParent = []byte("parent") @@ -121,8 +120,24 @@ func (s *snapshotter) getLayer(key string) (layer.Layer, error) { if !ok { id, ok := s.chainID(key) if !ok { - s.mu.Unlock() - return nil, nil + if err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(key)) + if b == nil { + return nil + } + v := b.Get(keyChainID) + if v != nil { + id = layer.ChainID(v) + } + return nil + }); err != nil { + s.mu.Unlock() + return nil, err + } + if id == "" { + s.mu.Unlock() + return nil, nil + } } var err error l, err = s.opt.LayerStore.Get(id) @@ -132,7 +147,7 @@ func (s *snapshotter) getLayer(key string) (layer.Layer, error) { } s.refs[string(id)] = l if err := s.db.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists([]byte(id)) + _, err := tx.CreateBucketIfNotExists([]byte(key)) return err }); err != nil { s.mu.Unlock() @@ -282,7 +297,8 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error { return nil } - return s.opt.GraphDriver.Remove(key) + id, _ := s.getGraphDriverID(key) + return s.opt.GraphDriver.Remove(id) } func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error { @@ -298,7 +314,7 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap }); err != nil { return err } - logrus.Debugf("committed %s as %s", name, key) + // logrus.Debugf("committed %s as %s", name, key)); return nil } @@ -307,61 +323,62 @@ func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snap } func (s *snapshotter) Walk(ctx context.Context, fn func(context.Context, snapshots.Info) error) error { - allKeys := map[string]struct{}{} - commitedIDs := map[string]string{} - chainIDs := map[string]layer.ChainID{} + // allKeys := map[string]struct{}{} + // commitedIDs := map[string]string{} + // chainIDs := map[string]layer.ChainID{} + // + // if err := s.db.View(func(tx *bolt.Tx) error { + // tx.ForEach(func(name []byte, b *bolt.Bucket) error { + // allKeys[string(name)] = struct{}{} + // v := b.Get(keyCommitted) + // if v != nil { + // commitedIDs[string(v)] = string(name) + // } + // + // v = b.Get(keyChainID) + // if v != nil { + // logrus.Debugf("loaded layer %s %s", name, v) + // chainIDs[string(name)] = layer.ChainID(v) + // } + // return nil + // }) + // return nil + // }); err != nil { + // return err + // } + // + // for k := range allKeys { + // if chainID, ok := chainIDs[k]; ok { + // s.mu.Lock() + // if _, ok := s.refs[k]; !ok { + // l, err := s.opt.LayerStore.Get(chainID) + // if err != nil { + // s.mu.Unlock() + // return err + // } + // s.refs[k] = l + // } + // s.mu.Unlock() + // } + // if _, ok := commitedIDs[k]; ok { + // continue + // } + // + // if _, err := s.getLayer(k); err != nil { + // s.Remove(ctx, k) + // continue + // } + // info, err := s.Stat(ctx, k) + // if err != nil { + // s.Remove(ctx, k) + // continue + // } + // if err := fn(ctx, info); err != nil { + // return err + // } + // } - if err := s.db.View(func(tx *bolt.Tx) error { - tx.ForEach(func(name []byte, b *bolt.Bucket) error { - allKeys[string(name)] = struct{}{} - v := b.Get(keyCommitted) - if v != nil { - commitedIDs[string(v)] = string(name) - } - - v = b.Get(keyChainID) - if v != nil { - chainIDs[string(name)] = layer.ChainID(v) - } - return nil - }) - return nil - }); err != nil { - return err - } - - for k := range allKeys { - if _, ok := commitedIDs[k]; ok { - continue - } - if chainID, ok := chainIDs[k]; ok { - s.mu.Lock() - if _, ok := s.refs[k]; !ok { - l, err := s.opt.LayerStore.Get(chainID) - if err != nil { - s.mu.Unlock() - return err - } - s.refs[k] = l - } - s.mu.Unlock() - } - - if _, err := s.getLayer(k); err != nil { - s.Remove(ctx, k) - continue - } - info, err := s.Stat(ctx, k) - if err != nil { - s.Remove(ctx, k) - continue - } - if err := fn(ctx, info); err != nil { - return err - } - } - - return nil + return errors.Errorf("not-implemented") } func (s *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) { diff --git a/components/engine/builder/builder-next/worker/worker.go b/components/engine/builder/builder-next/worker/worker.go new file mode 100644 index 0000000000..761f99d821 --- /dev/null +++ b/components/engine/builder/builder-next/worker/worker.go @@ -0,0 +1,301 @@ +package worker + +import ( + "context" + "io" + "time" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/rootfs" + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/cache/metadata" + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/executor" + "github.com/moby/buildkit/exporter" + "github.com/moby/buildkit/frontend" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/snapshot" + "github.com/moby/buildkit/solver-next" + "github.com/moby/buildkit/solver-next/llbsolver/ops" + "github.com/moby/buildkit/solver/pb" + "github.com/moby/buildkit/source" + "github.com/moby/buildkit/source/git" + "github.com/moby/buildkit/source/http" + "github.com/moby/buildkit/source/local" + "github.com/moby/buildkit/util/progress" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" +) + +// TODO: this file should be removed. containerd defines ContainerdWorker, oci defines OCIWorker. There is no base worker. + +// WorkerOpt is specific to a worker. +// See also CommonOpt. +type WorkerOpt struct { + ID string + Labels map[string]string + SessionManager *session.Manager + MetadataStore *metadata.Store + Executor executor.Executor + Snapshotter snapshot.Snapshotter + ContentStore content.Store + CacheManager cache.Manager + ImageSource source.Source + Exporters map[string]exporter.Exporter + // ImageStore images.Store // optional +} + +// Worker is a local worker instance with dedicated snapshotter, cache, and so on. +// TODO: s/Worker/OpWorker/g ? +type Worker struct { + WorkerOpt + SourceManager *source.Manager + // Exporters map[string]exporter.Exporter + // ImageSource source.Source +} + +// NewWorker instantiates a local worker +func NewWorker(opt WorkerOpt) (*Worker, error) { + sm, err := source.NewManager() + if err != nil { + return nil, err + } + + cm := opt.CacheManager + sm.Register(opt.ImageSource) + + gs, err := git.NewSource(git.Opt{ + CacheAccessor: cm, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + + sm.Register(gs) + + hs, err := http.NewSource(http.Opt{ + CacheAccessor: cm, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + + sm.Register(hs) + + ss, err := local.NewSource(local.Opt{ + SessionManager: opt.SessionManager, + CacheAccessor: cm, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + sm.Register(ss) + + return &Worker{ + WorkerOpt: opt, + SourceManager: sm, + }, nil +} + +func (w *Worker) ID() string { + return w.WorkerOpt.ID +} + +func (w *Worker) Labels() map[string]string { + return w.WorkerOpt.Labels +} + +func (w *Worker) LoadRef(id string) (cache.ImmutableRef, error) { + return w.CacheManager.Get(context.TODO(), id) +} + +func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge) (solver.Op, error) { + switch op := v.Sys().(type) { + case *pb.Op_Source: + return ops.NewSourceOp(v, op, w.SourceManager, w) + case *pb.Op_Exec: + return ops.NewExecOp(v, op, w.CacheManager, w.Executor, w) + case *pb.Op_Build: + return ops.NewBuildOp(v, op, s, w) + default: + return nil, errors.Errorf("could not resolve %v", v) + } +} + +func (w *Worker) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) { + // ImageSource is typically source/containerimage + resolveImageConfig, ok := w.ImageSource.(resolveImageConfig) + if !ok { + return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", w.ID()) + } + return resolveImageConfig.ResolveImageConfig(ctx, ref) +} + +type resolveImageConfig interface { + ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) +} + +func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error { + active, err := w.CacheManager.New(ctx, rootFS) + if err != nil { + return err + } + defer active.Release(context.TODO()) + return w.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr) +} + +func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) { + return w.CacheManager.DiskUsage(ctx, opt) +} + +func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo) error { + return w.CacheManager.Prune(ctx, ch) +} + +func (w *Worker) Exporter(name string) (exporter.Exporter, error) { + exp, ok := w.Exporters[name] + if !ok { + return nil, errors.Errorf("exporter %q could not be found", name) + } + return exp, nil +} + +func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef) (*solver.Remote, error) { + // diffPairs, err := blobs.GetDiffPairs(ctx, w.ContentStore, w.Snapshotter, w.Differ, ref) + // if err != nil { + // return nil, errors.Wrap(err, "failed calculaing diff pairs for exported snapshot") + // } + // if len(diffPairs) == 0 { + // return nil, nil + // } + // + // descs := make([]ocispec.Descriptor, len(diffPairs)) + // + // for i, dp := range diffPairs { + // info, err := w.ContentStore.Info(ctx, dp.Blobsum) + // if err != nil { + // return nil, err + // } + // descs[i] = ocispec.Descriptor{ + // Digest: dp.Blobsum, + // Size: info.Size, + // MediaType: schema2.MediaTypeLayer, + // Annotations: map[string]string{ + // "containerd.io/uncompressed": dp.DiffID.String(), + // }, + // } + // } + // + // return &solver.Remote{ + // Descriptors: descs, + // Provider: w.ContentStore, + // }, nil + return nil, errors.Errorf("getremote not implemented") +} + +func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error) { + // eg, gctx := errgroup.WithContext(ctx) + // for _, desc := range remote.Descriptors { + // func(desc ocispec.Descriptor) { + // eg.Go(func() error { + // done := oneOffProgress(ctx, fmt.Sprintf("pulling %s", desc.Digest)) + // return done(contentutil.Copy(gctx, w.ContentStore, remote.Provider, desc)) + // }) + // }(desc) + // } + // + // if err := eg.Wait(); err != nil { + // return nil, err + // } + // + // csh, release := snapshot.NewCompatibilitySnapshotter(w.Snapshotter) + // defer release() + // + // unpackProgressDone := oneOffProgress(ctx, "unpacking") + // chainID, err := w.unpack(ctx, remote.Descriptors, csh) + // if err != nil { + // return nil, unpackProgressDone(err) + // } + // unpackProgressDone(nil) + // + // return w.CacheManager.Get(ctx, chainID, cache.WithDescription(fmt.Sprintf("imported %s", remote.Descriptors[len(remote.Descriptors)-1].Digest))) + return nil, errors.Errorf("fromremote not implemented") +} + +// utility function. could be moved to the constructor logic? +// func Labels(executor, snapshotter string) map[string]string { +// hostname, err := os.Hostname() +// if err != nil { +// hostname = "unknown" +// } +// labels := map[string]string{ +// worker.LabelOS: runtime.GOOS, +// worker.LabelArch: runtime.GOOSARCH, +// worker.LabelExecutor: executor, +// worker.LabelSnapshotter: snapshotter, +// worker.LabelHostname: hostname, +// } +// return labels +// } +// +// // ID reads the worker id from the `workerid` file. +// // If not exist, it creates a random one, +// func ID(root string) (string, error) { +// f := filepath.Join(root, "workerid") +// b, err := ioutil.ReadFile(f) +// if err != nil { +// if os.IsNotExist(err) { +// id := identity.NewID() +// err := ioutil.WriteFile(f, []byte(id), 0400) +// return id, err +// } else { +// return "", err +// } +// } +// return string(b), nil +// } + +func getLayers(ctx context.Context, descs []ocispec.Descriptor) ([]rootfs.Layer, error) { + layers := make([]rootfs.Layer, len(descs)) + for i, desc := range descs { + diffIDStr := desc.Annotations["containerd.io/uncompressed"] + if diffIDStr == "" { + return nil, errors.Errorf("%s missing uncompressed digest", desc.Digest) + } + diffID, err := digest.Parse(diffIDStr) + if err != nil { + return nil, err + } + layers[i].Diff = ocispec.Descriptor{ + MediaType: ocispec.MediaTypeImageLayer, + Digest: diffID, + } + layers[i].Blob = ocispec.Descriptor{ + MediaType: desc.MediaType, + Digest: desc.Digest, + Size: desc.Size, + } + } + return layers, nil +} + +func oneOffProgress(ctx context.Context, id string) func(err error) error { + pw, _, _ := progress.FromContext(ctx) + now := time.Now() + st := progress.Status{ + Started: &now, + } + pw.Write(id, st) + return func(err error) error { + // TODO: set error on status + now := time.Now() + st.Completed = &now + pw.Write(id, st) + pw.Close() + return err + } +} diff --git a/components/engine/cmd/dockerd/daemon.go b/components/engine/cmd/dockerd/daemon.go index 6b0be5f7f7..084cfb9d0f 100644 --- a/components/engine/cmd/dockerd/daemon.go +++ b/components/engine/cmd/dockerd/daemon.go @@ -27,6 +27,7 @@ import ( swarmrouter "github.com/docker/docker/api/server/router/swarm" systemrouter "github.com/docker/docker/api/server/router/system" "github.com/docker/docker/api/server/router/volume" + buildkit "github.com/docker/docker/builder/builder-next" "github.com/docker/docker/builder/dockerfile" "github.com/docker/docker/builder/fscache" "github.com/docker/docker/cli/debug" @@ -270,7 +271,16 @@ func newRouterOptions(config *config.Config, daemon *daemon.Daemon) (routerOptio return opts, err } - bb, err := buildbackend.NewBackend(daemon.ImageService(), manager, buildCache) + buildkit, err := buildkit.New(buildkit.Opt{ + SessionManager: sm, + Root: filepath.Join(config.Root, "buildkit"), + Dist: daemon.DistributionServices(), + }) + if err != nil { + return opts, err + } + + bb, err := buildbackend.NewBackend(daemon.ImageService(), manager, buildCache, buildkit) if err != nil { return opts, errors.Wrap(err, "failed to create buildmanager") }