1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# Copyright (C) 2023 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
import datetime
import unittest
import coordinator
from tests import dummy
class TestConnection(unittest.IsolatedAsyncioTestCase):
def setUp(self) -> None:
self.info = coordinator.Info(url="http://localhost:3330", secret="pearl")
self.connection = coordinator.Connection(
coordinator_info=self.info, hostname="clam", logger=None
)
# Use short timeouts to speed up tests.
self.coordinator = dummy.Coordinator(info=self.info, ping_interval=1, ping_timeout=1)
self.work_item = coordinator.WorkItem(
integration_id=4567,
integration_url=None,
integration_timestamp=datetime.datetime.fromisoformat("2000-01-01"),
integration_data=[],
repository="repository",
branch="main",
revision="abcd",
)
async def test_connect(self) -> None:
"""Connect to the coordinator and send the idle status"""
await self.coordinator.start()
async with self.connection:
auth = await self.coordinator.wait_for_connect()
status = await self.coordinator.wait_for_status()
await self.coordinator.wait_for_disconnect()
await self.coordinator.stop()
self.assertEqual(auth["secret"], self.info.secret)
self.assertEqual(auth["hostname"], self.connection.hostname)
self.assertEqual(status["status"], "idle")
async def test_send_work_status(self) -> None:
"""Send a work status to the coordinator"""
await self.coordinator.start()
async with self.connection:
await self.coordinator.wait_for_connect()
await self.coordinator.wait_for_status()
await self.connection.send_status(
"new",
message="Received work item 7",
work_item=self.work_item,
logger=None,
)
status = await self.coordinator.wait_for_status()
await self.coordinator.wait_for_disconnect()
await self.coordinator.stop()
self.assertEqual(status["status"], "new")
self.assertEqual(status["detailMessage"], "Received work item 7")
async def test_fetch_work_item(self) -> None:
"""Fetch a work item from the coordinator"""
await self.coordinator.start()
async with self.connection:
await self.coordinator.wait_for_connect()
work_item = await self.connection.fetch_work(use_query_event=False, logger=None)
await self.coordinator.wait_for_disconnect()
await self.coordinator.stop()
self.assertEqual(work_item.integration_id, self.coordinator.work_item.integration_id)
self.assertEqual(work_item.revision, self.coordinator.work_item.revision)
async def test_fetch_work_item_again(self) -> None:
"""Fetch a second work item from the coordinator"""
await self.coordinator.start()
async with self.connection:
await self.coordinator.wait_for_connect()
work_item = await self.connection.fetch_work(use_query_event=False, logger=None)
self.coordinator.work_item = self.work_item
work_item = await self.connection.fetch_work(use_query_event=False, logger=None)
await self.coordinator.wait_for_disconnect()
await self.coordinator.stop()
self.assertEqual(work_item.integration_id, self.work_item.integration_id)
self.assertEqual(work_item.revision, self.work_item.revision)
async def test_resend_current_status(self) -> None:
"""Reconnect and send the current status after a connection drop"""
await self.coordinator.start()
async with self.connection:
await self.coordinator.wait_for_connect()
await self.coordinator.wait_for_status()
await self.connection.send_status(
"git",
message="Fetching Git refs",
work_item=self.work_item,
logger=None,
)
await self.coordinator.wait_for_status()
await self.coordinator.stop()
await self.coordinator.wait_for_disconnect()
await self.coordinator.start()
await self.coordinator.wait_for_connect()
status = await self.coordinator.wait_for_status()
await self.coordinator.wait_for_disconnect()
await self.coordinator.stop()
self.assertEqual(status["status"], "git")
self.assertEqual(status["detailMessage"], "Fetching Git refs")
async def test_resend_pending_status(self) -> None:
"""Save a status during a connection drop, then reconnect and send it"""
await self.coordinator.start()
async with self.connection:
await self.coordinator.wait_for_connect()
await self.coordinator.stop()
await self.coordinator.wait_for_disconnect()
await self.connection.send_status(
"git",
message="Resetting Git repository",
work_item=self.work_item,
logger=None,
)
await self.coordinator.start()
await self.coordinator.wait_for_connect()
status = await self.coordinator.wait_for_status()
await self.coordinator.wait_for_disconnect()
await self.coordinator.stop()
self.assertEqual(status["status"], "git")
self.assertEqual(status["detailMessage"], "Resetting Git repository")
|