diff --git a/CHANGES/1188.bugfix b/CHANGES/1188.bugfix new file mode 100644 index 00000000..4d4e0a88 --- /dev/null +++ b/CHANGES/1188.bugfix @@ -0,0 +1 @@ +Fixed "Worker has gone missing" errors during repair_metadata on large repositories (1000+ packages) by reducing peak memory consumption. diff --git a/pulp_python/app/tasks/repair.py b/pulp_python/app/tasks/repair.py index 514e770a..fb4d7acf 100644 --- a/pulp_python/app/tasks/repair.py +++ b/pulp_python/app/tasks/repair.py @@ -1,4 +1,5 @@ import logging +import os from collections import defaultdict from gettext import gettext as _ from itertools import groupby @@ -8,18 +9,20 @@ from django.db.models.query import QuerySet from pulp_python.app.models import PythonPackageContent, PythonRepository from pulp_python.app.utils import ( - artifact_to_metadata_artifact, artifact_to_python_content_data, + copy_artifact_to_temp_file, + extract_wheel_metadata, fetch_json_release_metadata, + metadata_content_to_artifact, parse_metadata, ) -from pulpcore.plugin.models import Artifact, ContentArtifact, ProgressReport +from pulpcore.plugin.models import ContentArtifact, ProgressReport from pulpcore.plugin.util import get_domain log = logging.getLogger(__name__) -BULK_SIZE = 1000 +BULK_SIZE = 250 def repair(repository_pk: UUID) -> None: @@ -118,11 +121,21 @@ def repair_metadata(content: QuerySet[PythonPackageContent]) -> tuple[int, set[s .first() .artifact ) - new_data = artifact_to_python_content_data(package.filename, main_artifact, domain) + # Copy artifact to temp file once, extract both content data and metadata + temp_path = copy_artifact_to_temp_file(main_artifact, package.filename) + try: + new_data = artifact_to_python_content_data( + package.filename, main_artifact, domain, temp_path=temp_path + ) + metadata_content = ( + extract_wheel_metadata(temp_path) if package.filename.endswith(".whl") else None + ) + finally: + os.unlink(temp_path) total_metadata_repaired += update_metadata_artifact_if_needed( package, new_data.get("metadata_sha256"), - main_artifact, + metadata_content, metadata_batch, pkgs_metadata_not_repaired, ) @@ -236,7 +249,7 @@ def update_package_if_needed( def update_metadata_artifact_if_needed( package: PythonPackageContent, new_metadata_sha256: str | None, - main_artifact: Artifact, + metadata_content: bytes | None, metadata_batch: list[tuple], pkgs_metadata_not_repaired: set[str], ) -> int: @@ -248,7 +261,7 @@ def update_metadata_artifact_if_needed( Args: package: Package to check for metadata changes. new_metadata_sha256: The correct metadata_sha256 extracted from the main artifact, or None. - main_artifact: The main package artifact used to generate metadata. + metadata_content: Raw metadata bytes extracted from the wheel, or None. metadata_batch: List of tuples for batch processing (updated in-place). pkgs_metadata_not_repaired: Set of package PKs that failed repair (updated in-place). @@ -265,13 +278,13 @@ def update_metadata_artifact_if_needed( # Create missing if not cas: - metadata_batch.append((package, main_artifact)) + metadata_batch.append((package, metadata_content)) # Fix existing elif new_metadata_sha256 != original_metadata_sha256: ca = cas.first() metadata_artifact = ca.artifact if metadata_artifact is None or (metadata_artifact.sha256 != new_metadata_sha256): - metadata_batch.append((package, main_artifact)) + metadata_batch.append((package, metadata_content)) if len(metadata_batch) == BULK_SIZE: not_repaired = _process_metadata_batch(metadata_batch) @@ -288,7 +301,7 @@ def _process_metadata_batch(metadata_batch: list[tuple]) -> set[str]: and their corresponding ContentArtifacts. Args: - metadata_batch: List of (package, main_artifact) tuples. + metadata_batch: List of (package, metadata_content) tuples. Returns: Set of package PKs for which metadata artifacts could not be created. @@ -296,8 +309,8 @@ def _process_metadata_batch(metadata_batch: list[tuple]) -> set[str]: not_repaired = set() content_artifacts = [] - for package, main_artifact in metadata_batch: - metadata_artifact = artifact_to_metadata_artifact(package.filename, main_artifact) + for package, metadata_content in metadata_batch: + metadata_artifact = metadata_content_to_artifact(metadata_content) if metadata_artifact: ca = ContentArtifact( artifact=metadata_artifact, diff --git a/pulp_python/app/utils.py b/pulp_python/app/utils.py index 133a6a92..9c4eb15c 100644 --- a/pulp_python/app/utils.py +++ b/pulp_python/app/utils.py @@ -240,18 +240,37 @@ def compute_metadata_sha256(filename: str) -> str | None: return hashlib.sha256(metadata_content).hexdigest() if metadata_content else None -def artifact_to_python_content_data(filename, artifact, domain=None): +def copy_artifact_to_temp_file(artifact, filename, tmp_dir="."): + """ + Copy an artifact's file to a temporary file on disk. + + Returns the path to the temp file. The caller is responsible for cleanup. + """ + temp_file = tempfile.NamedTemporaryFile("wb", dir=tmp_dir, suffix=filename, delete=False) + artifact.file.seek(0) + shutil.copyfileobj(artifact.file, temp_file) + temp_file.flush() + temp_file.close() + return temp_file.name + + +def artifact_to_python_content_data(filename, artifact, domain=None, temp_path=None): """ Takes the artifact/filename and returns the metadata needed to create a PythonPackageContent. + + If temp_path is provided, uses it instead of copying the artifact to a new temp file. """ # Copy file to a temp directory under the user provided filename, we do this # because pkginfo validates that the filename has a valid extension before # reading it - with tempfile.NamedTemporaryFile("wb", dir=".", suffix=filename) as temp_file: - artifact.file.seek(0) - shutil.copyfileobj(artifact.file, temp_file) - temp_file.flush() - metadata = get_project_metadata_from_file(temp_file.name) + if temp_path: + metadata = get_project_metadata_from_file(temp_path) + else: + with tempfile.NamedTemporaryFile("wb", dir=".", suffix=filename) as temp_file: + artifact.file.seek(0) + shutil.copyfileobj(artifact.file, temp_file) + temp_file.flush() + metadata = get_project_metadata_from_file(temp_file.name) data = parse_project_metadata(vars(metadata)) data["sha256"] = artifact.sha256 data["size"] = artifact.size @@ -280,6 +299,16 @@ def artifact_to_metadata_artifact( if not metadata_content: return None + return metadata_content_to_artifact(metadata_content, tmp_dir) + + +def metadata_content_to_artifact(metadata_content: bytes, tmp_dir: str = ".") -> Artifact | None: + """ + Creates an Artifact from raw metadata content bytes. + """ + if not metadata_content: + return None + with tempfile.NamedTemporaryFile( "wb", dir=tmp_dir, suffix=".metadata", delete=False ) as temp_md: