Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/cameraServiceExample.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def main() -> None:
advertizer.advertise(info)

# Schedule periodic advertisements every 10 seconds
advertizer.schedule(interval=10)
advertizer.schedule_periodic(interval=10)

shutdown_event.wait()

Expand Down
3 changes: 2 additions & 1 deletion hello/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class HelloConfig:
receiver_poll_timeout: float = 0.1
advertizer_responder: bool = True
advertizer_max_delay: float = 0.1
discoverer_max_workers: int = 1


class Hello(object):
Expand All @@ -41,7 +42,7 @@ def scheduled_advertizer(cls, config: HelloConfig) -> ScheduledAdvertizer:
def default_discoverer(cls, config: HelloConfig) -> DefaultDiscoverer:
sender = RadioSender(config.context)
receiver = DishReceiver(config.context, config.receiver_max_workers, config.receiver_poll_timeout)
return DefaultDiscoverer(sender, receiver)
return DefaultDiscoverer(sender, receiver, config.discoverer_max_workers)

@classmethod
def scheduled_discoverer(cls, config: HelloConfig) -> ScheduledDiscoverer:
Expand Down
32 changes: 22 additions & 10 deletions hello/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

class Scheduler(Generic[T]):

def schedule(self, data: T | None = None, interval: float = 60, one_shot: bool = False) -> None:
def schedule_one_shot(self, data: T | None = None, interval: float | None = None) -> None:
raise NotImplementedError()

def schedule_periodic(self, data: T | None = None, interval: float | None = None) -> None:
raise NotImplementedError()

def stop(self) -> None:
Expand All @@ -23,22 +26,25 @@ def stop(self) -> None:

class AbstractScheduler(Scheduler[T]):

def __init__(self, timer: IReusableTimer) -> None:
def __init__(self, timer: IReusableTimer, interval: float = 60) -> None:
self._timer = timer
self._interval = interval

def __enter__(self) -> Scheduler[T]:
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def schedule(self, data: T | None = None, interval: float = 60, one_shot: bool = False) -> None:
if one_shot:
self._timer.start(interval, self._execute, [data])
log.info('One-shot execution scheduled', data=data, interval=interval)
else:
self._timer.start(interval, self._execute_and_restart, [data])
log.info('Periodic execution scheduled', data=data, interval=interval)
def schedule_one_shot(self, data: T | None = None, interval: float | None = None) -> None:
interval = interval or self._interval
self._timer.start(interval, self._safe_execute, [data])
log.info('One-shot execution scheduled', data=data, interval=interval)

def schedule_periodic(self, data: T | None = None, interval: float | None = None) -> None:
interval = interval or self._interval
self._timer.start(interval, self._execute_and_restart, [data])
log.info('Periodic execution scheduled', data=data, interval=interval)

def stop(self) -> None:
self._timer.cancel()
Expand All @@ -47,5 +53,11 @@ def _execute(self, data: T | None = None) -> None:
raise NotImplementedError()

def _execute_and_restart(self, data: T | None = None) -> None:
self._execute(data)
self._safe_execute(data)
self._timer.restart()

def _safe_execute(self, data: T | None = None) -> None:
try:
self._execute(data)
except Exception as e:
log.error('Error during scheduled execution', error=e, data=data)
49 changes: 43 additions & 6 deletions tests/abstractSchedulerTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,47 @@ def test_schedules_execution_once(self):
data = MagicMock()

# When
scheduler.schedule(data, 60, True)
scheduler.schedule_one_shot(data, 60)

# Then
timer.start.assert_called_once_with(60, scheduler._execute, [data])
timer.start.assert_called_once_with(60, scheduler._safe_execute, [data])

def test_schedules_periodic_discover(self):
def test_schedules_execution_once_with_default_interval(self):
# Given
timer = MagicMock(spec=IReusableTimer)
scheduler = TestScheduler(timer, 10)
data = MagicMock()

# When
scheduler.schedule_one_shot(data)

# Then
timer.start.assert_called_once_with(10, scheduler._safe_execute, [data])

def test_schedules_execution_periodically(self):
# Given
timer = MagicMock(spec=IReusableTimer)
scheduler = TestScheduler(timer)
data = MagicMock()

# When
scheduler.schedule(data, 60, False)
scheduler.schedule_periodic(data, 60)

# Then
timer.start.assert_called_once_with(60, scheduler._execute_and_restart, [data])

def test_schedules_execution_periodically_with_default_interval(self):
# Given
timer = MagicMock(spec=IReusableTimer)
scheduler = TestScheduler(timer, 10)
data = MagicMock()

# When
scheduler.schedule_periodic(data)

# Then
timer.start.assert_called_once_with(10, scheduler._execute_and_restart, [data])

def test_execute_and_restart_restarts_timer(self):
# Given
timer = MagicMock(spec=IReusableTimer)
Expand All @@ -76,11 +100,24 @@ def test_execute_and_restart_restarts_timer(self):
# Then
timer.restart.assert_called_once()

def test_handles_exception_in_execute(self):
# Given
timer = MagicMock(spec=IReusableTimer)
scheduler = TestScheduler(timer)
data = MagicMock()
scheduler._execute = MagicMock(side_effect=Exception('Test exception'))

# When
scheduler._safe_execute(data)

# Then
scheduler._execute.assert_called_once_with(data)


class TestScheduler(AbstractScheduler[Any]):

def __init__(self, timer: IReusableTimer) -> None:
super().__init__(timer)
def __init__(self, timer: IReusableTimer, interval: float = 0) -> None:
super().__init__(timer, interval)

def _execute(self, data: Any | None = None) -> None:
pass
Expand Down
4 changes: 2 additions & 2 deletions tests/advertizerIntegrationTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def test_sends_hello_when_schedules_advertisement_once(self):
scheduled_advertizer.start(GROUP)

# When
scheduled_advertizer.schedule(SERVICE_INFO, interval=0.01, one_shot=True)
scheduled_advertizer.schedule_one_shot(SERVICE_INFO, interval=0.01)

wait_for_assertion(1, lambda: self.assertEqual(1, len(messages)))

Expand All @@ -138,7 +138,7 @@ def test_sends_hello_when_schedules_advertisement_periodically(self):
scheduled_advertizer.start(GROUP)

# When
scheduled_advertizer.schedule(SERVICE_INFO, interval=0.01)
scheduled_advertizer.schedule_periodic(SERVICE_INFO, interval=0.01)

# Then
wait_for_assertion(1, lambda: self.assertTrue(len(messages) >= 10))
Expand Down
8 changes: 4 additions & 4 deletions tests/apiIntegrationTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_discoverer_caches_advertised_service_when_advertisement_scheduled_once(
discoverer.start(GROUP, SERVICE_QUERY)

# When
advertizer.schedule(interval=0.01, one_shot=True)
advertizer.schedule_one_shot(interval=0.01)

wait_for_assertion(1, lambda: self.assertEqual(1, len(discoverer.get_services())))

Expand All @@ -92,7 +92,7 @@ def test_discoverer_caches_advertised_service_when_advertisement_scheduled_perio
discoverer.start(GROUP, SERVICE_QUERY)

# When
advertizer.schedule(interval=0.01)
advertizer.schedule_periodic(interval=0.01)

wait_for_assertion(1, lambda: self.assertEqual(1, len(discoverer.get_services())))

Expand Down Expand Up @@ -150,7 +150,7 @@ def test_discoverer_caches_discovery_response_service_when_discovery_scheduled_o
discoverer.start(GROUP, SERVICE_QUERY)

# When
discoverer.schedule(interval=0.01, one_shot=True)
discoverer.schedule_one_shot(interval=0.01)

wait_for_assertion(1, lambda: self.assertEqual(1, len(discoverer.get_services())))

Expand All @@ -167,7 +167,7 @@ def test_discoverer_caches_discovery_response_service_when_discovery_scheduled_p
discoverer.start(GROUP, SERVICE_QUERY)

# When
discoverer.schedule(interval=0.01)
discoverer.schedule_periodic(interval=0.01)

wait_for_assertion(1, lambda: self.assertEqual(1, len(discoverer.get_services())))

Expand Down
4 changes: 2 additions & 2 deletions tests/discovererIntegrationTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_sends_query_when_schedules_discovery_once(self):
scheduled_discoverer.start(GROUP)

# When
scheduled_discoverer.schedule(SERVICE_QUERY, interval=0.01, one_shot=True)
scheduled_discoverer.schedule_one_shot(SERVICE_QUERY, interval=0.01)

wait_for_assertion(1, lambda: self.assertEqual(1, len(messages)))

Expand All @@ -129,7 +129,7 @@ def test_sends_query_when_schedules_discovery_periodically(self):
scheduled_discoverer.start(GROUP)

# When
scheduled_discoverer.schedule(SERVICE_QUERY, interval=0.01)
scheduled_discoverer.schedule_periodic(SERVICE_QUERY, interval=0.01)

# Then
wait_for_assertion(1, lambda: self.assertTrue(len(messages) >= 10))
Expand Down
6 changes: 3 additions & 3 deletions tests/scheduledAdvertizerTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def test_schedules_advertise_once(self):
scheduled_advertizer.start(GROUP)

# When
scheduled_advertizer.schedule(SERVICE_INFO, 60, True)
scheduled_advertizer.schedule_one_shot(SERVICE_INFO, 60)

# Then
timer.start.assert_called_once_with(60, scheduled_advertizer._execute, [SERVICE_INFO])
timer.start.assert_called_once_with(60, scheduled_advertizer._safe_execute, [SERVICE_INFO])

def test_schedules_periodic_advertise(self):
# Given
Expand All @@ -94,7 +94,7 @@ def test_schedules_periodic_advertise(self):
scheduled_advertizer.start(GROUP)

# When
scheduled_advertizer.schedule(SERVICE_INFO, 60, False)
scheduled_advertizer.schedule_periodic(SERVICE_INFO, 60)

# Then
timer.start.assert_called_once_with(60, scheduled_advertizer._execute_and_restart, [SERVICE_INFO])
Expand Down
6 changes: 3 additions & 3 deletions tests/scheduledDiscovererTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ def test_schedules_discover_once(self):
scheduled_discoverer.start(GROUP)

# When
scheduled_discoverer.schedule(SERVICE_QUERY, 60, True)
scheduled_discoverer.schedule_one_shot(SERVICE_QUERY, 60)

# Then
timer.start.assert_called_once_with(60, scheduled_discoverer._execute, [SERVICE_QUERY])
timer.start.assert_called_once_with(60, scheduled_discoverer._safe_execute, [SERVICE_QUERY])

def test_schedules_periodic_discover(self):
# Given
Expand All @@ -120,7 +120,7 @@ def test_schedules_periodic_discover(self):
scheduled_discoverer.start(GROUP)

# When
scheduled_discoverer.schedule(SERVICE_QUERY, 60, False)
scheduled_discoverer.schedule_periodic(SERVICE_QUERY, 60)

# Then
timer.start.assert_called_once_with(60, scheduled_discoverer._execute_and_restart, [SERVICE_QUERY])
Expand Down
Loading