diff --git a/components/engine/builder/builder-next/containerimage/pull.go b/components/engine/builder/builder-next/containerimage/pull.go new file mode 100644 index 0000000000..c5d7acbc34 --- /dev/null +++ b/components/engine/builder/builder-next/containerimage/pull.go @@ -0,0 +1,517 @@ +package containerimage + +import ( + "context" + "encoding/json" + "fmt" + "io" + "runtime" + "sync" + "time" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/platforms" + ctdreference "github.com/containerd/containerd/reference" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/containerd/remotes/docker/schema1" + "github.com/docker/docker/distribution" + "github.com/docker/docker/distribution/metadata" + "github.com/docker/docker/distribution/xfer" + "github.com/docker/docker/image" + "github.com/docker/docker/layer" + "github.com/docker/docker/pkg/ioutils" + pkgprogress "github.com/docker/docker/pkg/progress" + "github.com/docker/docker/reference" + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/session/auth" + "github.com/moby/buildkit/source" + "github.com/moby/buildkit/util/flightcontrol" + "github.com/moby/buildkit/util/imageutil" + "github.com/moby/buildkit/util/progress" + "github.com/moby/buildkit/util/tracing" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + netcontext "golang.org/x/net/context" +) + +type SourceOpt struct { + SessionManager *session.Manager + ContentStore content.Store + CacheAccessor cache.Accessor + ReferenceStore reference.Store + DownloadManager distribution.RootFSDownloadManager + MetadataStore metadata.V2MetadataService +} + +type imageSource struct { + SourceOpt + g flightcontrol.Group +} + +func NewSource(opt SourceOpt) (source.Source, error) { + is := &imageSource{ + SourceOpt: opt, + } + + return is, nil +} + +func (is *imageSource) ID() string { + return source.DockerImageScheme +} + +func (is *imageSource) getResolver(ctx context.Context) remotes.Resolver { + return docker.NewResolver(docker.ResolverOptions{ + Client: tracing.DefaultClient, + Credentials: is.getCredentialsFromSession(ctx), + }) +} + +func (is *imageSource) getCredentialsFromSession(ctx context.Context) func(string) (string, string, error) { + id := session.FromContext(ctx) + if id == "" { + return nil + } + return func(host string) (string, string, error) { + timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + caller, err := is.SessionManager.Get(timeoutCtx, id) + if err != nil { + return "", "", err + } + + return auth.CredentialsFunc(tracing.ContextWithSpanFromContext(context.TODO(), ctx), caller)(host) + } +} + +func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) { + // type t struct { + // dgst digest.Digest + // dt []byte + // } + // res, err := is.g.Do(ctx, ref, func(ctx context.Context) (interface{}, error) { + // dgst, dt, err := imageutil.Config(ctx, ref, is.getResolver(ctx), is.ContentStore) + // if err != nil { + // return nil, err + // } + // return &t{dgst: dgst, dt: dt}, nil + // }) + // if err != nil { + // return "", nil, err + // } + // typed := res.(*t) + // return typed.dgst, typed.dt, nil + return "", nil, errors.Errorf("not-implemented") +} + +func (is *imageSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) { + imageIdentifier, ok := id.(*source.ImageIdentifier) + if !ok { + return nil, errors.Errorf("invalid image identifier %v", id) + } + + p := &puller{ + src: imageIdentifier, + is: is, + resolver: is.getResolver(ctx), + } + return p, nil +} + +type puller struct { + is *imageSource + resolveOnce sync.Once + src *source.ImageIdentifier + desc ocispec.Descriptor + ref string + resolveErr error + resolver remotes.Resolver +} + +func (p *puller) resolve(ctx context.Context) error { + p.resolveOnce.Do(func() { + resolveProgressDone := oneOffProgress(ctx, "resolve "+p.src.Reference.String()) + + dgst := p.src.Reference.Digest() + if dgst != "" { + info, err := p.is.ContentStore.Info(ctx, dgst) + if err == nil { + p.ref = p.src.Reference.String() + ra, err := p.is.ContentStore.ReaderAt(ctx, dgst) + if err == nil { + mt, err := imageutil.DetectManifestMediaType(ra) + if err == nil { + p.desc = ocispec.Descriptor{ + Size: info.Size, + Digest: dgst, + MediaType: mt, + } + resolveProgressDone(nil) + return + } + } + } + } + + ref, desc, err := p.resolver.Resolve(ctx, p.src.Reference.String()) + if err != nil { + p.resolveErr = err + resolveProgressDone(err) + return + } + p.desc = desc + p.ref = ref + resolveProgressDone(nil) + }) + return p.resolveErr +} + +func (p *puller) CacheKey(ctx context.Context) (string, error) { + if err := p.resolve(ctx); err != nil { + return "", err + } + return p.desc.Digest.String(), nil +} + +func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { + if err := p.resolve(ctx); err != nil { + return nil, err + } + + ongoing := newJobs(p.ref) + + pctx, stopProgress := context.WithCancel(ctx) + + go showProgress(pctx, ongoing, p.is.ContentStore) + + fetcher, err := p.resolver.Fetcher(ctx, p.ref) + if err != nil { + stopProgress() + return nil, err + } + + // TODO: need a wrapper snapshot interface that combines content + // and snapshots as 1) buildkit shouldn't have a dependency on contentstore + // or 2) cachemanager should manage the contentstore + handlers := []images.Handler{ + images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest, + images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex, + images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig: + default: + return nil, images.ErrSkipDesc + } + ongoing.add(desc) + return nil, nil + }), + } + var schema1Converter *schema1.Converter + if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest { + schema1Converter = schema1.NewConverter(p.is.ContentStore, fetcher) + handlers = append(handlers, schema1Converter) + } else { + handlers = append(handlers, + remotes.FetchHandler(p.is.ContentStore, fetcher), + images.ChildrenHandler(p.is.ContentStore, platforms.Default()), + ) + } + + if err := images.Dispatch(ctx, images.Handlers(handlers...), p.desc); err != nil { + stopProgress() + return nil, err + } + stopProgress() + + mfst, err := images.Manifest(ctx, p.is.ContentStore, p.desc, platforms.Default()) + if err != nil { + return nil, err + } + + config, err := images.Config(ctx, p.is.ContentStore, p.desc, platforms.Default()) + if err != nil { + return nil, err + } + + dt, err := content.ReadBlob(ctx, p.is.ContentStore, config.Digest) + if err != nil { + return nil, err + } + + var img ocispec.Image + if err := json.Unmarshal(dt, &img); err != nil { + return nil, err + } + + if len(mfst.Layers) != len(img.RootFS.DiffIDs) { + return nil, errors.Errorf("invalid config for manifest") + } + + pchan := make(chan pkgprogress.Progress, 10) + + go func() { + for p := range pchan { + logrus.Debugf("progress %+v", p) + } + }() + + layers := make([]xfer.DownloadDescriptor, 0, len(mfst.Layers)) + + for i, desc := range mfst.Layers { + layers = append(layers, &layerDescriptor{ + desc: desc, + diffID: layer.DiffID(img.RootFS.DiffIDs[i]), + fetcher: fetcher, + ref: p.src.Reference, + is: p.is, + }) + } + + r := image.NewRootFS() + rootFS, release, err := p.is.DownloadManager.Download(ctx, *r, runtime.GOOS, layers, pkgprogress.ChanOutput(pchan)) + if err != nil { + return nil, err + } + + ref, err := p.is.CacheAccessor.Get(ctx, string(rootFS.ChainID()), cache.WithDescription(fmt.Sprintf("pulled from %s", p.ref))) + release() + if err != nil { + return nil, err + } + + return ref, nil +} + +// Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) +type layerDescriptor struct { + is *imageSource + fetcher remotes.Fetcher + desc ocispec.Descriptor + diffID layer.DiffID + ref ctdreference.Spec +} + +func (ld *layerDescriptor) Key() string { + return "v2:" + ld.desc.Digest.String() +} + +func (ld *layerDescriptor) ID() string { + return ld.desc.Digest.String() +} + +func (ld *layerDescriptor) DiffID() (layer.DiffID, error) { + return ld.diffID, nil +} + +func (ld *layerDescriptor) Download(ctx netcontext.Context, progressOutput pkgprogress.Output) (io.ReadCloser, int64, error) { + rc, err := ld.fetcher.Fetch(ctx, ld.desc) + if err != nil { + return nil, 0, err + } + defer rc.Close() + + // TODO: progress + if err := content.WriteBlob(ctx, ld.is.ContentStore, ld.desc.Digest.String(), rc, ld.desc.Size, ld.desc.Digest); err != nil { + return nil, 0, err + } + + ra, err := ld.is.ContentStore.ReaderAt(ctx, ld.desc.Digest) + if err != nil { + return nil, 0, err + } + + return ioutils.NewReadCloserWrapper(content.NewReader(ra), func() error { + return ld.is.ContentStore.Delete(context.TODO(), ld.desc.Digest) + }), ld.desc.Size, nil +} + +func (ld *layerDescriptor) Close() { + ld.is.ContentStore.Delete(context.TODO(), ld.desc.Digest) +} + +func (ld *layerDescriptor) Registered(diffID layer.DiffID) { + // Cache mapping from this layer's DiffID to the blobsum + ld.is.MetadataStore.Add(diffID, metadata.V2Metadata{Digest: ld.desc.Digest, SourceRepository: ld.ref.Locator}) +} + +func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) { + var ( + ticker = time.NewTicker(100 * time.Millisecond) + statuses = map[string]statusInfo{} + done bool + ) + defer ticker.Stop() + + pw, _, ctx := progress.FromContext(ctx) + defer pw.Close() + + for { + select { + case <-ticker.C: + case <-ctx.Done(): + done = true + } + + resolved := "resolved" + if !ongoing.isResolved() { + resolved = "resolving" + } + statuses[ongoing.name] = statusInfo{ + Ref: ongoing.name, + Status: resolved, + } + + actives := make(map[string]statusInfo) + + if !done { + active, err := cs.ListStatuses(ctx, "") + if err != nil { + // log.G(ctx).WithError(err).Error("active check failed") + continue + } + // update status of active entries! + for _, active := range active { + actives[active.Ref] = statusInfo{ + Ref: active.Ref, + Status: "downloading", + Offset: active.Offset, + Total: active.Total, + StartedAt: active.StartedAt, + UpdatedAt: active.UpdatedAt, + } + } + } + + // now, update the items in jobs that are not in active + for _, j := range ongoing.jobs() { + refKey := remotes.MakeRefKey(ctx, j.Descriptor) + if a, ok := actives[refKey]; ok { + started := j.started + pw.Write(j.Digest.String(), progress.Status{ + Action: a.Status, + Total: int(a.Total), + Current: int(a.Offset), + Started: &started, + }) + continue + } + + if !j.done { + info, err := cs.Info(context.TODO(), j.Digest) + if err != nil { + if errdefs.IsNotFound(err) { + pw.Write(j.Digest.String(), progress.Status{ + Action: "waiting", + }) + continue + } + } else { + j.done = true + } + + if done || j.done { + started := j.started + createdAt := info.CreatedAt + pw.Write(j.Digest.String(), progress.Status{ + Action: "done", + Current: int(info.Size), + Total: int(info.Size), + Completed: &createdAt, + Started: &started, + }) + } + } + } + if done { + return + } + } +} + +// jobs provides a way of identifying the download keys for a particular task +// encountering during the pull walk. +// +// This is very minimal and will probably be replaced with something more +// featured. +type jobs struct { + name string + added map[digest.Digest]job + mu sync.Mutex + resolved bool +} + +type job struct { + ocispec.Descriptor + done bool + started time.Time +} + +func newJobs(name string) *jobs { + return &jobs{ + name: name, + added: make(map[digest.Digest]job), + } +} + +func (j *jobs) add(desc ocispec.Descriptor) { + j.mu.Lock() + defer j.mu.Unlock() + + if _, ok := j.added[desc.Digest]; ok { + return + } + j.added[desc.Digest] = job{ + Descriptor: desc, + started: time.Now(), + } +} + +func (j *jobs) jobs() []job { + j.mu.Lock() + defer j.mu.Unlock() + + descs := make([]job, 0, len(j.added)) + for _, j := range j.added { + descs = append(descs, j) + } + return descs +} + +func (j *jobs) isResolved() bool { + j.mu.Lock() + defer j.mu.Unlock() + return j.resolved +} + +type statusInfo struct { + Ref string + Status string + Offset int64 + Total int64 + StartedAt time.Time + UpdatedAt time.Time +} + +func oneOffProgress(ctx context.Context, id string) func(err error) error { + pw, _, _ := progress.FromContext(ctx) + now := time.Now() + st := progress.Status{ + Started: &now, + } + pw.Write(id, st) + return func(err error) error { + // TODO: set error on status + now := time.Now() + st.Completed = &now + pw.Write(id, st) + pw.Close() + return err + } +} diff --git a/components/engine/builder/builder-next/exporter/export.go b/components/engine/builder/builder-next/exporter/export.go new file mode 100644 index 0000000000..efea1ebf8c --- /dev/null +++ b/components/engine/builder/builder-next/exporter/export.go @@ -0,0 +1,135 @@ +package containerimage + +import ( + "context" + "fmt" + + distref "github.com/docker/distribution/reference" + "github.com/docker/docker/image" + "github.com/docker/docker/layer" + "github.com/docker/docker/reference" + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/exporter" + digest "github.com/opencontainers/go-digest" + "github.com/sirupsen/logrus" +) + +const ( + keyImageName = "name" + exporterImageConfig = "containerimage.config" +) + +type Differ interface { + EnsureLayer(ctx context.Context, key string) ([]layer.DiffID, error) +} + +type Opt struct { + ImageStore image.Store + ReferenceStore reference.Store + Differ Differ +} + +type imageExporter struct { + opt Opt +} + +func New(opt Opt) (exporter.Exporter, error) { + im := &imageExporter{opt: opt} + return im, nil +} + +func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) { + i := &imageExporterInstance{imageExporter: e} + for k, v := range opt { + switch k { + case keyImageName: + ref, err := distref.ParseNormalizedNamed(v) + if err != nil { + return nil, err + } + i.targetName = ref + case exporterImageConfig: + i.config = []byte(v) + default: + logrus.Warnf("image exporter: unknown option %s", k) + } + } + return i, nil +} + +type imageExporterInstance struct { + *imageExporter + targetName distref.Named + config []byte +} + +func (e *imageExporterInstance) Name() string { + return "exporting to image" +} + +func (e *imageExporterInstance) Export(ctx context.Context, ref cache.ImmutableRef, opt map[string][]byte) error { + if config, ok := opt[exporterImageConfig]; ok { + e.config = config + } + config := e.config + + layersDone := oneOffProgress(ctx, "exporting layers") + + if err := ref.Finalize(ctx); err != nil { + return err + } + + diffIDs, err := e.opt.Differ.EnsureLayer(ctx, ref.ID()) + if err != nil { + return err + } + + diffs := make([]digest.Digest, len(diffIDs)) + for i := range diffIDs { + diffs[i] = digest.Digest(diffIDs[i]) + } + + layersDone(nil) + + if len(config) == 0 { + var err error + config, err = emptyImageConfig() + if err != nil { + return err + } + } + + history, err := parseHistoryFromConfig(config) + if err != nil { + return err + } + + diffs, history = normalizeLayersAndHistory(diffs, history, ref) + + config, err = patchImageConfig(config, diffs, history) + if err != nil { + return err + } + + configDigest := digest.FromBytes(config) + + configDone := oneOffProgress(ctx, fmt.Sprintf("writing image %s", configDigest)) + id, err := e.opt.ImageStore.Create(config) + if err != nil { + return configDone(err) + } + configDone(nil) + + if e.targetName != nil { + if e.opt.ReferenceStore != nil { + tagDone := oneOffProgress(ctx, "naming to "+e.targetName.String()) + + if err := e.opt.ReferenceStore.AddTag(e.targetName, digest.Digest(id), true); err != nil { + return tagDone(err) + } + tagDone(nil) + } + } + + return nil +} diff --git a/components/engine/builder/builder-next/exporter/writer.go b/components/engine/builder/builder-next/exporter/writer.go new file mode 100644 index 0000000000..dc6304573d --- /dev/null +++ b/components/engine/builder/builder-next/exporter/writer.go @@ -0,0 +1,166 @@ +package containerimage + +import ( + "context" + "encoding/json" + "runtime" + "strings" + "time" + + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/util/progress" + "github.com/moby/buildkit/util/system" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + emptyGZLayer = digest.Digest("sha256:4f4fb700ef54461cfa02571ae0db9a0dc1e0cdb5577484a6d75e68dc38e8acc1") +) + +func emptyImageConfig() ([]byte, error) { + img := ocispec.Image{ + Architecture: runtime.GOARCH, + OS: runtime.GOOS, + } + img.RootFS.Type = "layers" + img.Config.WorkingDir = "/" + img.Config.Env = []string{"PATH=" + system.DefaultPathEnv} + dt, err := json.Marshal(img) + return dt, errors.Wrap(err, "failed to create empty image config") +} + +func parseHistoryFromConfig(dt []byte) ([]ocispec.History, error) { + var config struct { + History []ocispec.History + } + if err := json.Unmarshal(dt, &config); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal history from config") + } + return config.History, nil +} + +func patchImageConfig(dt []byte, dps []digest.Digest, history []ocispec.History) ([]byte, error) { + m := map[string]json.RawMessage{} + if err := json.Unmarshal(dt, &m); err != nil { + return nil, errors.Wrap(err, "failed to parse image config for patch") + } + + var rootFS ocispec.RootFS + rootFS.Type = "layers" + for _, dp := range dps { + rootFS.DiffIDs = append(rootFS.DiffIDs, dp) + } + dt, err := json.Marshal(rootFS) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal rootfs") + } + m["rootfs"] = dt + + dt, err = json.Marshal(history) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal history") + } + m["history"] = dt + + now := time.Now() + dt, err = json.Marshal(&now) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal creation time") + } + m["created"] = dt + + dt, err = json.Marshal(m) + return dt, errors.Wrap(err, "failed to marshal config after patch") +} + +func normalizeLayersAndHistory(diffs []digest.Digest, history []ocispec.History, ref cache.ImmutableRef) ([]digest.Digest, []ocispec.History) { + var historyLayers int + for _, h := range history { + if !h.EmptyLayer { + historyLayers += 1 + } + } + if historyLayers > len(diffs) { + // this case shouldn't happen but if it does force set history layers empty + // from the bottom + logrus.Warn("invalid image config with unaccounted layers") + historyCopy := make([]ocispec.History, 0, len(history)) + var l int + for _, h := range history { + if l >= len(diffs) { + h.EmptyLayer = true + } + if !h.EmptyLayer { + l++ + } + historyCopy = append(historyCopy, h) + } + history = historyCopy + } + + if len(diffs) > historyLayers { + // some history items are missing. add them based on the ref metadata + for _, msg := range getRefDesciptions(ref, len(diffs)-historyLayers) { + tm := time.Now().UTC() + history = append(history, ocispec.History{ + Created: &tm, + CreatedBy: msg, + Comment: "buildkit.exporter.image.v0", + }) + } + } + + // var layerIndex int + // for i, h := range history { + // if !h.EmptyLayer { + // if diffs[layerIndex] == emptyGZLayer { // TODO: fixme + // h.EmptyLayer = true + // diffs = append(diffs[:layerIndex], diffs[layerIndex+1:]...) + // } else { + // layerIndex++ + // } + // } + // history[i] = h + // } + + return diffs, history +} + +func getRefDesciptions(ref cache.ImmutableRef, limit int) []string { + if limit <= 0 { + return nil + } + defaultMsg := "created by buildkit" // shouldn't happen but don't fail build + if ref == nil { + strings.Repeat(defaultMsg, limit) + } + descr := cache.GetDescription(ref.Metadata()) + if descr == "" { + descr = defaultMsg + } + p := ref.Parent() + if p != nil { + defer p.Release(context.TODO()) + } + return append(getRefDesciptions(p, limit-1), descr) +} + +func oneOffProgress(ctx context.Context, id string) func(err error) error { + pw, _, _ := progress.FromContext(ctx) + now := time.Now() + st := progress.Status{ + Started: &now, + } + pw.Write(id, st) + return func(err error) error { + // TODO: set error on status + now := time.Now() + st.Completed = &now + pw.Write(id, st) + pw.Close() + return err + } +} diff --git a/components/engine/builder/builder-next/snapshot/layer.go b/components/engine/builder/builder-next/snapshot/layer.go new file mode 100644 index 0000000000..5e1a2d1138 --- /dev/null +++ b/components/engine/builder/builder-next/snapshot/layer.go @@ -0,0 +1,116 @@ +package snapshot + +import ( + "context" + "os" + "path/filepath" + + "github.com/boltdb/bolt" + "github.com/docker/docker/layer" + "github.com/docker/docker/pkg/ioutils" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" +) + +func (s *snapshotter) EnsureLayer(ctx context.Context, key string) ([]layer.DiffID, error) { + if l, err := s.getLayer(key); err != nil { + return nil, err + } else if l != nil { + return getDiffChain(l), nil + } + + id, committed := s.getGraphDriverID(key) + if !committed { + return nil, errors.Errorf("can not convert active %s to layer", key) + } + + info, err := s.Stat(ctx, key) + if err != nil { + return nil, err + } + + eg, gctx := errgroup.WithContext(ctx) + + // TODO: add flightcontrol + + var parentChainID layer.ChainID + if info.Parent != "" { + eg.Go(func() error { + diffIDs, err := s.EnsureLayer(gctx, info.Parent) + if err != nil { + return err + } + parentChainID = layer.CreateChainID(diffIDs) + return nil + }) + } + + tmpDir, err := ioutils.TempDir("", "docker-tarsplit") + if err != nil { + return nil, err + } + defer os.RemoveAll(tmpDir) + tarSplitPath := filepath.Join(tmpDir, "tar-split") + + var diffID layer.DiffID + var size int64 + eg.Go(func() error { + parent := "" + if p := info.Parent; p != "" { + if l, err := s.getLayer(p); err != nil { + return err + } else if l != nil { + parent, err = getGraphID(l) + if err != nil { + return err + } + } else { + parent, _ = s.getGraphDriverID(info.Parent) + } + } + diffID, size, err = s.reg.ChecksumForGraphID(id, parent, "", tarSplitPath) + if err != nil { + return err + } + return nil + }) + + if err := eg.Wait(); err != nil { + return nil, err + } + + l, err := s.reg.RegisterByGraphID(id, parentChainID, diffID, tarSplitPath, size) + if err != nil { + return nil, err + } + + if err := s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(key)) + b.Put(keyChainID, []byte(l.ChainID())) + return nil + }); err != nil { + return nil, err + } + + s.mu.Lock() + s.refs[key] = l + s.mu.Unlock() + + return getDiffChain(l), nil +} + +func getDiffChain(l layer.Layer) []layer.DiffID { + if p := l.Parent(); p != nil { + return append(getDiffChain(p), l.DiffID()) + } + return []layer.DiffID{l.DiffID()} +} + +func getGraphID(l layer.Layer) (string, error) { + if l, ok := l.(interface { + CacheID() string + }); ok { + return l.CacheID(), nil + } + return "", errors.Errorf("couldn't access cacheID for %s", l.ChainID()) +} diff --git a/components/engine/builder/builder-next/snapshot/snapshot.go b/components/engine/builder/builder-next/snapshot/snapshot.go new file mode 100644 index 0000000000..0f201920a1 --- /dev/null +++ b/components/engine/builder/builder-next/snapshot/snapshot.go @@ -0,0 +1,391 @@ +package snapshot + +import ( + "context" + "path/filepath" + "strings" + "sync" + + "github.com/boltdb/bolt" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/snapshots" + "github.com/docker/docker/daemon/graphdriver" + "github.com/docker/docker/layer" + "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/snapshot" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var keyParent = []byte("parent") +var keyCommitted = []byte("committed") +var keyChainID = []byte("chainid") +var keySize = []byte("size") + +type Opt struct { + GraphDriver graphdriver.Driver + LayerStore layer.Store + Root string +} + +type graphIDRegistrar interface { + RegisterByGraphID(string, layer.ChainID, layer.DiffID, string, int64) (layer.Layer, error) + Release(layer.Layer) ([]layer.Metadata, error) + checksumCalculator +} + +type checksumCalculator interface { + ChecksumForGraphID(id, parent, oldTarDataPath, newTarDataPath string) (diffID layer.DiffID, size int64, err error) +} + +type snapshotter struct { + opt Opt + + refs map[string]layer.Layer + db *bolt.DB + mu sync.Mutex + reg graphIDRegistrar +} + +var _ snapshot.SnapshotterBase = &snapshotter{} + +func NewSnapshotter(opt Opt) (snapshot.SnapshotterBase, error) { + dbPath := filepath.Join(opt.Root, "snapshots.db") + db, err := bolt.Open(dbPath, 0600, nil) + if err != nil { + return nil, errors.Wrapf(err, "failed to open database file %s", dbPath) + } + + reg, ok := opt.LayerStore.(graphIDRegistrar) + if !ok { + return nil, errors.Errorf("layerstore doesn't support graphID registration") + } + + s := &snapshotter{ + opt: opt, + db: db, + refs: map[string]layer.Layer{}, + reg: reg, + } + return s, nil +} + +func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) error { + origParent := parent + if parent != "" { + if l, err := s.getLayer(parent); err != nil { + return err + } else if l != nil { + parent, err = getGraphID(l) + if err != nil { + return err + } + } else { + parent, _ = s.getGraphDriverID(parent) + } + } + if err := s.opt.GraphDriver.Create(key, parent, nil); err != nil { + return err + } + if err := s.db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(key)) + if err != nil { + return err + } + + if err := b.Put(keyParent, []byte(origParent)); err != nil { + return err + } + return nil + }); err != nil { + return err + } + return nil +} + +func (s *snapshotter) chainID(key string) (layer.ChainID, bool) { + if strings.HasPrefix(key, "sha256:") { + dgst, err := digest.Parse(key) + if err != nil { + return "", false + } + return layer.ChainID(dgst), true + } + return "", false +} + +func (s *snapshotter) getLayer(key string) (layer.Layer, error) { + s.mu.Lock() + l, ok := s.refs[key] + if !ok { + id, ok := s.chainID(key) + if !ok { + s.mu.Unlock() + return nil, nil + } + var err error + l, err = s.opt.LayerStore.Get(id) + if err != nil { + s.mu.Unlock() + return nil, err + } + s.refs[string(id)] = l + if err := s.db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte(id)) + return err + }); err != nil { + s.mu.Unlock() + return nil, err + } + } + s.mu.Unlock() + + return l, nil +} + +func (s *snapshotter) getGraphDriverID(key string) (string, bool) { + var gdID string + if err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(key)) + if b == nil { + return errors.Errorf("not found") // TODO: typed + } + v := b.Get(keyCommitted) + if v != nil { + gdID = string(v) + } + return nil + }); err != nil || gdID == "" { + return key, false + } + return gdID, true +} + +func (s *snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) { + if l, err := s.getLayer(key); err != nil { + return snapshots.Info{}, err + } else if l != nil { + var parentID string + if p := l.Parent(); p != nil { + parentID = p.ChainID().String() + } + info := snapshots.Info{ + Kind: snapshots.KindCommitted, + Name: key, + Parent: parentID, + } + return info, nil + } + + inf := snapshots.Info{ + Kind: snapshots.KindActive, + } + + id, committed := s.getGraphDriverID(key) + if committed { + inf.Kind = snapshots.KindCommitted + } + + if err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(id)) + if b == nil { + return errors.Errorf("not found") // TODO: typed + } + inf.Name = string(key) + v := b.Get(keyParent) + if v != nil { + inf.Parent = string(v) + } + return nil + }); err != nil { + return snapshots.Info{}, err + } + return inf, nil +} + +func (s *snapshotter) Mounts(ctx context.Context, key string) (snapshot.MountFactory, error) { + l, err := s.getLayer(key) + if err != nil { + return nil, err + } + if l != nil { + id := identity.NewID() + rwlayer, err := s.opt.LayerStore.CreateRWLayer(id, l.ChainID(), nil) + if err != nil { + return nil, err + } + rootfs, err := rwlayer.Mount("") + if err != nil { + return nil, err + } + mnt := []mount.Mount{{ + Source: rootfs.Path(), + Type: "bind", + Options: []string{"rbind"}, + }} + return &constMountFactory{ + mounts: mnt, + release: func() error { + _, err := s.opt.LayerStore.ReleaseRWLayer(rwlayer) + return err + }, + }, nil + } + + id, _ := s.getGraphDriverID(key) + + rootfs, err := s.opt.GraphDriver.Get(id, "") + if err != nil { + return nil, err + } + mnt := []mount.Mount{{ + Source: rootfs.Path(), + Type: "bind", + Options: []string{"rbind"}, + }} + return &constMountFactory{ + mounts: mnt, + release: func() error { + return s.opt.GraphDriver.Put(id) + }, + }, nil +} + +func (s *snapshotter) Remove(ctx context.Context, key string) error { + l, err := s.getLayer(key) + if err != nil { + return err + } + + var found bool + if err := s.db.Update(func(tx *bolt.Tx) error { + found = tx.Bucket([]byte(key)) != nil + if found { + id, _ := s.getGraphDriverID(key) + tx.DeleteBucket([]byte(key)) + if id != key { + tx.DeleteBucket([]byte(id)) + } + } + return nil + }); err != nil { + return err + } + + if l != nil { + _, err := s.opt.LayerStore.Release(l) + return err + } + + if !found { // this happens when removing views + return nil + } + + return s.opt.GraphDriver.Remove(key) +} + +func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error { + if err := s.db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(name)) + if err != nil { + return err + } + if err := b.Put(keyCommitted, []byte(key)); err != nil { + return err + } + return nil + }); err != nil { + return err + } + logrus.Debugf("committed %s as %s", name, key) + return nil +} + +func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) (snapshot.MountFactory, error) { + return s.Mounts(ctx, parent) +} + +func (s *snapshotter) Walk(ctx context.Context, fn func(context.Context, snapshots.Info) error) error { + allKeys := map[string]struct{}{} + commitedIDs := map[string]string{} + chainIDs := map[string]layer.ChainID{} + + if err := s.db.View(func(tx *bolt.Tx) error { + tx.ForEach(func(name []byte, b *bolt.Bucket) error { + allKeys[string(name)] = struct{}{} + v := b.Get(keyCommitted) + if v != nil { + commitedIDs[string(v)] = string(name) + } + + v = b.Get(keyChainID) + if v != nil { + chainIDs[string(name)] = layer.ChainID(v) + } + return nil + }) + return nil + }); err != nil { + return err + } + + for k := range allKeys { + if _, ok := commitedIDs[k]; ok { + continue + } + if chainID, ok := chainIDs[k]; ok { + s.mu.Lock() + if _, ok := s.refs[k]; !ok { + l, err := s.opt.LayerStore.Get(chainID) + if err != nil { + s.mu.Unlock() + return err + } + s.refs[k] = l + } + s.mu.Unlock() + } + + if _, err := s.getLayer(k); err != nil { + s.Remove(ctx, k) + continue + } + info, err := s.Stat(ctx, k) + if err != nil { + s.Remove(ctx, k) + continue + } + if err := fn(ctx, info); err != nil { + return err + } + } + + return nil +} + +func (s *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) { + // not implemented + return s.Stat(ctx, info.Name) +} + +func (s *snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) { + return snapshots.Usage{}, nil +} + +func (s *snapshotter) Close() error { + return s.db.Close() +} + +type constMountFactory struct { + mounts []mount.Mount + release func() error +} + +func (mf *constMountFactory) Mount() ([]mount.Mount, func() error, error) { + release := mf.release + if release == nil { + release = func() error { return nil } + } + return mf.mounts, release, nil +}