Source code for face_detection

import torch
from pipert.core.message import Message
from pipert.core.routine import RoutineTypes, Routine
from pipert.utils.structures import Instances, Boxes
from queue import Empty
import time
import cv2
import pkg_resources


[docs]class FaceDetection(Routine):
[docs] routine_type = RoutineTypes.PROCESSING
def __init__(self, in_queue, out_queue, *args, **kwargs): super().__init__(*args, **kwargs) self.in_queue = in_queue self.out_queue = out_queue self.face_cas = None
[docs] def main_logic(self, *args, **kwargs): try: frame_msg = self.in_queue.get(block=False) frame = frame_msg.get_payload() gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = self.face_cas.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(20, 20) ) if len(faces): faces = torch.from_numpy(faces) faces[:, 2:] += faces[:, :2] # print(faces.size(), faces) new_instances = Instances(frame.shape[:2]) new_instances.set("pred_boxes", Boxes(faces)) new_instances.set("pred_classes", torch.zeros(faces.size(0)).int()) else: new_instances = Instances(frame.shape[:2]) new_instances.set("pred_classes", []) try: self.out_queue.get(block=False) self.state.dropped += 1 except Empty: pass pred_msg = Message(new_instances, frame_msg.source_address) self.out_queue.put(pred_msg, block=False) return True except Empty: time.sleep(0) return False
[docs] def setup(self, *args, **kwargs): haar_xml = pkg_resources.resource_filename('cv2', 'data/haarcascade_frontalface_default.xml') self.face_cas = cv2.CascadeClassifier(haar_xml) self.state.dropped = 0
[docs] def cleanup(self, *args, **kwargs): pass
@staticmethod
[docs] def get_constructor_parameters(): dicts = Routine.get_constructor_parameters() dicts.update({ "in_queue": "QueueIn", "out_queue": "QueueOut", }) return dicts
[docs] def does_routine_use_queue(self, queue): return (self.in_queue == queue) or (self.out_queue == queue)