diff --git a/hello/advertizer.py b/hello/advertizer.py index 2ed632e..9700c25 100644 --- a/hello/advertizer.py +++ b/hello/advertizer.py @@ -12,11 +12,15 @@ from hello import Service, Group, Sender, Receiver, ServiceMatcher, ServiceQuery, AbstractScheduler -log = get_logger('Advertizer') - class Advertizer: + def __enter__(self) -> 'Advertizer': + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.stop() + def start(self, group: Group, service: Service | None = None) -> None: raise NotImplementedError() @@ -33,24 +37,19 @@ def __init__(self, sender: Sender) -> None: self._sender = sender self._group: Group | None = None self._service: Service | None = None - - def __enter__(self) -> Advertizer: - return self - - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - self.stop() + self.log = get_logger(type(self).__name__) def start(self, group: Group, service: Service | None = None) -> None: self._sender.start(group.hello()) self._group = group self._service = service - log.info('Advertizer started', group=self._group, service=self._service) + self.log.info('Advertizer started', group=self._group, service=self._service) def stop(self) -> None: self._group = None self._service = None self._sender.stop() - log.info('Advertizer stopped') + self.log.info('Advertizer stopped') def advertise(self, service: Service | None = None, log_level: int = INFO) -> None: if self._group: @@ -58,11 +57,11 @@ def advertise(self, service: Service | None = None, log_level: int = INFO) -> No self._service = service if self._service: self._sender.send(self._service) - log.log(log_level, 'Service advertised', service=self._service, group=self._group) + self.log.log(log_level, 'Service advertised', service=self._service, group=self._group) else: - log.warning('Cannot advertise service, no service provided', group=self._group) + self.log.warning('Cannot advertise service, no service provided', group=self._group) else: - log.warning('Cannot advertise service, advertizer not started', service=service) + self.log.warning('Cannot advertise service, advertizer not started', service=service) class RespondingAdvertizer(DefaultAdvertizer): @@ -87,17 +86,17 @@ def _handle_message(self, message: dict[str, Any]) -> None: try: query = ServiceQuery(**message) matcher = ServiceMatcher(query) - log.debug('Service query received', group=self._group, query=query) + self.log.debug('Service query received', group=self._group, query=query) self._handle_query(matcher, self._service) except Exception as error: - log.warning('Invalid service query received', group=self._group, received=message, error=error) + self.log.warning('Invalid service query received', group=self._group, received=message, error=error) def _handle_query(self, matcher: ServiceMatcher, service: Service) -> None: if matcher.matches(service): delay = round(self._max_delay * random.random(), 3) - log.info('Responding to query', group=self._group, query=matcher.query, service=service, delay=delay) + self.log.info('Responding to query', group=self._group, query=matcher.query, delay=delay) time.sleep(delay) - self.advertise(service) + self.advertise(service, DEBUG) class ScheduledAdvertizer(AbstractScheduler[Service], Advertizer): diff --git a/hello/api.py b/hello/api.py index 6f91cb9..359c275 100644 --- a/hello/api.py +++ b/hello/api.py @@ -9,7 +9,7 @@ from zmq import Context from hello import RadioSender, DishReceiver, DefaultAdvertizer, DefaultDiscoverer, \ - RespondingAdvertizer, ScheduledAdvertizer, ScheduledDiscoverer + RespondingAdvertizer, ScheduledAdvertizer, ScheduledDiscoverer, Advertizer, Discoverer @dataclass @@ -25,7 +25,7 @@ class HelloConfig: class Hello(object): @classmethod - def default_advertizer(cls, config: HelloConfig) -> DefaultAdvertizer: + def default_advertizer(cls, config: HelloConfig) -> Advertizer: sender = RadioSender(config.context) if config.advertizer_responder: receiver = DishReceiver(config.context, config.receiver_max_workers, config.receiver_poll_timeout) @@ -39,7 +39,7 @@ def scheduled_advertizer(cls, config: HelloConfig) -> ScheduledAdvertizer: return ScheduledAdvertizer(advertizer, ReusableTimer()) @classmethod - def default_discoverer(cls, config: HelloConfig) -> DefaultDiscoverer: + def default_discoverer(cls, config: HelloConfig) -> Discoverer: sender = RadioSender(config.context) receiver = DishReceiver(config.context, config.receiver_max_workers, config.receiver_poll_timeout) return DefaultDiscoverer(sender, receiver, config.discoverer_max_workers) @@ -59,7 +59,7 @@ class AdvertizerBuilder(object): def __init__(self, config: HelloConfig) -> None: self._config = config - def default(self) -> DefaultAdvertizer: + def default(self) -> Advertizer: return Hello.default_advertizer(self._config) def scheduled(self) -> ScheduledAdvertizer: @@ -71,7 +71,7 @@ class DiscovererBuilder(object): def __init__(self, config: HelloConfig) -> None: self._config = config - def default(self) -> DefaultDiscoverer: + def default(self) -> Discoverer: return Hello.default_discoverer(self._config) def scheduled(self) -> ScheduledDiscoverer: diff --git a/hello/discoverer.py b/hello/discoverer.py index f2f0d22..222ca92 100644 --- a/hello/discoverer.py +++ b/hello/discoverer.py @@ -14,8 +14,6 @@ from hello import Group, ServiceQuery, Sender, Receiver, Service, ServiceMatcher, AbstractScheduler -log = get_logger('Discoverer') - class DiscoveryEventType(Enum): DISCOVERED = 'discovered' @@ -37,6 +35,12 @@ def __call__(self, event: DiscoveryEvent) -> None: ... class Discoverer: + def __enter__(self) -> 'Discoverer': + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.stop() + def start(self, group: Group, query: ServiceQuery | None = None) -> None: raise NotImplementedError() @@ -68,12 +72,7 @@ def __init__(self, sender: Sender, receiver: Receiver, max_workers: int = 8) -> event_type: [] for event_type in DiscoveryEventType } self._handler_executor = ThreadPoolExecutor(max_workers=max_workers) - - def __enter__(self) -> Discoverer: - return self - - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - self.stop() + self.log = get_logger(type(self).__name__) def start(self, group: Group, query: ServiceQuery | None = None) -> None: self._group = group @@ -82,7 +81,7 @@ def start(self, group: Group, query: ServiceQuery | None = None) -> None: self._sender.start(group.query()) self._receiver.register(self._handle_message) self._receiver.start(group.hello()) - log.info('Discoverer started', group=self._group, query=query) + self.log.info('Discoverer started', group=self._group, query=query) def stop(self) -> None: self._group = None @@ -90,7 +89,7 @@ def stop(self) -> None: self._sender.stop() self._receiver.deregister(self._handle_message) self._receiver.stop() - log.info('Discoverer stopped') + self.log.info('Discoverer stopped') def discover(self, query: ServiceQuery | None = None, log_level: int = INFO) -> None: if self._group: @@ -98,11 +97,11 @@ def discover(self, query: ServiceQuery | None = None, log_level: int = INFO) -> self._matcher = ServiceMatcher(query) if self._matcher: self._sender.send(self._matcher.query) - log.log(log_level, 'Service discovery initiated', group=self._group, query=self._matcher.query) + self.log.log(log_level, 'Service discovery initiated', group=self._group, query=self._matcher.query) else: - log.warning('Cannot discover services, no query provided', group=self._group) + self.log.warning('Cannot discover services, no query provided', group=self._group) else: - log.warning('Cannot discover services, discoverer not started', query=query) + self.log.warning('Cannot discover services, discoverer not started', query=query) def register(self, handler: OnDiscoveryEvent, types: set[DiscoveryEventType] | None = None) -> None: for event_type in types if types else self._get_event_types(): @@ -123,10 +122,10 @@ def _handle_message(self, message: dict[str, Any]) -> None: try: service = Service(UUID(message['uuid']), message['name'], message['role'], message.get('urls', {}), message.get('info', {}), message['address']) - log.debug('Service received', service=service, group=self._group) + self.log.debug('Service received', service=service, group=self._group) self._handle_service(service, self._group, self._matcher) except Exception as error: - log.warn('Invalid service received', group=self._group, data=message, error=error) + self.log.warn('Invalid service received', group=self._group, data=message, error=error) def _handle_service(self, service: Service, group: Group, matcher: ServiceMatcher) -> None: if matcher.matches(service): @@ -138,13 +137,13 @@ def _create_event(self, group: Group, matcher: ServiceMatcher, stored: Service | service: Service) -> DiscoveryEvent: if stored: if stored != service: - log.info('Service updated', group=group, old_service=stored, new_service=service) + self.log.info('Service updated', group=group, old_service=stored, new_service=service) return DiscoveryEvent(group, matcher.query, service, DiscoveryEventType.UPDATED) else: - log.debug('Service unchanged', group=group, service=service) + self.log.debug('Service unchanged', group=group, service=service) return DiscoveryEvent(group, matcher.query, service, DiscoveryEventType.UNCHANGED) else: - log.info('Service discovered', group=group, service=service) + self.log.info('Service discovered', group=group, service=service) return DiscoveryEvent(group, matcher.query, service, DiscoveryEventType.DISCOVERED) def _handle_event(self, event: DiscoveryEvent) -> None: @@ -157,7 +156,7 @@ def _execute_handler(self, handler: OnDiscoveryEvent, event: DiscoveryEvent) -> try: handler(event) except Exception as error: - log.warn('Error in event handler execution', event=event, error=error) + self.log.warn('Error in event handler execution', event=event, error=error) class ScheduledDiscoverer(AbstractScheduler[ServiceQuery], Discoverer): diff --git a/hello/receiver.py b/hello/receiver.py index 8c045ab..f4c9dfb 100644 --- a/hello/receiver.py +++ b/hello/receiver.py @@ -10,8 +10,6 @@ from hello import PrefixedGroup -log = get_logger('Receiver') - class OnMessage(Protocol): def __call__(self, message: dict[str, Any]) -> None: ... @@ -19,6 +17,12 @@ def __call__(self, message: dict[str, Any]) -> None: ... class Receiver: + def __enter__(self) -> 'Receiver': + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.stop() + def start(self, group: PrefixedGroup) -> None: raise NotImplementedError() @@ -43,12 +47,7 @@ def __init__(self, context: Context[Any], max_workers: int = 8, poll_timeout: fl self._poll_timeout = int(poll_timeout * 1000) self._group: str | None = None self._handlers: list[OnMessage] = [] - - def __enter__(self) -> Receiver: - return self - - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - self.stop() + self.log = get_logger(type(self).__name__) def start(self, group: PrefixedGroup) -> None: try: @@ -59,9 +58,9 @@ def start(self, group: PrefixedGroup) -> None: self._dish.join(group.name) self._group = group.name self._loop_executor.submit(self._receive_loop) - log.debug('Receiver started', url=group.url, group=group.name) + self.log.debug('Receiver started', url=group.url, group=group.name) except Exception as error: - log.error('Failed to start receiver', url=group.url, group=group.name, error=error) + self.log.error('Failed to start receiver', url=group.url, group=group.name, error=error) raise error def stop(self) -> None: @@ -69,9 +68,9 @@ def stop(self) -> None: self._group = None self._loop_executor.shutdown() self._dish.close() - log.debug('Receiver stopped') + self.log.debug('Receiver stopped') except Exception as error: - log.error('Failed to stop receiver', error=error) + self.log.error('Failed to stop receiver', error=error) raise error def register(self, handler: OnMessage) -> None: @@ -88,10 +87,10 @@ def _receive_loop(self) -> None: message = self._dish.recv_json() self._handle_message(message) except Exception as error: - log.error('Failed to receive message', group=self._group, error=error) + self.log.error('Failed to receive message', group=self._group, error=error) def _handle_message(self, message: dict[str, Any]) -> None: - log.debug('Message received', data=message, group=self._group) + self.log.debug('Message received', data=message, group=self._group) for handler in self._handlers: self._handler_executor.submit(self._execute_handler, handler, message) @@ -99,4 +98,4 @@ def _execute_handler(self, handler: OnMessage, message: dict[str, Any]) -> None: try: handler(message) except Exception as error: - log.warn('Handler failed to process message', data=message, group=self._group, error=error) + self.log.warn('Handler failed to process message', data=message, group=self._group, error=error) diff --git a/hello/scheduler.py b/hello/scheduler.py index 9a8107d..14b8465 100644 --- a/hello/scheduler.py +++ b/hello/scheduler.py @@ -7,8 +7,6 @@ from common_utility import IReusableTimer from context_logger import get_logger -log = get_logger('Scheduler') - T = TypeVar('T') @@ -29,6 +27,7 @@ class AbstractScheduler(Scheduler[T]): def __init__(self, timer: IReusableTimer, interval: float = 60) -> None: self._timer = timer self._interval = interval + self.log = get_logger(type(self).__name__) def __enter__(self) -> Scheduler[T]: return self @@ -37,14 +36,14 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.stop() def schedule_one_shot(self, data: T | None = None, interval: float | None = None) -> None: - interval = interval or self._interval - self._timer.start(interval, self._safe_execute, [data]) - log.info('One-shot execution scheduled', data=data, interval=interval) + resolved_interval: float = self._interval if interval is None else interval + self._timer.start(resolved_interval, self._safe_execute, [data]) + self.log.info('One-shot execution scheduled', data=data, interval=resolved_interval) def schedule_periodic(self, data: T | None = None, interval: float | None = None) -> None: - interval = interval or self._interval - self._timer.start(interval, self._execute_and_restart, [data]) - log.info('Periodic execution scheduled', data=data, interval=interval) + resolved_interval: float = self._interval if interval is None else interval + self._timer.start(resolved_interval, self._execute_and_restart, [data]) + self.log.info('Periodic execution scheduled', data=data, interval=resolved_interval) def stop(self) -> None: self._timer.cancel() @@ -60,4 +59,4 @@ def _safe_execute(self, data: T | None = None) -> None: try: self._execute(data) except Exception as e: - log.error('Error during scheduled execution', error=e, data=data) + self.log.error('Error during scheduled execution', error=e, data=data) diff --git a/hello/sender.py b/hello/sender.py index 5a42be9..71efdaa 100644 --- a/hello/sender.py +++ b/hello/sender.py @@ -9,11 +9,15 @@ from hello import PrefixedGroup -log = get_logger('Sender') - class Sender: + def __enter__(self) -> 'Sender': + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.stop() + def start(self, group: PrefixedGroup) -> None: raise NotImplementedError() @@ -30,12 +34,7 @@ def __init__(self, context: Context[Any]) -> None: self._context = context self._radio: Socket[bytes] = self._context.socket(RADIO) self._group: str | None = None - - def __enter__(self) -> Sender: - return self - - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - self.stop() + self.log = get_logger(type(self).__name__) def start(self, group: PrefixedGroup) -> None: try: @@ -43,18 +42,18 @@ def start(self, group: PrefixedGroup) -> None: raise RuntimeError('Sender already started') self._radio.connect(group.url) self._group = group.name - log.debug('Sender started', url=group.url, group=group.name) + self.log.debug('Sender started', url=group.url, group=group.name) except Exception as error: - log.error('Failed to start sender', url=group.url, group=group.name, error=error) + self.log.error('Failed to start sender', url=group.url, group=group.name, error=error) raise error def stop(self) -> None: try: self._radio.close() self._group = None - log.debug('Sender stopped') + self.log.debug('Sender stopped') except Exception as error: - log.error('Failed to stop sender', error=error) + self.log.error('Failed to stop sender', error=error) raise error def send(self, data: Any) -> None: @@ -62,9 +61,9 @@ def send(self, data: Any) -> None: if message := self._convert_to_dict(data): self._send_json(message) else: - log.warning('Unsupported message type', data=data, group=self._group) + self.log.warning('Unsupported message type', data=data, group=self._group) else: - log.warning('Cannot send message, sender not started', data=data) + self.log.warning('Cannot send message, sender not started', data=data) def _convert_to_dict(self, data: Any) -> dict[str, Any] | None: if isinstance(data, dict): @@ -80,6 +79,6 @@ def _convert_to_dict(self, data: Any) -> dict[str, Any] | None: def _send_json(self, data: dict[str, Any]) -> None: try: self._radio.send_json(data, group=self._group) - log.debug('Message sent', data=data, group=self._group) + self.log.debug('Message sent', data=data, group=self._group) except Exception as error: - log.error('Failed to send message', data=data, group=self._group, error=error) + self.log.error('Failed to send message', data=data, group=self._group, error=error) diff --git a/hello/service.py b/hello/service.py index 0b2c523..00420ad 100644 --- a/hello/service.py +++ b/hello/service.py @@ -42,8 +42,8 @@ class ServiceMatcher(object): def __init__(self, query: ServiceQuery) -> None: self.query = query - self._name_matcher = re.compile(self.query.name) - self._role_matcher = re.compile(self.query.role) + self._name_matcher = re.compile(query.name) + self._role_matcher = re.compile(query.role) def matches(self, service: Service) -> bool: name_match = self._name_matcher.match(service.name)