开发者

How to disconnect redis client in websocket eventmachine

I'm trying to build a websocket server where each client establish its own redis connections used for publish and subscribe.

When the redis server is running I can see the two new connections being established when a client connects to the websocket server and I can also publish data to the client, but when the client drops the connection to the websocket server I also want to disconnect from Redis . How can I do this?

Maybe I'm doing it wrong, but this is my code.

#require 'redis'
require 'em-websocket'
require 'em-hiredis'
require 'json'

CLIENTS = Hash.new

class PubSub
  def initialize(client)
    @socket = client.ws
    # These clients can only be used for pub sub commands
    @publisher = EM::Hiredis.connect #Later I will like to disconnect this
    @subscriber = EM::Hiredis.connect #Later I will like to disconnect this
    client.connections << @publisher << @subscriber
  end
  def subscribe(channel)
    @channel = channel
    @subscriber.subscribe(channel)
    @subscriber.on(:message) { |chan, message|
      @socket.send message
    }
  end
  def publish(channel,msg)
    @publisher.publish(channel, msg).errback { |e|
      puts [:publisherror, e]
    }
  end
  def unsubscribe()
    @subscriber.unsubscribe(@channel)
  end
end

class Client
  attr_accessor :connections, :ws
  def initialize(ws)
    @connections = []
    @ws = ws
  end
end

EventMachine.run do
  # Creates a websocket listener
  EventMachine::WebSocket.start(:host => '0.0.0.0', :port => 8081) do |ws|

    ws.onopen do
      # I instantiated above
      puts 'CLient connected. Creating socket'      
      @client = Client.new(ws)
      CLIENTS[ws] = @client
    end

    ws.onclose do
      # Upon the close of the connection I remove it from my list of running sockets
      puts 'Client disconnected. Closing socket'

      @client.connections.each do |con|
        #do something to disconnect from redis
      end

      CLIENTS.delete ws
    end

    ws.onmessage { |msg|
      puts "Received message: #{msg}"
      result = JSON.parse(msg)
      if result.has_key? 'channel'
        ps = PubSub.开发者_如何学JAVAnew(@client)
        ps.subscribe(result['channel'])
      elsif result.has_key? 'publish'
         ps = PubSub.new(ws)
         ps.publish(result['publish']['channel'],result['publish']['msg']);
      end
    }
  end
end


This version of em-hiredis supports close connection: https://github.com/whatupdave/em-hiredis


Here is how I would (and did many times) this: instead of always opening and closing connections for each client you can keep 1 connection open per Thread/Fiber dependeing on what you are basing your concurrency on, that way if you are using a poll of Thread/Fibers once each one of them have its connections they will keep it and reuse them.

I did not worked much with websocket until now (I was waiting for a standard implementation) but I am sure you can apply that thinking to it too.

You can also do what rails/activerecord: keeo a pool of redis connection, each time you need to use a connection you request one, use it and realease it, it could look like this:

def handle_request(request)
  @redis_pool.get_connection do |c|
    # [...]
  end
end

before yielding the block a connection is taken from the available ones and after it the connection is marked as free.


This was added to em-hiredis: https://github.com/mloughran/em-hiredis/pull/6

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜