pipert.core

Package Contents

Classes

Routine

Helper class that provides a standard way to create an ABC using

Events

Events that are fired by the RoutineInterface during

BaseComponent

Message

Payload

Helper class that provides a standard way to create an ABC using

MessageHandler

Helper class that provides a standard way to create an ABC using

RedisHandler

Helper class that provides a standard way to create an ABC using

QueueHandler

class pipert.core.Routine(logger, name='', component_name='', extensions=None, metrics_collector=NullCollector(), *args, **kwargs)[source]

Bases: abc.ABC

Helper class that provides a standard way to create an ABC using inheritance.

routine_type
_setup_extensions(self, extensions)
register_events(self, *event_names)

Add events that can be fired.

Registering an event will let the user fire these events at any point. This opens the door to make the run() loop even more configurable.

By default, the events from Events are registerd.

Args:
*event_names: An object (ideally a string or int) to define the

name of the event being supported.

Example usage:

from enum import Enum

class Custom_Events(Enum):
    FOO_EVENT = "foo_event"
    BAR_EVENT = "bar_event"

routine = Routine(process_function)
routine.register_events(*Custom_Events)
add_event_handler(self, event_name, handler, first=False, last=False, *args, **kwargs)

Add an event handler to be executed when the specified event is fired.

Args:

event_name: An event to attach the handler to. Valid events are from Events or any event_name added by

register_events().

handler (callable): the callable event handler that should be invoked first: specify ‘true’ if the event handler should be called first last: specify ‘true’ if the event handler should be called last *args: additional args to be passed to handler. **kwargs: additional keyword args to be passed to handler.

Notes:

The handler function’s first argument will be self, the Routine object it was bound to.

Note that other arguments can be passed to the handler in addition to the *args and **kwargs passed here, for example

during EXCEPTION_RAISED.

Example usage:

routine = Routine(process_function)

def print_epoch(routine):
    print("Epoch: {}".format(routine.state.epoch))

routine.add_event_handler(Events.EPOCH_COMPLETED, print_epoch)
has_event_handler(self, handler, event_name=None)

Check if the specified event has the specified handler.

Args:

handler (callable): the callable event handler. event_name: The event the handler attached to. Set this

to None to search all events.

remove_event_handler(self, handler, event_name)

Remove event handler handler from registered handlers of the routine

Args:

handler (callable): the callable event handler that should be removed event_name: The event the handler attached to.

_extension_log_fps(self, fps_time_interval)

Log the fps of routine every fps_time_interval seconds

Args:

fps_time_interval: Interval time between each log.

_extension_pace(self, fps)

Pace the routine to work at a wanted fps

Args:

fps: The wanted fps for the routine

on(self, event_name, *args, **kwargs)

Decorator shortcut for add_event_handler.

Args:

event_name: An event to attach the handler to. Valid events are from Events or any event_name added by

register_events().

*args: additional args to be passed to handler. **kwargs: additional keyword args to be passed to handler.

_fire_event(self, event_name, *event_args, **event_kwargs)

Execute all the handlers associated with given event.

This method executes all handlers associated with the event event_name. Optional positional and keyword arguments can be used to pass arguments to all handlers added with this event. These aguments updates arguments passed using add_event_handler().

Args:
event_name: event for which the handlers should be executed. Valid

events are from Events or any event_name added by

register_events().

*event_args: additional args to be passed to all handlers. **event_kwargs: additional keyword args to be passed to all handlers.

abstract main_logic(self, *args, **kwargs)
abstract setup(self, *args, **kwargs)
abstract cleanup(self, *args, **kwargs)
_extended_run(self)

Returns:

as_thread(self)
as_process(self)
start(self)
abstract static get_constructor_parameters()

Returns a dictionary of the constructor’s parameters built as key for name and value for type name

abstract does_routine_use_queue(self, queue_name)

Returns True whether the routine uses the given queue_name. Args:

queue_name: the name of the queue

get_creation_dictionary(self)

Returns a dictionary containing the routine parameters name as keys and their values as values. The method return queue objects instead of queue names when encountering them.

class pipert.core.Events[source]

Bases: enum.Enum

Events that are fired by the RoutineInterface during execution.

BEFORE_LOGIC = before_logic
AFTER_LOGIC = after_logic
EXCEPTION_RAISED = exception_raised
class pipert.core.BaseComponent(component_config, start_component=False)[source]
setup_component(self, component_config)
_replace_queue_names_with_queue_objects(self, routine_parameters_kwargs)
_start(self)

Goes over the component’s routines registered in self.routines and starts running them.

run_comp(self)

Starts running all the component’s routines.

register_routine(self, routine: Union[Routine, Process, Thread])

Registers routine to the list of component’s routines Args:

routine: the routine to register

_teardown_callback(self, *args, **kwargs)

Implemented by subclasses of BaseComponent. Used for stopping or tearing down things that are not stopped by setting the stop_event. Returns: None

stop_run(self)

Signals all the component’s routines to stop.

create_queue(self, queue_name, queue_size=1)

Create a new queue for the component. Returns True if created or False otherwise Args:

queue_name: the name of the queue, must be unique queue_size: the size of the queue

get_queue(self, queue_name)

Returns the queue object by its name Args:

queue_name: the name of the queue

Raises:

KeyError - if no queue has the name

get_all_queue_names(self)

Returns the list of names of queues that the component expose.

does_queue_exist(self, queue_name)

Returns True the component has a queue named queue_name or False otherwise Args:

queue_name: the name of the queue to check

delete_queue(self, queue_name)

Deletes a queue with the name queue_name. Returns True if succeeded. Args:

queue_name: the name of the queue to delete

Raises:

KeyError - if no queue has the name queue_name

does_routine_name_exist(self, routine_name)
remove_routine(self, routine_name)
does_routines_use_queue(self, queue_name)
is_component_running(self)
get_routines(self)
get_component_configuration(self)
_get_routine_creation(self, routine)
set_monitoring_system(self, monitoring_system_parameters)
set_routine_attribute(self, routine_name, attribute_name, attribute_value)
class pipert.core.Message(data, source_address)[source]
counter = 0
update_payload(self, data)
get_payload(self)
is_empty(self)
record_entry(self, component_name, logger)

Records the timestamp of the message’s entry into a component.

Args:

component_name: the name of the component that the message entered. logger: the logger object of the component’s input routine.

record_custom(self, component_name, section)

Records the timestamp of the message’s entry into some section of a component.

Args:

component_name: the name of the component that the message is in. section: the name of the section within the component that the message entered.

record_exit(self, component_name, logger)

Records the timestamp of the message’s exit out of a component. Additionally, it enables a flag called ‘reached_exit’ if the message is exiting the pipeline’s “output component”.

Args:

component_name: the name of the component that the message exited. logger: the logger object of the component’s output routine.

get_latency(self, component_name)

Returns the time it took for a message to pass through a whole component.

Using the message’s history, this method calculates and returns the amount of time that passed from the moment the message entered a component, to the moment that it left it. Args:

component_name: the name of the relevant component.

get_end_to_end_latency(self, output_component)

Returns the time it took for a message to pass through the pipeline.

Args:

output_component: the name of the pipeline’s output component.

__str__(self)

Return str(self).

full_description(self)
class pipert.core.Payload(data)[source]

Bases: abc.ABC

Helper class that provides a standard way to create an ABC using inheritance.

abstract decode(self)
abstract encode(self, generator)
abstract is_empty(self)
class pipert.core.MessageHandler[source]

Bases: abc.ABC

Helper class that provides a standard way to create an ABC using inheritance.

abstract receive(self, in_key)

Receives the latest message from the message broker.

Args:

in_key: the name of the queue/stream at which the relevant message is located.

abstract read_next_msg(self, in_key)

Reads the following message from the one that was last read, if no message has been read before, then read the last message in the message broker cannot read the same message twice.

Args:

in_key: the name of the queue/stream at which the relevant message is located.

abstract read_most_recent_msg(self, in_key)

Reads the latest message in the message broker, cannot read the same message twice.

Args:

in_key: the name of the queue/stream at which the relevant message is located.

abstract send(self, out_key, msg)

Sends a message to the message broker.

Args:

out_key: the name of the queue/stream at which the relevant message will be placed. msg: the message object that is being sent.

abstract connect(self)

Establishes a connection to the message broker.

abstract close(self)

Closes the connection to the message broker.

class pipert.core.RedisHandler(url, maxlen=100)[source]

Bases: pipert.core.message_handlers.MessageHandler

Helper class that provides a standard way to create an ABC using inheritance.

read_next_msg(self, in_key)

Reads the following message from the one that was last read, if no message has been read before, then read the last message in the message broker cannot read the same message twice.

Args:

in_key: the name of the queue/stream at which the relevant message is located.

read_most_recent_msg(self, in_key)

Reads the latest message in the message broker, cannot read the same message twice.

Args:

in_key: the name of the queue/stream at which the relevant message is located.

receive(self, in_key)

Receives the latest message from the message broker.

Args:

in_key: the name of the queue/stream at which the relevant message is located.

_read_from_redis_using_method(self, in_key, reading_method, **method_args)
send(self, out_key, msg)

Sends a message to the message broker.

Args:

out_key: the name of the queue/stream at which the relevant message will be placed. msg: the message object that is being sent.

connect(self)

Establishes a connection to the message broker.

close(self)

Closes the connection to the message broker.

static _add_offset_to_stream_id(stream_id, offset)
class pipert.core.QueueHandler(q)
get(self, block=True, timeout=None)

Works just like the get method of queue.Queue

Returns:

item from the queue

timeout_get(self, timeout)

If timeout is reached, forces a context switch using time.sleep(0) and then returns None Args:

timeout: number of seconds until timeout

Returns:

item from the queue

non_blocking_get(self)

If the queue is empty, forces a context switch using time.sleep(0) and then returns None

put(self, item, block=True, timeout=None)

Works just like the put method of queue.Queue

timeout_put(self, item, timeout)

If timeout is reached returns False, else puts item in queue and returns True Args:

item: item to put in queue timeout: number of seconds until timeout

Returns:

True if successful, False if not

non_blocking_put(self, item)

If queue is full, returns False, else puts item in queue and returns True Args:

item: item to put in queue

Returns:

True if successful, False if not

deque_timeout_put(self, item, timeout)

If timeout is reached, it tries to take an item from the queue and then puts the item in the queue. Args:

item: item to put in queue timeout: number of seconds until timeout

Returns:

True if successful, False if had to deque an item

deque_non_blocking_put(self, item)

If queue is full, it tries to take an item from the queue and then puts the item in the queue. Args:

item: item to put in queue

Returns:

True if successful, False if had to deque an item