Source code for vis_logic_classification

import cv2
from pipert.core.routine import Routine, RoutineTypes
from queue import Empty, Full
import time


[docs]class VisLogicClassification(Routine):
[docs] routine_type = RoutineTypes.PROCESSING
def __init__(self, in_queue, out_queue, *args, **kwargs): super().__init__(*args, **kwargs) self.in_queue = in_queue self.out_queue = out_queue
[docs] def main_logic(self, *args, **kwargs): # TODO implement input that takes both frame and metadata try: frame_msg, pred_msg = self.in_queue.get(block=False) # print("frame", frame_msg) # print("pred msg = ", pred_msg) if pred_msg is not None and not pred_msg.is_empty(): frame = frame_msg.get_payload() pred = pred_msg.get_payload() image = self.draw_pred_on_frame(frame, pred) frame_msg.update_payload(image) frame_msg.history = pred_msg.history frame_msg.record_exit(self.component_name, self.logger) try: self.out_queue.put(frame_msg, block=False) return True except Full: try: self.out_queue.get(block=False) self.state.dropped += 1 except Empty: pass finally: try: self.out_queue.put(frame_msg, block=False) except Full: pass return True except Empty: time.sleep(0) return False
[docs] def draw_pred_on_frame(self, frame, pred): font = cv2.FONT_HERSHEY_SIMPLEX org = (50, 50) fontScale = 1 color = (255, 0, 0) thickness = 2 frame = cv2.putText(frame, pred, org, font, fontScale, color, thickness, cv2.LINE_AA) return frame
[docs] def setup(self, *args, **kwargs): self.state.dropped = 0
[docs] def cleanup(self, *args, **kwargs): pass
@staticmethod
[docs] def get_constructor_parameters(): dicts = Routine.get_constructor_parameters() dicts.update({ "in_queue": "QueueIn", "out_queue": "QueueOut", }) return dicts
[docs] def does_routine_use_queue(self, queue): return (self.in_queue == queue) or (self.out_queue == queue)