diff --git a/examples/cameraServiceExample.py b/examples/cameraServiceExample.py index 65efb14..d4c84a2 100644 --- a/examples/cameraServiceExample.py +++ b/examples/cameraServiceExample.py @@ -31,7 +31,7 @@ def main() -> None: advertizer.advertise(info) # Schedule periodic advertisements every 10 seconds - advertizer.schedule(interval=10) + advertizer.schedule_periodic(interval=10) shutdown_event.wait() diff --git a/hello/api.py b/hello/api.py index 7364a7e..6f91cb9 100644 --- a/hello/api.py +++ b/hello/api.py @@ -19,6 +19,7 @@ class HelloConfig: receiver_poll_timeout: float = 0.1 advertizer_responder: bool = True advertizer_max_delay: float = 0.1 + discoverer_max_workers: int = 1 class Hello(object): @@ -41,7 +42,7 @@ def scheduled_advertizer(cls, config: HelloConfig) -> ScheduledAdvertizer: def default_discoverer(cls, config: HelloConfig) -> DefaultDiscoverer: sender = RadioSender(config.context) receiver = DishReceiver(config.context, config.receiver_max_workers, config.receiver_poll_timeout) - return DefaultDiscoverer(sender, receiver) + return DefaultDiscoverer(sender, receiver, config.discoverer_max_workers) @classmethod def scheduled_discoverer(cls, config: HelloConfig) -> ScheduledDiscoverer: diff --git a/hello/scheduler.py b/hello/scheduler.py index ac72a3a..9a8107d 100644 --- a/hello/scheduler.py +++ b/hello/scheduler.py @@ -14,7 +14,10 @@ class Scheduler(Generic[T]): - def schedule(self, data: T | None = None, interval: float = 60, one_shot: bool = False) -> None: + def schedule_one_shot(self, data: T | None = None, interval: float | None = None) -> None: + raise NotImplementedError() + + def schedule_periodic(self, data: T | None = None, interval: float | None = None) -> None: raise NotImplementedError() def stop(self) -> None: @@ -23,8 +26,9 @@ def stop(self) -> None: class AbstractScheduler(Scheduler[T]): - def __init__(self, timer: IReusableTimer) -> None: + def __init__(self, timer: IReusableTimer, interval: float = 60) -> None: self._timer = timer + self._interval = interval def __enter__(self) -> Scheduler[T]: return self @@ -32,13 +36,15 @@ def __enter__(self) -> Scheduler[T]: def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.stop() - def schedule(self, data: T | None = None, interval: float = 60, one_shot: bool = False) -> None: - if one_shot: - self._timer.start(interval, self._execute, [data]) - log.info('One-shot execution scheduled', data=data, interval=interval) - else: - self._timer.start(interval, self._execute_and_restart, [data]) - log.info('Periodic execution scheduled', data=data, interval=interval) + def schedule_one_shot(self, data: T | None = None, interval: float | None = None) -> None: + interval = interval or self._interval + self._timer.start(interval, self._safe_execute, [data]) + log.info('One-shot execution scheduled', data=data, interval=interval) + + def schedule_periodic(self, data: T | None = None, interval: float | None = None) -> None: + interval = interval or self._interval + self._timer.start(interval, self._execute_and_restart, [data]) + log.info('Periodic execution scheduled', data=data, interval=interval) def stop(self) -> None: self._timer.cancel() @@ -47,5 +53,11 @@ def _execute(self, data: T | None = None) -> None: raise NotImplementedError() def _execute_and_restart(self, data: T | None = None) -> None: - self._execute(data) + self._safe_execute(data) self._timer.restart() + + def _safe_execute(self, data: T | None = None) -> None: + try: + self._execute(data) + except Exception as e: + log.error('Error during scheduled execution', error=e, data=data) diff --git a/tests/abstractSchedulerTest.py b/tests/abstractSchedulerTest.py index c94d40e..d51e962 100644 --- a/tests/abstractSchedulerTest.py +++ b/tests/abstractSchedulerTest.py @@ -47,23 +47,47 @@ def test_schedules_execution_once(self): data = MagicMock() # When - scheduler.schedule(data, 60, True) + scheduler.schedule_one_shot(data, 60) # Then - timer.start.assert_called_once_with(60, scheduler._execute, [data]) + timer.start.assert_called_once_with(60, scheduler._safe_execute, [data]) - def test_schedules_periodic_discover(self): + def test_schedules_execution_once_with_default_interval(self): + # Given + timer = MagicMock(spec=IReusableTimer) + scheduler = TestScheduler(timer, 10) + data = MagicMock() + + # When + scheduler.schedule_one_shot(data) + + # Then + timer.start.assert_called_once_with(10, scheduler._safe_execute, [data]) + + def test_schedules_execution_periodically(self): # Given timer = MagicMock(spec=IReusableTimer) scheduler = TestScheduler(timer) data = MagicMock() # When - scheduler.schedule(data, 60, False) + scheduler.schedule_periodic(data, 60) # Then timer.start.assert_called_once_with(60, scheduler._execute_and_restart, [data]) + def test_schedules_execution_periodically_with_default_interval(self): + # Given + timer = MagicMock(spec=IReusableTimer) + scheduler = TestScheduler(timer, 10) + data = MagicMock() + + # When + scheduler.schedule_periodic(data) + + # Then + timer.start.assert_called_once_with(10, scheduler._execute_and_restart, [data]) + def test_execute_and_restart_restarts_timer(self): # Given timer = MagicMock(spec=IReusableTimer) @@ -76,11 +100,24 @@ def test_execute_and_restart_restarts_timer(self): # Then timer.restart.assert_called_once() + def test_handles_exception_in_execute(self): + # Given + timer = MagicMock(spec=IReusableTimer) + scheduler = TestScheduler(timer) + data = MagicMock() + scheduler._execute = MagicMock(side_effect=Exception('Test exception')) + + # When + scheduler._safe_execute(data) + + # Then + scheduler._execute.assert_called_once_with(data) + class TestScheduler(AbstractScheduler[Any]): - def __init__(self, timer: IReusableTimer) -> None: - super().__init__(timer) + def __init__(self, timer: IReusableTimer, interval: float = 0) -> None: + super().__init__(timer, interval) def _execute(self, data: Any | None = None) -> None: pass diff --git a/tests/advertizerIntegrationTest.py b/tests/advertizerIntegrationTest.py index 98ad352..4a6a3df 100644 --- a/tests/advertizerIntegrationTest.py +++ b/tests/advertizerIntegrationTest.py @@ -117,7 +117,7 @@ def test_sends_hello_when_schedules_advertisement_once(self): scheduled_advertizer.start(GROUP) # When - scheduled_advertizer.schedule(SERVICE_INFO, interval=0.01, one_shot=True) + scheduled_advertizer.schedule_one_shot(SERVICE_INFO, interval=0.01) wait_for_assertion(1, lambda: self.assertEqual(1, len(messages))) @@ -138,7 +138,7 @@ def test_sends_hello_when_schedules_advertisement_periodically(self): scheduled_advertizer.start(GROUP) # When - scheduled_advertizer.schedule(SERVICE_INFO, interval=0.01) + scheduled_advertizer.schedule_periodic(SERVICE_INFO, interval=0.01) # Then wait_for_assertion(1, lambda: self.assertTrue(len(messages) >= 10)) diff --git a/tests/apiIntegrationTest.py b/tests/apiIntegrationTest.py index 633aafa..ff884af 100644 --- a/tests/apiIntegrationTest.py +++ b/tests/apiIntegrationTest.py @@ -75,7 +75,7 @@ def test_discoverer_caches_advertised_service_when_advertisement_scheduled_once( discoverer.start(GROUP, SERVICE_QUERY) # When - advertizer.schedule(interval=0.01, one_shot=True) + advertizer.schedule_one_shot(interval=0.01) wait_for_assertion(1, lambda: self.assertEqual(1, len(discoverer.get_services()))) @@ -92,7 +92,7 @@ def test_discoverer_caches_advertised_service_when_advertisement_scheduled_perio discoverer.start(GROUP, SERVICE_QUERY) # When - advertizer.schedule(interval=0.01) + advertizer.schedule_periodic(interval=0.01) wait_for_assertion(1, lambda: self.assertEqual(1, len(discoverer.get_services()))) @@ -150,7 +150,7 @@ def test_discoverer_caches_discovery_response_service_when_discovery_scheduled_o discoverer.start(GROUP, SERVICE_QUERY) # When - discoverer.schedule(interval=0.01, one_shot=True) + discoverer.schedule_one_shot(interval=0.01) wait_for_assertion(1, lambda: self.assertEqual(1, len(discoverer.get_services()))) @@ -167,7 +167,7 @@ def test_discoverer_caches_discovery_response_service_when_discovery_scheduled_p discoverer.start(GROUP, SERVICE_QUERY) # When - discoverer.schedule(interval=0.01) + discoverer.schedule_periodic(interval=0.01) wait_for_assertion(1, lambda: self.assertEqual(1, len(discoverer.get_services()))) diff --git a/tests/discovererIntegrationTest.py b/tests/discovererIntegrationTest.py index b0ad21f..3e3d469 100644 --- a/tests/discovererIntegrationTest.py +++ b/tests/discovererIntegrationTest.py @@ -107,7 +107,7 @@ def test_sends_query_when_schedules_discovery_once(self): scheduled_discoverer.start(GROUP) # When - scheduled_discoverer.schedule(SERVICE_QUERY, interval=0.01, one_shot=True) + scheduled_discoverer.schedule_one_shot(SERVICE_QUERY, interval=0.01) wait_for_assertion(1, lambda: self.assertEqual(1, len(messages))) @@ -129,7 +129,7 @@ def test_sends_query_when_schedules_discovery_periodically(self): scheduled_discoverer.start(GROUP) # When - scheduled_discoverer.schedule(SERVICE_QUERY, interval=0.01) + scheduled_discoverer.schedule_periodic(SERVICE_QUERY, interval=0.01) # Then wait_for_assertion(1, lambda: self.assertTrue(len(messages) >= 10)) diff --git a/tests/scheduledAdvertizerTest.py b/tests/scheduledAdvertizerTest.py index f572ddd..6219aaf 100644 --- a/tests/scheduledAdvertizerTest.py +++ b/tests/scheduledAdvertizerTest.py @@ -81,10 +81,10 @@ def test_schedules_advertise_once(self): scheduled_advertizer.start(GROUP) # When - scheduled_advertizer.schedule(SERVICE_INFO, 60, True) + scheduled_advertizer.schedule_one_shot(SERVICE_INFO, 60) # Then - timer.start.assert_called_once_with(60, scheduled_advertizer._execute, [SERVICE_INFO]) + timer.start.assert_called_once_with(60, scheduled_advertizer._safe_execute, [SERVICE_INFO]) def test_schedules_periodic_advertise(self): # Given @@ -94,7 +94,7 @@ def test_schedules_periodic_advertise(self): scheduled_advertizer.start(GROUP) # When - scheduled_advertizer.schedule(SERVICE_INFO, 60, False) + scheduled_advertizer.schedule_periodic(SERVICE_INFO, 60) # Then timer.start.assert_called_once_with(60, scheduled_advertizer._execute_and_restart, [SERVICE_INFO]) diff --git a/tests/scheduledDiscovererTest.py b/tests/scheduledDiscovererTest.py index d247291..2116213 100644 --- a/tests/scheduledDiscovererTest.py +++ b/tests/scheduledDiscovererTest.py @@ -107,10 +107,10 @@ def test_schedules_discover_once(self): scheduled_discoverer.start(GROUP) # When - scheduled_discoverer.schedule(SERVICE_QUERY, 60, True) + scheduled_discoverer.schedule_one_shot(SERVICE_QUERY, 60) # Then - timer.start.assert_called_once_with(60, scheduled_discoverer._execute, [SERVICE_QUERY]) + timer.start.assert_called_once_with(60, scheduled_discoverer._safe_execute, [SERVICE_QUERY]) def test_schedules_periodic_discover(self): # Given @@ -120,7 +120,7 @@ def test_schedules_periodic_discover(self): scheduled_discoverer.start(GROUP) # When - scheduled_discoverer.schedule(SERVICE_QUERY, 60, False) + scheduled_discoverer.schedule_periodic(SERVICE_QUERY, 60) # Then timer.start.assert_called_once_with(60, scheduled_discoverer._execute_and_restart, [SERVICE_QUERY])