Source code for pipert.core.utlis.queue_handler

import queue
import multiprocessing as mp
from typing import Union
import time


[docs]class QueueHandler: def __init__(self, q): self.q: Union[queue.Queue, mp.Queue] = q
[docs] def get(self, block=True, timeout=None): """ Works just like the `get` method of `queue.Queue` Returns: item from the queue """ return self.q.get(block, timeout)
[docs] def timeout_get(self, timeout): """ If timeout is reached, forces a context switch using `time.sleep(0)` and then returns `None` Args: timeout: number of seconds until timeout Returns: item from the queue """ try: return self.q.get(timeout=timeout) except queue.Empty: time.sleep(0) return None
[docs] def non_blocking_get(self): """ If the queue is empty, forces a context switch using `time.sleep(0)` and then returns `None` """ try: return self.q.get(block=False) except queue.Empty: time.sleep(0) return None
[docs] def put(self, item, block=True, timeout=None): """ Works just like the `put` method of `queue.Queue` """ self.q.put(item, block, timeout)
[docs] def timeout_put(self, item, timeout): """ If timeout is reached returns `False`, else puts item in queue and returns `True` Args: item: item to put in queue timeout: number of seconds until timeout Returns: True if successful, False if not """ try: self.q.put(item, timeout=timeout) return True except queue.Full: return False
[docs] def non_blocking_put(self, item): """ If queue is full, returns `False`, else puts item in queue and returns `True` Args: item: item to put in queue Returns: True if successful, False if not """ try: self.q.put(item, block=False) return True except queue.Full: return False
[docs] def deque_timeout_put(self, item, timeout): """ If timeout is reached, it tries to take an item from the queue and then puts the item in the queue. Args: item: item to put in queue timeout: number of seconds until timeout Returns: True if successful, False if had to deque an item """ try: self.q.put(item, timeout=timeout) return True except queue.Full: try: _ = self.q.get(block=False) dropped = False except queue.Empty: dropped = True # TODO - could crash due to a race condition, could be solved with a lock self.q.put(item, block=False) return dropped
[docs] def deque_non_blocking_put(self, item): """ If queue is full, it tries to take an item from the queue and then puts the item in the queue. Args: item: item to put in queue Returns: True if successful, False if had to deque an item """ try: self.q.put(item, block=False) return True except queue.Full: try: _ = self.q.get(block=False) dropped = False except queue.Empty: dropped = True # TODO - could crash due to a race condition, could be solved with a lock self.q.put(item, block=False) return dropped