from abc import ABC, abstractmethod
import redis
[docs]class MessageHandler(ABC):
@abstractmethod
[docs] def receive(self, in_key):
"""
Receives the latest message from the message broker.
Args:
in_key: the name of the queue/stream at which the relevant message
is located.
"""
pass
@abstractmethod
[docs] def read_next_msg(self, in_key):
"""
Reads the following message from the one that was last read,
if no message has been read before,
then read the last message in the message broker
cannot read the same message twice.
Args:
in_key: the name of the queue/stream at which the relevant message
is located.
"""
pass
@abstractmethod
[docs] def read_most_recent_msg(self, in_key):
"""
Reads the latest message in the message broker,
cannot read the same message twice.
Args:
in_key: the name of the queue/stream at which the relevant message
is located.
"""
pass
@abstractmethod
[docs] def send(self, out_key, msg):
"""
Sends a message to the message broker.
Args:
out_key: the name of the queue/stream at which the relevant message
will be placed.
msg: the message object that is being sent.
"""
pass
@abstractmethod
[docs] def connect(self):
"""
Establishes a connection to the message broker.
"""
pass
@abstractmethod
[docs] def close(self):
"""
Closes the connection to the message broker.
"""
pass
[docs]class RedisHandler(MessageHandler):
def __init__(self, url, maxlen=100):
self.conn = None
self.url = url
self.maxlen = maxlen
self.last_msg_id = None
self.connect()
[docs] def read_next_msg(self, in_key):
return self._read_from_redis_using_method(
in_key=in_key,
reading_method=self.conn.xrange,
name=in_key,
count=1,
min=self._add_offset_to_stream_id(self.last_msg_id, 1)
)
[docs] def read_most_recent_msg(self, in_key):
return self._read_from_redis_using_method(
in_key=in_key,
reading_method=self.conn.xrevrange,
name=in_key,
count=1,
min=self._add_offset_to_stream_id(self.last_msg_id, 1)
)
[docs] def receive(self, in_key):
# Need to set value in last_msg_id so
# _read_from_redis_using_method will not cause an infinite loop
self.last_msg_id = ""
msg = self._read_from_redis_using_method(
in_key,
self.conn.xrevrange,
name=in_key,
count=1
)
if msg is None:
self.last_msg_id = None
return msg
[docs] def _read_from_redis_using_method(self,
in_key,
reading_method,
**method_args):
if self.last_msg_id is None:
return self.receive(in_key)
redis_msg = reading_method(**method_args)
if not redis_msg:
return None
self.last_msg_id = redis_msg[0][0].decode()
msg = redis_msg[0][1]["msg".encode("utf-8")]
return msg
[docs] def send(self, out_key, msg):
fields = {
"msg": msg
}
_ = self.conn.xadd(out_key, fields, maxlen=self.maxlen)
[docs] def connect(self):
self.conn = redis.Redis(host=self.url.hostname, port=self.url.port)
if not self.conn.ping():
raise Exception('Redis unavailable')
[docs] def close(self):
self.conn.close()
@staticmethod
[docs] def _add_offset_to_stream_id(stream_id, offset):
if stream_id is None:
return None
fixed_id = stream_id.split("-")
last_msg_id_to_read = '-'.join([fixed_id[0],
str(int(fixed_id[1]) + offset)])
return last_msg_id_to_read