pipert.core¶
Subpackages¶
Submodules¶
pipert.core.class_factorypipert.core.componentpipert.core.errorspipert.core.handlerspipert.core.messagepipert.core.message_handlerspipert.core.metrics_collectorpipert.core.multiprocessing_shared_memorypipert.core.pipeline_managerpipert.core.routinepipert.core.shared_memorypipert.core.shared_memory_generator
Package Contents¶
Classes¶
Helper class that provides a standard way to create an ABC using |
|
Events that are fired by the |
|
Helper class that provides a standard way to create an ABC using |
|
Helper class that provides a standard way to create an ABC using |
|
Helper class that provides a standard way to create an ABC using |
|
-
class
pipert.core.Routine(logger, name='', component_name='', extensions=None, metrics_collector=NullCollector(), *args, **kwargs)[source]¶ Bases:
abc.ABCHelper class that provides a standard way to create an ABC using inheritance.
-
routine_type¶
-
_setup_extensions(self, extensions)¶
-
register_events(self, *event_names)¶ Add events that can be fired.
Registering an event will let the user fire these events at any point. This opens the door to make the
run()loop even more configurable.By default, the events from
Eventsare registerd.- Args:
- *event_names: An object (ideally a string or int) to define the
name of the event being supported.
Example usage:
from enum import Enum class Custom_Events(Enum): FOO_EVENT = "foo_event" BAR_EVENT = "bar_event" routine = Routine(process_function) routine.register_events(*Custom_Events)
-
add_event_handler(self, event_name, handler, first=False, last=False, *args, **kwargs)¶ Add an event handler to be executed when the specified event is fired.
- Args:
event_name: An event to attach the handler to. Valid events are from
Eventsor any event_name added byregister_events().handler (callable): the callable event handler that should be invoked first: specify ‘true’ if the event handler should be called first last: specify ‘true’ if the event handler should be called last *args: additional args to be passed to handler. **kwargs: additional keyword args to be passed to handler.
- Notes:
The handler function’s first argument will be self, the
Routineobject it was bound to.Note that other arguments can be passed to the handler in addition to the *args and **kwargs passed here, for example
during
EXCEPTION_RAISED.
Example usage:
routine = Routine(process_function) def print_epoch(routine): print("Epoch: {}".format(routine.state.epoch)) routine.add_event_handler(Events.EPOCH_COMPLETED, print_epoch)
-
has_event_handler(self, handler, event_name=None)¶ Check if the specified event has the specified handler.
- Args:
handler (callable): the callable event handler. event_name: The event the handler attached to. Set this
to
Noneto search all events.
-
remove_event_handler(self, handler, event_name)¶ Remove event handler handler from registered handlers of the routine
- Args:
handler (callable): the callable event handler that should be removed event_name: The event the handler attached to.
-
_extension_log_fps(self, fps_time_interval)¶ Log the fps of routine every fps_time_interval seconds
- Args:
fps_time_interval: Interval time between each log.
-
_extension_pace(self, fps)¶ Pace the routine to work at a wanted fps
- Args:
fps: The wanted fps for the routine
-
on(self, event_name, *args, **kwargs)¶ Decorator shortcut for add_event_handler.
-
_fire_event(self, event_name, *event_args, **event_kwargs)¶ Execute all the handlers associated with given event.
This method executes all handlers associated with the event event_name. Optional positional and keyword arguments can be used to pass arguments to all handlers added with this event. These aguments updates arguments passed using
add_event_handler().
-
abstract
main_logic(self, *args, **kwargs)¶
-
abstract
setup(self, *args, **kwargs)¶
-
abstract
cleanup(self, *args, **kwargs)¶
-
_extended_run(self)¶ Returns:
-
as_thread(self)¶
-
as_process(self)¶
-
start(self)¶
-
abstract static
get_constructor_parameters()¶ Returns a dictionary of the constructor’s parameters built as key for name and value for type name
-
abstract
does_routine_use_queue(self, queue_name)¶ Returns True whether the routine uses the given queue_name. Args:
queue_name: the name of the queue
-
get_creation_dictionary(self)¶ Returns a dictionary containing the routine parameters name as keys and their values as values. The method return queue objects instead of queue names when encountering them.
-
-
class
pipert.core.Events[source]¶ Bases:
enum.EnumEvents that are fired by the
RoutineInterfaceduring execution.-
BEFORE_LOGIC= before_logic¶
-
AFTER_LOGIC= after_logic¶
-
EXCEPTION_RAISED= exception_raised¶
-
-
class
pipert.core.BaseComponent(component_config, start_component=False)[source]¶ -
setup_component(self, component_config)¶
-
_replace_queue_names_with_queue_objects(self, routine_parameters_kwargs)¶
-
_start(self)¶ Goes over the component’s routines registered in self.routines and starts running them.
-
run_comp(self)¶ Starts running all the component’s routines.
-
register_routine(self, routine: Union[Routine, Process, Thread])¶ Registers routine to the list of component’s routines Args:
routine: the routine to register
-
_teardown_callback(self, *args, **kwargs)¶ Implemented by subclasses of BaseComponent. Used for stopping or tearing down things that are not stopped by setting the stop_event. Returns: None
-
stop_run(self)¶ Signals all the component’s routines to stop.
-
create_queue(self, queue_name, queue_size=1)¶ Create a new queue for the component. Returns True if created or False otherwise Args:
queue_name: the name of the queue, must be unique queue_size: the size of the queue
-
get_queue(self, queue_name)¶ Returns the queue object by its name Args:
queue_name: the name of the queue
- Raises:
KeyError - if no queue has the name
-
get_all_queue_names(self)¶ Returns the list of names of queues that the component expose.
-
does_queue_exist(self, queue_name)¶ Returns True the component has a queue named queue_name or False otherwise Args:
queue_name: the name of the queue to check
-
delete_queue(self, queue_name)¶ Deletes a queue with the name queue_name. Returns True if succeeded. Args:
queue_name: the name of the queue to delete
- Raises:
KeyError - if no queue has the name queue_name
-
does_routine_name_exist(self, routine_name)¶
-
remove_routine(self, routine_name)¶
-
does_routines_use_queue(self, queue_name)¶
-
is_component_running(self)¶
-
get_routines(self)¶
-
get_component_configuration(self)¶
-
_get_routine_creation(self, routine)¶
-
set_monitoring_system(self, monitoring_system_parameters)¶
-
set_routine_attribute(self, routine_name, attribute_name, attribute_value)¶
-
-
class
pipert.core.Message(data, source_address)[source]¶ -
counter= 0¶
-
update_payload(self, data)¶
-
get_payload(self)¶
-
is_empty(self)¶
-
record_entry(self, component_name, logger)¶ Records the timestamp of the message’s entry into a component.
- Args:
component_name: the name of the component that the message entered. logger: the logger object of the component’s input routine.
-
record_custom(self, component_name, section)¶ Records the timestamp of the message’s entry into some section of a component.
- Args:
component_name: the name of the component that the message is in. section: the name of the section within the component that the message entered.
-
record_exit(self, component_name, logger)¶ Records the timestamp of the message’s exit out of a component. Additionally, it enables a flag called ‘reached_exit’ if the message is exiting the pipeline’s “output component”.
- Args:
component_name: the name of the component that the message exited. logger: the logger object of the component’s output routine.
-
get_latency(self, component_name)¶ Returns the time it took for a message to pass through a whole component.
Using the message’s history, this method calculates and returns the amount of time that passed from the moment the message entered a component, to the moment that it left it. Args:
component_name: the name of the relevant component.
-
get_end_to_end_latency(self, output_component)¶ Returns the time it took for a message to pass through the pipeline.
- Args:
output_component: the name of the pipeline’s output component.
-
__str__(self)¶ Return str(self).
-
full_description(self)¶
-
-
class
pipert.core.Payload(data)[source]¶ Bases:
abc.ABCHelper class that provides a standard way to create an ABC using inheritance.
-
abstract
decode(self)¶
-
abstract
encode(self, generator)¶
-
abstract
is_empty(self)¶
-
abstract
-
class
pipert.core.MessageHandler[source]¶ Bases:
abc.ABCHelper class that provides a standard way to create an ABC using inheritance.
-
abstract
receive(self, in_key)¶ Receives the latest message from the message broker.
- Args:
in_key: the name of the queue/stream at which the relevant message is located.
-
abstract
read_next_msg(self, in_key)¶ Reads the following message from the one that was last read, if no message has been read before, then read the last message in the message broker cannot read the same message twice.
- Args:
in_key: the name of the queue/stream at which the relevant message is located.
-
abstract
read_most_recent_msg(self, in_key)¶ Reads the latest message in the message broker, cannot read the same message twice.
- Args:
in_key: the name of the queue/stream at which the relevant message is located.
-
abstract
send(self, out_key, msg)¶ Sends a message to the message broker.
- Args:
out_key: the name of the queue/stream at which the relevant message will be placed. msg: the message object that is being sent.
-
abstract
connect(self)¶ Establishes a connection to the message broker.
-
abstract
close(self)¶ Closes the connection to the message broker.
-
abstract
-
class
pipert.core.RedisHandler(url, maxlen=100)[source]¶ Bases:
pipert.core.message_handlers.MessageHandlerHelper class that provides a standard way to create an ABC using inheritance.
-
read_next_msg(self, in_key)¶ Reads the following message from the one that was last read, if no message has been read before, then read the last message in the message broker cannot read the same message twice.
- Args:
in_key: the name of the queue/stream at which the relevant message is located.
-
read_most_recent_msg(self, in_key)¶ Reads the latest message in the message broker, cannot read the same message twice.
- Args:
in_key: the name of the queue/stream at which the relevant message is located.
-
receive(self, in_key)¶ Receives the latest message from the message broker.
- Args:
in_key: the name of the queue/stream at which the relevant message is located.
-
_read_from_redis_using_method(self, in_key, reading_method, **method_args)¶
-
send(self, out_key, msg)¶ Sends a message to the message broker.
- Args:
out_key: the name of the queue/stream at which the relevant message will be placed. msg: the message object that is being sent.
-
connect(self)¶ Establishes a connection to the message broker.
-
close(self)¶ Closes the connection to the message broker.
-
static
_add_offset_to_stream_id(stream_id, offset)¶
-
-
class
pipert.core.QueueHandler(q)¶ -
get(self, block=True, timeout=None)¶ Works just like the get method of queue.Queue
- Returns:
item from the queue
-
timeout_get(self, timeout)¶ If timeout is reached, forces a context switch using time.sleep(0) and then returns None Args:
timeout: number of seconds until timeout
- Returns:
item from the queue
-
non_blocking_get(self)¶ If the queue is empty, forces a context switch using time.sleep(0) and then returns None
-
put(self, item, block=True, timeout=None)¶ Works just like the put method of queue.Queue
-
timeout_put(self, item, timeout)¶ If timeout is reached returns False, else puts item in queue and returns True Args:
item: item to put in queue timeout: number of seconds until timeout
- Returns:
True if successful, False if not
-
non_blocking_put(self, item)¶ If queue is full, returns False, else puts item in queue and returns True Args:
item: item to put in queue
- Returns:
True if successful, False if not
-
deque_timeout_put(self, item, timeout)¶ If timeout is reached, it tries to take an item from the queue and then puts the item in the queue. Args:
item: item to put in queue timeout: number of seconds until timeout
- Returns:
True if successful, False if had to deque an item
-
deque_non_blocking_put(self, item)¶ If queue is full, it tries to take an item from the queue and then puts the item in the queue. Args:
item: item to put in queue
- Returns:
True if successful, False if had to deque an item
-