import os
if os.environ.get('TORCHVISION', 'no') == 'yes':
from torch.multiprocessing import Event, Process
else:
from multiprocessing import Event, Process
from pipert.core.routine import Routine
from threading import Thread
from typing import Union
import signal
import gevent
from pipert.core.metrics_collector import NullCollector
import sys
if sys.version_info.minor >= 8:
from pipert.core.multiprocessing_shared_memory import MpSharedMemoryGenerator as smGen
else:
from pipert.core.shared_memory_generator import SharedMemoryGenerator as smGen
from pipert.core.errors import RegisteredException, QueueDoesNotExist
from pipert.core.class_factory import ClassFactory
from queue import Queue
from pipert.utils.logger_utils import create_parent_logger
[docs]class BaseComponent:
def __init__(self, component_config, start_component=False):
self.name = ""
self.ROUTINES_FOLDER_PATH = "pipert/contrib/routines"
self.MONITORING_SYSTEMS_FOLDER_PATH = "pipert/contrib/metrics_collectors"
self.use_memory = False
self.stop_event = Event()
self.stop_event.set()
self.queues = {}
self._routines = {}
self.metrics_collector = NullCollector()
self.parent_logger = None
self.logger = None
self.setup_component(component_config)
self.metrics_collector.setup()
if start_component:
self.run_comp()
[docs] def setup_component(self, component_config):
if (component_config is None) or (type(component_config) is not dict) or\
(component_config == {}):
return
component_name, component_parameters = list(component_config.items())[0]
self.name = component_name
self.parent_logger = create_parent_logger(self.name)
self.logger = self.parent_logger.getChild(self.name)
if ("shared_memory" in component_parameters) and \
(component_parameters["shared_memory"]):
self.use_memory = True
self.generator = smGen(self.name)
if "monitoring_system" in component_parameters:
self.set_monitoring_system(component_parameters["monitoring_system"])
for queue in component_parameters["queues"]:
self.create_queue(queue_name=queue, queue_size=1)
routine_factory = ClassFactory(self.ROUTINES_FOLDER_PATH)
for routine_name, routine_parameters_real in component_parameters["routines"].items():
routine_parameters = routine_parameters_real.copy()
routine_parameters["name"] = routine_name
routine_parameters['metrics_collector'] = self.metrics_collector
routine_parameters["logger"] = self.parent_logger.getChild(routine_name)
routine_class = routine_factory.get_class(routine_parameters.pop("routine_type_name", ""))
if routine_class is None:
continue
try:
self._replace_queue_names_with_queue_objects(routine_parameters)
except QueueDoesNotExist as e:
continue
routine_parameters["component_name"] = self.name
self.register_routine(routine_class(**routine_parameters).as_thread())
[docs] def _replace_queue_names_with_queue_objects(self, routine_parameters_kwargs):
for key, value in routine_parameters_kwargs.items():
if 'queue' in key.lower():
routine_parameters_kwargs[key] = self.get_queue(queue_name=value)
[docs] def _start(self):
"""
Goes over the component's routines registered in self.routines and
starts running them.
"""
self.logger.info("Running all routines")
for routine in self._routines.values():
routine.start()
self.logger.info("{0} Started".format(routine.name))
[docs] def run_comp(self):
"""
Starts running all the component's routines.
"""
self.logger.info("Running component")
self.stop_event.clear()
if self.use_memory and sys.version_info.minor < 8:
self.generator.create_memories()
self._start()
gevent.signal_handler(signal.SIGTERM, self.stop_run)
[docs] def register_routine(self, routine: Union[Routine, Process, Thread]):
"""
Registers routine to the list of component's routines
Args:
routine: the routine to register
"""
self.logger.info("Registering routine")
self.logger.info(routine)
# TODO - write this function in a cleaner way?
if isinstance(routine, Routine):
if routine.name in self._routines:
self.logger.error("Routine name already exist")
raise RegisteredException("routine name already exist")
if routine.stop_event is None:
routine.stop_event = self.stop_event
if self.use_memory:
routine.use_memory = self.use_memory
routine.generator = self.generator
else:
self.logger.error("Routine is already registered")
raise RegisteredException("routine is already registered")
self.logger.info("Routine registered")
self._routines[routine.name] = routine
else:
self.logger.info("Routine registered")
self._routines[routine.__str__()] = routine
[docs] def _teardown_callback(self, *args, **kwargs):
"""
Implemented by subclasses of BaseComponent. Used for stopping or
tearing down things that are not stopped by setting the stop_event.
Returns: None
"""
pass
[docs] def stop_run(self):
"""
Signals all the component's routines to stop.
"""
self.logger.info("Stopping component")
if self.stop_event.is_set():
return 0
self.stop_event.set()
try:
self._teardown_callback()
if self.use_memory:
self.logger.info("Cleaning shared memory")
self.generator.cleanup()
for routine in self._routines.values():
self.logger.info("Stopping routine {0}".format(routine.name))
if isinstance(routine, Routine):
routine.runner.join()
elif isinstance(routine, (Process, Thread)):
routine.join()
self.logger.info("Routine {0} stopped".format(routine.name))
return 0
except RuntimeError:
return 1
[docs] def create_queue(self, queue_name, queue_size=1):
"""
Create a new queue for the component.
Returns True if created or False otherwise
Args:
queue_name: the name of the queue, must be unique
queue_size: the size of the queue
"""
if queue_name in self.queues:
return False
self.queues[queue_name] = Queue(maxsize=queue_size)
return True
[docs] def get_queue(self, queue_name):
"""
Returns the queue object by its name
Args:
queue_name: the name of the queue
Raises:
KeyError - if no queue has the name
"""
try:
return self.queues[queue_name]
except KeyError:
raise QueueDoesNotExist(queue_name)
[docs] def get_all_queue_names(self):
"""
Returns the list of names of queues that
the component expose.
"""
return list(self.queues.keys())
[docs] def does_queue_exist(self, queue_name):
"""
Returns True the component has a queue named
queue_name or False otherwise
Args:
queue_name: the name of the queue to check
"""
return queue_name in self.queues
[docs] def delete_queue(self, queue_name):
"""
Deletes a queue with the name queue_name.
Returns True if succeeded.
Args:
queue_name: the name of the queue to delete
Raises:
KeyError - if no queue has the name queue_name
"""
if queue_name not in self.queues:
raise QueueDoesNotExist(queue_name)
if self.does_routines_use_queue(queue_name=queue_name):
return False
try:
del self.queues[queue_name]
return True
except KeyError:
raise QueueDoesNotExist(queue_name)
[docs] def does_routine_name_exist(self, routine_name):
return routine_name in self._routines
[docs] def remove_routine(self, routine_name):
if self.does_routine_name_exist(routine_name):
del self._routines[routine_name]
return True
else:
return False
[docs] def does_routines_use_queue(self, queue_name):
for routine in self._routines.values():
if routine.does_routine_use_queue(self.queues[queue_name]):
return True
return False
[docs] def is_component_running(self):
return not self.stop_event.is_set()
[docs] def get_routines(self):
return self._routines
[docs] def get_component_configuration(self):
component_dict = {
"shared_memory": self.use_memory,
"queues":
list(self.get_all_queue_names()),
"routines": {}
}
if type(self).__name__ != BaseComponent.__name__:
component_dict["component_type_name"] = type(self).__name__
for current_routine_object in self._routines.values():
routine_creation_dict = \
self._get_routine_creation(current_routine_object)
routine_name = routine_creation_dict.pop("name")
component_dict["routines"][routine_name] = \
routine_creation_dict
return {self.name: component_dict}
[docs] def _get_routine_creation(self, routine):
routine_dict = routine.get_creation_dictionary()
routine_dict["routine_type_name"] = routine.__class__.__name__
for routine_param_name in routine_dict.keys():
if "queue" in routine_param_name:
for queue_name in self.queues.keys():
if getattr(routine, routine_param_name) is \
self.queues[queue_name]:
routine_dict[routine_param_name] = queue_name
return routine_dict
[docs] def set_monitoring_system(self, monitoring_system_parameters):
monitoring_system_factory = ClassFactory(self.MONITORING_SYSTEMS_FOLDER_PATH)
if "name" not in monitoring_system_parameters:
print("No name parameter found inside the monitoring system")
return
monitoring_system_name = monitoring_system_parameters.pop("name") + "Collector"
monitoring_system_class = monitoring_system_factory.get_class(monitoring_system_name)
if monitoring_system_class is None:
return
try:
self.metrics_collector = monitoring_system_class(**monitoring_system_parameters)
except TypeError:
print("Bad parameters given for the monitoring system " + monitoring_system_name)
[docs] def set_routine_attribute(self, routine_name, attribute_name, attribute_value):
routine = self._routines.get(routine_name, None)
if routine is not None:
setattr(routine, attribute_name, attribute_value)