# -*- encoding: utf-8 -*-
"""Task representation of evergreen."""
from __future__ import absolute_import
from datetime import timedelta
from enum import IntEnum
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional
from evergreen.api_requests import IssueLinkRequest, MetadataLinkRequest
from evergreen.base import _BaseEvergreenObject, evg_attrib, evg_datetime_attrib
from evergreen.manifest import Manifest
from evergreen.task_annotations import TaskAnnotation
if TYPE_CHECKING:
from evergreen.api import EvergreenApi
from evergreen.tst import Tst # noqa: F401
EVG_FAILED_STATUS = "failed"
EVG_SUCCESS_STATUS = "success"
EVG_SETUP_FAILURE_STATUS_TYPE = "setup"
EVG_SYSTEM_FAILURE_STATUS = "system"
EVG_UNDISPATCHED_STATUS = "undispatched"
EVG_TEST_STATUS_TYPE = "test"
_EVG_DATE_FIELDS_IN_TASK = frozenset(
["create_time", "dispatch_time", "finish_time", "ingest_time", "scheduled_time", "start_time"]
)
_BINARY_TYPES = ["application"]
[docs]class Artifact(_BaseEvergreenObject):
"""Representation of a task artifact from evergreen."""
name = evg_attrib("name")
url = evg_attrib("url")
visibility = evg_attrib("visibility")
ignore_for_fetch = evg_attrib("ignore_for_fetch")
content_type = evg_attrib("content_type")
def __init__(self, json: Dict[str, Any], api: "EvergreenApi") -> None:
"""Create an instance of an evergreen task artifact."""
super(Artifact, self).__init__(json, api)
[docs] def stream(
self,
decode_unicode: bool = True,
chunk_size: Optional[int] = None,
is_binary: Optional[bool] = None,
) -> Iterable[str]:
"""
Retrieve an iterator of the streamed contents of this artifact.
:param decode_unicode: determines if we decode as unicode
:param chunk_size: the size of the chunks to be read
:param is_binary: explicit variable, overrides information from content type
:return: Iterable to stream contents of artifact.
"""
if is_binary is None:
is_binary = self._is_binary()
return self._api._stream_api(
self.url, decode_unicode=decode_unicode, chunk_size=chunk_size, is_binary=is_binary,
)
def _is_binary(self) -> bool:
"""Determine if an artifact is binary based on content_type."""
_type, subtype = self.content_type.split("/")
if _type in _BINARY_TYPES:
return True
else:
return False
[docs]class StatusScore(IntEnum):
"""Integer score of the task status."""
SUCCESS = 1
FAILURE = 2
FAILURE_SYSTEM = 3
FAILURE_TIMEOUT = 4
UNDISPATCHED = 5
[docs] @classmethod
def get_task_status_score(cls, task: "Task") -> "StatusScore":
"""
Retrieve the status score based on the task status.
:return: Status score.
"""
if task.is_success():
return StatusScore.SUCCESS
if task.is_undispatched():
return StatusScore.UNDISPATCHED
if task.is_timeout():
return StatusScore.FAILURE_TIMEOUT
if task.is_system_failure():
return StatusScore.FAILURE_SYSTEM
return StatusScore.FAILURE
[docs]class OomTrackerInfo(_BaseEvergreenObject):
"""Representation of a task's OOM Tracker Info from evergreen."""
detected = evg_attrib("detected")
pids = evg_attrib("pids")
def __init__(self, json: Dict[str, Any], api: "EvergreenApi") -> None:
"""Create an instance of an evergreen task OOM Tracker Info."""
super(OomTrackerInfo, self).__init__(json, api)
[docs]class StatusDetails(_BaseEvergreenObject):
"""Representation of a task status details from evergreen."""
status = evg_attrib("status")
type = evg_attrib("type")
desc = evg_attrib("desc")
timed_out = evg_attrib("timed_out")
def __init__(self, json: Dict[str, Any], api: "EvergreenApi") -> None:
"""Create an instance of an evergreen task status details."""
super(StatusDetails, self).__init__(json, api)
@property
def oom_tracker_info(self) -> OomTrackerInfo:
"""
Retrieve the OOM tracker info from the status details for the given task.
:return: OOM Tracker Info.
"""
return OomTrackerInfo(self.json["oom_tracker_info"], self._api)
[docs]class Task(_BaseEvergreenObject):
"""Representation of an Evergreen task."""
activated = evg_attrib("activated")
activated_by = evg_attrib("activated_by")
build_id = evg_attrib("build_id")
build_variant = evg_attrib("build_variant")
build_variant_display_name = evg_attrib("build_variant_display_name")
create_time = evg_datetime_attrib("create_time")
depends_on = evg_attrib("depends_on")
dispatch_time = evg_datetime_attrib("dispatch_time")
display_name = evg_attrib("display_name")
display_only = evg_attrib("display_only")
display_status = evg_attrib("display_status")
distro_id = evg_attrib("distro_id")
est_wait_to_start_ms = evg_attrib("est_wait_to_start_ms")
execution = evg_attrib("execution")
execution_tasks = evg_attrib("execution_tasks")
expected_duration_ms = evg_attrib("expected_duration_ms")
finish_time = evg_datetime_attrib("finish_time")
generate_task = evg_attrib("generate_task")
generated_by = evg_attrib("generated_by")
host_id = evg_attrib("host_id")
ingest_time = evg_datetime_attrib("ingest_time")
mainline = evg_attrib("mainline")
order = evg_attrib("order")
parent_task_id = evg_attrib("parent_task_id")
project_id = evg_attrib("project_id")
project_identifier = evg_attrib("project_identifier")
priority = evg_attrib("priority")
restarts = evg_attrib("restarts")
revision = evg_attrib("revision")
scheduled_time = evg_datetime_attrib("scheduled_time")
start_time = evg_datetime_attrib("start_time")
status = evg_attrib("status")
task_group = evg_attrib("task_group")
task_group_max_hosts = evg_attrib("task_group_max_hosts")
task_id = evg_attrib("task_id")
time_taken_ms = evg_attrib("time_taken_ms")
version_id = evg_attrib("version_id")
def __init__(self, json: Dict[str, Any], api: "EvergreenApi") -> None:
"""Create an instance of an evergreen task."""
super(Task, self).__init__(json, api)
self._logs_map: Optional[Dict[Any, Any]] = None
@property
def artifacts(self) -> List[Artifact]:
"""
Retrieve the artifacts for the given task.
:return: List of artifacts.
"""
if not self.json.get("artifacts"):
return []
return [Artifact(artifact, self._api) for artifact in self.json["artifacts"]]
@property
def log_map(self) -> Dict:
"""
Retrieve a dict of all the logs.
:return: Dictionary of the logs.
"""
if not self._logs_map:
self._logs_map = {key: value for key, value in self.json["logs"].items()}
return self._logs_map
[docs] def get_project_identifier(self) -> str:
"""
Return the human-readable project id. Can also be accessed as an attribute.
:return: Human-readable project id.
"""
return self.project_identifier
[docs] def retrieve_log(self, log_name: str, raw: bool = False) -> str:
"""
Retrieve the contents of the specified log.
:param log_name: Name of log to retrieve.
:param raw: Retrieve raw version of log.
:return: Contents of the specified log.
"""
return self._api.retrieve_task_log(self.log_map[log_name], raw)
[docs] def stream_log(self, log_name: str) -> Iterable[str]:
"""
Retrieve an iterator of a streamed log contents for the given log.
:param log_name: Log to stream.
:return: Iterable log contents.
"""
return self._api.stream_log(self.log_map[log_name])
@property
def status_details(self) -> StatusDetails:
"""
Retrieve the status details for the given task.
:return: Status details.
"""
return StatusDetails(self.json["status_details"], self._api)
[docs] def get_status_score(self) -> StatusScore:
"""
Retrieve the status score enum for the given task.
:return: Status score.
"""
return StatusScore.get_task_status_score(self)
[docs] def get_execution(self, execution: int) -> Optional["Task"]:
"""
Get the task info for the specified execution.
:param execution: Index of execution.
:return: Task info for specified execution.
"""
if self.execution == execution:
return self
if "previous_executions" in self.json:
for task in self.json["previous_executions"]:
if task["execution"] == execution:
return Task(task, self._api)
return None
[docs] def get_execution_or_self(self, execution: int) -> "Task":
"""
Get the specified execution if it exists.
If the specified execution does not exist, return self.
:param execution: Index of execution.
:return: Task info for specified execution or self.
"""
task_execution = self.get_execution(execution)
if task_execution:
return task_execution
return self
[docs] def wait_time(self) -> Optional[timedelta]:
"""
Get the time taken until the task started running.
:return: Time taken until task started running.
"""
if self.start_time and self.ingest_time:
return self.start_time - self.ingest_time
return None
[docs] def wait_time_once_unblocked(self) -> Optional[timedelta]:
"""
Get the time taken until the task started running.
Once it is unblocked by task dependencies.
:return: Time taken until task started running.
"""
if self.start_time and self.scheduled_time:
return self.start_time - self.scheduled_time
return None
[docs] def is_success(self) -> bool:
"""
Whether task was successful.
:return: True if task was successful.
"""
return self.status == EVG_SUCCESS_STATUS
[docs] def is_test_failure(self) -> bool:
"""
Whether task was a test failure.
:return: True is task was a test failure.
"""
if not self.is_success() and self.status_details and self.status_details.type:
return self.status_details.type == EVG_TEST_STATUS_TYPE
return False
[docs] def is_undispatched(self) -> bool:
"""
Whether the task was undispatched.
:return: True is task was undispatched.
"""
return self.status == EVG_UNDISPATCHED_STATUS
[docs] def is_system_failure(self) -> bool:
"""
Whether task resulted in a system failure.
:return: True if task was a system failure.
"""
if not self.is_success() and self.status_details and self.status_details.type:
return self.status_details.type == EVG_SYSTEM_FAILURE_STATUS
return False
[docs] def is_timeout(self) -> bool:
"""
Whether task results in a timeout.
:return: True if task was a timeout.
"""
if not self.is_success() and self.status_details and self.status_details.timed_out:
return self.status_details.timed_out
return False
[docs] def is_setup_failure(self) -> bool:
"""
Whether task is a setup failure.
:return: True if task is a setup failure.
"""
if not self.is_success() and self.status_details and self.status_details.type:
return self.status_details.type == EVG_SETUP_FAILURE_STATUS_TYPE
return False
[docs] def is_completed(self) -> bool:
"""
Whether task is completed.
:return: True if task is completed.
"""
return self.status == EVG_SUCCESS_STATUS or self.status == EVG_FAILED_STATUS
[docs] def has_oom(self) -> bool:
"""
Determine if the given task has an OOM failure.
:return: True if task has an OOM failure.
"""
return self.status_details.oom_tracker_info.detected
[docs] def is_active(self) -> bool:
"""
Determine if the given task is active.
:return: True if task is active.
"""
return self.scheduled_time and not self.finish_time
[docs] def get_tests(
self, status: Optional[str] = None, execution: Optional[int] = None
) -> List["Tst"]:
"""
Get the test results for this task.
:param status: Only return tests with the given status.
:param execution: Return results for specified execution, if specified.
:return: List of test results for the task.
"""
return self._api.tests_by_task(
self.task_id,
status=status,
execution=self.execution if execution is None else execution,
)
[docs] def get_num_of_tests(self) -> int:
"""
Get the number of tests that ran as part of this task.
:return: Number of tests for the task.
"""
return self._api.num_of_tests_by_task(self.task_id)
[docs] def get_execution_tasks(
self, filter_fn: Optional[Callable[["Task"], bool]] = None
) -> Optional[List["Task"]]:
"""
Get a list of execution tasks associated with this task.
If this is a display task, return the tasks execution tasks associated with it.
If this is not a display task, returns None.
:param filter_fn: Function to filter returned results.
:return: List of execution tasks.
"""
if self.display_only:
execution_tasks = [
self._api.task_by_id(task_id, fetch_all_executions=True)
for task_id in self.execution_tasks
]
execution_tasks = [
task.get_execution_or_self(self.execution) for task in execution_tasks
]
if filter_fn:
return [task for task in execution_tasks if filter_fn(task)]
return execution_tasks
return None
[docs] def get_manifest(self) -> Optional[Manifest]:
"""Get the Manifest for this task."""
return self._api.manifest_for_task(self.task_id)
[docs] def get_task_annotation(self) -> List[TaskAnnotation]:
"""Get the task annotation for this task."""
return self._api.get_task_annotation(self.task_id, self.execution)
[docs] def get_oom_pids(self) -> List[int]:
"""Get the OOM PIDs for this task."""
return (
self.status_details.oom_tracker_info.pids
if self.status_details.oom_tracker_info.pids
else []
)
[docs] def annotate(
self,
message: Optional[str] = None,
issues: Optional[List[IssueLinkRequest]] = None,
suspected_issues: Optional[List[IssueLinkRequest]] = None,
metadata: Optional[Dict[str, Any]] = None,
metadata_links: Optional[List[MetadataLinkRequest]] = None,
) -> None:
"""
Annotate the specified task.
:param message: Message to add to the annotations.
:param issues: Issues to attach to the annotation.
:param suspected_issues: Suspected issues to add to the annotation.
:param metadata: Extra metadata to add to the issue.
:param metadata_links: Metadata links to add to the annotation.
"""
self._api.annotate_task(
self.task_id,
self.execution,
message,
issues,
suspected_issues,
metadata,
metadata_links,
)
def __repr__(self) -> str:
"""
Get a string representation of Task for debugging purposes.
:return: String representation of Task.
"""
return "Task({id})".format(id=self.task_id)