开发者

Dictionary+Queue Data Structure with Active Removal of Old Messages

I would like to create a data structure which represents a set of queues (ideally a hash, map, or dict like lookup) where messages in the queues are being actively removed after they've reached a certain age. The ttl value would be global; messages would not need nor have individual ttl's. The resolution for the ttl doesn't need to开发者_开发百科 be terribly accurate - only within a second or so.

I'm not even sure what to search for here. I could create a separate global queue that a background thread is monitoring, peeking and pulling pointers to messages off the global queue that tell it to remove items from the individual queues, but the behavior needs to go both ways. If an item gets removed from an invidual queue, it needs to remove from the global queue.

I would like for this data structure to be implemented in Python, ideally, and as always, speed is of the utmost importance (more so than memory usage). Any suggestions for where to start?


I'd start by just modeling the behavior you're looking for in a single class, expressed as simply as possible. Performance can come later on through iterative optimization, but only if necessary (you may not need it).

The class below does something roughly like what you're describing. Queues are simply lists that are named and stored in dictionary. Each message is timestamped and inserted at the front of the list (FIFO). Messages are reaped by checking the timestamp of the message at the end of the list, and popping it until it hits a message that is below the age threshold.

If you plan to access this from several threads you'll need to add some fine-grained locking to squeeze the most performance out of it. For example, the reap() method should only lock 1 queue at a time, rather than locking all queues (method-level synchronization), so you'd also need to keep a lock for each named queue.

Updated -- Now uses a global set of buckets (by timestamp, 1 second resolution) to keep track of which queues have messages from that time. This reduces the number of queues to be checked on each pass.

import time
from collections import defaultdict

class QueueMap(object):

    def __init__(self):
        self._expire = defaultdict(lambda *n: defaultdict(int))
        self._store = defaultdict(list)
        self._oldest_key = int(time.time())

    def get_queue(self, name):
        return self._store.get(name, [])

    def pop(self, name):
        queue = self.get_queue(name)
        if queue:
            key, msg = queue.pop()
            self._expire[key][name] -= 1
            return msg
        return None

    def set(self, name, message):
        key = int(time.time())
        # increment count of messages in this bucket/queue
        self._expire[key][name] += 1
        self._store[name].insert(0, (key, message))

    def reap(self, age):
        now = time.time()
        threshold = int(now - age)
        oldest = self._oldest_key

        # iterate over buckets we need to check
        for key in range(oldest, threshold + 1):
            # for each queue with items, expire the oldest ones
            for name, count in self._expire[key].iteritems():
                if count <= 0:
                    continue

                queue = self.get_queue(name)
                while queue:
                    if queue[-1][0] > threshold:
                        break
                    queue.pop()
            del self._expire[key]

        # set oldest_key for next pass
        self._oldest_key = threshold

Usage:

qm = QueueMap()
qm.set('one', 'message 1')
qm.set('one', 'message 2')
qm.set('two', 'message 3')
print qm.pop('one')
print qm.get_queue('one')
print qm.get_queue('two')

# call this on a background thread which sleeps
time.sleep(2)
# reap messages older than 1 second
qm.reap(1)
# queues should be empty now
print qm.get_queue('one')
print qm.get_queue('two')


Consider checking the TTLs whenever you access the queues instead of using a thread to be constantly checking. I'm not sure what you mean about the hash/map/dict (what is the key?), but how about something like this:

import time
class EmptyException(Exception): pass
class TTLQueue(object):
    TTL = 60 # seconds
    def __init__(self):
        self._queue = []

    def push(self, msg):
        self._queue.append((time.time()+self.TTL, msg))

    def pop(self):
        self._queue = [(t, msg) for (t, msg) in self._queue if t > time.time()]
        if len(self._queue) == 0:
            raise EmptyException()
        return self._queue.pop(0)[1]

queues = [TTLQueue(), TTLQueue(), TTLQueue()]  # this could be a dict or set or
                                               #    whatever if I knew what keys
                                               #    you expected
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜