Source code for pipert.core.shared_memory_generator

import posix_ipc
import mmap
from pipert.core.shared_memory import SharedMemory


[docs]class MemoryIdIterator: """ Iterates over a set amount of id's. The id's are in the following format: "{component_name}_{serial_memory_number}". """ def __init__(self, component_name, max_count): self.component_name = component_name self.name_count = 0 self.max_count = max_count
[docs] def get_next(self): """ Get the next shared memory name. Returns: Next available shared memory name. """ current_memory_id = (self.name_count % self.max_count) next_name = f"{self.component_name}_{current_memory_id}" self.name_count += 1 return next_name
[docs]def get_shared_memory_object(name): """ Get a SharedMemory object that correlates to the name given. Args: name: The name of a shared memory. Returns: A SharedMemory object. """ try: memory = posix_ipc.SharedMemory(name) semaphore = posix_ipc.Semaphore(name) except posix_ipc.ExistentialError: return None except Exception: return None mapfile = mmap.mmap(memory.fd, memory.size) memory.close_fd() semaphore.release() return SharedMemory(memory, semaphore, mapfile)
[docs]class SharedMemoryGenerator: """ Generates a set 'max_count' amount of shared memories to be used. """ def __init__(self, component_name, max_count=50, size=5000000): self.memory_id_gen = MemoryIdIterator(component_name, max_count) self.max_count = max_count self.shared_memories = {} self.size = size
[docs] def create_memories(self): for _ in range(self.max_count): next_name = self.memory_id_gen.get_next() memory = posix_ipc.SharedMemory(next_name, posix_ipc.O_CREAT, size=self.size) semaphore = posix_ipc.Semaphore(next_name, posix_ipc.O_CREAT) mapfile = mmap.mmap(memory.fd, memory.size) memory.close_fd() semaphore.release() self.shared_memories[next_name] = SharedMemory(memory, semaphore, mapfile)
[docs] def get_next_shared_memory_name(self): return self.memory_id_gen.get_next()
[docs] def cleanup(self): """ Cleans all of the allocated shared memories to free up the ram. """ for _ in range(self.max_count): name_to_unlink = self.memory_id_gen.get_next() if name_to_unlink in self.shared_memories: self._destroy_memory(name_to_unlink)
[docs] def _destroy_memory(self, memory_to_destroy): """ Destroys a specified shared memory. Args: memory_to_destroy: The name of the existing shared memory. """ self.shared_memories[memory_to_destroy].free_memory() self.shared_memories.pop(memory_to_destroy)