import subprocess
from typing import Optional
import yaml
import zerorpc
import re
from pipert.core.class_factory import ClassFactory
from pipert.core.errors import QueueDoesNotExist
from pipert.core.routine import Routine
from os import listdir
from os.path import isfile, join
from jsonschema import validate, ValidationError
import functools
from pipert.utils.logger_utils import create_parent_logger
[docs]def component_name_existence_error(need_to_be_exist):
def decorator(func):
@functools.wraps(func)
def function_wrapper(self, *args, **kwargs):
if not (self._does_component_exist(
kwargs['component_name']) == need_to_be_exist):
error_word = "doesn't" if need_to_be_exist else 'already'
return self._create_response(
False,
f"Component named {kwargs['component_name']} {error_word} exist"
)
return func(self, *args, **kwargs)
return function_wrapper
return decorator
[docs]class PipelineManager:
def __init__(self):
"""
Args:
"""
super().__init__()
self.components = {}
self.component_ports = {}
self.ROUTINES_FOLDER_PATH = "pipert/contrib/routines"
self.COMPONENTS_FOLDER_PATH = "pipert/contrib/components"
self.ports_counter = 20000
self.logger = create_parent_logger("Pipe-PipelineManager")
@component_name_existence_error(need_to_be_exist=True)
[docs] def add_routine_to_component(self, component_name,
routine_type_name, **routine_parameters_kwargs):
if self._is_component_running(component_name=component_name):
return self._create_response(
False,
"You can't add a routine while your component is running"
)
routine_class_object = self._get_routine_class_object_by_type_name(routine_type_name)
if routine_class_object is None:
return self._create_response(
False,
f"The routine type '{routine_type_name}' doesn't exist"
)
if "name" not in routine_parameters_kwargs:
return self._create_response(
False,
"Routine must have a name"
)
if self.components[component_name] \
.does_routine_name_exist(routine_parameters_kwargs["name"]):
return self._create_response(
False,
f"Routine with the name {routine_parameters_kwargs['name']}"
" already exist in this component"
)
try:
# replace all queue names with the queue objects of the component before creating routine
for key, value in routine_parameters_kwargs.items():
if 'queue' in key.lower():
routine_parameters_kwargs[key] = self.components[component_name] \
.get_queue(queue_name=value)
routine_parameters_kwargs["component_name"] = component_name
self.components[component_name] \
.register_routine(routine_class_object(**routine_parameters_kwargs)
.as_thread())
return self._create_response(
True,
f"The routine {routine_parameters_kwargs['name']} has been added"
)
except QueueDoesNotExist as e:
return self._create_response(
False,
e.message()
)
except TypeError as error:
return self._create_response(
False,
str(error)
)
@component_name_existence_error(need_to_be_exist=True)
[docs] def remove_routine_from_component(self, component_name, routine_name):
if self._is_component_running(component_name=component_name):
return self._create_response(
False,
"You can't remove a routine while your component is running"
)
if self.components[component_name].remove_routine(routine_name):
return self._create_response(
True,
f"Removed routine with the name {routine_name} from the component"
)
else:
return self._create_response(
False,
f"There is no routine with the name {routine_name}"
f" inside the component {component_name}"
)
@component_name_existence_error(need_to_be_exist=True)
[docs] def create_queue_to_component(self, component_name,
queue_name, queue_size=1):
if self.components[component_name].\
create_queue(queue_name=queue_name,
queue_size=queue_size):
return self._create_response(
True,
f"The Queue {queue_name} has been created"
)
else:
return self._create_response(
False,
f"Queue named {queue_name} already exist"
)
@component_name_existence_error(need_to_be_exist=True)
[docs] def remove_queue_from_component(self, component_name, queue_name):
if not self.components[component_name].does_queue_exist(queue_name):
return self._create_response(
False,
f"Queue named {queue_name} doesn't exist"
)
if self.components[component_name]. \
does_routines_use_queue(queue_name):
return self._create_response(
False,
"Can't remove a queue that is being used by routines"
)
self.components[component_name].delete_queue(queue_name=queue_name)
return self._create_response(
True,
f"The Queue {queue_name} has been removed"
)
@component_name_existence_error(need_to_be_exist=True)
[docs] def run_component(self, component_name):
if self._is_component_running(component_name=component_name):
return self._create_response(
False,
f"The component {component_name} already running"
)
else:
self.components[component_name].run_comp()
return self._create_response(
True,
f"The component {component_name} is now running"
)
@component_name_existence_error(need_to_be_exist=True)
[docs] def stop_component(self, component_name):
if not self._is_component_running(component_name=component_name):
return self._create_response(
False,
f"The component {component_name} is not running running"
)
else:
if self.components[component_name].stop_run() == 0:
return self._create_response(
True,
f"The component {component_name} has been stopped"
)
else:
return self._create_response(
False,
f"An error has occurred, can't stop the component {component_name}"
)
[docs] def run_all_components(self):
self.logger.info("*****************Running all components*****************")
self.logger.info(self.components.keys())
for component_name in self.components.keys():
is_component_running = self._is_component_running(component_name=component_name)
self.logger.info("Checking if component {0} is running {1}".format(component_name, is_component_running))
if not is_component_running:
self.logger.info("Running component: " + component_name)
self.run_component(component_name=component_name)
return self._create_response(
True,
"All of the components are running"
)
[docs] def stop_all_components(self):
self.logger.info("*****************Stopping all components*****************")
self.logger.info(self.components.keys())
for component_name in self.components.keys():
is_component_running = self._is_component_running(component_name=component_name)
self.logger.info("Checking if component {0} is running {1}".format(component_name, is_component_running))
if is_component_running:
self.logger.info("Stopping component: " + component_name)
# component.stop_run()
self.stop_component(component_name=component_name)
return self._create_response(
True,
"All of the components have been stopped"
)
[docs] def get_all_routine_types(self):
routine_file_names = [f for f in
listdir(self.ROUTINES_FOLDER_PATH)
if isfile(join(self.ROUTINES_FOLDER_PATH, f))]
routine_file_names = [file_name[:-3] for
file_name in routine_file_names]
routine_file_names = \
[file_name[0].upper() + re.sub(r'_\w',
self._remove_string_with_underscore,
file_name)[1:]
for file_name in routine_file_names]
routines = []
for routine_name in routine_file_names:
current_routine_type = \
self._get_routine_class_object_by_type_name(routine_name) \
.routine_type.value
routines.append({"name": routine_name,
"type": current_routine_type})
return routines
@component_name_existence_error(need_to_be_exist=True)
[docs] def change_component_execution_mode(self, component_name, execution_mode):
try:
getattr(self.components[component_name], "as_" + execution_mode.lower())()
return self._create_response(
True,
f"The component {component_name} changed execution mode to {execution_mode}"
)
except AttributeError:
return self._create_response(
False,
f"Cannot find execution mode '{execution_mode}'"
)
# helping method for changing the file name to class name
@staticmethod
[docs] def _remove_string_with_underscore(match):
return match.group(0).upper()[1]
# helping method for changing the class name to file name
@staticmethod
[docs] def _add_underscore_before_uppercase(match):
return '_' + match.group(0).lower()
[docs] def get_routine_parameters(self, routine_type_name):
routine_class_object = self._get_routine_class_object_by_type_name(routine_type_name)
if routine_class_object is not None:
return routine_class_object.get_constructor_parameters()
else:
return self._create_response(
False,
f"Routine named {routine_type_name} doesn't exist"
)
[docs] def setup_components(self, components):
"""
vvv Expecting to get vvv
"components": {
"component_name": {
"queues": [str],
"routines": {
"routine_name": {
"routine_type_name": str,
...(routine params)
},
...(more routines)
}
}
...(more components)
}
"""
component_validator = {
"type": "object",
"properties": {
"queues": {"type": "array", "items": {"type": "string"}},
"routines": {"type": "object"}
},
"required": ["queues", "routines"]
}
COMPONENT_FACTORY_PATH = "pipert/utils/scripts/component_factory.py"
# Delete all of the current components
self.components = {}
responses = []
# gc.collect()
if (type(components) is not dict) and ("components" not in components):
return self._create_response(
False,
"All of the components must be inside a dictionary with the key 'components'"
)
for component_name, component_parameters in components["components"].items():
try:
validate(instance=component_parameters, schema=component_validator)
current_component_dict = {component_name: component_parameters}
component_file_path = "pipert/utils/config_files/" + component_name + ".yaml"
with open(component_file_path, 'w') as file:
yaml.dump(current_component_dict, file)
component_port = str(self.get_random_available_port())
cmd = "python3 " + COMPONENT_FACTORY_PATH + " -cp " + component_file_path + " -p " + component_port
subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
self.components[component_name] = zerorpc.Client()
self.components[component_name].connect("tcp://localhost:" + component_port)
self.component_ports[component_name] = component_port
except ValidationError as error:
responses.append(self._create_response(
False,
error.message
))
if all(response["Succeeded"] for response in responses):
return self._create_response(
True,
"All of the components have been created"
)
else:
return list(filter(lambda response: not response["Succeeded"], responses))
[docs] def _get_routine_class_object_by_type_name(self, routine_name: str) -> Optional[Routine]:
routine_factory = ClassFactory(self.ROUTINES_FOLDER_PATH)
return routine_factory.get_class(routine_name)
[docs] def _does_component_exist(self, component_name):
return component_name in self.components
@component_name_existence_error(need_to_be_exist=True)
[docs] def _is_component_running(self, component_name):
return self.components[component_name].is_component_running()
@staticmethod
[docs] def _create_response(succeeded, message):
return {
"Succeeded": succeeded,
"Message": message
}
[docs] def get_pipeline_creation(self):
components = {}
for component_name in self.components.keys():
components.update(self.components[component_name].get_component_configuration())
return {"components": components}
[docs] def get_random_available_port(self):
self.ports_counter += 1
return self.ports_counter
@component_name_existence_error(need_to_be_exist=True)
[docs] def set_routine_parameter_in_component(self, component_name, routine_name, attribute_name, attribute_value):
self.logger.info(component_name + ", " + routine_name + ", " + attribute_name + ", " + attribute_value)
self.components[component_name].set_routine_attribute(routine_name, attribute_name, attribute_value)