开发者

Redis pub/sub adding additional channels mid subscription

Is it possible to add additional subscriptions to a Redis connection? I have a listening thread but it appears not to be influenced by new SUBSCRIBE commands.

If this is the expected behavior, what is the pattern that should be used if users add a stock ticker feed to their interests or join chatroom?

I would like to implement a Python class similar to:

import threading
import redis

class RedisPubSub(object):
    def __init__(self):
        self._redis_pub = redis.Redis(host='localhost', port=6379, db=0)        
        self._redis_sub = redis.Redis(host='localhost', port=6379, db=0)        
        self._sub_thread = threading.Thread(target=self._listen)
        self._sub_thread.setDaemon(True)
        self._sub_thread.start()

    def publish(self, channel, message):
        self._redis_pub.publish(channel, message)

    def subscribe(self, channel):
   开发者_Go百科     self._redis_sub.subscribe(channel)

    def _listen(self):
        for message in self._redis_sub.listen():
            print message


The python-redis Redis and ConnectionPool classes inherit from threading.local, and this is producing the "magical" effects you're seeing.

Summary: your main thread and worker threads' self._redis_sub clients end up using two different connections to the server, but only the main thread's connection has issued the SUBSCRIBE command.

Details: Since the main thread is creating the self._redis_sub, that client ends up being placed into main's thread-local storage. Next I presume the main thread does a client.subscribe(channel) call. Now the main thread's client is subscribed on connection 1. Next you start the self._sub_thread worker thread which ends up having its own self._redis_sub attribute set to a new instance of redis.Client which constructs a new connection pool and establishes a new connection to the redis server.

This new connection has not yet been subscribed to your channel, so listen() returns immediately. So with python-redis you cannot pass an established connection with outstanding subscriptions (or any other stateful commands) between threads.

Depending on how you plan to implement your app you may need to switch to using a different client, or come up with some other way to communicate subscription state to the worker threads, e.g. send subscription commands through a queue.

One other issue is that python-redis uses blocking sockets, which prevents your listening thread from doing other work while waiting for messages, and it cannot signal it wishes to unsubscribe unless it does so immediately after receiving a message.


Async way:

Twisted framework and the plug txredisapi

Example code (Subscribe:

import txredisapi as redis

from twisted.application import internet
from twisted.application import service


class myProtocol(redis.SubscriberProtocol):
    def connectionMade(self):
        print "waiting for messages..."
        print "use the redis client to send messages:"
        print "$ redis-cli publish chat test"
        print "$ redis-cli publish foo.bar hello world"
        self.subscribe("chat")
        self.psubscribe("foo.*")


        reactor.callLater(10, self.unsubscribe, "chat")
        reactor.callLater(15, self.punsubscribe, "foo.*")

        # self.continueTrying = False
        # self.transport.loseConnection()

    def messageReceived(self, pattern, channel, message):
        print "pattern=%s, channel=%s message=%s" % (pattern, channel, message)

    def connectionLost(self, reason):
        print "lost connection:", reason


class myFactory(redis.SubscriberFactory):
    # SubscriberFactory is a wapper for the ReconnectingClientFactory
    maxDelay = 120
    continueTrying = True
    protocol = myProtocol


application = service.Application("subscriber")
srv = internet.TCPClient("127.0.0.1", 6379, myFactory())
srv.setServiceParent(application)

Only one thread, no headache :)

Depends on what kind of app u coding of course. In networking case go twisted.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜