Utiliser Thrift avec Twisted
En travaillant autour du sessionManager de FreeRds, j'ai eu l'occasion de travailler
avec le binding python de Thrift.
Thrift
Thrift est un nième framework de serialization de données. L'avantage pour notre projet est qu'il fournit directement une couche de transport avec un style RPC.
Comme avec protobuf, on a un fichier IDL qui décrit les messages et les méthodes. Ensuite le compilateur génère ce qu'il faut pour gérer ces messages. Pour Python, dans mon cas, ça ressemble à ça:
# thrift --gen py:twisted moninterface.thrift
Le résultat est dans le répertoire gen-py.twisted.
Twisted
FreeRDS et le sessionManager communiquent via une interface protobuf. Elle est bidirectionnelle, et c'est un trivial protocol maison qui gère ces messages protobuf (taille, type, question/réponse, id). FreeRds n'accepte qu'une seule connexion par ce canal. Nous avons appelé ICP le protocole entre FreeRDS et le sessionManager.
L'interface Thrift est de type RPC, c'est à dire une requête et la réponse qui revient. Dans certaines méthodes un appel Thrift va se traduire par un appel à FreeRds (en ICP donc) et la réponse sera traduite en Thrift. Ce qui veut dire que le traitement de la requête Thrift serait bloqué en attente de la réponse ICP. J'aurais pu faire un truc pas très élégant / compliqué, avec des primitives de synchronization, des transactions thrift / ICP, etc. Mais j'ai décidé de casser mon session manager python qui gérait très bien l'ICP (protobuf) pour le passer à Twisted et tout faire en évènementiel.
Pourquoi twisted ? Juste parce que thrift le propose, et que j'en ai toujours entendu parlé dans l'éco-système python, sans jamais avoir eu l'occasion de m'en servir. Des fois, on est bien obligé de trouver des excuses pour essayer des librairies ;). Autant dire que pour moi twisted avait plutôt une réputation sulfureuse, rendant le code illisible (spaghetti style) et j'avais des souvenirs désagréables avec apt-proxy basé dessus (au moins 1 blocage par jour).
Migration du code de l'ICP (protobuf)
La migration s'est vraiment faite sans douleur pour l'ICP: il a suffit d'écrire un gestionnaire de protocol. Mettre en place une factory. On s'enregistre dans le reactor et le tour est joué.
Des extraits pertinent du code après transformation:
from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ServerFactory
class IcpProtocol(Protocol):
...
def connectionMade(self):
self.data = ''
self.state = self.ICP_WAITING_LEN
self.bodyLen = 0
def connectionLost(self, reason):
self.factory.freeRdsInstance = None
def dataReceived(self, data):
self.data += data
...
class IcpFactory(ServerFactory):
def __init__(self, server):
...
self.freeRdsInstance = None
def buildProtocol(self, addr):
print "FreeRDS connected"
self.freeRdsInstance = IcpProtocol(self)
return self.freeRdsInstance
...
icpServer = reactor.listenUNIX(pipePath, core.icpFactory, 0666)
Implémentation du server Thrift
Dans l'implémentation du serveur Thrift ce que je voulais pouvoir gérer, c'était qu'une méthode Thrift appelée allait différer l'envoie de la réponse jusqu'à ce que la réponse ICP soit arrivée. Soit la séquence suivante:
- appel Thrift externe ;
- le sessionManager envoie une requête à FreeRds en ICP ;
- reception de la réponse ICP ;
- renvoie de la réponse Thrift ;
Évidement le sessionManager ne doit pas bloquer activement entre 2 et 3, et doit pouvoir continuer d'honorer des requêtes.
Comment faire ça ? Et bien j'ai découvert dans la docummentation qu'avec le binding twisted, la méthode serveur peut renvoyer un Deferred qui sera le résultat de la requête Thrift. Mais sera traitée plus tard évidement.
Ce sera le serveur ICP, à la réception de la réponse, qui fera l'appel callback() sur le deferred, ce qui déclenchera la réponse thrift.
Extrait de code:
...
from zope.interface import implements
from twisted.internet.defer import Deferred
class VcOpenHandler(PbRpcResponseHandler):
def __init__(self, fdsApi, deferred):
PbRpcResponseHandler.__init__(self, fdsApi, FdsApiVirtualChannelOpenResponse)
self.deferred = deferred
def onResponse(self, status, response):
if status == RPCBase.SUCCESS:
self.deferred.callback(response)
else:
self.deferred.error("request not successful")
class FdsApiHandler(object):
implements(fdsapi.Iface)
def __init__(self, server):
self.server = server
def virtualChannelOpen(self, authToken, sessionId, virtualName, isDynChannel, flags):
icpFactory = self.server.icpFactory
req = FdsApiVirtualChannelOpenRequest()
req.ConnectionId = sessionId
req.VirtualName = virtualName
req.dynamicChannel = False
req.flags = 0
d = Deferred()
def onError(err):
return 'Internal error in server'
def onSuccess(response):
return ttypes.TReturnVirtualChannelOpen(response.ConnectionString, response.Instance)
d.addErrback(onError)
d.addCallback(onSuccess)
icpFactory.doQuery(FdsApiVirtualChannelOpen, req, VcOpenHandler(self, d))
return d
La blague du transport
J'ai eu un petit soucis avec mon client Thrift en C++: les requêtes partaient mais le sessionManager python semblait ne pas comprendre ce qui lui arrivait. Après enquête et surtout l'aide précieuse des développeurs thrift sur IRC, j'ai trouvé ce qui clochait. Le client parlait en utilisant la représentation BufferedTransport alors que le serveur ne gère que le FramedTransport. Pour info, le BufferedTransport est totalement inefficace et ne devrait pas être utilisé.
Par contre cette petite anecdote montre quand même que Thrift aurait pu faire quelque chose pour ce cas: une sacro sainte compatibilité ascendante.
Conclusion
Au final le code est beau. Par contre, le debugging avec twisted est plutôt besogneux. Quand j'ai eu le problème de transport avec thrift, et que je me suis retouvé avec pydev dans les entrailles du reactor, je ne faisais pas le malin. J'imagine que c'est souvent comme ça avec la programmation évènementielle.
Commentaires