Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions hello/advertizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@

from hello import Service, Group, Sender, Receiver, ServiceMatcher, ServiceQuery, AbstractScheduler

log = get_logger('Advertizer')


class Advertizer:

def __enter__(self) -> 'Advertizer':
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def start(self, group: Group, service: Service | None = None) -> None:
raise NotImplementedError()

Expand All @@ -33,36 +37,31 @@ def __init__(self, sender: Sender) -> None:
self._sender = sender
self._group: Group | None = None
self._service: Service | None = None

def __enter__(self) -> Advertizer:
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()
self.log = get_logger(type(self).__name__)

def start(self, group: Group, service: Service | None = None) -> None:
self._sender.start(group.hello())
self._group = group
self._service = service
log.info('Advertizer started', group=self._group, service=self._service)
self.log.info('Advertizer started', group=self._group, service=self._service)

def stop(self) -> None:
self._group = None
self._service = None
self._sender.stop()
log.info('Advertizer stopped')
self.log.info('Advertizer stopped')

def advertise(self, service: Service | None = None, log_level: int = INFO) -> None:
if self._group:
if service:
self._service = service
if self._service:
self._sender.send(self._service)
log.log(log_level, 'Service advertised', service=self._service, group=self._group)
self.log.log(log_level, 'Service advertised', service=self._service, group=self._group)
else:
log.warning('Cannot advertise service, no service provided', group=self._group)
self.log.warning('Cannot advertise service, no service provided', group=self._group)
else:
log.warning('Cannot advertise service, advertizer not started', service=service)
self.log.warning('Cannot advertise service, advertizer not started', service=service)


class RespondingAdvertizer(DefaultAdvertizer):
Expand All @@ -87,17 +86,17 @@ def _handle_message(self, message: dict[str, Any]) -> None:
try:
query = ServiceQuery(**message)
matcher = ServiceMatcher(query)
log.debug('Service query received', group=self._group, query=query)
self.log.debug('Service query received', group=self._group, query=query)
self._handle_query(matcher, self._service)
except Exception as error:
log.warning('Invalid service query received', group=self._group, received=message, error=error)
self.log.warning('Invalid service query received', group=self._group, received=message, error=error)

def _handle_query(self, matcher: ServiceMatcher, service: Service) -> None:
if matcher.matches(service):
delay = round(self._max_delay * random.random(), 3)
log.info('Responding to query', group=self._group, query=matcher.query, service=service, delay=delay)
self.log.info('Responding to query', group=self._group, query=matcher.query, delay=delay)
time.sleep(delay)
self.advertise(service)
self.advertise(service, DEBUG)


class ScheduledAdvertizer(AbstractScheduler[Service], Advertizer):
Expand Down
10 changes: 5 additions & 5 deletions hello/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from zmq import Context

from hello import RadioSender, DishReceiver, DefaultAdvertizer, DefaultDiscoverer, \
RespondingAdvertizer, ScheduledAdvertizer, ScheduledDiscoverer
RespondingAdvertizer, ScheduledAdvertizer, ScheduledDiscoverer, Advertizer, Discoverer


@dataclass
Expand All @@ -25,7 +25,7 @@ class HelloConfig:
class Hello(object):

@classmethod
def default_advertizer(cls, config: HelloConfig) -> DefaultAdvertizer:
def default_advertizer(cls, config: HelloConfig) -> Advertizer:
sender = RadioSender(config.context)
if config.advertizer_responder:
receiver = DishReceiver(config.context, config.receiver_max_workers, config.receiver_poll_timeout)
Expand All @@ -39,7 +39,7 @@ def scheduled_advertizer(cls, config: HelloConfig) -> ScheduledAdvertizer:
return ScheduledAdvertizer(advertizer, ReusableTimer())

@classmethod
def default_discoverer(cls, config: HelloConfig) -> DefaultDiscoverer:
def default_discoverer(cls, config: HelloConfig) -> Discoverer:
sender = RadioSender(config.context)
receiver = DishReceiver(config.context, config.receiver_max_workers, config.receiver_poll_timeout)
return DefaultDiscoverer(sender, receiver, config.discoverer_max_workers)
Expand All @@ -59,7 +59,7 @@ class AdvertizerBuilder(object):
def __init__(self, config: HelloConfig) -> None:
self._config = config

def default(self) -> DefaultAdvertizer:
def default(self) -> Advertizer:
return Hello.default_advertizer(self._config)

def scheduled(self) -> ScheduledAdvertizer:
Expand All @@ -71,7 +71,7 @@ class DiscovererBuilder(object):
def __init__(self, config: HelloConfig) -> None:
self._config = config

def default(self) -> DefaultDiscoverer:
def default(self) -> Discoverer:
return Hello.default_discoverer(self._config)

def scheduled(self) -> ScheduledDiscoverer:
Expand Down
37 changes: 18 additions & 19 deletions hello/discoverer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

from hello import Group, ServiceQuery, Sender, Receiver, Service, ServiceMatcher, AbstractScheduler

log = get_logger('Discoverer')


class DiscoveryEventType(Enum):
DISCOVERED = 'discovered'
Expand All @@ -37,6 +35,12 @@ def __call__(self, event: DiscoveryEvent) -> None: ...

class Discoverer:

def __enter__(self) -> 'Discoverer':
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def start(self, group: Group, query: ServiceQuery | None = None) -> None:
raise NotImplementedError()

Expand Down Expand Up @@ -68,12 +72,7 @@ def __init__(self, sender: Sender, receiver: Receiver, max_workers: int = 8) ->
event_type: [] for event_type in DiscoveryEventType
}
self._handler_executor = ThreadPoolExecutor(max_workers=max_workers)

def __enter__(self) -> Discoverer:
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()
self.log = get_logger(type(self).__name__)

def start(self, group: Group, query: ServiceQuery | None = None) -> None:
self._group = group
Expand All @@ -82,27 +81,27 @@ def start(self, group: Group, query: ServiceQuery | None = None) -> None:
self._sender.start(group.query())
self._receiver.register(self._handle_message)
self._receiver.start(group.hello())
log.info('Discoverer started', group=self._group, query=query)
self.log.info('Discoverer started', group=self._group, query=query)

def stop(self) -> None:
self._group = None
self._matcher = None
self._sender.stop()
self._receiver.deregister(self._handle_message)
self._receiver.stop()
log.info('Discoverer stopped')
self.log.info('Discoverer stopped')

def discover(self, query: ServiceQuery | None = None, log_level: int = INFO) -> None:
if self._group:
if query:
self._matcher = ServiceMatcher(query)
if self._matcher:
self._sender.send(self._matcher.query)
log.log(log_level, 'Service discovery initiated', group=self._group, query=self._matcher.query)
self.log.log(log_level, 'Service discovery initiated', group=self._group, query=self._matcher.query)
else:
log.warning('Cannot discover services, no query provided', group=self._group)
self.log.warning('Cannot discover services, no query provided', group=self._group)
else:
log.warning('Cannot discover services, discoverer not started', query=query)
self.log.warning('Cannot discover services, discoverer not started', query=query)

def register(self, handler: OnDiscoveryEvent, types: set[DiscoveryEventType] | None = None) -> None:
for event_type in types if types else self._get_event_types():
Expand All @@ -123,10 +122,10 @@ def _handle_message(self, message: dict[str, Any]) -> None:
try:
service = Service(UUID(message['uuid']), message['name'], message['role'],
message.get('urls', {}), message.get('info', {}), message['address'])
log.debug('Service received', service=service, group=self._group)
self.log.debug('Service received', service=service, group=self._group)
self._handle_service(service, self._group, self._matcher)
except Exception as error:
log.warn('Invalid service received', group=self._group, data=message, error=error)
self.log.warn('Invalid service received', group=self._group, data=message, error=error)

def _handle_service(self, service: Service, group: Group, matcher: ServiceMatcher) -> None:
if matcher.matches(service):
Expand All @@ -138,13 +137,13 @@ def _create_event(self, group: Group, matcher: ServiceMatcher, stored: Service |
service: Service) -> DiscoveryEvent:
if stored:
if stored != service:
log.info('Service updated', group=group, old_service=stored, new_service=service)
self.log.info('Service updated', group=group, old_service=stored, new_service=service)
return DiscoveryEvent(group, matcher.query, service, DiscoveryEventType.UPDATED)
else:
log.debug('Service unchanged', group=group, service=service)
self.log.debug('Service unchanged', group=group, service=service)
return DiscoveryEvent(group, matcher.query, service, DiscoveryEventType.UNCHANGED)
else:
log.info('Service discovered', group=group, service=service)
self.log.info('Service discovered', group=group, service=service)
return DiscoveryEvent(group, matcher.query, service, DiscoveryEventType.DISCOVERED)

def _handle_event(self, event: DiscoveryEvent) -> None:
Expand All @@ -157,7 +156,7 @@ def _execute_handler(self, handler: OnDiscoveryEvent, event: DiscoveryEvent) ->
try:
handler(event)
except Exception as error:
log.warn('Error in event handler execution', event=event, error=error)
self.log.warn('Error in event handler execution', event=event, error=error)


class ScheduledDiscoverer(AbstractScheduler[ServiceQuery], Discoverer):
Expand Down
29 changes: 14 additions & 15 deletions hello/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@

from hello import PrefixedGroup

log = get_logger('Receiver')


class OnMessage(Protocol):
def __call__(self, message: dict[str, Any]) -> None: ...


class Receiver:

def __enter__(self) -> 'Receiver':
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def start(self, group: PrefixedGroup) -> None:
raise NotImplementedError()

Expand All @@ -43,12 +47,7 @@ def __init__(self, context: Context[Any], max_workers: int = 8, poll_timeout: fl
self._poll_timeout = int(poll_timeout * 1000)
self._group: str | None = None
self._handlers: list[OnMessage] = []

def __enter__(self) -> Receiver:
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()
self.log = get_logger(type(self).__name__)

def start(self, group: PrefixedGroup) -> None:
try:
Expand All @@ -59,19 +58,19 @@ def start(self, group: PrefixedGroup) -> None:
self._dish.join(group.name)
self._group = group.name
self._loop_executor.submit(self._receive_loop)
log.debug('Receiver started', url=group.url, group=group.name)
self.log.debug('Receiver started', url=group.url, group=group.name)
except Exception as error:
log.error('Failed to start receiver', url=group.url, group=group.name, error=error)
self.log.error('Failed to start receiver', url=group.url, group=group.name, error=error)
raise error

def stop(self) -> None:
try:
self._group = None
self._loop_executor.shutdown()
self._dish.close()
log.debug('Receiver stopped')
self.log.debug('Receiver stopped')
except Exception as error:
log.error('Failed to stop receiver', error=error)
self.log.error('Failed to stop receiver', error=error)
raise error

def register(self, handler: OnMessage) -> None:
Expand All @@ -88,15 +87,15 @@ def _receive_loop(self) -> None:
message = self._dish.recv_json()
self._handle_message(message)
except Exception as error:
log.error('Failed to receive message', group=self._group, error=error)
self.log.error('Failed to receive message', group=self._group, error=error)

def _handle_message(self, message: dict[str, Any]) -> None:
log.debug('Message received', data=message, group=self._group)
self.log.debug('Message received', data=message, group=self._group)
for handler in self._handlers:
self._handler_executor.submit(self._execute_handler, handler, message)

def _execute_handler(self, handler: OnMessage, message: dict[str, Any]) -> None:
try:
handler(message)
except Exception as error:
log.warn('Handler failed to process message', data=message, group=self._group, error=error)
self.log.warn('Handler failed to process message', data=message, group=self._group, error=error)
17 changes: 8 additions & 9 deletions hello/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from common_utility import IReusableTimer
from context_logger import get_logger

log = get_logger('Scheduler')

T = TypeVar('T')


Expand All @@ -29,6 +27,7 @@ class AbstractScheduler(Scheduler[T]):
def __init__(self, timer: IReusableTimer, interval: float = 60) -> None:
self._timer = timer
self._interval = interval
self.log = get_logger(type(self).__name__)

def __enter__(self) -> Scheduler[T]:
return self
Expand All @@ -37,14 +36,14 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def schedule_one_shot(self, data: T | None = None, interval: float | None = None) -> None:
interval = interval or self._interval
self._timer.start(interval, self._safe_execute, [data])
log.info('One-shot execution scheduled', data=data, interval=interval)
resolved_interval: float = self._interval if interval is None else interval
self._timer.start(resolved_interval, self._safe_execute, [data])
self.log.info('One-shot execution scheduled', data=data, interval=resolved_interval)

def schedule_periodic(self, data: T | None = None, interval: float | None = None) -> None:
interval = interval or self._interval
self._timer.start(interval, self._execute_and_restart, [data])
log.info('Periodic execution scheduled', data=data, interval=interval)
resolved_interval: float = self._interval if interval is None else interval
self._timer.start(resolved_interval, self._execute_and_restart, [data])
self.log.info('Periodic execution scheduled', data=data, interval=resolved_interval)

def stop(self) -> None:
self._timer.cancel()
Expand All @@ -60,4 +59,4 @@ def _safe_execute(self, data: T | None = None) -> None:
try:
self._execute(data)
except Exception as e:
log.error('Error during scheduled execution', error=e, data=data)
self.log.error('Error during scheduled execution', error=e, data=data)
Loading
Loading