Source code for meta_and_frame_from_redis

import os
from urllib.parse import urlparse

from pipert.core.routine import Routine, RoutineTypes
from queue import Empty
import cv2
from pipert.core.message import message_decode
from pipert.core.message_handlers import RedisHandler
import time


[docs]class MetaAndFrameFromRedis(Routine):
[docs] routine_type = RoutineTypes.INPUT
def __init__(self, redis_read_meta_key, redis_read_image_key, image_meta_queue, *args, **kwargs): super().__init__(*args, **kwargs) self.redis_read_meta_key = redis_read_meta_key self.redis_read_image_key = redis_read_image_key self.url = urlparse(os.environ.get('REDIS_URL', "redis://127.0.0.1:6379")) self.image_meta_queue = image_meta_queue self.msg_handler = None self.flip = False self.negative = False
[docs] def receive_msg(self, in_key): encoded_msg = self.msg_handler.read_most_recent_msg(in_key) if not encoded_msg: return None msg = message_decode(encoded_msg) msg.record_entry(self.component_name, self.logger) return msg
[docs] def main_logic(self, *args, **kwargs): pred_msg = self.receive_msg(self.redis_read_meta_key) frame_msg = self.receive_msg(self.redis_read_image_key) if frame_msg: arr = frame_msg.get_payload() if self.flip: arr = cv2.flip(arr, 1) if self.negative: arr = 255 - arr try: self.image_meta_queue.get(block=False) except Empty: pass frame_msg.update_payload(arr) self.image_meta_queue.put((frame_msg, pred_msg)) return True else: time.sleep(0) return False
[docs] def setup(self, *args, **kwargs): self.msg_handler = RedisHandler(self.url)
[docs] def cleanup(self, *args, **kwargs): self.msg_handler.close()
@staticmethod
[docs] def get_constructor_parameters(): dicts = Routine.get_constructor_parameters() dicts.update({ "redis_read_meta_key": "String", "redis_read_image_key": "String", "image_meta_queue": "QueueOut", }) return dicts
[docs] def does_routine_use_queue(self, queue): return self.image_meta_queue == queue