From 1f4175ede876ab75de7130956c11d3663da14e14 Mon Sep 17 00:00:00 2001 From: Ian Calvert Date: Wed, 17 Dec 2014 22:17:14 +0000 Subject: [PATCH 1/3] Run the remote image presence checks in parallel Signed-off-by: Ian Calvert Upstream-commit: 90c5ff4f06b8672cd2b427d120a8988bfc9f1de7 Component: engine --- components/engine/graph/push.go | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/components/engine/graph/push.go b/components/engine/graph/push.go index 09e13a5cff..6d84be6edd 100644 --- a/components/engine/graph/push.go +++ b/components/engine/graph/push.go @@ -113,18 +113,35 @@ func (s *TagStore) pushRepository(r *registry.Session, out io.Writer, repoInfo * if tag == "" { nTag = len(localRepo) } + completed := make(chan bool) + needsPush := make([]bool, len(imgList)) for _, ep := range repoData.Endpoints { out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag)) - for _, imgId := range imgList { - if err := r.LookupRemoteImage(imgId, ep, repoData.Tokens); err != nil { - log.Errorf("Error in LookupRemoteImage: %s", err) - if _, err := s.pushImage(r, out, imgId, ep, repoData.Tokens, sf); err != nil { + + for i, imgId := range imgList { + go func(i int, imgId string) { + if err := r.LookupRemoteImage(imgId, ep, repoData.Tokens); err == nil { + out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId))) + needsPush[i] = false + } else { + log.Errorf("Error in LookupRemoteImage: %s", err) + out.Write(sf.FormatStatus("", "Image %s not pushed, adding to queue", utils.TruncateID(imgId))) + needsPush[i] = true + } + completed <- true + }(i, imgId) + } + for i := 0; i < len(imgList); i++ { + <-completed + } + for i, imgId := range imgList { + if needsPush[i] { + if _, err := s.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil { // FIXME: Continue on error? return err } - } else { - out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId))) } + for _, tag := range tagsByImage[imgId] { out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+repoInfo.RemoteName+"/tags/"+tag)) From 0f5c9b554c8da8a2d8890917bf7ffe7d3fd58917 Mon Sep 17 00:00:00 2001 From: Ian Calvert Date: Thu, 18 Dec 2014 09:38:31 +0000 Subject: [PATCH 2/3] Replace custom waiting code with a WaitGroup Signed-off-by: Ian Calvert Upstream-commit: 33286589291a56fe0a4728f8b45ad3feb5494226 Component: engine --- components/engine/graph/push.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/components/engine/graph/push.go b/components/engine/graph/push.go index 6d84be6edd..1dc08c6f9d 100644 --- a/components/engine/graph/push.go +++ b/components/engine/graph/push.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "os" "path" + "sync" log "github.com/Sirupsen/logrus" "github.com/docker/docker/engine" @@ -113,13 +114,15 @@ func (s *TagStore) pushRepository(r *registry.Session, out io.Writer, repoInfo * if tag == "" { nTag = len(localRepo) } - completed := make(chan bool) + var wg sync.WaitGroup needsPush := make([]bool, len(imgList)) for _, ep := range repoData.Endpoints { out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag)) for i, imgId := range imgList { + wg.Add(1) go func(i int, imgId string) { + defer wg.Done() if err := r.LookupRemoteImage(imgId, ep, repoData.Tokens); err == nil { out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId))) needsPush[i] = false @@ -128,12 +131,11 @@ func (s *TagStore) pushRepository(r *registry.Session, out io.Writer, repoInfo * out.Write(sf.FormatStatus("", "Image %s not pushed, adding to queue", utils.TruncateID(imgId))) needsPush[i] = true } - completed <- true }(i, imgId) } - for i := 0; i < len(imgList); i++ { - <-completed - } + + wg.Wait() + for i, imgId := range imgList { if needsPush[i] { if _, err := s.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil { From dc1c238ed61b59c3ea0ea1ae2285b76daeac3d13 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 13 Jan 2015 15:24:51 -0800 Subject: [PATCH 3/3] Use channels and set workers to make image lookup safe This refactors the starting work by the prior commits to make this safe for access. A maximum of 5 worker go routines are started to lookup images on the endpoint. Another go routine consumes the images that are required to be pushed into a map for quick lookups. The map is required because the pushing of the image json and layer have to be done in the correct order or the registry will explode in fire. Signed-off-by: Michael Crosby Upstream-commit: 476cf1b906ca14cc3ce1347f1aa479713fa6ce6e Component: engine --- components/engine/graph/push.go | 184 ++++++++++++++++++++------------ 1 file changed, 113 insertions(+), 71 deletions(-) diff --git a/components/engine/graph/push.go b/components/engine/graph/push.go index 1dc08c6f9d..0ec81a5152 100644 --- a/components/engine/graph/push.go +++ b/components/engine/graph/push.go @@ -62,103 +62,145 @@ func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string return imageList, tagsByImage, nil } -func (s *TagStore) pushRepository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, localRepo map[string]string, tag string, sf *utils.StreamFormatter) error { - out = utils.NewWriteFlusher(out) - log.Debugf("Local repo: %s", localRepo) - imgList, tagsByImage, err := s.getImageList(localRepo, tag) - if err != nil { - return err - } - - out.Write(sf.FormatStatus("", "Sending image list")) - - var ( - repoData *registry.RepositoryData - imageIndex []*registry.ImgData - ) - - for _, imgId := range imgList { - if tags, exists := tagsByImage[imgId]; exists { +// createImageIndex returns an index of an image's layer IDs and tags. +func (s *TagStore) createImageIndex(images []string, tags map[string][]string) []*registry.ImgData { + var imageIndex []*registry.ImgData + for _, id := range images { + if tags, hasTags := tags[id]; hasTags { // If an image has tags you must add an entry in the image index // for each tag for _, tag := range tags { imageIndex = append(imageIndex, ®istry.ImgData{ - ID: imgId, + ID: id, Tag: tag, }) } - } else { - // If the image does not have a tag it still needs to be sent to the - // registry with an empty tag so that it is accociated with the repository - imageIndex = append(imageIndex, ®istry.ImgData{ - ID: imgId, - Tag: "", - }) + continue + } + // If the image does not have a tag it still needs to be sent to the + // registry with an empty tag so that it is accociated with the repository + imageIndex = append(imageIndex, ®istry.ImgData{ + ID: id, + Tag: "", + }) + } + return imageIndex +} +type imagePushData struct { + id string + endpoint string + tokens []string +} + +// lookupImageOnEndpoint checks the specified endpoint to see if an image exists +// and if it is absent then it sends the image id to the channel to be pushed. +func lookupImageOnEndpoint(wg *sync.WaitGroup, r *registry.Session, out io.Writer, sf *utils.StreamFormatter, + images chan imagePushData, imagesToPush chan string) { + defer wg.Done() + for image := range images { + if err := r.LookupRemoteImage(image.id, image.endpoint, image.tokens); err != nil { + log.Errorf("Error in LookupRemoteImage: %s", err) + imagesToPush <- image.id + continue + } + out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(image.id))) + } +} + +func (s *TagStore) pushImageToEndpoint(endpoint string, out io.Writer, remoteName string, imageIDs []string, + tags map[string][]string, repo *registry.RepositoryData, sf *utils.StreamFormatter, r *registry.Session) error { + workerCount := len(imageIDs) + // start a maximum of 5 workers to check if images exist on the specified endpoint. + if workerCount > 5 { + workerCount = 5 + } + var ( + wg = &sync.WaitGroup{} + imageData = make(chan imagePushData, workerCount*2) + imagesToPush = make(chan string, workerCount*2) + pushes = make(chan map[string]struct{}, 1) + ) + for i := 0; i < workerCount; i++ { + wg.Add(1) + go lookupImageOnEndpoint(wg, r, out, sf, imageData, imagesToPush) + } + // start a go routine that consumes the images to push + go func() { + shouldPush := make(map[string]struct{}) + for id := range imagesToPush { + shouldPush[id] = struct{}{} + } + pushes <- shouldPush + }() + for _, id := range imageIDs { + imageData <- imagePushData{ + id: id, + endpoint: endpoint, + tokens: repo.Tokens, } } + // close the channel to notify the workers that there will be no more images to check. + close(imageData) + wg.Wait() + close(imagesToPush) + // wait for all the images that require pushes to be collected into a consumable map. + shouldPush := <-pushes + // finish by pushing any images and tags to the endpoint. The order that the images are pushed + // is very important that is why we are still itterating over the ordered list of imageIDs. + for _, id := range imageIDs { + if _, push := shouldPush[id]; push { + if _, err := s.pushImage(r, out, id, endpoint, repo.Tokens, sf); err != nil { + // FIXME: Continue on error? + return err + } + } + for _, tag := range tags[id] { + out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(id), endpoint+"repositories/"+remoteName+"/tags/"+tag)) + if err := r.PushRegistryTag(remoteName, id, tag, endpoint, repo.Tokens); err != nil { + return err + } + } + } + return nil +} +// pushRepository pushes layers that do not already exist on the registry. +func (s *TagStore) pushRepository(r *registry.Session, out io.Writer, + repoInfo *registry.RepositoryInfo, localRepo map[string]string, + tag string, sf *utils.StreamFormatter) error { + log.Debugf("Local repo: %s", localRepo) + out = utils.NewWriteFlusher(out) + imgList, tags, err := s.getImageList(localRepo, tag) + if err != nil { + return err + } + out.Write(sf.FormatStatus("", "Sending image list")) + + imageIndex := s.createImageIndex(imgList, tags) log.Debugf("Preparing to push %s with the following images and tags", localRepo) for _, data := range imageIndex { log.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag) } - // Register all the images in a repository with the registry // If an image is not in this list it will not be associated with the repository - repoData, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil) + repoData, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil) if err != nil { return err } - nTag := 1 if tag == "" { nTag = len(localRepo) } - var wg sync.WaitGroup - needsPush := make([]bool, len(imgList)) - for _, ep := range repoData.Endpoints { - out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag)) - - for i, imgId := range imgList { - wg.Add(1) - go func(i int, imgId string) { - defer wg.Done() - if err := r.LookupRemoteImage(imgId, ep, repoData.Tokens); err == nil { - out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId))) - needsPush[i] = false - } else { - log.Errorf("Error in LookupRemoteImage: %s", err) - out.Write(sf.FormatStatus("", "Image %s not pushed, adding to queue", utils.TruncateID(imgId))) - needsPush[i] = true - } - }(i, imgId) - } - - wg.Wait() - - for i, imgId := range imgList { - if needsPush[i] { - if _, err := s.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil { - // FIXME: Continue on error? - return err - } - } - - for _, tag := range tagsByImage[imgId] { - out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+repoInfo.RemoteName+"/tags/"+tag)) - - if err := r.PushRegistryTag(repoInfo.RemoteName, imgId, tag, ep, repoData.Tokens); err != nil { - return err - } - } + out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag)) + // push the repository to each of the endpoints only if it does not exist. + for _, endpoint := range repoData.Endpoints { + if err := s.pushImageToEndpoint(endpoint, out, repoInfo.RemoteName, imgList, tags, repoData, sf, r); err != nil { + return err } } - - if _, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints); err != nil { - return err - } - - return nil + _, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints) + return err } func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {