# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Unpack source package and run dpkg-genchanges on it."""
from pathlib import Path
from typing import Any
import debusine.utils
from debusine.artifacts import Upload
from debusine.artifacts.models import (
ArtifactCategory,
CollectionCategory,
DebianSourcePackage,
get_source_package_name,
)
from debusine.client.models import RelationType
from debusine.tasks import BaseTaskWithExecutor, RunCommandTask
from debusine.tasks.models import (
MakeSourcePackageUploadData,
MakeSourcePackageUploadDynamicData,
)
from debusine.tasks.server import TaskDatabaseInterface
[docs]class MakeSourcePackageUpload(
RunCommandTask[
MakeSourcePackageUploadData, MakeSourcePackageUploadDynamicData
],
BaseTaskWithExecutor[
MakeSourcePackageUploadData, MakeSourcePackageUploadDynamicData
],
):
"""Makes a debian:upload artifact from a debian:source-package artifact."""
TASK_VERSION = 1
[docs] def __init__(
self,
task_data: dict[str, Any],
dynamic_task_data: dict[str, Any] | None = None,
) -> None:
"""Initialize (constructor)."""
super().__init__(task_data, dynamic_task_data)
self._dsc_path: Path
self._changes_path: Path
self._shell_script: Path
[docs] def compute_dynamic_data(
self, task_database: TaskDatabaseInterface
) -> MakeSourcePackageUploadDynamicData:
"""Resolve artifact lookups for this task."""
input_source_artifact = task_database.lookup_single_artifact(
self.data.input.source_artifact
)
self.ensure_artifact_categories(
configuration_key="input.source_artifact",
category=input_source_artifact.category,
expected=(ArtifactCategory.SOURCE_PACKAGE,),
)
assert isinstance(input_source_artifact.data, DebianSourcePackage)
package_name = get_source_package_name(input_source_artifact.data)
return MakeSourcePackageUploadDynamicData(
environment_id=self.get_environment(
task_database,
self.data.environment,
default_category=CollectionCategory.ENVIRONMENTS,
).id,
input_source_artifact_id=input_source_artifact.id,
subject=package_name,
)
def _cmdline(self) -> list[str]:
"""Build full makesourcepackageupload.sh command line."""
return [
"bash",
"-x", # trace commands for debug log
"-e", # safely stop on error
str(self._shell_script),
str(self._dsc_path),
str(self._changes_path),
self.data.since_version or "",
self.data.target_distribution or "",
]
[docs] def upload_artifacts(
self, execute_directory: Path, *, execution_success: bool # noqa: U100
) -> None:
"""Create DebianUpload artifact and relationships."""
if not self.debusine:
raise AssertionError("self.debusine not set")
assert self.dynamic_data
assert self.executor_instance
self.executor_instance.file_pull(self._changes_path, self._changes_path)
changes_artifact = Upload.create(
changes_file=self._changes_path,
)
changes_uploaded = self.debusine.upload_artifact(
changes_artifact,
workspace=self.workspace_name,
work_request=self.work_request_id,
)
self.debusine.relation_create(
changes_uploaded.id,
self.dynamic_data.input_source_artifact_id,
RelationType.EXTENDS,
)
self.debusine.relation_create(
changes_uploaded.id,
self.dynamic_data.input_source_artifact_id,
RelationType.RELATES_TO,
)
[docs] def get_label(self) -> str:
"""Return the task label."""
return "prepare source package upload"