# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Run mergechanges on multiple uploads."""
import re
from pathlib import Path
from typing import Any
import debusine.utils
from debusine.artifacts import Upload
from debusine.artifacts.models import (
ArtifactCategory,
CollectionCategory,
DebianUpload,
get_source_package_name,
)
from debusine.client.models import RelationType
from debusine.tasks import BaseTaskWithExecutor, RunCommandTask
from debusine.tasks.models import MergeUploadsData, MergeUploadsDynamicData
from debusine.tasks.server import TaskDatabaseInterface
[docs]class MergeUploads(
RunCommandTask[MergeUploadsData, MergeUploadsDynamicData],
BaseTaskWithExecutor[MergeUploadsData, MergeUploadsDynamicData],
):
"""
Combines multiple debian:upload artifacts into a single one.
This is in preparation for uploading them together.
"""
TASK_VERSION = 1
# Target .changes file: use CAPTURE_OUTPUT_FILENAME mechanic.
# We could use `mergechanges -f` which automatically names the
# output .changes, but it does that inside the container's
# download_directory which can't easily scan and grab after the
# process finishes. So we use a static predictable output filename.
CAPTURE_OUTPUT_FILENAME = "multi.changes"
[docs] def __init__(
self,
task_data: dict[str, Any],
dynamic_task_data: dict[str, Any] | None = None,
) -> None:
"""Initialize (constructor)."""
super().__init__(task_data, dynamic_task_data)
self._changes_files: list[Path] = []
[docs] def compute_dynamic_data(
self, task_database: TaskDatabaseInterface
) -> MergeUploadsDynamicData:
"""Resolve artifact lookups for this task."""
upload_artifacts = task_database.lookup_multiple_artifacts(
self.data.input.uploads
)
source_package_names = set()
for upload_artifact in upload_artifacts:
self.ensure_artifact_categories(
configuration_key="input.uploads",
category=upload_artifact.category,
expected=(ArtifactCategory.UPLOAD,),
)
assert isinstance(upload_artifact.data, DebianUpload)
source_package_names.add(
get_source_package_name(upload_artifact.data)
)
subject = (
source_package_names.pop()
if len(source_package_names) == 1
else None
)
return MergeUploadsDynamicData(
environment_id=self.get_environment(
task_database,
self.data.environment,
default_category=CollectionCategory.ENVIRONMENTS,
).id,
input_uploads_ids=upload_artifacts.get_ids(),
subject=subject,
)
def _cmdline(self) -> list[str]:
"""Build full mergechanges command line."""
cmdline = ['mergechanges']
for changes_file in self._changes_files:
cmdline.append(str(changes_file))
return cmdline
[docs] def upload_artifacts(
self, execute_directory: Path, *, execution_success: bool # noqa: U100
) -> None:
"""Create DebianUpload artifact and relationships."""
if not self.debusine:
raise AssertionError("self.debusine not set")
assert self.dynamic_data
assert self.executor_instance
target_changes_file = execute_directory / self.CAPTURE_OUTPUT_FILENAME
# Move to source directory so Upload can validate referenced files.
# Rename following mergechanges' "multi" suffix convention.
ref_path = self._changes_files[0]
target_filename = re.sub(
'_[a-z]+.changes$', '_multi.changes', ref_path.name
)
target_changes_file = target_changes_file.rename(
ref_path.with_name(target_filename)
)
changes_artifact = Upload.create(changes_file=target_changes_file)
changes_uploaded = self.debusine.upload_artifact(
changes_artifact,
workspace=self.workspace_name,
work_request=self.work_request_id,
)
for input_upload_id in self.dynamic_data.input_uploads_ids:
self.debusine.relation_create(
changes_uploaded.id,
input_upload_id,
RelationType.EXTENDS,
)
[docs] def get_label(self) -> str:
"""Return the task label."""
return "merge package uploads"