Python, Twitter statistics and the 2012 French presidential election

August 29, 2012

This post describes how Pytolab was designed to process Tweets related to the 2012 French presidential election, in real-time. This post also goes over some of the statistics computed over a period of 9 months.

Note: I presented this project at EuroSciPy 2012: abstract.

Architecture
Statistics

Architecture

The posts are received from the Twitter streaming API and sent to a messaging exchange. The posts are read from the messaging queue and processed by the computing unit. Most frequently accessed data is stored in an in-memory DB (Redis) and long term data is stored in MySQL. See diagram below.

Twitter statistics

Tweets receiver

The Twitter streaming API filter feature is used here to receive the tweets we are interested in: Tweets referring to at least one of the candidates. The helper library Tweepy facilitates that task.

First thing we do is setting up a stream listener. We get a listener instance, set the callback to be called when a new post arrives and finally get a stream instance by passing our listener instance to it. We will see next how those different objects are defined.

def setup_stream_listener(self):
    """
    Setup Twitter API streaming listener
    """
    listener = Listener()
    listener.set_callback(self.mq.producer.publish)
    self.stream = tweepy.Stream(
        self.config.get('twitter', 'userid'),
        self.config.get('twitter', 'password'),
        listener,
        timeout=3600
    )

Note: We use ConfigParser for the configuration file management.

The Listener class is derived from the tweepy.StreamListener class. We overwrite some of the methods to indicate what to do when a new post arrives or when an error is detected.

class Listener(tweepy.StreamListener):
    def on_status(self, status):
        # Do things with the post received. Post is the status object.
        ...
    
    def on_error(self, status_code):
        # If error thrown during streaming.
        ...

    def on_timeout(self):
        # If no post received for too long
        ...
        
    def on_limit(self, track):
        # If too many posts match our filter criteria and only a subset is
        # sent to us
        ...

    def on_delete(self, status_id, user_id):
         # When a delete notice arrives for a post.
         ...

    def set_callback(self, callback):
        # Pass callback to call when a new post arrives
        self.callback = callback

We need to add few lines of code to the on_status method. We parse what we are interested in and publish the data to our messaging queue. We filter out the posts written by an author whose language is not French. The callback is our messaging queue producer publish method.

def on_status(self, status):
    if status.author.lang == 'fr':
        message = {'author_name': status.author.screen_name,
                   'author_id': status.author.id,
                   'id': status.id,
                   'text': status.text,
                   'retweeted': status.retweeted,
                   'coordinates': status.coordinates,
                   'time': int(time.time())}
        self.callback(json.dumps(message), 'posts')

We will see later how the messaging queue producer and consumer are built.

There is one more thing we need to do: setting up our streaming filter so we start receiving posts from Twitter, we are interested in. We have a list of presidential candidates in the list self.person. We build a list of names and start listening for them. The call to stream.filter is blocking and the method on_status of the listener class is called each time a new post arrived.

Keep in mind that the streaming filter returns at most 1% of all posts processed by Twitter. This means that if the posts referring to our candidates represent more than 1% of all posts on Twitter at instant t, then the number of posts will be capped at 1%. We encountered this case only twice: during the first round results and during the second round results. We lost less than 10% of the posts when that situation happened. How do you make sure this does not happen? You will have to subscribe to the complete stream which is provided by some Twitter partners like DataSift and Gnip. Those solutions are not cheap.

Note that we are catching all exceptions. There is no guarantee that you will get continuous streaming with no errors so catching all exceptions is important here.

def stream_filter(self):
    track_list = [data.normalize(p['name']) for p in self.persons]
    while True:
        try:
            self.stream.filter(track=track_list)
        except Exception:
            logging.exception('stream filter')
            time.sleep(10)

Some examples of errors I saw in the past:

File "/usr/local/lib/python2.6/dist-packages/tweepy-1.7.1-py2.6.egg/tweepy/streaming.py", line 148, in _read_loop
    c = resp.read(1)
...
File "/usr/lib/python2.6/httplib.py", line 518, in read
    return self._read_chunked(amt)
File "/usr/lib/python2.6/httplib.py", line 561, in _read_chunked
    raise IncompleteRead(''.join(value))
IncompleteRead: IncompleteRead(0 bytes read)
File "/usr/local/lib/python2.6/dist-packages/tweepy-1.7.1-py2.6.egg/tweepy/streaming.py", line 148, in _read_loop
    c = resp.read(1)
...
File "/usr/lib/python2.6/ssl.py", line 96, in <lambda>
    self.recv = lambda buflen=1024, flags=0: SSLSocket.recv(self, buflen, flags)
File "/usr/lib/python2.6/ssl.py", line 222, in recv
    raise x
SSLError: The read operation timed out
File "/usr/local/lib/python2.6/dist-packages/tweepy-1.7.1-py2.6.egg/tweepy/streaming.py", line 148, in _read_loop
    c = resp.read(1)
...
File "/usr/lib/python2.6/ssl.py", line 136, in read
    return self._sslobj.read(len)
error: [Errno 104] Connection reset by peer

Let’s take a look at what happens when the method stream.filter is called. An HTTPS POST request is made using the following URL: https://stream.twitter.com/1/statuses/filter.json?delimited=length and the following body: track=candidate1, candidate2,… The stream of data is then read in a loop until there is an error.

The data arrives in the following format: “\n\n…” There is the length before the post data because we ask for it with the URL parameter ‘delimited=length’.

Here is an example of a post content:

{
    "in_reply_to_user_id": null,
    "in_reply_to_status_id": null,
    "text": "bla bla bla",
    "favorited": false,
    ...
}

A more complete example: https://gist.github.com/900964.

The Tweepy library formats that data as a status object and passes it to the on_status method of the listener object.

See the full Tweets receiver module.

Messaging queue

We are using RabbitMQ for our messaging system plus the Python helper library py-amqlib. An exchange is created to receive the posts and a consumer reads the messages from a queue. Those messages are processed by the computing unit. The advantage of using a messaging queue is we can handle surge of posts.

First is the producer. We create a connection to the messaging server and get a channel on that connection. This channel is used to publish messages to the exchange.

class Producer(object):
    def __init__(self, exchange_name, host, userid, password):
        self.exchange_name = exchange_name
        self.connection = amqp.Connection(
            host=host, userid=userid, password=password, virtual_host="/",
            insist=False)
        self.channel = self.connection.channel()

Our publisher class has a publish method to send a message to the exchange. Messages marked as ‘persistent’ that are delivered to ‘durable’ queues will be logged to disk. We use the routing key ‘posts’ which will also be used when we create the queue to route the messages properly.

def publish(self, message, routing_key):
    msg = amqp.Message(message)
    msg.properties["content_type"] = "text/plain"
    msg.properties["delivery_mode"] = 2
    self.channel.basic_publish(exchange=self.exchange_name,
                     routing_key=routing_key,
                     msg=msg)

Next is the consumer. We also get a connection to the messaging server and get a channel on that connection.

class Consumer(object):
    def __init__(self, host, userid, password):
        self.connection = amqp.Connection(host=host, userid=userid, password=password, virtual_host="/", insist=False)
        self.channel = self.connection.channel()

We also have a method creating the queue and one passing the method to be called each time there is a message to be consumed in the queue.

See the full Messaging queue module.

DB interface

Before we go over the computing unit, let’s look at the DB interface we created to interface with the in-memory DB Redis and MySQL.

Regarding Redis, our interface is built on top of the helper library redis-py. It adds retries around DB commands.

We use the following Redis commands (complexity of the command is shown next to it):

  • GET key – O(1)
  • SET key – O(1)
  • DELETE key – O(1)
  • EXISTS key – O(1)
  • INCR key – O(1)
  • RPUSH key value – O(1)
  • LSET key index value – O(N)
  • LINDEX key index – O(N)
  • LRANGE key start stop – O(S+N)

The key used to store posts is ‘post: ‘. We dump the json post data as the key’s value. For ease of access, we also have a Redis list per person and per hour with the following key: ‘person: :posts:. This list contains the post ids referring to this person during that hour.

Regarding MySQL, our interface is built on top of the helper library MySQLdb.

Here is the method to execute a MySQL command. If the command throws an operational error or an internal error, we try to reconnect to the MySQL server. If it throws a SQL error, we retry multiple times before raising a DBError.

def mysql_command(self, cmd, sql, writer, *args):
    retry = 0
    while retry < self.cmd_retries:
        try:
            r = getattr(self.db_cursor, cmd)(sql, args)
            if writer:
                self.db_disk_posts.commit()
                return r
            else:
                return self.db_cursor.fetchall() 
        except (MySQLdb.OperationalError, MySQLdb.InternalError):
            self.log.error('MySQL cmd %s DB error', cmd)
            # reconnect
            self.setup_mysql_loop()
            retry = 0
        except MySQLdb.Error:
            self.log.error('MySQL cmd %s sql %s failed', cmd, sql)
            retry += 1
            if retry <= self.cmd_retries:
                time.sleep(self.cmd_retry_wait)
        except AttributeError:
            self.log.error('MySQL cmd %s does not exist', cmd)
            raise exceptions.DbError()
    raise exceptions.DbError()

We keep smaller and more recent data in Redis. MySQL is used for larger and long-term data.

We added a thin layer on top of the Redis and MySQL commands to make the dual DB setup transparent. When we request some data, it is read from Redis and/or MySQL based on its age or type.

Twitter statistics

See the full DB module.

Computing unit

We defined a method called when there is a message to read from the queue. When a post is received, we process it the following way:

  • Filter out posts marked as fr language and containing common english words. In most cases, this is a post fully written in English and we need to bypass those.
  • For each person, check if this post is really about that person and not something unrelated.
  • Add post ID to the person’s hourly posts list.
  • Store post data in DB.
def process_post(self, post):
    """
    Process post received from the message queue.
    """
    # is this a post matching one or more persons?
    post_add = False
    # remove accents and lowercase everything
    text = data.normalize(post['text']).lower()
    ...
    # check post language
    if data.get_text_language(text) == 'fr':
        for person in self.persons:
            # get person's first name, last name and nickname
            names = data.get_names(person)
            # check if the post is really about that person
            if data.check_names(names, text, person['words']) == 1:
                # one more post for this person
                if not post_add:
                    post_add = True
                    # get next post id
                    post_id = self.db.incr('nextPostId')
                # add post to person's posts list
                key = 'person:%d:posts:%d' % (person['id'],
                        self.stats_last_update)
                self.db.rpush(key, post_id)
                ...
        if post_add:
            # add post to db
            self.db.set_post(int(post_id),
                json.dumps(post))
            # add post id to current hour
            key = 'posts:%d' % (self.stats_last_update)
            self.db.rpush(key, post_id)
    else:
        logging.debug('found english word in %s', text)

Filtering out unrelated messages is key here. For example, “Je vais en Hollande demain” (I am going to Holland tomorrow) is not really about the presidential candidate “Hollande” but more about the country “Holland”. Both are spelled the same way in French. We defined a list of words and rules per person to help filtering out the unrelated posts.

See the full compute module.

High availability

Each element above can be highly-available with the use of an extra server. We can add one more server receiving the tweets in case the active one fails over. We can detect this type of failure using an heartbeat between the active and the stand-by instance. RabbitMQ supports mirror queues. Redis and MySQL supports a master/slave architecture.

Twitter statistics

Performance

During peak traffic (first round results for example), the bottleneck in our system was the Twitter streaming listener. The code reads the length of the post data byte per byte from the stream and then reads the post data using the length value. This is quite CPU intensive and we had to switch from a small instance (1 computing units) on Amazon EC2 to a large one (4 computing units) to read the posts in real-time during traffic peaks.

The messaging system we used (RabbitMQ) can handle way more than what we used it for so no issue on that side.

Here is some comparison between Redis and MySQL when it comes to storing data on a small EC2 instance.

Method:

- MySQL: insert into table for each value, final commit.
- Redis: SET command for each value. Redis persists changes to disk in the background.

Adding 10k posts:

- MySQL: 4.0 seconds.
- Redis: 2.6 seconds – 1.53x faster.

Adding 100k posts:

- MySQL: 42.0 seconds.
- Redis: 23.7 seconds – 1.77x faster.

Statistics

Over 8 millions tweets (8442728) related to the different candidates were analyzed by Pytolab from Sep 1st 2011 to June 1st 2012. Posts referring to at least one candidate were analyzed. This is different than the posts posted by the candidates themselves.

Here are some key dates from the presidential campaign:

  • 1st round of the Socialist primaries: October 9th 2011
  • 2nd round of the Socialist primaries: October 16th 2011
  • 1st round of the presidential election: April 22nd 2012
  • 2nd round of the presidential election: May 6th 2012

The following chart represents the number of posts per day for each candidate. The key dates described above are shown in red.

Twitter statistics

Here is the list of candidates we tracked:

List of candidates:

  • Nathalie Arthaud
  • Martine Aubry
  • Jean-Michel Baylet
  • François Bayrou
  • Christine Boutin
  • Nicolas Dupont Aignan
  • François Hollande
  • Nicolas Hulot
  • Eva Joly
  • Marine Le Pen
  • Jean-Luc Mélenchon
  • Arnaud Montebourg
  • Philippe Poutou
  • Ségolène Royal
  • Nicolas Sarkozy
  • Manuel Valls
  • Dominique Villepin

Here are the number of posts where each candidate’s name appears.

Twitter statistics

We noticed that Nicolas Sarkozy is referred in 41% of all posts we analyzed. François Hollande in 35% of all posts. There is no strong correlation between the number of posts per candidate and their polling result. The candidate with the most posts was the president at that time so it is expected to see those numbers.

Posts count Polls
Nicolas Sarkozy François Hollande
François Hollande Nicolas Sarkozy
François Bayrou Marine Le Pen
Marine Le Pen Jean-Luc Mélenchon
Jean-Luc Mélenchon François Bayrou

We noticed something interesting where the number of posts were matching the polls during the 11 hours preceding the first round results and during the 6 hours preceding the second round results.

Twitter statistics
Twitter statistics

Let’s look at the authors of the posts now. We counted 388628 different authors. 98.3% of authors posted less than 200 posts during those 9 months. That is less than 1 post per day. 0.7% of authors (2720) posted more than 500 posts and posted 45% of all posts.

The top 10 authors in number of posts are:

  • sarkoactu: 26356
  • bayrouactu: 26345
  • Elysee_2012: 21076
  • sarkozy_info: 18868
  • FlashPresse: 16349
  • Scrutin2012: 16229
  • Eric_vds: 14667
  • democrates: 14528
  • akemoi: 14403
  • blabalade: 14119

Here is the distribution of posts per hour for all posts over our period of 9 months:

Twitter statistics

If we look at the number of posts from “sarkoactu”. it is about 96 posts per day. Looking at the distribution of the posts per hour for that author, we notice that it is probably an automatic feed.

Twitter statistics

Looking at the full list of authors and their posts distribution per hour, we found out that 26 authors are probably automatic feeds and that they represent 215783 posts which is 2.5% of all posts.

Location metadata is attached to only 0.5% of all posts. In our case, this represents 40799 posts. There is not much difference between each candidate in regards to the post location. We do notice that the posts are issued mainly from French speaking countries: France, Belgium, Canada, Algeria, Tunisia… It makes sense as we analyzed posts written in French.

Twitter statistics

This Europe map shows that this event is mainly followed in France and little in the rest of Europe. The fact that we tracked the posts written in French contributes to this result.

Twitter statistics

Next, we looked at what other candidates an author talks about when his most talked candidate is person A. Below, you can see that if an author most talked candidate is Nathalie Arthaud then that author also talks about François Hollande, Nicolas Sarkozy and Marine Le Pen.

In 11 on 17 cases, the most talked candidate is Nicolas Sarkozy. Reciprocity is not a rule. When an author talks about Nicolas Hulot, he also talks about Eva Joly (2nd most). The opposite is not true.

  • Nathalie Arthaud
    • François Hollande – 19.2%
    • Nicolas Sarkozy – 18.8%
    • Marine Le Pen – 11.1%
    • Philippe Poutou – 11.1%
    • Eva Joly – 9.2%
  • Martine Aubry
    • François Hollande – 31.4%
    • Nicolas Sarkozy – 19.5%
    • Ségolène Royal – 9.2%
    • Arnaud Montebourg – 8.6%
    • Marine Le Pen – 7.8%
  • Jean-Michel Baylet
    • François Hollande – 21.9%
    • Nicolas Sarkozy – 19.7%
    • Marine Le Pen – 10.1%
    • Arnaud Montebourg – 7.9%
    • Eva Joly – 7.5%
  • François Bayrou
    • François Hollande – 24.2%
    • Nicolas Sarkozy – 23.9%
    • Marine Le Pen – 10.2%
    • Jean-Luc Mélenchon – 8.2%
    • Eva Joly – 6.6%
  • Christine Boutin
    • Nicolas Sarkozy – 31.5%
    • François Hollande – 24.8%
    • Marine Le Pen – 11.2%
    • Ségolène Royal – 6.7%
    • Eva Joly – 4.9%
  • Nicolas Dupont Aignan
    • Nicolas Sarkozy – 24.1%
    • François Hollande – 23.1%
    • Marine Le Pen – 14.9%
    • Jean-Luc Mélenchon – 8.6%
    • Eva Joly – 8.3%
  • François Hollande
    • Nicolas Sarkozy – 32.8%
    • Marine Le Pen – 13.8%
    • Jean-Luc Mélenchon – 7.6%
    • François Bayrou – 7.5%
    • Eva Joly – 6.9%
  • Nicolas Hulot
    • Nicolas Sarkozy – 31.6%
    • Eva Joly – 18.3%
    • François Hollande – 10.7%
    • Marine Le Pen – 10.2%
    • Ségolène Royal – 8.9%
  • Eva Joly
    • Nicolas Sarkozy – 27.4%
    • Marine Le Pen – 15.2%
    • François Hollande – 14.5%
    • Jean-Luc Mélenchon – 7.6%
    • François Bayrou – 5.8%
  • Marine Le Pen
    • Nicolas Sarkozy – 33.6%
    • François Hollande – 19.8%
    • Jean-Luc Mélenchon – 14.0%
    • Eva Joly – 6.1%
    • Ségolène Royal – 5.5%
  • Jean-Luc Mélenchon
    • Nicolas Sarkozy – 23.3%
    • François Hollande – 15.9%
    • Marine Le Pen – 14.7%
    • François Bayrou – 8.8%
    • Eva Joly – 6.4%
  • Arnaud Montebourg
    • Nicolas Sarkozy – 25.2%
    • François Hollande – 15.8%
    • Ségolène Royal – 8.1%
    • Martine Aubry – 7.9%
    • Manuel Valls – 6.5%
  • Philippe Poutou
    • Nicolas Sarkozy – 32.0%
    • François Hollande – 20.6%
    • Marine Le Pen – 12.1%
    • Eva Joly – 6.8%
    • Ségolène Royal – 6.4%
  • Ségolène Royal
    • Nicolas Sarkozy – 32.4%
    • François Hollande – 19.2%
    • Marine Le Pen – 9.4%
    • Martine Aubry – 6.0%
    • Eva Joly – 5.2%
  • Nicolas Sarkozy
    • François Hollande – 21.4%
    • Marine Le Pen – 13.8%
    • François Bayrou – 9.2%
    • Jean-Luc Mélenchon – 8.8%
    • Eva Joly – 7.9%
  • Manuel Valls
    • François Hollande – 24.4%
    • Nicolas Sarkozy – 19.1%
    • Martine Aubry – 8.8%
    • Arnaud Montebourg – 7.9%
    • Marine Le Pen – 7.6%
  • Dominique Villepin
    • Nicolas Sarkozy – 18.9%
    • François Hollande – 16.4%
    • François Bayrou – 11.3%
    • Marine Le Pen – 9.7%
    • Eva Joly – 6.6%

The following graph shows connections between candidates based on number of identical words used when posts are referring to them. Wider the vertex is, more words are in common. An obvious link is Hollande – Sarkozy. Bayrou being in the center politically has two strong links: with Sarkozy and with Hollande. This is also expected. We notice another strong link between Le Pen and Sarkozy. This link makes sense based on some subjects discussed by both candidates.

Twitter statistics

Next is a similar chart but based on posts referring multiple candidates. What is interesting here are the links going across political boundaries. Mélenchon has two major links: one with a candidate on the left and one with a candidate on the right. Joly has two links with candidates on the left and one with a candidate on the right. It makes sense when you know that Joly is more on the left on the political spectrum.

Twitter statistics

Let’s look at the major events for each candidate. As we are tracking the number of posts for each candidate, we can find out the events and what those were about by looking at the most used words in the posts.

The next chart shows that about 22500 posts talked about François Bayrou on March 9th. Looking at the most used words, we can see that the candidate was participating to a TV show called “Des paroles et des actes” also abbreviated “dpda”. “soir” means evening and the TV show takes place in the evening.

Twitter statistics

See the section events in the annexes for the complete list of events for each candidate.

Next is a bar chart showing the number of authors mainly talking about a candidate (80% or more of the posts only related that candidate). We notice a strong presence online of authors mainly talking about François Hollande. We notice 2 others strong presence online: Marine Le Pen and Ségolène Royal.

Twitter statistics

Annexes

Events

Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics
Twitter statistics

Twitter sentiment analysis using Python and NLTK

January 2, 2012

This post describes the implementation of sentiment analysis of tweets using Python and the natural language toolkit NLTK. The post also describes the internals of NLTK related to this implementation.

Background

The purpose of the implementation is to be able to automatically classify a tweet as a positive or negative tweet sentiment wise.

The classifier needs to be trained and to do that, we need a list of manually classified tweets. Let’s start with 5 positive tweets and 5 negative tweets.

Positive tweets:

  • I love this car.
  • This view is amazing.
  • I feel great this morning.
  • I am so excited about the concert.
  • He is my best friend.

Negative tweets:

  • I do not like this car.
  • This view is horrible.
  • I feel tired this morning.
  • I am not looking forward to the concert.
  • He is my enemy.

In the full implementation, I use about 600 positive tweets and 600 negative tweets to train the classifier. I store those tweets in a Redis DB. Even with those numbers, it is quite a small sample and you should use a much larger set if you want good results.

Next is a test set so we can assess the exactitude of the trained classifier.

Test tweets:

  • I feel happy this morning. positive.
  • Larry is my friend. positive.
  • I do not like that man. negative.
  • My house is not great. negative.
  • Your song is annoying. negative.

Implementation

The following list contains the positive tweets:

pos_tweets = [('I love this car', 'positive'),
              ('This view is amazing', 'positive'),
              ('I feel great this morning', 'positive'),
              ('I am so excited about the concert', 'positive'),
              ('He is my best friend', 'positive')]

The following list contains the negative tweets:

neg_tweets = [('I do not like this car', 'negative'),
              ('This view is horrible', 'negative'),
              ('I feel tired this morning', 'negative'),
              ('I am not looking forward to the concert', 'negative'),
              ('He is my enemy', 'negative')]

We take both of those lists and create a single list of tuples each containing two elements. First element is an array containing the words and second element is the type of sentiment. We get rid of the words smaller than 2 characters and we use lowercase for everything.

tweets = []
for (words, sentiment) in pos_tweets + neg_tweets:
    words_filtered = [e.lower() for e in words.split() if len(e) >= 3] 
    tweets.append((words_filtered, sentiment))

The list of tweets now looks like this:

tweets = [
    (['love', 'this', 'car'], 'positive'),
    (['this', 'view', 'amazing'], 'positive'),
    (['feel', 'great', 'this', 'morning'], 'positive'),
    (['excited', 'about', 'the', 'concert'], 'positive'),
    (['best', 'friend'], 'positive'),
    (['not', 'like', 'this', 'car'], 'negative'),
    (['this', 'view', 'horrible'], 'negative'),
    (['feel', 'tired', 'this', 'morning'], 'negative'),
    (['not', 'looking', 'forward', 'the', 'concert'], 'negative'),
    (['enemy'], 'negative')]

Finally, the list with the test tweets:

test_tweets = [
    (['feel', 'happy', 'this', 'morning'], 'positive'),
    (['larry', 'friend'], 'positive'),
    (['not', 'like', 'that', 'man'], 'negative'),
    (['house', 'not', 'great'], 'negative'),
    (['your', 'song', 'annoying'], 'negative')]

Classifier

The list of word features need to be extracted from the tweets. It is a list with every distinct words ordered by frequency of appearance. We use the following function to get the list plus the two helper functions.

word_features = get_word_features(get_words_in_tweets(tweets))
def get_words_in_tweets(tweets):
    all_words = []
    for (words, sentiment) in tweets:
      all_words.extend(words)
    return all_words
def get_word_features(wordlist):
    wordlist = nltk.FreqDist(wordlist)
    word_features = wordlist.keys()
    return word_features

If we take a pick inside the function get_word_features, the variable ‘wordlist’ contains:

<FreqDist:
    'this': 6,
    'car': 2,
    'concert': 2,
    'feel': 2,
    'morning': 2,
    'not': 2,
    'the': 2,
    'view': 2,
    'about': 1,
    'amazing': 1,
    ...
>

We end up with the following list of word features:

word_features = [
    'this',
    'car',
    'concert',
    'feel',
    'morning',
    'not',
    'the',
    'view',
    'about',
    'amazing',
    ...
]

As you can see, ‘this’ is the most used word in our tweets, followed by ‘car’, followed by ‘concert’…

To create a classifier, we need to decide what features are relevant. To do that, we first need a feature extractor. The one we are going to use returns a dictionary indicating what words are contained in the input passed. Here, the input is the tweet. We use the word features list defined above along with the input to create the dictionary.

def extract_features(document):
    document_words = set(document)
    features = {}
    for word in word_features:
        features['contains(%s)' % word] = (word in document_words)
    return features

As an example, let’s call the feature extractor with the document ['love', 'this', 'car'] which is the first positive tweet. We obtain the following dictionary which indicates that the document contains the words: ‘love’, ‘this’ and ‘car’.

{'contains(not)': False,
 'contains(view)': False,
 'contains(best)': False,
 'contains(excited)': False,
 'contains(morning)': False,
 'contains(about)': False,
 'contains(horrible)': False,
 'contains(like)': False,
 'contains(this)': True,
 'contains(friend)': False,
 'contains(concert)': False,
 'contains(feel)': False,
 'contains(love)': True,
 'contains(looking)': False,
 'contains(tired)': False,
 'contains(forward)': False,
 'contains(car)': True,
 'contains(the)': False,
 'contains(amazing)': False,
 'contains(enemy)': False,
 'contains(great)': False}

With our feature extractor, we can apply the features to our classifier using the method apply_features. We pass the feature extractor along with the tweets list defined above.

training_set = nltk.classify.apply_features(extract_features, tweets)

The variable ‘training_set’ contains the labeled feature sets. It is a list of tuples which each tuple containing the feature dictionary and the sentiment string for each tweet. The sentiment string is also called ‘label’.

[({'contains(not)': False,
   ...
   'contains(this)': True,
   ...
   'contains(love)': True,
   ...
   'contains(car)': True,
   ...
   'contains(great)': False},
  'positive'),
 ({'contains(not)': False,
   'contains(view)': True,
   ...
   'contains(this)': True,
   ...
   'contains(amazing)': True,
   ...
   'contains(enemy)': False,
   'contains(great)': False},
  'positive'),
  ...]

Now that we have our training set, we can train our classifier.

classifier = nltk.NaiveBayesClassifier.train(training_set)

Here is a summary of what we just saw:

Twitter sentiment analysis with Python and NLTK

The Naive Bayes classifier uses the prior probability of each label which is the frequency of each label in the training set, and the contribution from each feature. In our case, the frequency of each label is the same for ‘positive’ and ‘negative’. The word ‘amazing’ appears in 1 of 5 of the positive tweets and none of the negative tweets. This means that the likelihood of the ‘positive’ label will be multiplied by 0.2 when this word is seen as part of the input.

Let’s take a look inside the classifier train method in the source code of the NLTK library. ‘label_probdist’ is the prior probability of each label and ‘feature_probdist’ is the feature/value probability dictionary. Those two probability objects are used to create the classifier.

def train(labeled_featuresets, estimator=ELEProbDist):
    ...
    # Create the P(label) distribution
    label_probdist = estimator(label_freqdist)
    ...
    # Create the P(fval|label, fname) distribution
    feature_probdist = {}
    ...
    return NaiveBayesClassifier(label_probdist, feature_probdist)

In our case, the probability of each label is 0.5 as we can see below. label_probdist is of type ELEProbDist.

print label_probdist.prob('positive')
0.5
print label_probdist.prob('negative')
0.5

The feature/value probability dictionary associates expected likelihood estimate to a feature and label. We can see that the probability for the input to be negative is about 0.077 when the input contains the word ‘best’.

print feature_probdist
{('negative', 'contains(view)'): <ELEProbDist based on 5 samples>,
 ('positive', 'contains(excited)'): <ELEProbDist based on 5 samples>,
 ('negative', 'contains(best)'): <ELEProbDist based on 5 samples>, ...}
print feature_probdist[('negative', 'contains(best)')].prob(True)
0.076923076923076927

We can display the most informative features for our classifier using the method show_most_informative_features. Here, we see that if the input does not contain the word ‘not’ then the positive ration is 1.6.

print classifier.show_most_informative_features(32)
Most Informative Features
           contains(not) = False          positi : negati =      1.6 : 1.0
         contains(tired) = False          positi : negati =      1.2 : 1.0
       contains(excited) = False          negati : positi =      1.2 : 1.0
         contains(great) = False          negati : positi =      1.2 : 1.0
       contains(looking) = False          positi : negati =      1.2 : 1.0
          contains(like) = False          positi : negati =      1.2 : 1.0
          contains(love) = False          negati : positi =      1.2 : 1.0
       contains(amazing) = False          negati : positi =      1.2 : 1.0
         contains(enemy) = False          positi : negati =      1.2 : 1.0
         contains(about) = False          negati : positi =      1.2 : 1.0
          contains(best) = False          negati : positi =      1.2 : 1.0
       contains(forward) = False          positi : negati =      1.2 : 1.0
        contains(friend) = False          negati : positi =      1.2 : 1.0
      contains(horrible) = False          positi : negati =      1.2 : 1.0
...

Classify

Now that we have our classifier initialized, we can try to classify a tweet and see what the sentiment type output is. Our classifier is able to detect that this tweet has a positive sentiment because of the word ‘friend’ which is associated to the positive tweet ‘He is my best friend’.

tweet = 'Larry is my friend'
print classifier.classify(extract_features(tweet.split()))
positive

Let’s take a look at how the classify method works internally in the NLTK library. What we pass to the classify method is the feature set of the tweet we want to analyze. The feature set dictionary indicates that the tweet contains the word ‘friend’.

print extract_features(tweet.split())
{'contains(not)': False,
 'contains(view)': False,
 'contains(best)': False,
 'contains(excited)': False,
 'contains(morning)': False,
 'contains(about)': False,
 'contains(horrible)': False,
 'contains(like)': False,
 'contains(this)': False,
 'contains(friend)': True,
 'contains(concert)': False,
 'contains(feel)': False,
 'contains(love)': False,
 'contains(looking)': False,
 'contains(tired)': False,
 'contains(forward)': False,
 'contains(car)': False,
 'contains(the)': False,
 'contains(amazing)': False,
 'contains(enemy)': False,
 'contains(great)': False}
def classify(self, featureset):
    # Discard any feature names that we've never seen before.
    # Find the log probability of each label, given the features.
    # Then add in the log probability of features given labels.
    # Generate a probability distribution dictionary using the dict logprod
    # Return the sample with the greatest probability from the probability
    # distribution dictionary

Let’s go through that method using our example. The parameter passed to the method classify is the feature set dictionary we saw above. The first step is to discard any feature names that are not know by the classifier. This step does nothing in our case so the feature set stays the same.

Next step is to find the log probability for each label. The probability of each label (‘positive’ and ‘negative’) is 0.5. The log probability is the log base 2 of that which is -1. We end up with logprod containing the following:

{'positive': -1.0, 'negative': -1.0}

The log probability of features given labels is then added to logprod. This means that for each label, we go through the items in the feature set and we add the log probability of each item to logprod[label]. For example, we have the feature name ‘friend’ and the feature value True. Its log probability for the label ‘positive’ in our classifier is -2.12. This value is added to logprod['positive']. We end up with the following logprod dictionary.

{'positive': -5.4785441837188511, 'negative': -14.784261334886439}

The probability distribution dictionary of type DictionaryProbDist is generated:

DictionaryProbDist(logprob, normalize=True, log=True)

The label with the greatest probability is returned which is ‘positive’. Our classifier finds out that this tweets has a positive sentiment based on the training we did.

Another example is the tweet ‘My house is not great’. The word ‘great’ weights more on the positive side but the word ‘not’ is part of two negative tweets in our training set so the output from the classifier is ‘negative’. Of course, the following tweet: ‘The movie is not bad’ would return ‘negative’ even if it is ‘positive’. Again, a large and well chosen sample will help with the accuracy of the classifier.

Taking the following test tweet ‘Your song is annoying’. The classifier thinks it is positive. The reason is that we don’t have any information on the feature name ‘annoying’. Larger the training sample tweets is, better the classifier will be.

tweet = 'Your song is annoying'
print classifier.classify(extract_features(tweet.split()))
positive

There is an accuracy method we can use to check the quality of our classifier by using our test tweets. We get 0.8 in our case which is high because we picked our test tweets for this article. The key is to have a very large number of manually classified positive and negative tweets.

Voilà. Don’t hesitate to post a comment if you have any feedback.

Python string objects implementation

June 19, 2011

This article describes how string objects are managed by Python internally and how string search is done.

PyStringObject structure
New string object
Sharing string objects
String search

PyStringObject structure

A string object in Python is represented internally by the structure PyStringObject. “ob_shash” is the hash of the string if calculated. “ob_sval” contains the string of size “ob_size”. The string is null terminated. The initial size of “ob_sval” is 1 byte and ob_sval[0] = 0. If you are wondering where “ob_size is defined”, take a look at PyObject_VAR_HEAD in object.h. “ob_sstate” indicates if the string object is in the interned dictionary which we are going to see later.

typedef struct {
    PyObject_VAR_HEAD
    long ob_shash;
    int ob_sstate;
    char ob_sval[1];
} PyStringObject;

New string object

What happens when you assign a new string to a variable like this one?

>>> s1 = 'abc'

The internal C function “PyString_FromString” is called and the pseudo code looks like this:

arguments: string object: 'abc'
returns: Python string object with ob_sval = 'abc'
PyString_FromString(string):
    size = length of string
    allocate string object + size for 'abc'. ob_sval will be of size: size + 1
    copy string to ob_sval
    return object

Each time a new string is used, a new string object is allocated.

Sharing string objects

There is a neat feature where small strings are shared between variables. This reduces the amount of memory used. Small strings are strings of size 0 or 1 byte. The global variable “interned” is a dictionary referencing those small strings. The array “characters” is also used to reference the strings of length 1 byte: i.e. single characters. We will see later how the array “characters” is used.

static PyStringObject *characters[UCHAR_MAX + 1];
static PyObject *interned;

Let’s see what happens when a new small string is assigned to a variable in your Python script.

>>> s2 = 'a'

The string object containing ‘a’ is added to the dictionary “interned”. The key is a pointer to the string object and the value is the same pointer. This new string object is also referenced in the array characters at the offset 97 because value of ‘a’ is 97 in ASCII. The variable “s2″ is pointing to this string object.

Python string object internals

What happens when a different variable is assigned to the same string ‘a’?

>>> s3 = 'a'

The same string object previously created is returned so both variables are pointing to the same string object. The “characters” array is used during that process to check if the string already exists and returns the pointer to the string object.

if (size == 1 && (op = characters[*str & UCHAR_MAX]) != NULL)
{
    ...
    return (PyObject *)op;
}

Python string object internals

Let’s create a new small string containing the character ‘c’.

>>> s4 = 'c'

We end up with the following:

Python string object internals

We also find the “characters” array at use when a string’s item is requested like in the following Python script:

>>> s5 = 'abc'
>>> c = s5[0]
>>> c
>>> 'a'

Instead of creating a new string containing ‘a’, the pointer at the offset 97 of the “characters” array is returned. Here is the code of the function “string_item” which is called when we request a character from a string. The argument “a” is the string object containing ‘abc’ and the argument “i” is the index requested: 0 in our case. A pointer to a string object is returned.

static PyObject *
string_item(PyStringObject *a, register Py_ssize_t i)
{
    char pchar;
    PyObject *v;
    ...
    pchar = a->ob_sval[i];
    v = (PyObject *)characters[pchar & UCHAR_MAX];
    if (v == NULL)
        // allocate string
    else {
        ...
        Py_INCREF(v);
    }
    return v;
}

String search

Let’s take a look at what happens when you perform a string search like in the following Python code:

>>> s = 'adcabcdbdabcabd'
>>> s.find('abcab')
>>> 11 

The “find” function returns the index where the string ‘abcd’ is found in the string “s”. It returns -1 if the string is not found.

So, what happens internally? The function “fastsearch” is called. It is a mix between Boyer-Moore and Horspool algorithms plus couple of neat tricks.

Let’s call “s” the string to search in and “p” the string to search for. s = ‘adcabcdbdabcabd’ and p = ‘abcab’. “n” is the length of “s” and “m” is the length of “p”. n = 18 and m = 5.

The first check in the code is obvious, if m > n then we know that we won’t be able to find the index so the function returns -1 right away as we can see in the following code:

w = n - m;
if (w < 0)
    return -1;

When m = 1, the code goes through “s” one character at a time and returns the index when there is a match. mode = FAST_SEARCH in our case as we are looking for the index where the string is found first and not the number of times the string if found.

if (m <= 1) {
    ...
    if (mode == FAST_COUNT) {
        ...
    } else {
        for (i = 0; i < n; i++)
            if (s[i] == p[0])
                return i;
    }
    return -1;
}

For other cases i.e. m > 1. The first step is to create a compressed boyer-moore delta 1 table. Two variables will be assigned during that step: “mask” and “skip”.

“mask” is a 32-bit bitmask, using the 5 least significant bits of the character as the key. It is generated using the string to search “p”. It is a bloom filter which is used to test if a character is present in this string. It is really fast but there are false positives. You can read more about bloom filters here. This is how the bitmask is generated in our case:

mlast = m - 1
/* process pattern[:-1] */
for (mask = i = 0; i < mlast; i++) {
    mask |= (1 << (p[i] & 0x1F));
}
/* process pattern[-1] outside the loop */
mask |= (1 << (p[mlast] & 0x1F));

First character of “p” is ‘a’. Value of ‘a’ is 97 = 1100001 in binary format. Using the 5 least significants bits, we get 00001 so “mask” is first set to: 1 << 1 = 10. Once the entire string "p" is processed, mask = 1110.

How do we use this bitmask? By using the following test where "c" is the character to look for in the string "p".

if ((mask & (1 << (c & 0x1F))))

Is 'a' in "p" where p = 'abcab'? Is 1110 & (1 << ('a' & 0X1F)) true? 1110 & (1 << ('a' & 0X1F)) = 1110 & 10 = 10. So, yes 'a' is in 'abcab'. If we test with 'd', we get false and also with the characters from 'e' to 'z' so this filter works pretty well in our case.

"skip" is set to the index of the character with the same value as the last character in the string to search for. "skip" is set to the length of "p" - 1 if the last character is not found. The last character in the string to search for is 'b' which means "skip" will be set to 2 because this character can also be found by skipping over 2 characters down. This variable is used in a skip method called the bad-character skip method.

In the following example: p = 'abcab' and s = 'adcabcaba'. The search starts at index 4 of "s" and checks backward if there is a string match. This first test fails at index = 1 where 'b' is different than 'd'. We know that the character 'b' in "p" is also found 3 characters down starting from the end. Because 'c' is part of "p", we skip to the following 'b'. This is the bad-character skip.

Python string object internals

Next is the search loop itself (real code is in C instead of Python):

for i = 0 to n - m = 13:
    if s[i+m-1] == p[m-1]:
        if s[i:i+mlast] == p[0:mlast]:
            return i
        if s[i+m] not in p:
            i += m
        else:
            i += skip
    else:
        if s[i+m] not in p:
            i += m
return -1

The test "s[i+m] not in p" is done using the bitmask. "i += skip" is the bad-character skip. "i += m" is done when the next character is not found in "p".

Let's see how this search algorithm works with our strings "p" and "s". The first 3 steps are familiar. After that, the character 'd' is not in the string "p" so we skip the length of "p" and quickly find a match after that.

Python string object internals

You can take a look at the code for the string objects here.

That's it for now. I hope you enjoyed the article. Please write a comment if you have any feedback. If you need help with a project written in Python or with building a new web service, I am available as a freelancer: LinkedIn profile. Follow me on Twitter @laurentluce.

Python integer objects implementation

May 15, 2011

This article describes how integer objects are managed by Python internally.

An integer object in Python is represented internally by the structure PyIntObject. Its value is an attribute of type long.

typedef struct {
    PyObject_HEAD
    long ob_ival;
} PyIntObject;

To avoid allocating a new integer object each time a new integer object is needed, Python allocates a block of free unused integer objects in advance.

The following structure is used by Python to allocate integer objects, also called PyIntObjects. Once this structure is initialized, the integer objects are ready to be used when new integer values are assigned to objects in a Python script. This structure is called “PyIntBlock” and is defined as:

struct _intblock {
    struct _intblock *next;
    PyIntObject objects[N_INTOBJECTS];
};
typedef struct _intblock PyIntBlock;

When a block of integer objects is allocated by Python, the objects have no value assigned to them yet. We call them free integer objects ready to be used. A value will be assigned to the next free object when a new integer value is used in your program. No memory allocation will be required when a free integer object’s value is set so it will be fast.

The integer objects inside the block are linked together back to front using their internal pointer called ob_type. As noted in the source code, this is an abuse of this internal pointer so do not pay too much attention to the name.

Each block of integers contains the number of integer objects which can fit in a block of 1K, about 250 integer objects. When all the integer objects inside a block are used, a new block is allocated with a new list of integer objects available.

A singly-linked list is used to keep track of the integers blocks allocated. It is called “block_list” internally.

Python integer object internals

A specific structure is used to refer small integers and share them so access is fast. It is an array of 262 pointers to integer objects. Those integer objects are allocated during initialization in a block of integer objects we saw above. The small integers range is from -5 to 257. Many Python programs spend a lot of time using integers in that range so this is a smart decision.

#define NSMALLPOSINTS           257
#define NSMALLNEGINTS           5
static PyIntObject *small_ints[NSMALLNEGINTS + NSMALLPOSINTS];

Python integer object internals

The integer object representing the integer -5 is at the offset 0 inside the small integers array. The integers object representing -4 is at offset 1 …

What happens when an integer is defined in a Python script like this one?

>>> a=1
>>> a
1

When you execute the first line, the function PyInt_FromLong is called and its logic is the following:

if integer value in range -5,257:
    return the integer object pointed by the small integers array at the 
    offset (value + 5).
else:
    if no free integer object available:
        allocate new block of integer objects 
    set value of the next free integer object in the current block 
    of integers.
    return integer object

With our example: integer 1 object is pointed by the small integers array at offset: 1+5 = 6. A pointer to this integer object will be returned and the variable “a” will be pointing to that integer object.

Python integer object internals

Let’s a look at a different example:

>>> a=300
>>> a
300

300 is not in the range of the small integers array so the next free integer object’s value is set to 300.

Python integer object internals

If you take a look at the file intobject.c in the Python 2.6 source code, you will see a long list of functions taking care of operations like addition, multiplication, conversion… The comparison function looks like this:

static int
int_compare(PyIntObject *v, PyIntObject *w)
{
    register long i = v->ob_ival;
    register long j = w->ob_ival;
    return (i < j) ? -1 : (i > j) ? 1 : 0;
}

The value of an integer object is stored in its ob_ival attribute which is of type long. Each value is placed in a register to optimize access and the comparison is done between those 2 registers. -1 is returned if the integer object pointed by v is less than the one pointed by w. 1 is returned for the opposite and 0 is returned if they are equal.

That’s it for now. I hope you enjoyed the article. Please write a comment if you have any feedback.

Python and cryptography with pycrypto

April 22, 2011

We are going to talk about the toolkit pycrypto and how it can help us speed up development when cryptography is involved.

Hash functions
Encryption algorithms
Public-key algorithms

Hash functions

A hash function takes a string and produces a fixed-length string based on the input. The output string is called the hash value. Ideal hash functions obey the following:

  • It should be very difficult to guess the input string based on the output string.
  • It should be very difficult to find 2 different input strings having the same hash output.
  • It should be very difficult to modify the input string without modifying the output hash value.

Cryptography and Python

Hash functions can be used to calculate the checksum of some data. It can be used in digital signatures and authentication. We will see some applications in details later on.

Let’s look at one example of a hash function: MD5.

MD5

This hash function dates back from 1991. The hash output value is 128-bit.

The algorithm’s steps are as follow:

  • Pad the string to a length congruent to 448 bits, modulo 512.
  • Append a 64-bit representation of the length of the input string.
  • Process the message in 16-word blocks. There are 4 rounds instead of 3 compared to MD4.

You can get more details regarding the algorithm here.

Hashing a value using MD5 is done this way:

>>> from Crypto.Hash import MD5
>>> MD5.new('abc').hexdigest()
'900150983cd24fb0d6963f7d28e17f72'

It is important to know that the MD5 is vulnerable to collision attacks. A collision attack is when 2 different inputs result in the same hash output. It is also vulnerable to some preimage attacks found in 2004 and 2008. A preimage attack is: given a hash h, you can find a message m where hash(m) = h.

Applications

Hash functions can be used in password management and storage. Web sites usually store the hash of a password and not the password itself so only the user knows the real password. When the user logs in, the hash of the password input is generated and compared to the hash value stored in the database. If it matches, the user is granted access. The code looks like this:

from Crypto.Hash import MD5
def check_password(clear_password, password_hash):
    return MD5.new(clear_password).hexdigest() == password_hash

It is recommended to use a module like py-bcrypt to hash passwords as it is more secure than using MD5.

Another application is file integrity checking. Many downloadable files include a MD5 checksum to verify the integrity of the file once downloaded. Here is the code to calculate the MD5 checksum of a file. We work on chunks to avoid using too much memory when the file is large.

import os
from Crypto.Hash import MD5
def get_file_checksum(filename):
    h = MD5.new()
    chunk_size = 8192 
    with open(filename, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if len(chunk) == 0:
                break
            h.update(chunk)
    return h.hexdigest()

Hash functions comparison

Hash function Hash output size (bits) Secure?
MD2 128 No
MD4 128 No
MD5 128 No
SHA-1 160 No
SHA-256 256 Yes

Encryption algorithms

Encryption algorithms take some text as input and produce ciphertext using a variable key. You have 2 types of ciphers: block and stream. Block ciphers work on blocks of a fixed size (8 or 16 bytes). Stream ciphers work byte-by-byte. Knowing the key, you can decrypt the ciphertext.

Block ciphers

Let’s look at one of the block cipher: DES. The key size used by this cipher is 8 bytes and the block of data it works with is 8 bytes long. The simplest mode for this block cipher is the electronic code book mode where each block is encrypted independently to form the encrypted text.

Cryptography and Python

It is easy to encrypt text using DES/ECB with pycrypto. The key ’10234567′ is 8 bytes and the text’s length needs to be a multiple of 8 bytes. We picked ‘abcdefgh’ in this example.

>>> from Crypto.Cipher import DES
>>> des = DES.new('01234567', DES.MODE_ECB)
>>> text = 'abcdefgh'
>>> cipher_text = des.encrypt(text)
>>> cipher_text
'\xec\xc2\x9e\xd9] a\xd0'
>>> des.decrypt(cipher_text)
'abcdefgh'

A stronger mode is CFB (Cipher feedback) which combines the plain block with the previous cipher block before encrypting it.

Cryptography and Python

Here is how to use DES CFB mode. The plain text is 16 bytes long (multiple of 8 bytes). We need to specify an initial feedback value: we use a random string 8 bytes long, same size as the block size. It is better to use a random string for each new encryption to avoid chosen-ciphertext attacks. Note how we use 2 DES objects, 1 to encrypt and 1 to decrypt. This is required because of the feedback value getting modified each time a block is encrypted.

>>> from Crypto.Cipher import DES
>>> from Crypto import Random
>>> iv = Random.get_random_bytes(8)
>>> des1 = DES.new('01234567', DES.MODE_CFB, iv)
>>> des2 = DES.new('01234567', DES.MODE_CFB, iv)
>>> text = 'abcdefghijklmnop'
>>> cipher_text = des1.encrypt(text)
>>> cipher_text
"?\\\x8e\x86\xeb\xab\x8b\x97'\xa1W\xde\x89!\xc3d"
>>> des2.decrypt(cipher_text)
'abcdefghijklmnop'

Stream ciphers

Those algorithms work on a byte-by-byte basis. The block size is always 1 byte. 2 algorithms are supported by pycrypto: ARC4 and XOR. Only one mode is available: ECB.

Let’s look at an example with the algorithm ARC4 using the key ’01234567′.

>>> from Crypto.Cipher import ARC4
>>> obj1 = ARC4.new('01234567')
>>> obj2 = ARC4.new('01234567')
>>> text = 'abcdefghijklmnop'
>>> cipher_text = obj1.encrypt(text)
>>> cipher_text
'\xf0\xb7\x90{#ABXY9\xd06\x9f\xc0\x8c '
>>> obj2.decrypt(cipher_text)
'abcdefghijklmnop'

Applications

It is easy to write code to encrypt and decrypt a file using pycrypto ciphers. Let’s do it using DES3 (Triple DES). We encrypt and decrypt data by chunks to avoid using too much memory when the file is large. In case the chunk is less than 16 bytes long, we pad it before encrypting it.

import os
from Crypto.Cipher import DES3

def encrypt_file(in_filename, out_filename, chunk_size, key, iv):
    des3 = DES3.new(key, DES3.MODE_CFB, iv)

    with open(in_filename, 'r') as in_file:
        with open(out_filename, 'w') as out_file:
            while True:
                chunk = in_file.read(chunk_size)
                if len(chunk) == 0:
                    break
                elif len(chunk) % 16 != 0:
                    chunk += ' ' * (16 - len(chunk) % 16)
                out_file.write(des3.encrypt(chunk))

def decrypt_file(in_filename, out_filename, chunk_size, key, iv):
    des3 = DES3.new(key, DES3.MODE_CFB, iv)

    with open(in_filename, 'r') as in_file:
        with open(out_filename, 'w') as out_file:
            while True:
                chunk = in_file.read(chunk_size)
                if len(chunk) == 0:
                    break
                out_file.write(des3.decrypt(chunk))

Next is a usage example of the 2 functions defined above:

from Crypto import Random
iv = Random.get_random_bytes(8)
with open('to_enc.txt', 'r') as f:
    print 'to_enc.txt: %s' % f.read()
encrypt_file('to_enc.txt', 'to_enc.enc', 8192, key, iv)
with open('to_enc.enc', 'r') as f:
    print 'to_enc.enc: %s' % f.read()
decrypt_file('to_enc.enc', 'to_enc.dec', 8192, key, iv)
with open('to_enc.dec', 'r') as f:
    print 'to_enc.dec: %s' % f.read()

The output of this script:

to_enc.txt: this content needs to be encrypted.

to_enc.enc: ??~?E??.??]!=)??"t?
                                JpDw???R?UN0?=??R?UN0?}0r?FV9
to_enc.dec: this content needs to be encrypted.

Public-key algorithms

One disadvantage with the encryption algorithms seen above is that both sides need to know the key. With public-key algorithms, there are 2 different keys: 1 to encrypt and 1 to decrypt. You only need to share the encryption key and only you, can decrypt the message with your private decryption key.

Cryptography and Python

Public/private key pair

It is easy to generate a private/public key pair with pycrypto. We need to specify the size of the key in bits: we picked 1024 bits. Larger is more secure. We also need to specify a random number generator function, we use the Random module of pycrypto for that.

>>> from Crypto.PublicKey import RSA
>>> from Crypto import Random
>>> random_generator = Random.new().read
>>> key = RSA.generate(1024, random_generator)
>>> key
<_RSAobj @0x7f60cf1b57e8 n(1024),e,d,p,q,u,private>

Let’s take a look at some methods supported by this key object. can_encrypt() checks the capability of encrypting data using this algorithm. can_sign() checks the capability of signing messages. has_private() returns True if the private key is present in the object.

>>> key.can_encrypt()
True
>>> key.can_sign()
True
>>> key.has_private()
True

Encrypt

Now that we have our key pair, we can encrypt some data. First, we extract the public key from the key pair and use it to encrypt some data. 32 is a random parameter used by the RSA algorithm to encrypt the data. This step simulates us publishing the encryption key and someone using it to encrypt some data before sending it to us.

>>> public_key = key.publickey()
>>> enc_data = public_key.encrypt('abcdefgh', 32)
>>> enc_data
('\x11\x86\x8b\xfa\x82\xdf\xe3sN ~@\xdbP\x85
\x93\xe6\xb9\xe9\x95I\xa7\xadQ\x08\xe5\xc8$9\x81K\xa0\xb5\xee\x1e\xb5r
\x9bH)\xd8\xeb\x03\xf3\x86\xb5\x03\xfd\x97\xe6%\x9e\xf7\x11=\xa1Y<\xdc
\x94\xf0\x7f7@\x9c\x02suc\xcc\xc2j\x0c\xce\x92\x8d\xdc\x00uL\xd6.
\x84~/\xed\xd7\xc5\xbe\xd2\x98\xec\xe4\xda\xd1L\rM`\x88\x13V\xe1M\n X
\xce\x13 \xaf\x10|\x80\x0e\x14\xbc\x14\x1ec\xf6Rs\xbb\x93\x06\xbe',)

Decrypt

We have the private decryption key so it is easy to decrypt the data.

>>> key.decrypt(enc_data)
'abcdefgh'

Sign

Signing a message can be useful to check the author of a message and make sure we can trust its origin. Next is an example on how to sign a message. The hash for this message is calculated first and then passed to the sign() method of the RSA key. You can use other algorithms like DSA or ElGamal.

>>> from Crypto.Hash import MD5
>>> from Crypto.PublicKey import RSA
>>> from Crypto import Random
>>> key = RSA.generate(1024, random_generator)
>>> text = 'abcdefgh'
>>> hash = MD5.new(text).digest()
>>> hash
'\xe8\xdc@\x81\xb144\xb4Q\x89\xa7 \xb7{h\x18'
>>> signature = key.sign(hash, '')
>>> signature
(1549358700992033008647390368952919655009213441715588267926189797
14352832388210003027089995136141364041133696073722879839526120115
25996986614087200336035744524518268136542404418603981729787438986
50177007820700181992412437228386361134849096112920177007759309019
6400328917297225219942913552938646767912958849053L,)

Verify

Knowing the public key, it is easy to verify a message. The plain text is sent to the user along with the signature. The receiving side calculates the hash value and then uses the public key verify() method to validate its origin.

>>> text = 'abcdefgh'
>>> hash = MD5.new(text).digest()
>>> public_key.verify(hash, signature)
True

That’s it for now. I hope you enjoyed the article. Please write a comment if you have any feedback.

Python list implementation

March 10, 2011

This post describes how lists are implemented in the Python language.

Lists in Python are powerful and it is interesting to see how they are implemented internally.

Following is a simple Python script appending some integers to a list and printing them.

>>> l = []
>>> l.append(1)
>>> l.append(2)
>>> l.append(3)
>>> l
[1, 2, 3]
>>> for e in l:
...   print e
... 
1
2
3

As you can see, lists are iterable.

List object C structure

A list object in Python is represented by the following C structure. ob_item is a list of pointers to the list elements. allocated is the number of slots allocated in memory.

typedef struct {
    PyObject_VAR_HEAD
    PyObject **ob_item;
    Py_ssize_t allocated;
} PyListObject;

List initialization

Let’s look at what happens when we initialize an empty list. e.g. l = [].

arguments: size of the list = 0
returns: list object = []
PyListNew:
    nbytes = size * size of global Python object = 0
    allocate new list object
    allocate list of pointers (ob_item) of size nbytes = 0
    clear ob_item
    set list's allocated var to 0 = 0 slots
    return list object 

It is important to notice the difference between allocated slots and the size of the list. The size of a list is the same as len(l). The number of allocated slots is what has been allocated in memory. Often, you will see that allocated can be greater than size. This is to avoid needing calling realloc each time a new elements is appended to the list. We will see more about that later.

Append

We append an integer to the list: l.append(1). What happens? The internal C function app1() is called:

arguments: list object, new element
returns: 0 if OK, -1 if not
app1:
    n = size of list
    call list_resize() to resize the list to size n+1 = 0 + 1 = 1
    list[n] = list[0] = new element
    return 0

Let’s look at list_resize(). It over-allocates memory to avoid calling list_resize too many time. The growth pattern of the list is: 0, 4, 8, 16, 25, 35, 46, 58, 72, 88, …

arguments: list object, new size
returns: 0 if OK, -1 if not
list_resize:
    new_allocated = (newsize >> 3) + (newsize < 9 ? 3 : 6) = 3
    new_allocated += newsize = 3 + 1 = 4
    resize ob_item (list of pointers) to size new_allocated
    return 0

4 slots are now allocated to contain elements and the first one is the integer 1. You can see on the following diagram that l[0] points to the integer object that we just appended. The dashed squares represent the slots allocated but not used yet.

Append operation complexity is O(1).

Python lists

We continue by adding one more element: l.append(2). list_resize is called with n+1 = 2 but because the allocated size is 4, there is no need to allocate more memory. Same thing happens when we add 2 more integers: l.append(3), l.append(4). The following diagram shows what we have so far.

Python lists

Insert

Let’s insert a new integer (5) at position 1: l.insert(1,5) and look at what happens internally. ins1() is called:

arguments: list object, where, new element
returns: 0 if OK, -1 if not
ins1:
    resize list to size n+1 = 5 -> 4 more slots will be allocated
    starting at the last element up to the offset where, right shift each element 
    set new element at offset where
    return 0

Python lists

The dashed squares represent the slots allocated but not used yet. Here, 8 slots are allocated but the size or length of the list is only 5.

Insert operation complexity is O(n).

Pop

When you pop the last element: l.pop(), listpop() is called. list_resize is called inside listpop() and if the new size is less than half of the allocated size then the list is shrunk.

arguments: list object
returns: element popped
listpop:
    if list empty:
        return null
    resize list with size 5 - 1 = 4. 4 is not less than 8/2 so no shrinkage
    set list object size to 4
    return last element

Pop operation complexity is O(1).

Python lists

You can observe that slot 4 still points to the integer but the important thing is the size of the list which is now 4.

Let’s pop one more element. In list_resize(), size – 1 = 4 – 1 = 3 is less than half of the allocated slots so the list is shrunk to 6 slots and the new size of the list is now 3.

You can observe that slot 3 and 4 still point to some integers but the important thing is the size of the list which is now 3.

Python lists

Remove

Python list object has a method to remove a specific element: l.remove(5). listremove() is called.

arguments: list object, element to remove
returns none if OK, null if not
listremove:
    loop through each list element:
        if correct element:
            slice list between element's slot and element's slot + 1
            return none
    return null

To slice the list and remove the element, list_ass_slice() is called and it is interesting to see how it works. Here, low offset is 1 and high offset is 2 as we are removing the element 5 at position 1.

arguments: list object, low offset, high offset
returns: 0 if OK
list_ass_slice:
    copy integer 5 to recycle list to dereference it
    shift elements from slot 2 to slot 1
    resize list to 5 slots
    return 0

Remove operation complexity is O(n).

Python lists

Sort

We are going to use a list with 130 elements to see how the sorting algorithm works. Anything below 64 elements is quickly sorted using a binary sort insertion so it is not as interesting to look at. Once you have more than 64 elements, merging is involved and it gives us a better overview of the algorithm.

Let’s create a list with 130 elements from 0 to 129. We shuffle it to make the sort worth it.

>>> import random
>>> l = [n for n in range(130)]
>>> random.shuffle(l)
>>> l
[107, 44, 97, 121, 26, 11, 24, 100, 79, 19, 109, 7, 52, 93, 70, 94, 124, 117, 92, 32, 115, 83, 9, 112, 84, 22, 65, 95, 89, 74, 64, 23, 101, 68, 119, 127, 90, 80, 91, 75, 4, 20, 114, 16, 103, 34, 96, 125, 47, 77, 81, 3, 30, 14, 25, 29, 104, 102, 98, 69, 78, 60, 33, 12, 31, 37, 76, 10, 5, 105, 35, 48, 85, 106, 63, 71, 54, 39, 8, 6, 62, 67, 42, 72, 118, 116, 27, 46, 38, 99, 126, 40, 28, 113, 43, 41, 59, 2, 56, 61, 88, 18, 45, 128, 58, 73, 1, 13, 129, 49, 0, 82, 123, 111, 57, 86, 110, 51, 15, 36, 120, 108, 66, 55, 53, 87, 122, 17, 21, 50]

First step is to find natural runs: e.g. l[i] <= l[i+1] <= ... or l[i] > l[i+1] > … Because we are using a very random list, natural runs are going to be short. Natural runs need to have a minimum length dictated by the length of the list. merge_compute_minrun() returns the minimum length of a run based on the length of the list. In our case, the minimum run is equal to 33. Any natural run of length less than 33 will be boosted by using a binary sort insertion to end up with a sorted run of length 33.

l[0] >= l[1] and then it breaks as l[2] > l[1] so our first run is 2. Not very good but we knew this will be the case as we are using random.shuffle() to shuffle our list. In real life, natural runs occur more often and it makes this sort much more efficient than in our very random example.

First natural run has a length of 2 so we boost it to length 33 using a binary sort insertion. Once this is done, the merge state structure is used to keep track of the different runs. First run starts at offset 0 and has a length of 33. It looks like this: [7, 9, 11, 19, 22, 23, 24, 26, 32, 44, 52, 64, 65, 70, 74, 79, 83, 84, 89, 92, 93, 94, 95, 97, 100, 101, 107, 109, 112, 115, 117, 121, 124].

We are now looking at l[33] and after. The natural run is only of length 3: l[33] < l[34] < l[35] so binary sort insertion is used here again and the second run looks like this after sorting: [3, 4, 12, 14, 16, 20, 25, 29, 30, 31, 33, 34, 37, 47, 60, 68, 69, 75, 77, 78, 80, 81, 90, 91, 96, 98, 102, 103, 104, 114, 119, 125, 127].

We continue and get a 3rd run of length 33 and the last run has length of 31.

1st run: [7, 9, 11, 19, 22, 23, 24, 26, 32, 44, 52, 64, 65, 70, 74, 79, 83, 84, 89, 92, 93, 94, 95, 97, 100, 101, 107, 109, 112, 115, 117, 121, 124]
2nd run: [3, 4, 12, 14, 16, 20, 25, 29, 30, 31, 33, 34, 37, 47, 60, 68, 69, 75, 77, 78, 80, 81, 90, 91, 96, 98, 102, 103, 104, 114, 119, 125, 127]
3rd run: [2, 5, 6, 8, 10, 27, 28, 35, 38, 39, 40, 41, 42, 43, 46, 48, 54, 56, 59, 62, 63, 67, 71, 72, 76, 85, 99, 105, 106, 113, 116, 118, 126]
4th run: [0, 1, 13, 15, 17, 18, 21, 36, 45, 49, 50, 51, 53, 55, 57, 58, 61, 66, 73, 82, 86, 87, 88, 108, 110, 111, 120, 122, 123, 128, 129]

Next is the merging process. The different runs are merged until there is only 1 run of length 130 left.

1st and 2nd runs are merged. The merge algorithm is quite sophisticated. The concept of gallops is used to speed up things. If during the merge, more than 7 elements are taken as winners from the same run, we enter the galloping mode where chunks are moved to the temporary merge area instead of comparing the elements one by one. You can read more about the galloping mode in listsort.txt. We end up with a run of length 66 sorted: [3, 4, 7, 9, 11, 12, 14, 16, 19, 20, 22, 23, 24, 25, 26, 29, 30, 31, 32, 33, 34, 37, 44, 47, 52, 60, 64, 65, 68, 69, 70, 74, 75, 77, 78, 79, 80, 81, 83, 84, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 100, 101, 102, 103, 104, 107, 109, 112, 114, 115, 117, 119, 121, 124, 125, 127]

1st run is now of length 66 and it is merged with the 3nd run resulting in a run of length 99: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 16, 19, 20, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 37, 38, 39, 40, 41, 42, 43, 44, 46, 47, 48, 52, 54, 56, 59, 60, 62, 63, 64, 65, 67, 68, 69, 70, 71, 72, 74, 75, 76, 77, 78, 79, 80, 81, 83, 84, 85, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 109, 112, 113, 114, 115, 116, 117, 118, 119, 121, 124, 125, 126, 127]

Last 2 runs are merged and we finally get 1 run sorted of length 130: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129]

Sort operation complexity is O(n log n).

That’s it for now. I hope you enjoyed the article. Please write a comment if you have any feedback. If you need help with a project written in Python or with building a new web service, I am available as a freelancer: LinkedIn profile. Follow me on Twitter @laurentluce.

Solving mazes using Python: Simple recursivity and A* search

March 10, 2011

This post describes how to solve mazes using 2 algorithms implemented in Python: a simple recursive algorithm and the A* search algorithm.

Maze

The maze we are going to use in this article is 6 cells by 6 cells. The walls are colored in blue. The starting cell is at the bottom left (x=0 and y=0) colored in green. The ending cell is at the top right (x=5 and y=5) colored in green. We can only move horizontally or vertically 1 cell at a time.

Python algorithms for mazes

Recursive walk

We use a nested list of integers to represent the maze. The values are the following:

  • 0: empty cell
  • 1: unreachable cell: e.g. wall
  • 2: ending cell
  • 3: visited cell
grid = [[0, 0, 0, 0, 0, 1],
        [1, 1, 0, 0, 0, 1],
        [0, 0, 0, 1, 0, 0],
        [0, 1, 1, 0, 0, 1],
        [0, 1, 0, 0, 1, 0],
        [0, 1, 0, 0, 0, 2]]

This is a very simple algorithm which does the job even if it is not an efficient algorithm. It walks the maze recursively by visiting each cell and avoiding walls and already visited cells.

The search function accepts the coordinates of a cell to explore. If it is the ending cell, it returns True. If it is a wall or an already visited cell, it returns False. The neighboring cells are explored recursively and if nothing is found at the end, it returns False so it backtracks to explore new paths. We start at cell x=0 and y=0.

def search(x, y):
    if grid[x][y] == 2:
        print 'found at %d,%d' % (x, y)
        return True
    elif grid[x][y] == 1:
        print 'wall at %d,%d' % (x, y)
        return False
    elif grid[x][y] == 3:
        print 'visited at %d,%d' % (x, y)
        return False
    
    print 'visiting %d,%d' % (x, y)

    # mark as visited
    grid[x][y] = 3

    # explore neighbors clockwise starting by the one on the right
    if ((x < len(grid)-1 and search(x+1, y))
        or (y > 0 and search(x, y-1))
        or (x > 0 and search(x-1, y))
        or (y < len(grid)-1 and search(x, y+1))):
        return True

    return False

search(0, 0)

Let’s see what happens when we run the script.

$ python maze.py
visiting 0,0
wall at 1,0
visiting 0,1
wall at 1,1
visited at 0,0
visiting 0,2
...

First cell visited is (0,0). Its neighbors are explored starting by the one on the right (1,0). search(1,0) returns False because it is a wall. There is no cell below and on the left so the one at the top (0,1) is explored. Right of that is a wall and below is already visited so the one at the top (0,2) is explored. This is what we have so far:

Python algorithms for mazes

Because the neighbor on the right is explored first, this algorithm is going to explore the dead-end at the bottom-right first.

...
visiting 1,2
visiting 2,2
wall at 3,2
visiting 2,1
wall at 3,1
visiting 2,0
visiting 3,0
visiting 4,0
visiting 5,0
...

Python algorithms for mazes

The algorithm is going to backtrack because there is nothing else to explore as we are in a dead-end and we are going to end up at cell (1, 2) again where there is more to explore.

...
visited at 4,0
wall at 5,1
visited at 3,0
wall at 4,1
visited at 2,0
wall at 3,1
wall at 1,0
visited at 2,1
wall at 1,1
visited at 2,2
visited at 1,2
wall at 2,3
wall at 1,1
visited at 0,2
visiting 1,3
...

Python algorithms for mazes

Let’s continue, we end up in a second dead-end at cell (4, 2).

...
wall at 2,3
visited at 1,2
visiting 0,3
visited at 1,3
visited at 0,2
visiting 0,4
visiting 1,4
visiting 2,4
visiting 3,4
wall at 4,4
visiting 3,3
visiting 4,3
visiting 5,3
visiting 5,2
wall at 5,1
visiting 4,2
visited at 5,2
wall at 4,1
wall at 3,2
visited at 4,3
...

Python algorithms for mazes

Backtracking happens one more time to go back to cell (5, 3) and we are now on our way to the exit.

...
visiting 5,4
visited at 5,3
wall at 4,4
found at 5,5

Python algorithms for mazes

The full walk looks like this:

Python algorithms for mazes

A* search

We are going to look at a more sophisticated algorithm called A* search. This is based on costs to move around the grid. Let’s assume the cost to move horizontally or vertically 1 cell is equal to 10. Again, we cannot move diagonally here.

Before we start describing the algorithm, let’s define 2 variables: G and H. G is the cost to move from the starting cell to a given cell.

Python algorithms for mazes

H is an estimation of the cost to move from a given cell to the ending cell. How do we calculate that if we don’t know the path to the ending cell? To simplify, we just calculate the distance if no walls were present. There are other ways to do the estimation but this one is good enough for this example.

Python algorithms for mazes

We use 2 lists: an open list containing the cells to explore and a closed list containing the processed cells. We start with the starting cell in the open list and nothing in the closed list.

Let’s follow 1 round of this algorithm by processing our first cell from the open list. It is the starting cell. We remove it from the list and append it to the closed list. We retrieve the list of adjacent cells and we start processing them. The starting cell has 2 adjacent cells: (1, 0) and (0, 1). (1, 0) is a wall so we drop that one. (0, 1) is reachable and not in the closed list so we process it. We calculate G and H for it. G = 10 as we just need to move 1 cell up from the starting cell. H = 90: 5 cells right and 4 cells up to reach the ending cell. We call the sum F = G + H = 10 + 90 = 100. We set the parent of this adjacent cell to be the cell we just removed from the open list: e.g. (0, 0). Finally, we add this adjacent cell to the open list. This is what we have so far. The arrow represents the pointer to the parent cell.

Python algorithms for mazes

We continue with the cell in the open list having the lowest F = G + H. Only one cell is in the open list so it makes it easy. We remove it from the open list and we get its adjacent cells. Again, only one adjacent cell is reachable: (0, 2). We end up with the following after this 2nd round.

Python algorithms for mazes

3nd round result looks like this. Cells in green are in the open list. Cells in red are in the closed list.

Python algorithms for mazes

Let’s detail the next round. We have 2 cells in the open list: (1, 2) and (0, 3). Both have the same F value so we pick the last one added which is (0, 3). This cell has 3 reachable adjacent cells: (1, 3), (0, 2) and (0, 4). We process (1, 3) and (0, 4). (0, 2) is in the closed list so we don’t process that one again. We end up with:

Python algorithms for mazes

Let’s fast forward to:

Python algorithms for mazes

We have (1, 2), (1, 3) and (3, 3) in the open list. (1, 3) is processed next because it is the last one added with the lowest F value = 100. (1, 3) has 1 adjacent cell which is not in the closed list. It is (1, 2) which is in the open list. When an adjacent cell is in the open list, we check if its F value would be less if the path taken was going through the cell currently processed e.g. through (1, 3). Here it is not the case so we don’t update G and H of (1, 2) and its parent. This trick makes the algorithm more efficient when this condition exists.

Let’s take a break and look at a diagram representing the algorithm steps and conditions:

Python algorithms for mazes

We continue processing the cells remaining in the open list. Fast forward to:

Python algorithms for mazes

We have 2 cells in the open list: (3, 3) and (2, 0). The next cell removed from the open list is (3, 3) because its F is equal to 120. This proves that this algorithm is better than the first one we saw. We don’t end up exploring the dead end at (5, 0) and we continue walking from (3, 3) instead which is better.

Fast forward again to:

Python algorithms for mazes

The next cell processed from the open list is (5, 5) and it is the ending cell so we have found our path. It is easy to display the path. We just have to follow the parent pointers up to the starting cell. Our path is highlighted in green on the following diagram:

Python algorithms for mazes

You can read more about this algorithm here.

A* implementation

The basic object here is a cell so we write a class for it. We store the coordinates x and y, the values of G and H plus the sum F.

class Cell(object):
    def __init__(self, x, y, reachable):
        """
        Initialize new cell

        @param x cell x coordinate
        @param y cell y coordinate
        @param reachable is cell reachable? not a wall?
        """
        self.reachable = reachable
        self.x = x
        self.y = y
        self.parent = None
        self.g = 0
        self.h = 0
        self.f = 0

Next is our main class named AStar. Attributes are the open list heapified (keep cell with lowest F at the top), the closed list which is a set for fast lookup, the cells list (grid definition) and the size of the grid.

class AStar(object):
    def __init__(self):
        self.op = []
        heapq.heapify(self.op)
        self.cl = set()
        self.cells = []
        self.gridHeight = 6
        self.gridWidth = 6
  ...

We create a simple method initializing the list of cells to match our example with the walls at the same locations.

    def init_grid(self):
        walls = ((0, 5), (1, 0), (1, 1), (1, 5), (2, 3), 
                 (3, 1), (3, 2), (3, 5), (4, 1), (4, 4), (5, 1))
        for x in range(self.gridWidth):
            for y in range(self.gridHeight):
                if (x, y) in walls:
                    reachable = False
                else:
                    reachable = True
                self.cells.append(Cell(x, y, reachable))
        self.start = self.get_cell(0, 0)
        self.end = self.get_cell(5, 5)

Our heuristic compute method:

    def get_heuristic(self, cell):
        """
        Compute the heuristic value H for a cell: distance between
        this cell and the ending cell multiply by 10.

        @param cell
        @returns heuristic value H
        """
        return 10 * (abs(cell.x - self.end.x) + abs(cell.y - self.end.y))

We need a method to return a particular cell based on x and y coordinates.

    def get_cell(self, x, y):
        """
        Returns a cell from the cells list

        @param x cell x coordinate
        @param y cell y coordinate
        @returns cell
        """
        return self.cells[x * self.gridHeight + y]

Next is a method to retrieve the list of adjacent cells to a specific cell.

    def get_adjacent_cells(self, cell):
        """
        Returns adjacent cells to a cell. Clockwise starting
        from the one on the right.

        @param cell get adjacent cells for this cell
        @returns adjacent cells list 
        """
        cells = []
        if cell.x < self.gridWidth-1:
            cells.append(self.get_cell(cell.x+1, cell.y))
        if cell.y > 0:
            cells.append(self.get_cell(cell.x, cell.y-1))
        if cell.x > 0:
            cells.append(self.get_cell(cell.x-1, cell.y))
        if cell.y < self.gridHeight-1:
            cells.append(self.get_cell(cell.x, cell.y+1))
        return cells

Simple method to print the path found. It follows the parent pointers to go from the ending cell to the starting cell.

    def display_path(self):
        cell = self.end
        while cell.parent is not self.start:
            cell = cell.parent
            print 'path: cell: %d,%d' % (cell.x, cell.y)

We need a method to calculate G and H and set the parent cell.

    def update_cell(self, adj, cell):
        """
        Update adjacent cell

        @param adj adjacent cell to current cell
        @param cell current cell being processed
        """
        adj.g = cell.g + 10
        adj.h = self.get_heuristic(adj)
        adj.parent = cell
        adj.f = adj.h + adj.g

The main method implements the algorithm itself.

    def process(self):
        # add starting cell to open heap queue
        heapq.heappush(self.op, (self.start.f, self.start))
        while len(self.op):
            # pop cell from heap queue 
            f, cell = heapq.heappop(self.op)
            # add cell to closed list so we don't process it twice
            self.cl.add(cell)
            # if ending cell, display found path
            if cell is self.end:
                self.display_path()
                break
            # get adjacent cells for cell
            adj_cells = self.get_adjacent_cells(cell)
            for c in adj_cells:
                if c.reachable and c not in self.cl:
                    if (c.f, c) in self.op:
                        # if adj cell in open list, check if current path is
                        # better than the one previously found for this adj
                        # cell.
                        if c.g > cell.g + 10:
                            self.update_cell(c, cell)
                    else:
                        self.update_cell(c, cell)
                        # add adj cell to open list
                        heapq.heappush(self.op, (c.f, c))

That’s it for now. I hope you enjoyed the article. Please write a comment if you have any feedback.

Python dictionary implementation

February 9, 2011

This post describes how dictionaries are implemented in the Python language.

Dictionaries are indexed by keys and they can be seen as associative arrays. Let’s add 3 key/value pairs to a dictionary:

>>> d = {'a': 1, 'b': 2}
>>> d['c'] = 3
>>> d
{'a': 1, 'b': 2, 'c': 3}

The values can be accessed this way:

>>> d['a']
1
>>> d['b']
2
>>> d['c']
3
>>> d['d']
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
KeyError: 'd'

The key ‘d’ does not exist so a KeyError exception is raised.

Hash tables

Python dictionaries are implemented using hash tables. It is an array whose indexes are obtained using a hash function on the keys. The goal of a hash function is to distribute the keys evenly in the array. A good hash function minimizes the number of collisions e.g. different keys having the same hash.

We are going to assume that we are using strings as keys for the rest of this post. The hash function for strings in Python is defined as:

arguments: string object
returns: hash
function string_hash:
    if hash cached:
        return it
    set len to string's length
    initialize var p pointing to 1st char of string object
    set x to value pointed by p left shifted by 7 bits
    while len >= 0:
        set var x to (1000003 * x) xor value pointed by p
        increment pointer p
    set x to x xor length of string object
    cache x as the hash so we don't need to calculate it again
    return x as the hash

If you run hash(‘a’) in Python, it will execute string_hash() and return 12416037344. Here we assume we are using a 64-bit machine.

If an array of size x is used to store the key/value pairs then we use a mask equal to x-1 to calculate the slot index of the pair in the array. For example, if the size of the array is 8, the index for ‘a’ will be: hash(‘a’) & 7 = 0. The index for ‘b’ is 3, the index for ‘c’ is 2, the index for ‘z’ is 3 which is the same as ‘b’, here we have a collision.

hash table

We can see that the Python hash function does a good job when the keys are consecutive which is good because it is quite common to have this type of data to work with. However, once we add the key ‘z’, there is a collision because it is not consecutive enough.

We could use a linked list to store the pairs having the same hash but it would increase the lookup time e.g. not O(1) anymore. The next section describes the collision resolution method used in the case of Python dictionaries.

Open addressing

Open addressing is a method of collision resolution where probing is used. In case of ‘z’, the slot index 3 is already used in the array so we need to probe for a different index to find one which is not already used. Adding a key/value pair might take more time because of the probing but the lookup will be O(1) and this is the desired behavior.

A quadratic probing sequence is used to find a free slot. The code is the following:

i is the current slot index
set perturb to hash
forever loop:
  set i to i << 2 + i + perturb + 1
  set slot index to i & mask
  if slot is free:
      return it
  right shift perturb by 5 bits

Let’s see how this probing works when we start with i = 3:
3 -> 3 -> 5 -> 5 -> 6 -> 0…
This is not a really good example because we are using a small table of size 8 and this probing starts showing its advantages on larger tables. In our case, index 5 is free so it will be picked for key ‘z’.

Just out of curiosity, let’s look at the probing sequence when the table size is 32 e.g. mask = 31
3 -> 11 -> 19 -> 29 -> 5 -> 6 -> 16 -> 31 -> 28 -> 13 -> 2…

You can read more about this probing sequence by looking at the source code of dictobject.c. A detailed explanation of the probing mechanism can be found at the top of the file.

open addressing

Now, let’s look at the Python internal code along with an example.

Dictionary C structures

The following C structure is used to store a dictionary entry: key/value pair. The hash, key and value are stored. PyObject is the base class of the Python objects.

typedef struct {
    Py_ssize_t me_hash;
    PyObject *me_key;
    PyObject *me_value;
} PyDictEntry;

The following structure represents a dictionary. ma_fill is the number of used slots + dummy slots. A slot is marked dummy when a key pair is removed. ma_used is the number of used slots (active). ma_mask is equal to the array’s size minus 1 and is used to calculate the slot index. ma_table is the array and ma_smalltable is the initial array of size 8.

typedef struct _dictobject PyDictObject;
struct _dictobject {
    PyObject_HEAD
    Py_ssize_t ma_fill;
    Py_ssize_t ma_used;
    Py_ssize_t ma_mask;
    PyDictEntry *ma_table;
    PyDictEntry *(*ma_lookup)(PyDictObject *mp, PyObject *key, long hash);
    PyDictEntry ma_smalltable[PyDict_MINSIZE];
};

Dictionary initialization

When you first create a dictionary, the function PyDict_New() is called. I removed some of the lines and converted the C code to pseudocode to concentrate on the key concepts.

returns new dictionary object
function PyDict_New:
    allocate new dictionary object
    clear dictionary's table
    set dictionary's number of used slots + dummy slots (ma_fill) to 0
    set dictionary's number of active slots (ma_used) to 0
    set dictionary's mask (ma_value) to dictionary size - 1 = 7
    set dictionary's lookup function to lookdict_string
    return allocated dictionary object

Adding items

When a new key/value pair is added, PyDict_SetItem() is called. This function takes a pointer to the dictionary object and the key/value pair. It checks if the key is a string and calculates the hash or reuses the one cached if it exists. insertdict() is called to add the new key/value pair and the dictionary is resized if the number of used slots is greater than 2/3 of the array’s size.
Why 2/3? It is to make sure the probing sequence can find a free slot fast enough. We will look at the resizing function later.

arguments: dictionary, key, value
returns: 0 if OK or -1
function PyDict_SetItem:
    set mp to point to dictionary object
    if key's hash cached:
        use hash
    else:
        calculate hash
    set n_used to dictionary's number of active slots (ma_used)
    call insertdict with dictionary object, key, hash and value
    if key/value pair added successfully and capacity over 2/3:
        call dictresize to resize dictionary's table

inserdict() uses the lookup function to find a free slot. This is the next function we are going to examine. lookdict_string() calculates the slot index using the hash and the mask values. If it cannot find the key in the slot index = hash & mask, it probes using the perturb loop we saw above.

arguments: dictionary object, key, hash
returns: dictionary entry
function lookdict_string:
    calculate slot index based on hash and mask
    if slot's key matches or slot's key is not set:
        returns slot's entry
    if slot's key marked as dummy (was active):
        set freeslot to this slot's entry
    else:
        if slot's hash equals to hash and slot's key equals to key:
            return slot's entry
        set var freeslot to null
    we are here because we couldn't find the key so we start probing
    set perturb to hash
    forever loop:
        set i to i << 2 + i + perturb + 1
        calculate slot index based on i and mask
        if slot's key is null:
            if freeslot is null:
                return slot's entry
            else:
                return freeslot
        if slot's key equals to key or slot's hash equals to hash
            and slot is not marked as dummy:
            return slot's entry
        if slot marked as dummy and freeslot is null:
            set freeslot to slot's entry
        right shift perturb by 5 bits

We want to add the following key/value pairs: {‘a’: 1, ‘b’: 2′, ‘z’: 26, ‘y’: 25, ‘c’: 5, ‘x’: 24}. This is what happens:

A dictionary structure is allocated with internal table size of 8.

  • PyDict_SetItem: key = ‘a’, value = 1
    • hash = hash(‘a’) = 12416037344
    • insertdict
      • lookdict_string
        • slot index = hash & mask = 12416037344 & 7 = 0
        • slot 0 is not used so return it
      • init entry at index 0 with key, value and hash
      • ma_used = 1, ma_fill = 1
  • PyDict_SetItem: key = ‘b’, value = 2
    • hash = hash(‘b’) = 12544037731
    • insertdict
      • lookdict_string
        • slot index = hash & mask = 12544037731 & 7 = 3
        • slot 3 is not used so return it
      • init entry at index 3 with key, value and hash
      • ma_used = 2, ma_fill = 2
  • PyDict_SetItem: key = ‘z’, value = 26
    • hash = hash(‘z’) = 15616046971
    • insertdict
      • lookdict_string
        • slot index = hash & mask = 15616046971 & 7 = 3
        • slot 3 is used so probe for a different slot: 5 is free
      • init entry at index 5 with key, value and hash
      • ma_used = 3, ma_fill = 3
  • PyDict_SetItem: key = ‘y’, value = 25
    • hash = hash(‘y’) = 15488046584
    • insertdict
      • lookdict_string
        • slot index = hash & mask = 15488046584 & 7 = 0
        • slot 0 is used so probe for a different slot: 1 is free
      • init entry at index 1 with key, value and hash
      • ma_used = 4, ma_fill = 4
  • PyDict_SetItem: key = ‘c’, value = 3
    • hash = hash(‘c’) = 12672038114
    • insertdict
      • lookdict_string
        • slot index = hash & mask = 12672038114 & 7 = 2
        • slot 2 is free so return it
      • init entry at index 2 with key, value and hash
      • ma_used = 5, ma_fill = 5
  • PyDict_SetItem: key = ‘x’, value = 24
    • hash = hash(‘x’) = 15360046201
    • insertdict
      • lookdict_string
        • slot index = hash & mask = 15360046201 & 7 = 1
        • slot 1 is used so probe for a different slot: 7 is free
      • init entry at index 7 with key, value and hash
      • ma_used = 6, ma_fill = 6

This is what we have so far:

python dictionary insert

6 slots on 8 are used now so we are over 2/3 of the array’s capacity. dictresize() is called to allocate a larger array. This function also takes care of copying the old table entries to the new table.

dictresize() is called with minused = 24 in our case which is 4 * ma_used. 2 * ma_used is used when the number of used slots is very large (greater than 50000). Why 4 times the number of used slots? It reduces the number of resize steps and it increases sparseness.

The new table size needs to be greater than 24 and it is calculated by shifting the current size 1 bit left until it is greater than 24. It ends up being 32 e.g. 8 -> 16 -> 32.

arguments: dictionary object, (2 or 4) * active slots
returns: 0 if OK, -1 otherwise
function dictresize:
    calculate new dictionary size:
        set var newsize to dictionary size
        while newsize less or equal than (2 or 4) * active slots:
            set newsize to newsize left shifted by 1 bit
    set oldtable to dictionary's table
    allocate new dictionary table
    set dictionary's mask to newsize - 1
    clear dictionary's table
    set dictionary's active slots (ma_used) to 0
    set var i to dictionary's active + dummy slots (ma_fill)
    set dictionary's active + dummy slots (ma_fill) to 0
    copy oldtable entries to dictionary's table using new mask
    return 0
}

This is what happens with our table during resizing: a new table of size 32 is allocated. Old table entries are inserted into the new table using the new mask value which is 31. We end up with the following:

python dictionary table resizing

Removing items

PyDict_DelItem() is called to remove an entry. The hash for this key is calculated and the lookup function is called to return the entry. The key for this entry is set to the dummy key. Dummy entries are entries which contained a key in the past but have not been reused yet. The probe sequence use the dummy information in case of collision to know that those entries held an active pair in the past.

arguments: dictionary object, key
returns 0 if OK, -1 otherwise
function PyDict_DelItem:
    if key's hash cached:
        use hash
    else:
        calculate hash
    look for key in dictionary using hash
    if slot not found:
        return -1
    set slot's key to dummy
    set slot's value to null
    decrement dictionary active slots 
    return 0

We want to remove the key ‘c’ from our dictionary. We end up with the following array:

Python dictionary delete key

Note that the delete item operation doesn’t trigger an array resize if the number of used slots is much less that the total number of slots. However, when a key/value pair is added, the need for resize is based on the number of used slots so it can shrink the array too if the new size is the original size of the array: 8 by default.

That’s it for now. I hope you enjoyed the article. Please write a comment if you have any feedback. If you need help with a project written in Python or with building a new web service, I am available as a freelancer: LinkedIn profile. Follow me on Twitter @laurentluce.

Python threads synchronization: Locks, RLocks, Semaphores, Conditions, Events and Queues

February 5, 2011

This article describes the Python threading synchronization mechanisms in details. We are going to study the following types: Lock, RLock, Semaphore, Condition, Event and Queue. Also, we are going to look at the Python internals behind those mechanisms.

The source code of the programs below can be found at github.com/laurentluce/python-tutorials under threads/.

First, let’s look at a simple program using the threading module with no synchronization.

Threading

We want to write a program fetching the content of some URLs and writing it to a file. We could do it serially with no threads but to speed things up, we are going to create 2 threads processing half of the URLs each.

Note: The best way here would be to use a queue with the URLs to fetch but this example is more suitable to begin our tutorial.

The class FetchUrls is thread based and it takes a list of URLs to fetch and a file object to write the content to.

class FetchUrls(threading.Thread):
    """
    Thread checking URLs.
    """

    def __init__(self, urls, output):
        """
        Constructor.

        @param urls list of urls to check
        @param output file to write urls output
        """
        threading.Thread.__init__(self)
        self.urls = urls
        self.output = output
    
    def run(self):
        """
        Thread run method. Check URLs one by one.
        """
        while self.urls:
            url = self.urls.pop()
            req = urllib2.Request(url)
            try:
                d = urllib2.urlopen(req)
            except urllib2.URLError, e:
                print 'URL %s failed: %s' % (url, e.reason)
            self.output.write(d.read())
            print 'write done by %s' % self.name
            print 'URL %s fetched by %s' % (url, self.name)

The main function starts the 2 threads and then wait for them to finish.

def main():
    # list 1 of urls to fetch
    urls1 = ['http://www.google.com', 'http://www.facebook.com']
    # list 2 of urls to fetch
    urls2 = ['http://www.yahoo.com', 'http://www.youtube.com']
    f = open('output.txt', 'w+')
    t1 = FetchUrls(urls1, f)
    t2 = FetchUrls(urls2, f)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    f.close()

if __name__ == '__main__':
    main()

The issue is that both threads are going to write to the file at the same time, resulting in a big mess. We need to find a way to only have 1 thread writing to the file at a given time. To do that, one way is to use synchronization mechanisms like locks.

Lock

Locks have 2 states: locked and unlocked. 2 methods are used to manipulate them: acquire() and release(). Those are the rules:

  • if the state is unlocked: a call to acquire() changes the state to locked.
  • if the state is locked: a call to acquire() blocks until another thread calls release().
  • if the state is unlocked: a call to release() raises a RuntimeError exception.
  • if the state is locked: a call to release() changes the state to unlocked().

To solve our issue of 2 threads writing to the same file at the same time, we pass a lock to the FetchUrls constructor and we use it to protect the file write operation. I am just going to highlight the modifications relevant to locks. The source code can be found in threads/lock.py.

class FetchUrls(threading.Thread):
    ...

    def __init__(self, urls, output, lock):
        ...
        self.lock = lock
    
    def run(self):
        ...
        while self.urls:
            ...
            self.lock.acquire()
            print 'lock acquired by %s' % self.name
            self.output.write(d.read())
            print 'write done by %s' % self.name
            print 'lock released by %s' % self.name
            self.lock.release()
            ...

def main():
    ...
    lock = threading.Lock()
    ...
    t1 = FetchUrls(urls1, f, lock)
    t2 = FetchUrls(urls2, f, lock)
    ...

Let’s look at the program output when we run it:

$ python locks.py
lock acquired by Thread-2
write done by Thread-2
lock released by Thread-2
URL http://www.youtube.com fetched by Thread-2
lock acquired by Thread-1
write done by Thread-1
lock released by Thread-1
URL http://www.facebook.com fetched by Thread-1
lock acquired by Thread-2
write done by Thread-2
lock released by Thread-2
URL http://www.yahoo.com fetched by Thread-2
lock acquired by Thread-1
write done by Thread-1
lock released by Thread-1
URL http://www.google.com fetched by Thread-1

The write operation is now protected by a lock and we don’t have 2 threads writing to the file at the same time.

Let’s take a look at the Python internals. I am using Python 2.6.6 on Linux.

The method Lock() of the threading module is equal to thread.allocate_lock. You can see the code in Lib/threading.py.

Lock = _allocate_lock
_allocate_lock = thread.allocate_lock

The C implementation can be found in Python/thread_pthread.h. We assume that our system supports POSIX semaphores. sem_init() initializes the semaphore at the address pointed by lock. The initial value of the semaphore is 1 which means unlocked. The semaphore is shared between the threads of the process.

PyThread_type_lock
PyThread_allocate_lock(void)
{
    ...
    lock = (sem_t *)malloc(sizeof(sem_t));

    if (lock) {
        status = sem_init(lock,0,1);
        CHECK_STATUS("sem_init");
        ....
    }
    ...
}

When the acquire() method is called, the following C code is executed. waitflag is equal to 1 by default which means the call blocks until the lock is unlocked. sem_wait() decrements the semaphore’s value or blocks until the value is greater than 0 e.g. unlocked by another thread.

int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
    ...
    do {
        if (waitflag)
            status = fix_status(sem_wait(thelock));
        else
            status = fix_status(sem_trywait(thelock));
    } while (status == EINTR); /* Retry if interrupted by a signal */
    ....
}

When the release() method is called, the following C code is executed. sem_post() increments the semaphore’s value e.g. unlocks the semaphore.

void
PyThread_release_lock(PyThread_type_lock lock)
{
    ...
    status = sem_post(thelock);
    ...
}

You can also use the “with” statement. The Lock object can be used as a context manager. The advantage of using “with” is that the acquire() method will be called when the “with” block is entered and release() will be called when the block is exited. Let’s rewrite the class FetchUrls using the “with” statement.

class FetchUrls(threading.Thread):
    ...
    def run(self):
        ...
        while self.urls:
            ...
            with self.lock:
                print 'lock acquired by %s' % self.name
                self.output.write(d.read())
                print 'write done by %s' % self.name
                print 'lock released by %s' % self.name
            ...

RLock

RLock is a reentrant lock. acquire() can be called multiple times by the same thread without blocking. Keep in mind that release() needs to be called the same number of times to unlock the resource.

Using Lock, the second call to acquire() by the same thread will block:

lock = threading.Lock()
lock.acquire()
lock.acquire()

If you use RLock, the second call to acquire() won’t block.

rlock = threading.RLock()
rlock.acquire()
rlock.acquire()

RLock also uses thread.allocate_lock() but it keeps track of the owner thread to support the reentrant feature. Following is the RLock acquire() method implementation. If the thread calling acquire() is the owner of the resource then the counter is incremented by one. If not, it tries to acquire it. First time it acquires the lock, the owner is saved and the counter is initialized to 1.

def acquire(self, blocking=1):
    me = _get_ident()
    if self.__owner == me:
        self.__count = self.__count + 1
        ...
        return 1
    rc = self.__block.acquire(blocking)
    if rc:
        self.__owner = me
        self.__count = 1
        ...
    ...
    return rc

Let’s look at the RLock release() method. First is a check to make sure the thread calling the method is the owner of the lock. The counter is decremented and if it is equal to 0 then the resource is unlocked and available for grab by another thread.

def release(self):
    if self.__owner != _get_ident():
        raise RuntimeError("cannot release un-acquired lock")
    self.__count = count = self.__count - 1
    if not count:
        self.__owner = None
        self.__block.release()
        ...
    ...

Condition

This is a synchronization mechanism where a thread waits for a specific condition and another thread signals that this condition has happened. Once the condition happened, the thread acquires the lock to get exclusive access to the shared resource.

A good way to illustrate this mechanism is by looking at a producer/consumer example. The producer appends random integers to a list at random time and the consumer retrieves those integers from the list. The source code can be found in threads/condition.py.

Let’s look at the producer class. The producer acquires the lock, appends an integer, notifies the consumer thread that there is something to retrieve and release the lock. It does that forever with a random pause in between each append operation.

class Producer(threading.Thread):
    """
    Produces random integers to a list
    """

    def __init__(self, integers, condition):
        """
        Constructor.

        @param integers list of integers
        @param condition condition synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.condition = condition
    
    def run(self):
        """
        Thread run method. Append random integers to the integers list
        at random time.
        """
        while True:
            integer = random.randint(0, 256)
            self.condition.acquire()
            print 'condition acquired by %s' % self.name
            self.integers.append(integer) 
            print '%d appended to list by %s' % (integer, self.name)
            print 'condition notified by %s' % self.name
            self.condition.notify()
            print 'condition released by %s' % self.name
            self.condition.release()
            time.sleep(1)

Next is the consumer class. It acquires the lock, checks if there is an integer in the list, if there is nothing, it waits to be notified by the producer. Once the element is retrieved from the list, it releases the lock.

Note that a call to wait() releases the lock so the producer can acquire the resource and do its work.

class Consumer(threading.Thread):
    """
    Consumes random integers from a list
    """

    def __init__(self, integers, condition):
        """
        Constructor.

        @param integers list of integers
        @param condition condition synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.condition = condition
    
    def run(self):
        """
        Thread run method. Consumes integers from list
        """
        while True:
            self.condition.acquire()
            print 'condition acquired by %s' % self.name
            while True:
                if self.integers:
                    integer = self.integers.pop()
                    print '%d popped from list by %s' % (integer, self.name)
                    break
                print 'condition wait by %s' % self.name
                self.condition.wait()
            print 'condition released by %s' % self.name
            self.condition.release()

We need to write our main creating 2 threads and starting them:

def main():
    integers = []
    condition = threading.Condition()
    t1 = Producer(integers, condition)
    t2 = Consumer(integers, condition)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

if __name__ == '__main__':
    main()

The output of this program looks like this:

$ python condition.py
condition acquired by Thread-1
159 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
condition acquired by Thread-2
159 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
condition acquired by Thread-1
116 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
116 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2

Thread-1 appends 159 to the list then notifies the consumer and releases the lock. Thread-2 acquires the lock, retrieves 159 and releases the lock. The producer is still waiting at that time because of the time.sleep(1) so the consumer acquires the lock again then waits to get notified by the producer. When wait() is called, it unlocks the resource so the producer can acquire it and append a new integer to the list before notifying the consumer.

Let’s look at the Python internals for this condition synchronization mechanism. The condition’s constructor creates a RLock object if no existing lock is passed to the constructor. This lock will be used when acquire() and release() are called.

class _Condition(_Verbose):

    def __init__(self, lock=None, verbose=None):
        _Verbose.__init__(self, verbose)
        if lock is None:
            lock = RLock()
        self.__lock = lock

Next is the wait() method. We assume that we are calling wait() with no timeout value to simplify the explanation of the wait() method’s code. A new lock named waiter is created and the state is set to locked. The waiter lock is used for communication between the threads so the producer can notify the consumer by releasing this waiter lock. The lock object is added to the waiters list and the method is blocking at waiter.acquire(). Note that the condition lock state is saved at the beginning and restored when wait() returns.

def wait(self, timeout=None):
    ...
    waiter = _allocate_lock()
    waiter.acquire()
    self.__waiters.append(waiter)
    saved_state = self._release_save()
    try:    # restore state no matter what (e.g., KeyboardInterrupt)
        if timeout is None:
            waiter.acquire()
            ...
        ...
    finally:
        self._acquire_restore(saved_state)

The notify() method is used to release the waiter lock. The producer calls notify() to notify the consumer blocked on wait().

def notify(self, n=1):
    ...
    __waiters = self.__waiters
    waiters = __waiters[:n]
    ...
    for waiter in waiters:
        waiter.release()
        try:
            __waiters.remove(waiter)
        except ValueError:
            pass

You can also use the “with” statement with the Condition object so acquire() and release() are called for us. Let’s rewrite the producer class and the consumer class using “with”.

class Producer(threading.Thread):
    ...
    def run(self):
        while True:
            integer = random.randint(0, 256)
            with self.condition:
                print 'condition acquired by %s' % self.name
                self.integers.append(integer) 
                print '%d appended to list by %s' % (integer, self.name)
                print 'condition notified by %s' % self.name
                self.condition.notify()
                print 'condition released by %s' % self.name
            time.sleep(1)

class Consumer(threading.Thread):
    ... 
    def run(self):
        while True:
            with self.condition:
                print 'condition acquired by %s' % self.name
                while True:
                    if self.integers:
                        integer = self.integers.pop()
                        print '%d popped from list by %s' % (integer, self.name)
                        break
                    print 'condition wait by %s' % self.name
                    self.condition.wait()
                print 'condition released by %s' % self.name

Semaphore

A semaphore is based on an internal counter which is decremented each time acquire() is called and incremented each time release() is called. If the counter is equal to 0 then acquire() blocks. It is the Python implementation of the Dijkstra semaphore concept: P() and V(). Using a semaphore makes sense when you want to control access to a resource with limited capacity like a server.

Here is a simple example:

semaphore = threading.Semaphore()
semaphore.acquire()
# work on a shared resource
...
semaphore.release()

Let’s look at the Python internals. The constructor takes a value which is the counter initial value. This value defaults to 1. A condition instance is created with a lock to protect the counter and to notify the other thread when the semaphore is released.

class _Semaphore(_Verbose):
    ...    
    def __init__(self, value=1, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
        self.__value = value
        ...

Next is the acquire() method. If the semaphore’s counter is equal to 0, it blocks on the condition’s wait() method until it gets notified by a different thread. If the semaphore’s counter is greater than 0, it decrements the value.

def acquire(self, blocking=1):
    rc = False
    self.__cond.acquire()
    while self.__value == 0:
        ...
        self.__cond.wait()
    else:
        self.__value = self.__value - 1
        rc = True
    self.__cond.release()
    return rc

The semaphore’s release() method increments the counter and then notifies the other thread.

def release(self):
    self.__cond.acquire()
    self.__value = self.__value + 1
    self.__cond.notify()
    self.__cond.release()

Note that there is also a bounded semaphore you can use to make sure you never call release() too many times. Here is the Python internal code use for it:

class _BoundedSemaphore(_Semaphore):
    """Semaphore that checks that # releases is <= # acquires"""
    def __init__(self, value=1, verbose=None):
        _Semaphore.__init__(self, value, verbose)
        self._initial_value = value

    def release(self):
        if self._Semaphore__value >= self._initial_value:
            raise ValueError, "Semaphore released too many times"
        return _Semaphore.release(self)

You can also use the “with” statement with the Semaphore object so acquire() and release() are called for us.

semaphore = threading.Semaphore()
with semaphore:
  # work on a shared resource
  ...

Event

This is a simple mechanism. A thread signals an event and the other thread(s) wait for it.

Let’s go back to our producer and consumer example and convert it to use an event instead of a condition. The source code can be found in threads/event.py.

First the producer class. We pass an Event instance to the constructor instead of a Condition instance. Each time an integer is added to the list, the event is set then cleared right away to notify the consumer. The event instance is cleared by default.

class Producer(threading.Thread):
    """
    Produces random integers to a list
    """

    def __init__(self, integers, event):
        """
        Constructor.

        @param integers list of integers
        @param event event synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.event = event
    
    def run(self):
        """
        Thread run method. Append random integers to the integers list
        at random time.
        """
        while True:
            integer = random.randint(0, 256)
            self.integers.append(integer) 
            print '%d appended to list by %s' % (integer, self.name)
            print 'event set by %s' % self.name
            self.event.set()
            self.event.clear()
            print 'event cleared by %s' % self.name
            time.sleep(1)

Next is the consumer class. We also pass an Event instance to the constructor. The consumer instance is blocking on wait() until the event is set indicating that there is an integer to consume.

class Consumer(threading.Thread):
    """
    Consumes random integers from a list
    """

    def __init__(self, integers, event):
        """
        Constructor.

        @param integers list of integers
        @param event event synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.event = event
    
    def run(self):
        """
        Thread run method. Consumes integers from list
        """
        while True:
            self.event.wait()
            try:
                integer = self.integers.pop()
                print '%d popped from list by %s' % (integer, self.name)
            except IndexError:
                # catch pop on empty list
                time.sleep(1)

This is the output when we run the program. Thread-1 appends 124 to the list and then set the event to notify the consumer. The consumer’s call to wait() stops blocking and the integer is retrieved from the list.

$ python event.py 
124 appended to list by Thread-1
event set by Thread-1
event cleared by Thread-1
124 popped from list by Thread-2
223 appended to list by Thread-1
event set by Thread-1
event cleared by Thread-1
223 popped from list by Thread-2

Let’s look at the Python internals. First is the Event constructor. A condition instance is created with a lock to protect the event flag value and to notify the other thread when the event has been set.

class _Event(_Verbose):
    def __init__(self, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
        self.__flag = False

Following is the set() method. It sets the flag to True and notifies the other threads. The condition object is used to protect the critical part when the flag’s value is changed.

def set(self):
    self.__cond.acquire()
    try:
        self.__flag = True
        self.__cond.notify_all()
    finally:
        self.__cond.release()

Its opposite is the clear() method setting the flag to False.

def clear(self):
    self.__cond.acquire()
    try:
        self.__flag = False
    finally:
        self.__cond.release()

The wait() method blocks until the set method is called. The wait() method does nothing if the flag is set.

def wait(self, timeout=None):
    self.__cond.acquire()
    try:
        if not self.__flag:
            self.__cond.wait(timeout)
    finally:
        self.__cond.release()

Queue

Queues are a great mechanism when we need to exchange information between threads as it takes care of locking for us.

We are interested in the following 4 Queue methods:

  • put: Put an item to the queue.
  • get: Remove and return an item from the queue.
  • task_done: Needs to be called each time an item has been processed.
  • join: Blocks until all items have been processed.

Let’s convert our producer/consumer program to use a queue. The source code can be found in threads/queue.py.

First the producer class. We don’t need to pass the integers list because we are using the queue to store the integers generated. The thread generates and puts the integers in the queue in a forever loop.

class Producer(threading.Thread):
    """
    Produces random integers to a list
    """

    def __init__(self, queue):
        """
        Constructor.

        @param integers list of integers
        @param queue queue synchronization object
        """
        threading.Thread.__init__(self)
        self.queue = queue
    
    def run(self):
        """
        Thread run method. Append random integers to the integers list at
        random time.
        """
        while True:
            integer = random.randint(0, 256)
            self.queue.put(integer) 
            print '%d put to queue by %s' % (integer, self.name)
            time.sleep(1)

Next is our consumer class. The thread gets the integer from the queue and indicates that it is done working on it using task_done().

class Consumer(threading.Thread):
    """
    Consumes random integers from a list
    """

    def __init__(self, queue):
        """
        Constructor.

        @param integers list of integers
        @param queue queue synchronization object
        """
        threading.Thread.__init__(self)
        self.queue = queue
    
    def run(self):
        """
        Thread run method. Consumes integers from list
        """
        while True:
            integer = self.queue.get()
            print '%d popped from list by %s' % (integer, self.name)
            self.queue.task_done()

Here is the output of the program.

$ python queue.py 
61 put to queue by Thread-1
61 popped from list by Thread-2
6 put to queue by Thread-1
6 popped from list by Thread-2

The Queue module takes care of locking for us which is a great advantage. It is interesting to look at the Python internals to understand how the locking mechanism works underneath.

The Queue constructor creates a lock to protect the queue when an element is added or removed. Some conditions objects are created to notify events like the queue is not empty (get() call stops blocking), queue is not full (put() call stops blocking) and all items have been processed (join() call stops blocking).

class Queue:
    def __init__(self, maxsize=0):
        ...
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0

The put() method adds an item or waits if the queue is full. It notifies the threads blocked on get() that the queue is not empty. See above for an explanation on the Condition object for more details.

def put(self, item, block=True, timeout=None):
    ...
    self.not_full.acquire()
    try:
        if self.maxsize > 0:
            ...
            elif timeout is None:
                while self._qsize() == self.maxsize:
                    self.not_full.wait()
        self._put(item)
        self.unfinished_tasks += 1
        self.not_empty.notify()
    finally:
        self.not_full.release()

The get() method removes an element from the queue or waits if the queue is empty. It notifies the threads blocked on put() that the queue is not full.

def get(self, block=True, timeout=None):
    ...
    self.not_empty.acquire()
    try:
        ...
        elif timeout is None:
            while not self._qsize():
                self.not_empty.wait()
        item = self._get()
        self.not_full.notify()
        return item
    finally:
        self.not_empty.release()

When the method task_done() is called, the number of unfinished tasks is decremented. If the counter is equal to 0 then the threads waiting on the queue join() method continue their execution.

def task_done(self):
    self.all_tasks_done.acquire()
    try:
        unfinished = self.unfinished_tasks - 1
        if unfinished <= 0:
            if unfinished < 0:
                raise ValueError('task_done() called too many times')
            self.all_tasks_done.notify_all()
        self.unfinished_tasks = unfinished
    finally:
        self.all_tasks_done.release()

def join(self):
    self.all_tasks_done.acquire()
    try:
        while self.unfinished_tasks:
            self.all_tasks_done.wait()
    finally:
        self.all_tasks_done.release()

That’s it for now. I hope you enjoyed this article. Please write a comment if you have any feedback. If you need help with a project written in Python or with building a new web service, I am available as a freelancer: LinkedIn profile. Follow me on Twitter @laurentluce.

OpenStack Nova internals of instance launching

January 30, 2011

This article describes the internals of launching an instance in OpenStack Nova.

Overview

Launching a new instance involves multiple components inside OpenStack Nova:

  • API server: handles requests from the user and relays them to the cloud controller.
  • Cloud controller: handles the communication between the compute nodes, the networking controllers, the API server and the scheduler.
  • Scheduler: selects a host to run a command.
  • Compute worker: manages computing instances: launch/terminate instance, attach/detach volumes…
  • Network controller: manages networking resources: allocate fixed IP addresses, configure VLANs…

Note: There are more components in Nova like the authentication manager, the object store and the volume controller but we are not going to study them as we are focusing on instance launching in this article.

The flow of launching an instance goes like this: The API server receives a run_instances command from the user. The API server relays the message to the cloud controller (1). Authentication is performed to make sure this user has the required permissions. The cloud controller sends the message to the scheduler (2). The scheduler casts the message to a random host and asks him to start a new instance (3). The compute worker on the host grabs the message (4). The compute worker needs a fixed IP to launch a new instance so it sends a message to the network controller (5,6,7,8). The compute worker continues with spawning a new instance. We are going to see all those steps in details next.

API

You can use the OpenStack API or EC2 API to launch a new instance. We are going to use the EC2 API. We add a new key pair and we use it to launch an instance of type m1.tiny.

cd /tmp/
euca-add-keypair test > test.pem
euca-run-instances -k test -t m1.tiny ami-tiny

run_instances() in api/ec2/cloud.py is called which results in compute API create() in compute/API.py being called.

def run_instances(self, context, **kwargs):
  ...
  instances = self.compute_api.create(context,
            instance_type=instance_types.get_by_type(
                kwargs.get('instance_type', None)),
            image_id=kwargs['image_id'],
            ...

Compute API create() does the following:

  • Check if the maximum number of instances of this type has been reached.
  • Create a security group if it doesn’t exist.
  • Generate MAC addresses and hostnames for the new instances.
  • Send a message to the scheduler to run the instances.

Cast

Let’s pause for a minute and look at how the message is sent to the scheduler. This type of message delivery in OpenStack is defined as RPC casting. RabbitMQ is used here for delivery. The publisher (API) sends the message to a topic exchange (scheduler topic). A consumer (Scheduler worker) retrieves the message from the queue. No response is expected as it is a cast and not a call. We will see call later.

Here is the code casting that message:

LOG.debug(_("Casting to scheduler for %(pid)s/%(uid)s's"
        " instance %(instance_id)s") % locals())
rpc.cast(context,
         FLAGS.scheduler_topic,
         {"method": "run_instance",
          "args": {"topic": FLAGS.compute_topic,
                   "instance_id": instance_id,
                   "availability_zone": availability_zone}})

You can see that the scheduler topic is used and the message arguments indicates what we want the scheduler to use for its delivery. In this case, we want the scheduler to send the message using the compute topic.

Scheduler

The scheduler receives the message and sends the run_instance message to a random host. The chance scheduler is used here. There are more scheduler types like the zone scheduler (pick a random host which is up in a specific availability zone) or the simple scheduler (pick the least loaded host). Now that a host has been selected, the following code is executed to send the message to a compute worker on the host.

rpc.cast(context,
         db.queue_get_for(context, topic, host),
         {"method": method,
          "args": kwargs})
LOG.debug(_("Casting to %(topic)s %(host)s for %(method)s") % locals())

Compute

The Compute worker receives the message and the following method in compute/manager.py is called:

def run_instance(self, context, instance_id, **_kwargs):
  """Launch a new instance with specified options."""
  ...

run_instance() does the following:

  • Check if the instance is already running.
  • Allocate a fixed IP address.
  • Setup a VLAN and a bridge if not already setup.
  • Spawn the instance using the virtualization driver.

Call to network controller

A RPC call is used to allocate a fixed IP. A RPC call is different than a RPC cast because it uses a topic.host exchange meaning that a specific host is targeted. A response is also expected.

Spawn instance

Next is the instance spawning process performed by the virtualization driver. libvirt is used in our case. The code we are going to look at is located in virt/libvirt_conn.py.

First thing that needs to be done is the creation of the libvirt xml to launch the instance. The to_xml() method is used to retrieve the xml content. Following is the XML for our instance.

<domain type='qemu'>
    <name>instance-00000001</name>
    <memory>524288</memory>
    <os>
        <type>hvm</type>
        <kernel>/opt/novascript/trunk/nova/..//instances/instance-00000001/kernel</kernel>
        <cmdline>root=/dev/vda console=ttyS0</cmdline>
        <initrd>/opt/novascript/trunk/nova/..//instances/instance-00000001/ramdisk</initrd>
    </os>
    <features>
        <acpi/>
    </features>
    <vcpu>1</vcpu>
    <devices>
        <disk type='file'>
            <driver type='qcow2'/>
            <source file='/opt/novascript/trunk/nova/..//instances/instance-00000001/disk'/>
            <target dev='vda' bus='virtio'/>
        </disk>
        <interface type='bridge'>
            <source bridge='br100'/>
            <mac address='02:16:3e:17:35:39'/>
            <!--   <model type='virtio'/>  CANT RUN virtio network right now -->
            <filterref filter="nova-instance-instance-00000001">
                <parameter name="IP" value="10.0.0.3" />
                <parameter name="DHCPSERVER" value="10.0.0.1" />
                <parameter name="RASERVER" value="fe80::1031:39ff:fe04:58f5/64" />
                <parameter name="PROJNET" value="10.0.0.0" />
                <parameter name="PROJMASK" value="255.255.255.224" />
                <parameter name="PROJNETV6" value="fd00::" />
                <parameter name="PROJMASKV6" value="64" />
            </filterref>
        </interface>

        <!-- The order is significant here.  File must be defined first -->
        <serial type="file">
            <source path='/opt/novascript/trunk/nova/..//instances/instance-00000001/console.log'/>
            <target port='1'/>
        </serial>

        <console type='pty' tty='/dev/pts/2'>
            <source path='/dev/pts/2'/>
            <target port='0'/>
        </console>

        <serial type='pty'>
            <source path='/dev/pts/2'/>
            <target port='0'/>
        </serial>

    </devices>
</domain>

The hypervisor used is qemu. The memory allocated for the guest will be 524 kbytes. The guest OS will boot from a kernel and initrd stored on the host OS.

Number of virtual CPUs allocated for the guest OS is 1. ACPI is enabled for power management.

Multiple devices are defined:

  • The disk image is a file on the host OS using the driver qcow2. qcow2 is a qemu disk image copy-on-write format.
  • The network interface is a bridge visible to the guest. We define network filtering parameters like IP which means this interface will always use 10.0.0.3 as the source IP address.
  • Device logfile. All data sent to the character device is written to console.log.
  • Pseudo TTY: virsh console can be used to connect to the serial port locally.

Next is the preparation of the network filtering. The firewall driver used by default is iptables. The rules are defined in apply_ruleset() in the class IptablesFirewallDriver. Let’s take a look at the firewall chains and rules for this instance.

*filter
...
:nova-ipv4-fallback - [0:0]
:nova-local - [0:0]
:nova-inst-1 - [0:0]
:nova-sg-1 - [0:0]
-A nova-ipv4-fallback -j DROP
-A FORWARD -j nova-local
-A nova-local -d 10.0.0.3 -j nova-inst-1
-A nova-inst-1 -m state --state INVALID -j DROP
-A nova-inst-1 -m state --state ESTABLISHED,RELATED -j ACCEPT
-A nova-inst-1 -j nova-sg-1
-A nova-inst-1 -s 10.1.3.254 -p udp --sport 67 --dport 68
-A nova-inst-1 -j nova-ipv4-fallback
-A nova-sg-1 -p tcp -s 10.0.0.0/27 -m multiport --dports 1:65535 -j ACCEPT
-A nova-sg-1 -p udp -s 10.0.0.0/27 -m multiport --dports 1:65535 -j ACCEPT
-A nova-sg-1 -p icmp -s 10.0.0.0/27 -m icmp --icmp-type 1/65535 -j ACCEPT
COMMIT

First you have the chains: nova-local, nova-inst-1, nova-sg-1, nova-ipv4-fallback and then the rules.

Let’s look at the different chains and rules:

Packets routed through the virtual network are handled by the chain nova-local.

-A FORWARD -j nova-local

If the destination is 10.0.0.3 then it is for our instance so we jump to the chain nova-inst-1.

-A nova-local -d 10.0.0.3 -j nova-inst-1

If the packet could not be identified, drop it.

-A nova-inst-1 -m state --state INVALID -j DROP

If the packet is associated with an established connection or is starting a new connection but associated with an existing connection, accept it.

-A nova-inst-1 -m state --state ESTABLISHED,RELATED -j ACCEPT

Allow DHCP responses.

-A nova-inst-1 -s 10.0.0.254 -p udp --sport 67 --dport 68

Jump to the security group chain to check the packet against its rules.

-A nova-inst-1 -j nova-sg-1

Security group chain. Accept all TCP packets from 10.0.0.0/27 and ports 1 to 65535.

-A nova-sg-1 -p tcp -s 10.0.0.0/27 -m multiport --dports 1:65535 -j ACCEPT

Accept all UDP packets from 10.0.0.0/27 and ports 1 to 65535.

-A nova-sg-1 -p udp -s 10.0.0.0/27 -m multiport --dports 1:65535 -j ACCEPT

Accept all ICMP packets from 10.0.0.0/27 and ports 1 to 65535.

-A nova-sg-1 -p icmp -s 10.0.0.0/27 -m icmp --icmp-type 1/65535 -j ACCEPT

Jump to fallback chain.

-A nova-inst-1 -j nova-ipv4-fallback

This is the fallback chain’s rule where we drop the packet.

-A nova-ipv4-fallback -j DROP

Here is an example of a packet for a new TCP connection to 10.0.0.3:

Following the firewall rules preparation is the image creation. This happens in _create_image().

def _create_image(self, inst, libvirt_xml, suffix='', disk_images=None):
  ...

In this method, libvirt.xml is created based on the XML we generated above.

A copy of the ramdisk, initrd and disk images are made for the hypervisor to use.

If the flat network manager is used then a network configuration is injected into the guest OS image. We are using the VLAN manager in this example.

The instance’s SSH key is injected into the image. Let’s look at this part in more details. The disk inject_data() method is called.

disk.inject_data(basepath('disk'), key, net,
                 partition=target_partition,
                 nbd=FLAGS.use_cow_images)

basepath(‘disk’) is where the instance’s disk image is located on the host OS. key is the SSH key string. net is not set in our case because we don’t inject a networking configuration. partition is None because we are using a kernel image otherwise we could use a partitioned disk image. Let’s look inside inject_data().

First thing happening here is linking the image to a device. This happens in _link_device().

device = _allocate_device()
utils.execute('sudo qemu-nbd -c %s %s' % (device, image))
# NOTE(vish): this forks into another process, so give it a chance
#             to set up before continuuing
for i in xrange(10):
    if os.path.exists("/sys/block/%s/pid" % os.path.basename(device)):
        return device
    time.sleep(1)
raise exception.Error(_('nbd device %s did not show up') % device)

_allocate_device() returns the next available ndb device: /dev/ndbx where x is between 0 and 15. qemu-nbd is a QEMU disk network block device server. Once this is done, we get the device, let say: /dev/ndb0.

We disable filesystem check for this device. mapped_device here is “/dev/ndb0″.

out, err = utils.execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)

We mount the file system to a temporary directory and we add the SSH key to the ssh authorized_keys file.

sshdir = os.path.join(fs, 'root', '.ssh')
utils.execute('sudo mkdir -p %s' % sshdir)  # existing dir doesn't matter
utils.execute('sudo chown root %s' % sshdir)
utils.execute('sudo chmod 700 %s' % sshdir)
keyfile = os.path.join(sshdir, 'authorized_keys')
utils.execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')

In the code above, fs is the temporary directory.

Finally, we unmount the filesystem and unlink the device. This concludes the image creation and setup.

Next step in the virtualization driver spawn() method is the instance launch itself using the driver createXML() binding. Following that is the firewall rules apply step.

That’s it for now. I hope you enjoyed this article. Please write a comment if you have any feedback. If you need help with a project written in Python or with building a new web service, I am available as a freelancer: LinkedIn profile. Follow me on Twitter @laurentluce.

 
Powered by Wordpress and MySQL. Theme by Shlomi Noach, openark.org