pipert.core.component

Module Contents

Classes

BaseComponent

class pipert.core.component.BaseComponent(component_config, start_component=False)[source]
setup_component(self, component_config)[source]
_replace_queue_names_with_queue_objects(self, routine_parameters_kwargs)[source]
_start(self)[source]

Goes over the component’s routines registered in self.routines and starts running them.

run_comp(self)[source]

Starts running all the component’s routines.

register_routine(self, routine: Union[Routine, Process, Thread])[source]

Registers routine to the list of component’s routines Args:

routine: the routine to register

_teardown_callback(self, *args, **kwargs)[source]

Implemented by subclasses of BaseComponent. Used for stopping or tearing down things that are not stopped by setting the stop_event. Returns: None

stop_run(self)[source]

Signals all the component’s routines to stop.

create_queue(self, queue_name, queue_size=1)[source]

Create a new queue for the component. Returns True if created or False otherwise Args:

queue_name: the name of the queue, must be unique queue_size: the size of the queue

get_queue(self, queue_name)[source]

Returns the queue object by its name Args:

queue_name: the name of the queue

Raises:

KeyError - if no queue has the name

get_all_queue_names(self)[source]

Returns the list of names of queues that the component expose.

does_queue_exist(self, queue_name)[source]

Returns True the component has a queue named queue_name or False otherwise Args:

queue_name: the name of the queue to check

delete_queue(self, queue_name)[source]

Deletes a queue with the name queue_name. Returns True if succeeded. Args:

queue_name: the name of the queue to delete

Raises:

KeyError - if no queue has the name queue_name

does_routine_name_exist(self, routine_name)[source]
remove_routine(self, routine_name)[source]
does_routines_use_queue(self, queue_name)[source]
is_component_running(self)[source]
get_routines(self)[source]
get_component_configuration(self)[source]
_get_routine_creation(self, routine)[source]
set_monitoring_system(self, monitoring_system_parameters)[source]
set_routine_attribute(self, routine_name, attribute_name, attribute_value)[source]