Skip to content
Commits on Source (5)
# [3.42.0](https://gitlab.com/gitlab-org/container-registry/compare/v3.41.1-gitlab...v3.42.0-gitlab) (2022-05-18)
### Bug Fixes
* restore manifest push (by tag) and tag delete webhook notifications ([e6a7984](https://gitlab.com/gitlab-org/container-registry/commit/e6a7984a6773fb138efe3a17d744c958249661ea))
### Features
* **storage:** improve clarity of offline garbage collection log output ([8b6129a](https://gitlab.com/gitlab-org/container-registry/commit/8b6129a3ca9fe81425a29540866ece795637ea05))
## [3.41.1](https://gitlab.com/gitlab-org/container-registry/compare/v3.41.0-gitlab...v3.41.1-gitlab) (2022-05-13)
......
......@@ -1111,11 +1111,15 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
ctx.Errors = append(ctx.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
// This is required as part of a partial/temporary mitigation for
// https://gitlab.com/gitlab-org/container-registry/-/issues/682.
ctx.eventBridge = app.eventBridge(ctx, r)
// assign and decorate the authorized repository with an event bridge.
ctx.Repository, ctx.RepositoryRemover = notifications.Listen(
repository,
ctx.App.repoRemover,
app.eventBridge(ctx, r),
ctx.eventBridge,
app.Config.Migration.DisableMirrorFS)
ctx.Repository, err = applyRepoMiddleware(app, ctx.Repository, app.Config.Middleware["repository"])
......
......@@ -7,6 +7,7 @@ import (
"github.com/docker/distribution"
dcontext "github.com/docker/distribution/context"
"github.com/docker/distribution/notifications"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/api/urls"
"github.com/docker/distribution/registry/auth"
......@@ -46,6 +47,10 @@ type Context struct {
// TODO(stevvooe): The goal is too completely factor this context and
// dispatching out of the web application. Ideally, we should lean on
// context.Context for injection of these resources.
// This is required as part of a partial/temporary mitigation for
// https://gitlab.com/gitlab-org/container-registry/-/issues/682.
eventBridge notifications.Listener
}
// Value overrides context.Context.Value to ensure that calls are routed to
......
......@@ -712,6 +712,15 @@ func (imh *manifestHandler) PutManifest(w http.ResponseWriter, r *http.Request)
imh.appendPutError(err)
return
}
// This is a partial temporary mitigation for https://gitlab.com/gitlab-org/container-registry/-/issues/682.
// It restores the manifest push (by tag) notifications for repositories on the new code path. Restoring this
// particular event is a top priority as it is needed to trigger usage calculations on the Rails side. We will
// need to rework the notification mechanism as a whole and restore all event notifications later on.
if _, ok := manifestWriter.(*dbManifestWriter); ok && !imh.writeFSMetadata {
if err := imh.eventBridge.ManifestPushed(imh.Repository.Named(), manifest, distribution.WithTagOption{Tag: imh.Tag}); err != nil {
l.WithError(err).Error("dispatching manifest push to listener")
}
}
}
// Construct a canonical url for the uploaded manifest.
......
......@@ -229,7 +229,8 @@ func dbDeleteTag(ctx context.Context, db datastore.Handler, repoPath string, tag
// DeleteTag deletes a tag for a specific image name.
func (th *tagHandler) DeleteTag(w http.ResponseWriter, r *http.Request) {
log.GetLogger(log.WithContext(th)).Debug("DeleteTag")
l := log.GetLogger(log.WithContext(th))
l.Debug("DeleteTag")
if th.App.isCache {
th.Errors = append(th.Errors, errcode.ErrorCodeUnsupported)
......@@ -249,6 +250,15 @@ func (th *tagHandler) DeleteTag(w http.ResponseWriter, r *http.Request) {
th.appendDeleteTagError(err)
return
}
// This is a partial temporary mitigation for https://gitlab.com/gitlab-org/container-registry/-/issues/682.
// It restores the tag delete notifications for repositories on the new code path. Restoring this particular
// event is a top priority as it is needed to trigger usage calculations on the Rails side. We will need to
// rework the notification mechanism as a whole and restore all event notifications later on.
if !th.writeFSMetadata {
if err := th.eventBridge.TagDeleted(th.Repository.Named(), th.Tag); err != nil {
l.WithError(err).Error("dispatching tag delete to listener")
}
}
}
w.WriteHeader(http.StatusAccepted)
......
......@@ -95,13 +95,19 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
// mark
markStart := time.Now()
log.GetLogger(log.WithContext(ctx)).Info("starting mark stage")
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"stage": "mark",
"driver": storageDriver.Name(),
"dry_run": opts.DryRun,
})
l.Info("starting mark stage")
markSet := newSyncDigestSet()
manifestArr := syncManifestDelContainer{sync.Mutex{}, make([]ManifestDel, 0)}
err := repositoryEnumerator.Enumerate(ctx, func(repoName string) error {
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"repo": repoName}).Info("marking repository")
rLog := l.WithFields(log.Fields{"repository": repoName})
rLog.Info("marking repository")
taggedManifests := newSyncDigestSet()
unTaggedManifests := newSyncDigestSet()
......@@ -140,7 +146,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
// and allows us to report the number of primed tags.
if opts.RemoveUntagged {
primeStart := time.Now()
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"repo": repoName}).Info("priming tags cache")
rLog.Info("priming tags cache")
allTags, err := cachedTagStore.All(ctx)
if err != nil {
......@@ -152,8 +158,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
}
}
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"repo": repoName,
rLog.WithFields(log.Fields{
"tags_primed": len(allTags),
"duration_s": time.Since(primeStart).Seconds(),
}).Info("tags cache primed")
......@@ -170,6 +175,11 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
unTaggedManifests.add(dgst)
return nil
}
rLog.WithFields(log.Fields{
"referenced_by": "tag",
"tag_count": len(tags),
"digest": dgst,
}).Info("marking manifest metadata for repository")
taggedManifests.add(dgst)
return nil
}
......@@ -213,12 +223,11 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
<-semaphore
}()
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
rLog.WithFields(log.Fields{
"referenced_by": "tag",
"digest_type": "manifest",
"digest": manifestDigest,
"repository": repoName,
}).Info("marking manifest")
}).Info("marking blob")
markSet.add(manifestDigest)
manifest, err := manifestService.Get(ctx, manifestDigest)
......@@ -238,13 +247,13 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
// Normal manifest list with only manifest references, add these
// to the set of referenced manifests.
for _, r := range splitRef.Manifests {
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
rLog.WithFields(log.Fields{
"digest_type": "manifest",
"referenced_by": "manifest_list",
"digest": r.Digest,
"mediatype": r.MediaType,
"parent_digest": manifestDigest,
"repository": repoName,
}).Info("marking manifest list reference")
}).Info("marking manifest")
referencedManifests.add(r.Digest)
}
......@@ -256,33 +265,33 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
if err != nil {
return fmt.Errorf("retrieving tags for digest %v: %w", manifestDigest, err)
}
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"mediatype": manifestList.Versioned.MediaType,
"digest": manifestDigest,
"tags": tags,
"repository": repoName,
rLog.WithFields(log.Fields{
"mediatype": manifestList.Versioned.MediaType,
"digest": manifestDigest,
"tags": tags,
}).Warn("nonconformant manifest list with layer references, please report this to GitLab")
}
// Mark the manifest list layer references as normal blobs.
for _, r := range splitRef.Blobs {
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
rLog.WithFields(log.Fields{
"digest_type": "layer",
"referenced_by": "manifest_list",
"digest": r.Digest,
"mediatype": r.MediaType,
"parent_digest": manifestDigest,
"repository": repoName,
}).Info("marking manifest list layer reference")
}).Info("marking blob")
markSet.add(r.Digest)
}
} else {
for _, descriptor := range manifest.References() {
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"referenced_by": "tag",
rLog.WithFields(log.Fields{
"referenced_by": "manifest",
"digest_type": "layer",
"digest": descriptor.Digest,
"repository": repoName,
}).Info("marking manifest")
"mediatype": descriptor.MediaType,
"parent_digest": manifestDigest,
}).Info("marking blob")
markSet.add(descriptor.Digest)
}
}
......@@ -299,7 +308,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
continue
}
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"digest": dgst}).Info("manifest eligible for deletion")
rLog.WithFields(log.Fields{"digest": dgst}).Info("manifest metadata will be deleted from repository")
// Fetch all tags from repository: all of these tags could contain the
// manifest in history which means that we need check (and delete) those
// references when deleting the manifest.
......@@ -323,10 +332,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
refType = "manifest_list"
}
markLog := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"repository": repoName,
"referenced_by": refType,
})
markLog := rLog.WithFields(log.Fields{"referenced_by": refType})
g, ctx := errgroup.WithContext(ctx)
......@@ -340,10 +346,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
}()
// Mark the manifest's blob
markLog.WithFields(log.Fields{
"digest_type": "manifest",
"digest": d,
}).Info("marking manifest")
markLog.WithFields(log.Fields{"digest_type": "manifest", "digest": d}).Info("marking blob")
markSet.add(d)
manifest, err := manifestService.Get(ctx, d)
......@@ -358,10 +361,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
}
for _, descriptor := range manifest.References() {
markLog.WithFields(log.Fields{
"digest_type": "layer",
"digest": descriptor.Digest,
}).Info("marking manifest")
markLog.WithFields(log.Fields{"digest_type": "layer", "digest": descriptor.Digest}).Info("marking blob")
markSet.add(descriptor.Digest)
}
......@@ -383,7 +383,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
blobService := registry.Blobs()
deleteSet := newSyncDigestSet()
log.GetLogger(log.WithContext(ctx)).Info("finding blobs eligible for deletion. This may take some time...")
l.Info("finding blobs eligible for deletion. This may take some time...")
sizeChan := make(chan int64)
sizeDone := make(chan struct{})
......@@ -398,10 +398,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
err = blobService.Enumerate(ctx, func(desc distribution.Descriptor) error {
// check if digest is in markSet. If not, delete it!
if !markSet.contains(desc.Digest) {
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"digest": desc.Digest,
"size_bytes": desc.Size,
}).Info("blob eligible for deletion")
l.WithFields(log.Fields{"digest": desc.Digest, "size_bytes": desc.Size}).Info("blob eligible for deletion")
sizeChan <- desc.Size
deleteSet.add(desc.Digest)
......@@ -415,7 +412,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
close(sizeChan)
<-sizeDone
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
l.WithFields(log.Fields{
"blobs_marked": markSet.len(),
"blobs_to_delete": deleteSet.len(),
"manifests_to_delete": len(manifestArr.manifestDels),
......@@ -428,7 +425,8 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
return nil
}
sweepStart := time.Now()
log.GetLogger(log.WithContext(ctx)).Info("starting sweep stage")
l = l.WithFields(log.Fields{"stage": "sweep"})
l.Info("starting sweep stage")
vacuum := NewVacuum(storageDriver)
......@@ -451,7 +449,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
return fmt.Errorf("deleting blobs: %w", err)
}
}
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"duration_s": time.Since(sweepStart).Seconds()}).Info("sweep stage complete")
l.WithFields(log.Fields{"duration_s": time.Since(sweepStart).Seconds()}).Info("sweep stage complete")
return err
}
......@@ -173,7 +173,7 @@ func (v Vacuum) RemoveManifests(ctx context.Context, mm []ManifestDel) error {
log.WithFields(logrus.Fields{
"batch_count": totalBatches,
"batch_max_size": maxBatchSize,
}).Info("deleting manifests in batches")
}).Info("deleting manifest metadata in batches")
batchNo := 0
for i := 0; i < totalToDelete; i += maxBatchSize {
......@@ -191,7 +191,7 @@ func (v Vacuum) RemoveManifests(ctx context.Context, mm []ManifestDel) error {
log.WithFields(logrus.Fields{
"duration_s": time.Since(start).Seconds(),
}).Info("manifests deleted")
}).Info("manifest metadata deleted")
return nil
}
......