Source code for pipert.contrib.canny_demo.canny_gear

# A Redis gear for orchestrating realtime video analytics
import redisAI
import numpy as np
# from time import time
from PIL import Image

# Globals for downsampling
[docs]_mspf = 1000 / 10.0 # Msecs per frame (initialized with 10.0 FPS)
[docs]_next_ts = 0 # Next timestamp to sample a frame
# # class SimpleMovingAverage(object): # ''' Simple moving average ''' # def __init__(self, value=0.0, count=7): # ''' # @value - the initialization value # @count - the count of samples to keep # ''' # self.count = int(count) # self.current = float(value) # self.samples = [self.current] * self.count # # def __str__(self): # return str(round(self.current, 3)) # # def add(self, value): # ''' Adds the next value to the average ''' # v = float(value) # self.samples.insert(0, v) # o = self.samples.pop() # self.current = self.current + (v-o)/self.count # class Profiler(object): # ''' Mini profiler ''' # names = [] # Steps names in order # data = {} # ... and data # last = None # def __init__(self): # pass # # def __str__(self): # s = '' # for name in self.names: # s = '{}{}:{}, '.format(s, name, self.data[name]) # return(s[:-2]) # # def __delta(self): # ''' Returns the time delta between invocations ''' # now = time()*1000 # Transform to milliseconds # if self.last is None: # self.last = now # value = now - self.last # self.last = now # return value # # def start(self): # ''' Starts the profiler ''' # self.last = time()*1000 # # def add(self, name): # ''' Adds/updates a step's duration ''' # value = self.__delta() # if name not in self.data: # self.names.append(name) # self.data[name] = SimpleMovingAverage(value=value) # else: # self.data[name].add(value) # # def assign(self, name, value): # ''' Assigns a step with a value ''' # if name not in self.data: # self.names.append(name) # self.data[name] = SimpleMovingAverage(value=value) # else: # self.data[name].add(value) # # def get(self, name): # ''' Gets a step's value ''' # return self.data[name].current ''' The profiler is used first and foremost for keeping track of the total (average) time it takes to process a frame - the information is required for setting the FPS dynamically. As a side benefit, it also provides per step metrics. ''' # prf = Profiler() # def downsampleStream(x): # ''' Drops input frames to match FPS ''' # global _mspf, _next_ts # execute('TS.INCRBY', 'camera:0:in_fps', 1, 'RESET', 1) # Store the input fps count # ts, _ = map(int, str(x['streamId']).split('-')) # Extract the timestamp part from the message ID # sample_it = _next_ts <= ts # if sample_it: # Drop frames until the next timestamp is in the present/past # _next_ts = ts + _mspf # return sample_it
[docs]def runCanny(x): ''' Runs the model on an input image from the stream ''' global prf IMG_SIZE = 416 # Model's input image size # log('read') # Read the image from the stream's message buf = io.BytesIO(x['image']) pil_image = Image.open(buf) image = np.array(pil_image).transpose((2, 0, 1)) / 255. # log('resize') # Resize, normalize and tensorize the image for the model (number of images, width, height, channels) # log('tensor') img_ba = bytearray(image.tobytes()) image_tensor = redisAI.createTensorFromBlob('FLOAT', [1, 480, 640, 3], img_ba) # log('model') # Create the RedisAI model runner and run it modelRunner = redisAI.createModelRunner('canny:model') redisAI.modelRunnerAddInput(modelRunner, 'input', image_tensor) redisAI.modelRunnerAddOutput(modelRunner, 'output') model_replies = redisAI.modelRunnerRun(modelRunner) # model_output = model_replies[0] shape = redisAI.tensorGetDims(model_replies) buf = redisAI.tensorGetDataAsBlob(model_replies) edges = np.frombuffer(buf, dtype=np.float32).reshape(shape) # log('script') # The model's output is processed with a PyTorch script for non maxima suppression # scriptRunner = redisAI.createScriptRunner('yolo:script', 'boxes_from_tf') # redisAI.scriptRunnerAddInput(scriptRunner, model_output) # redisAI.scriptRunnerAddOutput(scriptRunner) # script_reply = redisAI.scriptRunnerRun(scriptRunner) # prf.add('script') # log('boxes') # The script outputs bounding boxes # shape = redisAI.tensorGetDims(script_reply) # buf = redisAI.tensorGetDataAsBlob(script_reply) # boxes = np.frombuffer(buf, dtype=np.float32).reshape(shape) # Iterate boxes to extract the people # ratio = float(IMG_SIZE) / max(pil_image.width, pil_image.height) # ratio = old / new # pad_x = (IMG_SIZE - pil_image.width * ratio) / 2 # Width padding # pad_y = (IMG_SIZE - pil_image.height * ratio) / 2 # Height padding # boxes_out = [] # people_count = 0 # for box in boxes[0]: # if box[4] == 0.0: # Remove zero-confidence detections # continue # if box[-1] != 14: # Ignore detections that aren't people # continue # people_count += 1 # # # Descale bounding box coordinates back to original image size # x1 = (IMG_SIZE * (box[0] - 0.5 * box[2]) - pad_x) / ratio # y1 = (IMG_SIZE * (box[1] - 0.5 * box[3]) - pad_y) / ratio # x2 = (IMG_SIZE * (box[0] + 0.5 * box[2]) - pad_x) / ratio # y2 = (IMG_SIZE * (box[1] + 0.5 * box[3]) - pad_y) / ratio # # # Store boxes as a flat list # boxes_out += [x1,y1,x2,y2] # prf.add('boxes') return x['streamId'], edges
[docs]def storeResults(x): ''' Stores the results in Redis Stream and TimeSeries data structures ''' global _mspf, prf ref_id, edges = x[0], int(x[1]) ref_msec = int(str(ref_id).split('-')[0]) # Store the output in its own stream res_id = execute('XADD', 'camera:0:yolo', 'MAXLEN', '~', 1000, '*', 'ref', ref_id, 'edges', edges)
# Create and register a gear that for each message in the stream
[docs]gb = GearsBuilder('StreamReader')
# gb.filter(downsampleStream) # Filter out high frame rate gb.map(runCanny) # Run the model gb.map(storeResults) # Store the results gb.register('camera:0')