-
Notifications
You must be signed in to change notification settings - Fork 50
Add ability to sync by digests #2365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Renamed `include_tags` and `exclude_tags` to `includes` and `excludes` on the remote. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Added support for syncing manifests by digest through new `includes` field on the remote. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| import django.contrib.postgres.fields | ||
| from django.db import migrations, models | ||
|
|
||
|
|
||
| def migrate_to_includes_excludes(apps, schema_editor): | ||
| """Copy include_tags -> includes, exclude_tags -> excludes.""" | ||
| ContainerRemote = apps.get_model("container", "ContainerRemote") | ||
| remotes = [] | ||
| for remote in ContainerRemote.objects.only("include_tags", "exclude_tags").iterator(): | ||
| remote.includes = remote.include_tags or None | ||
| remote.excludes = remote.exclude_tags or None | ||
| remotes.append(remote) | ||
| if len(remotes) > 1000: | ||
| ContainerRemote.objects.bulk_update(remotes, fields=["includes", "excludes"]) | ||
| remotes = [] | ||
| if remotes: | ||
| ContainerRemote.objects.bulk_update(remotes, fields=["includes", "excludes"]) | ||
|
|
||
| def down_migrate_to_include_exclude_tags(apps, schema_editor): | ||
| """Copy includes + excludes -> include_tags + exclude_tags.""" | ||
| ContainerRemote = apps.get_model("container", "ContainerRemote") | ||
| remotes = [] | ||
| for remote in ContainerRemote.objects.only("includes", "excludes").iterator(): | ||
| remote.include_tags = remote.includes or None | ||
| remote.exclude_tags = remote.excludes or None | ||
| remotes.append(remote) | ||
| if len(remotes) > 1000: | ||
| ContainerRemote.objects.bulk_update(remotes, fields=["include_tags", "exclude_tags"]) | ||
| remotes = [] | ||
| if remotes: | ||
| ContainerRemote.objects.bulk_update(remotes, fields=["include_tags", "exclude_tags"]) | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ("container", "0047_containernamespace_pulp_labels"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AddField( | ||
| model_name="containerremote", | ||
| name="includes", | ||
| field=django.contrib.postgres.fields.ArrayField( | ||
| base_field=models.TextField(null=True), | ||
| null=True, | ||
| size=None, | ||
| ), | ||
| ), | ||
| migrations.AddField( | ||
| model_name="containerremote", | ||
| name="excludes", | ||
| field=django.contrib.postgres.fields.ArrayField( | ||
| base_field=models.TextField(null=True), | ||
| null=True, | ||
| size=None, | ||
| ), | ||
| ), | ||
| # 2. Copy existing data. | ||
| migrations.RunPython( | ||
| migrate_to_includes_excludes, | ||
| down_migrate_to_include_exclude_tags, | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| import hashlib | ||
| import json | ||
| import logging | ||
| from collections import defaultdict | ||
| from urllib.parse import urljoin, urlparse, urlunparse | ||
|
|
||
| import aiohttp | ||
|
|
@@ -62,7 +63,7 @@ def __init__(self, remote, signed_only): | |
| self.manifest_list_dcs = [] | ||
| self.manifest_dcs = [] | ||
| self.signature_dcs = [] | ||
| self._synced_digests = set() | ||
| self._synced_digests = defaultdict(list) | ||
| self._full_tag_list = [] | ||
| self._cosign_tags = [] | ||
|
|
||
|
|
@@ -78,20 +79,26 @@ async def _download_manifest_data(self, manifest_url): | |
|
|
||
| return content_data, raw_text_data, response | ||
|
|
||
| async def _check_for_existing_manifest(self, download_tag): | ||
| response = await download_tag | ||
| async def _check_for_existing_manifest(self, head_manifest_task): | ||
| response = await head_manifest_task | ||
|
|
||
| digest = response.headers.get("docker-content-digest") | ||
| url = response.url | ||
| original_reference = url.split("/")[-1] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the structure of this URL? |
||
|
|
||
| if manifest := await Manifest.objects.filter( | ||
| digest=digest, pulp_domain=get_domain() | ||
| ).afirst(): | ||
| raw_text_data = manifest.data | ||
| content_data = json.loads(raw_text_data) | ||
| else: | ||
| content_data, raw_text_data, response = await self._download_manifest_data(response.url) | ||
| if not original_reference.startswith("sha256:") and digest: | ||
| # Fetch the tag with its digest | ||
| url = url.rsplit(original_reference, 1)[0] + digest | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. URL structure also useful for understanding this |
||
|
|
||
| return content_data, raw_text_data, response | ||
| content_data, raw_text_data, response = await self._download_manifest_data(url) | ||
|
|
||
| return content_data, raw_text_data, response, original_reference | ||
|
|
||
| async def run(self): | ||
| """ | ||
|
|
@@ -105,31 +112,33 @@ async def run(self): | |
| repo_name = self.remote.namespaced_upstream_name | ||
| tag_list_url = "/v2/{name}/tags/list".format(name=repo_name) | ||
| self._full_tag_list = await self.get_paginated_tag_list(tag_list_url, repo_name) | ||
| self._cosign_tags = filter_resources( | ||
| self._full_tag_list, ["sha256-*"], self.remote.exclude_tags | ||
| ) | ||
| if self.remote.include_tags or self.remote.exclude_tags: | ||
| includes = self.remote.includes or [] | ||
| excludes = self.remote.excludes or [] | ||
|
|
||
| digest_includes = [i for i in includes if i.startswith("sha256:")] | ||
| self._cosign_tags = filter_resources(self._full_tag_list, ["sha256-*"], excludes) | ||
|
|
||
| if includes or excludes: | ||
| # Split sync into two parts, first all non-cosign tags, then cosign tags | ||
| exclude_tags_and_cosign = (self.remote.exclude_tags or []) + ["sha256-*"] | ||
| tag_list = filter_resources( | ||
| self._full_tag_list, self.remote.include_tags, exclude_tags_and_cosign | ||
| ) | ||
| exclude_and_cosign = excludes + ["sha256-*"] | ||
| filtered_tags = filter_resources(self._full_tag_list, includes, exclude_and_cosign) | ||
| manifest_list = filtered_tags + digest_includes | ||
| else: | ||
| tag_list = self._full_tag_list | ||
| manifest_list = self._full_tag_list | ||
| await pb.aincrement() | ||
|
|
||
| await self._process_tags(tag_list, signature_source) | ||
| await self._process_manifests(manifest_list, signature_source, "Processing Manifests") | ||
|
|
||
| if self.remote.include_tags or self.remote.exclude_tags: | ||
| # Process cosign companion tags after all non-cosign tags are synced | ||
| if includes or excludes: | ||
| # Process cosign companion tags after all primary content is synced | ||
| companion_tags = self._find_cosign_companion_tags() | ||
| if companion_tags: | ||
| log.info( | ||
| "Syncing %d cosign companion tag(s) for filtered images", | ||
| len(companion_tags), | ||
| ) | ||
| await self._process_tags( | ||
| companion_tags, signature_source, msg="Processing Cosign Companion Tags" | ||
| await self._process_manifests( | ||
| companion_tags, signature_source, "Processing Cosign Companion Tags" | ||
| ) | ||
|
|
||
| def _find_cosign_companion_tags(self): | ||
|
|
@@ -143,54 +152,53 @@ def _find_cosign_companion_tags(self): | |
| companion_tags.append(tag) | ||
| return companion_tags | ||
|
|
||
| async def _process_tags(self, tag_list, signature_source, msg="Processing Tags"): | ||
| """Download and process a batch of tags, creating declarative content objects.""" | ||
| async def _process_manifests(self, manifests, signature_source, msg): | ||
| """Download and process a batch of manifests, creating declarative content objects.""" | ||
| BATCH_SIZE = 500 | ||
| to_download = [] | ||
|
|
||
| for tag_name in tag_list: | ||
| relative_url = "/v2/{name}/manifests/{tag}".format( | ||
| name=self.remote.namespaced_upstream_name, tag=tag_name | ||
| ) | ||
| tag_url = urljoin(self.remote.url, relative_url) | ||
| downloader = self.remote.get_downloader(url=tag_url) | ||
| for reference in manifests: | ||
| relative_url = f"/v2/{self.remote.namespaced_upstream_name}/manifests/{reference}" | ||
| manifest_url = urljoin(self.remote.url, relative_url) | ||
| downloader = self.remote.get_downloader(url=manifest_url) | ||
| to_download.append( | ||
| downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"}) | ||
| ) | ||
|
|
||
| async with ProgressReport( | ||
| message=msg, | ||
| code="sync.processing.tag", | ||
| total=len(tag_list), | ||
| ) as pb_parsed_tags: | ||
| code="sync.processing.manifest", | ||
| total=len(manifests), | ||
| ) as pb_parsed_manifests: | ||
| to_download_artifact = [ | ||
| self._check_for_existing_manifest(download_tag) | ||
| for download_tag in asyncio.as_completed(to_download) | ||
| self._check_for_existing_manifest(download_manifest) | ||
| for download_manifest in asyncio.as_completed(to_download) | ||
| ] | ||
|
|
||
| for artifact in asyncio.as_completed(to_download_artifact): | ||
| content_data, raw_text_data, response = await artifact | ||
| content_data, raw_text_data, response, manifest_ref = await artifact | ||
|
|
||
| digest = calculate_digest(raw_text_data) | ||
| tag_name = response.url.split("/")[-1] | ||
| is_tag = not manifest_ref.startswith("sha256:") | ||
| media_type = determine_media_type(content_data, response) | ||
|
|
||
| if self.signed_only and not signature_source: | ||
| if not ( | ||
| self._is_cosign_companion_tag(tag_name, media_type, content_data) | ||
| self._is_cosign_companion_tag(manifest_ref, media_type, content_data) | ||
| or await self._has_cosign_signature(digest) | ||
| ): | ||
| log.info( | ||
| "The unsigned image {digest} can't be synced " | ||
| "due to a requirement to sync signed content " | ||
| "only.".format(digest=digest) | ||
| ) | ||
| await pb_parsed_tags.aincrement() | ||
| await pb_parsed_manifests.aincrement() | ||
| continue | ||
|
|
||
| validate_manifest(content_data, media_type, digest) | ||
|
|
||
| tag_dc = DeclarativeContent(Tag(name=tag_name)) | ||
| if is_tag: | ||
| tag_dc = DeclarativeContent(Tag(name=manifest_ref)) | ||
|
|
||
| if media_type in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI): | ||
| list_dc = self.create_manifest_list( | ||
|
|
@@ -214,7 +222,7 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags") | |
| "The whole manifest list is skipped.".format( | ||
| img_digest=man_dc.content.digest, | ||
| ml_digest=list_dc.content.digest, | ||
| tag=tag_name, | ||
| tag=manifest_ref, | ||
| ) | ||
| ) | ||
| break | ||
|
|
@@ -224,20 +232,23 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags") | |
| else: | ||
| # Manifest indices can be signed too. It is not mandatory. | ||
| # If signature is available mirror it. | ||
| self._synced_digests.add(digest) | ||
| self._synced_digests[digest].append(manifest_ref) | ||
| if signature_source is not None: | ||
| list_sig_dcs = await self.create_signatures(list_dc, signature_source) | ||
| if list_sig_dcs: | ||
| self.signature_dcs.extend(list_sig_dcs) | ||
| tag_dc.extra_data["tagged_manifest_dc"] = list_dc | ||
| for listed_manifest in list_dc.extra_data["listed_manifests"]: | ||
| self._synced_digests.add(listed_manifest["manifest_dc"].content.digest) | ||
| self._synced_digests[ | ||
| listed_manifest["manifest_dc"].content.digest | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: would be cleaner if assigned to a variable |
||
| ].append(manifest_ref) | ||
| await self.handle_blobs( | ||
| listed_manifest["manifest_dc"], listed_manifest["content_data"] | ||
| ) | ||
| self.manifest_dcs.append(listed_manifest["manifest_dc"]) | ||
| self.manifest_list_dcs.append(list_dc) | ||
| self.tag_dcs.append(tag_dc) | ||
| if is_tag: | ||
| tag_dc.extra_data["tagged_manifest_dc"] = list_dc | ||
| self.tag_dcs.append(tag_dc) | ||
|
|
||
| else: | ||
| # Simple tagged manifest | ||
|
|
@@ -249,14 +260,15 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags") | |
| if self.signed_only and not man_sig_dcs: | ||
| continue | ||
| self.signature_dcs.extend(man_sig_dcs) | ||
| self._synced_digests.add(digest) | ||
| tag_dc.extra_data["tagged_manifest_dc"] = man_dc | ||
| self._synced_digests[digest].append(manifest_ref) | ||
| await self.handle_blobs(man_dc, content_data) | ||
| self.tag_dcs.append(tag_dc) | ||
| if is_tag: | ||
| tag_dc.extra_data["tagged_manifest_dc"] = man_dc | ||
| self.tag_dcs.append(tag_dc) | ||
| self.manifest_dcs.append(man_dc) | ||
|
|
||
| # Count the skipped tasks as parsed too. | ||
| await pb_parsed_tags.aincrement() | ||
| await pb_parsed_manifests.aincrement() | ||
|
|
||
| # Flush the queues to prevent overly excessive memory usage. | ||
| # This will cap the number of in flight high level objects to about BATCH_SIZE. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstring