1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
|
# Copyright (C) 2023 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
import asyncio
import decimal
import logging
import shutil
import subprocess
import xml.etree.ElementTree as ET
from typing import List, Optional, Union
class Error:
"""
Base class for errors.
It is used by several modules to define error types.
"""
__match_args__ = ("message",)
def __init__(self, message: str) -> None:
self.message = message
class CommandTimedOut(Error):
pass
class Command:
@staticmethod
async def run(
arguments: List[str],
output_file: Optional[str] = None,
timeout: Optional[int] = None,
cwd: Optional[str] = None,
) -> Optional[Error]:
if output_file is None:
process = await asyncio.create_subprocess_exec(
*arguments,
cwd=cwd,
)
else:
with open(output_file, "a") as f:
process = await asyncio.create_subprocess_exec(
*arguments,
stdout=f,
stderr=subprocess.STDOUT,
cwd=cwd,
)
try:
await asyncio.wait_for(process.wait(), timeout=timeout)
except asyncio.TimeoutError:
command = " ".join(arguments)
message = f'Command "{command}" timed out after {timeout} seconds'
if output_file:
message += f"; output was redirected to {output_file}"
return CommandTimedOut(message)
if process.returncode != 0:
command = " ".join(arguments)
message = f'Command "{command} returned non-zero code {process.returncode}'
if output_file:
message += f"; output was redirected to {output_file}"
return Error(message)
else:
return None
class OperationTimedOut(Error):
pass
class Operation:
"""
Operation that can time out.
"""
async def run(self) -> Optional[Error]:
"""
Run the operation.
This function is overridden by subclass implementations.
"""
raise NotImplementedError()
async def run_and_retry(
self, retries: int, delay: int, logger: Optional[logging.Logger]
) -> Optional[Error]:
"""
Run the operation. If it times out, retry.
"""
error = await self.run()
while isinstance(error, OperationTimedOut) and retries > 0:
if logger is not None:
logger.warning(f"Operation timed out, retrying after {delay} seconds")
await asyncio.sleep(delay)
error = await self.run()
retries -= 1
return error
class XmlParser:
def __init__(self, element: ET.Element) -> None:
self.element = element
@staticmethod
def load(file: str, tag: str) -> Union["XmlParser", Error]:
try:
tree = ET.parse(file)
except ET.ParseError as error:
return Error(str(error))
else:
element = tree.getroot()
if element.tag != tag:
return Error(f"Root element should be {tag} but was {element.tag}")
else:
return XmlParser(element)
def children(self, tag: str) -> List["XmlParser"]:
return [XmlParser(element) for element in self.element.findall(tag)]
def child(self, tag: str) -> Union["XmlParser", Error]:
children = self.children(tag)
if len(children) == 0:
return Error(f"{self.element.tag} has no {tag} children")
elif len(children) == 1:
return children[0]
else:
return Error(f"{self.element.tag} has multiple {tag} children")
def string_attribute(self, name: str) -> Union[str, Error]:
try:
return self.element.attrib[name]
except KeyError:
return Error(f"{self.element.tag} has no attribute {name}")
def integer_attribute(self, name: str) -> Union[int, Error]:
string = self.string_attribute(name)
match string:
case Error() as error:
return error
try:
return int(string)
except ValueError:
return Error(f"Could not parse {self.element.tag}.{name} as integer: {string}")
def decimal_attribute(self, name: str) -> Union[decimal.Decimal, Error]:
string = self.string_attribute(name)
match string:
case Error() as error:
return error
string = string.replace(",", ".") # QtTest uses both, depending on the version.
try:
return decimal.Decimal(string)
except decimal.InvalidOperation:
return Error(f"Could not parse {self.element.tag}.{name} as decimal: {string}")
def check_for_executable(name: str) -> Optional[Error]:
"""
Check that the executable is installed. Returns an error if not.
"""
path = shutil.which(name)
if path is None:
return Error(f"Executable is not installed: {name}")
return None
|