How to implement a two way jsonrpc + twisted server/client
Hello I am working on develop a rpc server based on twisted to serve several microcontrollers which make rpc call to twisted jsonrpc server. But the application also required that server send information to each micro at any time, so the question is how could be a good practice to prevent that the response from a remote jsonrpc call from a micro be confused with a server jsonrpc request which is made for a user.
The consequence that I am having now is that micros are receiving bad information, because they dont know if netstring/json string that is comming from socket is their response from a previous requirement or is a new request from server.
Here is my code:
from twisted.internet import reactor
from txjsonrpc.netstring import jsonrpc
import weakref
creds = {'user1':'pass1','user2':'pass2','user3':'pass3'}
class arduinoRPC(jsonrpc.JSONRPC):
def connectionMade(self):
pass
def jsonrpc_identify(self,username,password,mac):
""" Each client must be authenticated just after to be connected calling this rpc """
if creds.has_key(username):
if creds[username] == password:
authenticated = True
else:
authenticated = False
else:
authenticated = False
if authenticated:
self.factory.clients.append(self)
self.factory.references[mac] = weakref.ref(self)
return {'results':'Authenticated as %s'%username,'error':None}
else:
self.transport.loseConnection()
def jsonrpc_sync_acq(self,data,f):
"""Save into django table data acquired from sensors and send ack to gateway"""
if not (self in self.factory.clients):
self.transport.loseConnection()
print f
return {'results':'synced %s records'%len(data),'error':'null'}
def connectionLost(self, reason):
""" mac address is searched and all reference to self.factory.clientes are erased """
for mac in self.factory.references.keys():
if self.factory.references[mac]() == self:
print 'Connection closed - Mac address: %s'%mac
del self.factory.references[mac]
self.factory.clients.remove(self)
class rpcfactory(jsonrpc.RPCFactory):
protocol = arduinoRPC
def __init__(self, maxLength=1024):
self.maxLength = maxLength
self.subHandlers = {}
self.clients = []
self.references = {}
""" Asynchronous remote calling to micros, simulating random calling from server """
import threading,time,random,netstring,json
class asyncGatewayCalls(threading.Thread):
def __init__(self,rpcfactory):
threading.Thread.__init__(self)
self.rpcfactory = rpcfactory
"""identifiers of each micro/client connected"""
self.remoteMacList = ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90']
def run(self):
while True:
time.sleep(10)
while True:
""" call to any of three potential micros connected """
mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))]
if self.rpcfactory.references.has_key(mac):
print 'Calling %s'%mac
proto = self.rpcfactory.references[mac]()
""" requesting echo from selected micro"""
dataToSend = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']}))
proto.transport.write(dataToSend)
break
factory = rpcfactory(arduinoRPC)
"""start thread caller"""
r=asyncGatewayCalls(factory)
r.start()
reactor.listenTCP(7080, factory)
print "Micros remote RPC serv开发者_运维技巧er started"
reactor.run()
You need to add a enough information to each message so that the recipient can determine how to interpret it. Your requirements sounds very similar to those of AMP, so you could either use AMP instead or use the same structure as AMP to identify your messages. Specifically:
- In requests, put a particular key - for example, AMP uses "_ask" to identify requests. It also gives these a unique value, which further identifies that request for the lifetime of the connection.
- In responses, put a different key - for example, AMP uses "_answer" for this. The value matches up with the value from the "_ask" key in the request the response is for.
Using an approach like this, you just have to look to see whether there is an "_ask" key or an "_answer" key to determine if you've received a new request or a response to a previous request.
On a separate topic, your asyncGatewayCalls
class shouldn't be thread-based. There's no apparent reason for it to use threads, and by doing so it is also misusing Twisted APIs in a way which will lead to undefined behavior. Most Twisted APIs can only be used in the thread in which you called reactor.run
. The only exception is reactor.callFromThread
, which you can use to send a message to the reactor thread from any other thread. asyncGatewayCalls
tries to write to a transport, though, which will lead to buffer corruption or arbitrary delays in the data being sent, or perhaps worse things. Instead, you can write asyncGatewayCalls
like this:
from twisted.internet.task import LoopingCall
class asyncGatewayCalls(object):
def __init__(self, rpcfactory):
self.rpcfactory = rpcfactory
self.remoteMacList = [...]
def run():
self._call = LoopingCall(self._pokeMicro)
return self._call.start(10)
def _pokeMicro(self):
while True:
mac = self.remoteMacList[...]
if mac in self.rpcfactory.references:
proto = ...
dataToSend = ...
proto.transport.write(dataToSend)
break
factory = ...
r = asyncGatewayCalls(factory)
r.run()
reactor.listenTCP(7080, factory)
reactor.run()
This gives you a single-threaded solution which should have the same behavior as you intended for the original asyncGatewayCalls
class. Instead of sleeping in a loop in a thread in order to schedule the calls, though, it uses the reactor's scheduling APIs (via the higher-level LoopingCall class, which schedules things to be called repeatedly) to make sure _pokeMicro
gets called every ten seconds.
精彩评论