This article describes how to use Twisted to build a client/server cars monitoring system. We are going to focus on the client/server communication.

The client and server source code can be retrieved using Git:

git clone https://github.com/laurentluce/twisted-examples.git

Overview

Some researchers invented a system capable of monitoring cars and detecting the brand and color of a car. We are in charge of building a client/server software solution to retrieve the list of cars from different monitoring locations.

First element is a server used to monitor the cars and listen for clients connections to reply with the list of cars. Second element is the client retrieving the list of cars from the servers. We will use the Deferred feature to handle completion and failures callbacks.

Twisted is an asynchronous networking framework. It uses an event loop called the reactor. When this loop detects an event, it uses callbacks to report those events. Events include: connection made, data received, connection lost…

Server

The server listens for connection and write the cars data when a connection is initiated.

First, we have our server application class. The constructor takes care of the following:

  • Logging facility initialization.
  • Cars list initialization.
  • Create a server protocol factory. This factory produces a protocol instance for each connection.
  • Start a thread monitoring cars.

The class has a method named “listen” to start listening for new TCP connections on a specific host and port.

class TrafficServer(object):
    """
    Server main class
    """
    def __init__(self):
        """
        Constructor
        """
        # init logging facility: log to client.log
        logging.basicConfig(filename='server.log', level=logging.DEBUG)
        # cars list
        self.cars = []
        # server listening interface
        self.interface = 'server1.monitoring.com'
        # server port number
        self.port = 8000
        # Factory class for connections
        self.factory = TrafficFactory(self.cars)
        # Thread monitoring for new cars
        self.watchcars = WatchCars(self.cars)
        self.watchcars.start()
 
    def listen(self):
        """
        Call reactor's listen to listen for client's connections
        """
        port = reactor.listenTCP(self.port or 0, self.factory,
            interface=self.interface)

This is our server factory class creating protocol instances each time a connection is made. We pass the list of cars so it can be accessed using the factory in the protocol instance which we are going to see next.

class TrafficFactory(ServerFactory):
    """
    Factory to create protocol instances.
    """
        
    protocol = TrafficProtocol

    def __init__(self, cars):
        """
        Constructor.

        @param cars cars list
        """
        logging.debug('Traffic factory init')
        self.cars = cars

Next is the protocol class itself. The Protocol class implements connectionMade() which is called when a new connection is made. We are going to use this callback method to write the cars data to the client.

class TrafficProtocol(Protocol):
    """
    Protocol class to handle data between the client and the server.
    """

    def connectionMade(self):
        """
        Callback when a connection is made. Write cars data to the client then
        close the connection.
        """
        logging.debug('Connection made')
        data = '.'.join(self.factory.cars)
        self.transport.write(data)
        self.transport.loseConnection()

Here is the flow on the server side:

The WatchCars thread watches for new cars and append them to the cars list. Assume that get_next_car() is a blocking call returning the cars one by one.

class WatchCars(Thread):
    """
    Thread monitoring the cars.
    """

    def __init__(self, cars):
        """
        Constructor.

        @param cars cars list
        """
        Thread.__init__(self)
        self.cars = cars
    
    def run(self):
        """
        Thread run. Get new cars and add them to the cars list.
        """
        while True:
            t, brand, color = get_next_car()
            self.cars.append('%s:%s:%s' % (t, brand, color))

Finally, we have a simple main() function creating an instance of the server and starting the reactor loop.

def main():
    server = TrafficServer()
    server.listen()
    reactor.run()

if __name__ == '__main__':
    main()

Let’s start the server alone (server.py) and look at the logging output:

python server.py &

cat server.log
DEBUG:root:Traffic server init
DEBUG:root:Traffic factory init
DEBUG:root:Watch cars thread init
DEBUG:root:Watch cars thread run
DEBUG:root:Traffic server listen

We can see the server initializing the factory to instantiate protocol objects each time a connection is made. The server is listening for connections from the client.

Client

The client retrieves the list of cars from the different servers.

First is our client class doing the following:

  • Logging facility initialization.
  • Initialize the Deferred object to handle callbacks and failures.
  • Create a client protocol factory. This factory produces instances of our protocol each time a connection is made.
  • Cars list initialization.
  • Servers addresses list initialization.
class TrafficClient(object):
    """
    Client
    """
    def __init__(self):
        """
        Constructor
        """
        # init logging facility: log to client.log
        logging.basicConfig(filename='client.log', level=logging.DEBUG)
        # init deferred object to handle callbacks and failures
        self.deferred = defer.Deferred()
        # init factory to create protocol instances
        self.factory = TrafficClientFactory(d)
        # list of cars
        self.cars = []
        # keep track of servers replying so we know when the overall work
        # is finished
        self.addr_count = 0
        # list of servers to get cars list from
        self.addresses = [('server1.monitoring.com', 8000),
            ('server2.monitoring.com', 8000)]
        logging.debug('Init traffic client')
    ... 

Let’s look at the methods of the client class.

First is get_cars() which retrieves the list of cars from a server. It uses the reactor connectTCP() method to initiate a connection to the server. We will see later how we detect that we received data from he server. The deferred object allow us to register success and failure callbacks instead of handling the exception ourselves. We are going to register some callbacks in the main loop.

    def get_cars(self, host, port):
        """
        Connect to server to retrieve list of cars

        @param host server's hostname
        @param port server's port
        """
        reactor.connectTCP(host, port, self.factory)

Next is the main loop used to retrieve the list of cars from all the servers. We also register callbacks for when the list is returned and also to handle errors. The addCallbacks() method allow us to specify both. We also register a done method to be called no matter what happens. We are going to see those callback methods next.

    def update_cars(self):
        """
        Retrieve list of cars from all servers. Set callbacks to handle
        success and failure.
        """
        for address in self.addresses:
            host, port = address
            self.get_cars(host, port)
            self.deferred.addCallbacks(self.got_cars, self.get_cars_failed)
            self.deferred.addBoth(self.cars_done)

The method got_cars() is called when we are done receiving the data from the server. It is called by the protocol instance handling the data between the client and the server. We are going to see the factory and the protocol classes later.

    def got_cars(self, cars):
        """
        Callback when cars retrieval is successful

        @param cars data returned by server
        """
        logging.debug('Got cars: %s' % cars)
        self.cars.extend(cars)

The method get_cars_failed() is called when an error happens in the reactor loop.

    def get_cars_failed(self, err):
        """
        Callback when retrieval from server failed. Log error.

        @param err server error
        """
        logging.debug('Get cars failed: %s' % err)

The method cars_done() is called when all servers cars list have been retrieved. We also tell the reactor to stop.

    def cars_done(self, cars):
        """
        Callback when retrieval operation is finished for all servers.
        Log cars list and stop Twisted reactor loop which is listening to events
        """
        self.addr_count += 1
        if self.addr_count == len(addresses):
            logging.debug('Cars done: %s' % self.cars)
            reactor.stop()

Next is our protocol class to handle the data between the client and the server. We need to specify a method to be called when some data is received. We also specify a method to be called when the connection is lost. This happens normally when the server closes the connection after sending the list of cars.

class TrafficProtocol(Protocol):
    """
    Protocol class to handle data between the client and the server.
    """

    data = ''

    def dataReceived(self, data):
        """
        Callback when some data is received from server.

        @param data data received from server
        """
        logging.debug('Data received: %s' % data)
        self.data += data

    def connectionLost(self, reason):
        """
        Callback when connection is lost with server. At that point, the
        cars have been receieved.

        @param reason failure object
        """
        logging.debug('Connection lost: %s' % reason)
        self.cars = []
        for c in self.data.split('.'):
            self.cars.append(c)
        self.carsReceived(self.cars)

    def carsReceived(self, cars):
        """
        Called when the cars data are received.

        @param cars data received from the server
        """
        self.factory.get_cars_finished(cars)

We need to create a factory class to produce protocol instances. The method get_cars_finished() is called by the protocol instance when the connection is lost with the server. We also define clientConnectionFailed() to handle connection errors. Note how we use the deferred callbacks methods registered by the client class.

class TrafficClientFactory(ClientFactory):
    """
    Factory to create protocol instances
    """

    protocol = TrafficProtocol

    def __init__(self, deferred):
        """
        Constructor.

        @param deferred callbacks to handle completion and failures
        """
        self.deferred = deferred

    def get_cars_finished(self, cars):
        """
        Callback when the cars data is retrieved from the server successfully

        @param cars data received from the server
        """
        if self.deferred:
            d, self.deferred = self.deferred, None
            d.callback(cars)

    def clientConnectionFailed(self, connector, reason):
        """
        Callback when connection fails

        @param connector connection object.
        @param reason failure object
        """
        if self.deferred:
            d, self.deferred = self.deferred, None
            d.errback(reason)

Here is the flow on the client side:

Our simple main() function instantiates a client object and starts the reactor loop.

def main():
    client = TrafficClient()
    client.update_cars()

    reactor.run()

if __name__ == '__main__':
    main()

Let’s start the client alone (client.py) and look at the logging output:

python client.py &

cat client.log
DEBUG:root:Traffic client init
DEBUG:root:Traffic client factory init: <Deferred at 0x21945a8>
DEBUG:root:Update cars
DEBUG:root:Get cars: server1.monitoring.com - 8000
DEBUG:root:Get cars: server2.monitoring.com - 8000
DEBUG:root:Client connection failed: <twisted.internet.tcp.Connector instance at 0x2a28638> - [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionRefusedError'>: Connection was refused by other side: 111: Connection refused.
]
DEBUG:root:Get cars failed: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionRefusedError'>: Connection was refused by other side: 111: Connection refused.
]
DEBUG:root:Client connection failed: <twisted.internet.tcp.Connector instance at 0x2a28668> - [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionRefusedError'>: Connection was refused by other side: 111: Connection refused.
]
DEBUG:root:Get cars failed: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionRefusedError'>: Connection was refused by other side: 111: Connection refused.
]
DEBUG:root:Cars done: []

The client tries to connect to server1.monitoring.com and server2.monitoring.com. clientConnectionFailed() is called because there is no server listening. This is expected behavior. This results in calling get_cars_failed() followed by cars_done() as it is the callbacks chain we define for the Deferred object.

Testing

Let’s start 2 servers and 1 client and see what happens:

python server.py &

cat server.log
DEBUG:root:Traffic server init
DEBUG:root:Traffic factory init
DEBUG:root:Watch cars thread init
DEBUG:root:Watch cars thread run
DEBUG:root:Traffic server listen
DEBUG:root:Connection made

On the server side, we can see that a connection is made from the client. The server writes data to the client and closes the connection.

python client.py &

cat client.log
DEBUG:root:Traffic client init
DEBUG:root:Traffic client factory init: <Deferred at 0x24975a8>
DEBUG:root:Update cars
DEBUG:root:Get cars: server1.monitoring.com - 8000
DEBUG:root:Get cars: server2.monitoring.com - 8000
DEBUG:root:Data received: 97264836:peugeot:red
DEBUG:root:Connection lost: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionDone'>: Connection was closed cleanly.
]
DEBUG:root:Cars received: ['97264836:peugeot:red']
DEBUG:root:Get cars finished: ['97264836:peugeot:red']
DEBUG:root:Got cars: ['97264836:peugeot:red']
DEBUG:root:Cars done: ['97264836:peugeot:red', '97264846:renault:green']
DEBUG:root:Data received: 97264836:renault:green
DEBUG:root:Connection lost: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionDone'>: Connection was closed cleanly.
]
DEBUG:root:Cars received: ['97264846:renault:green']
DEBUG:root:Get cars finished: ['97264846:renault:green']
DEBUG:root:Got cars: ['97264846:renault:green']
DEBUG:root:Cars done: ['97264836:peugeot:red', '97264846:renault:green']

On the client side, 2 connections are made. 1 to server1.monitoring.com and 1 to server2.monitoring.com. ‘97264836:peugeot:red’ is received from server1 and ‘97264846:renault:green’ is received from server2.

That’s it for now. I hope you enjoyed this article. Please write a comment if you have any feedback.

Last modified: May 30, 2020

Author

Comments

This is what I was looking for a long time. Very nice.

awesome ! really very clear.

thanks for sharing

Excellent example. Neat as a tutorial. Thanks for sharing.

Comments are closed.