Python list implementation

March 10, 2011

This post describes the CPython implementation of the list object. CPython is the most used Python implementation.

Lists in Python are powerful and it is interesting to see how they are implemented internally.

Following is a simple Python script appending some integers to a list and printing them.

>>> l = []
>>> l.append(1)
>>> l.append(2)
>>> l.append(3)
>>> l
[1, 2, 3]
>>> for e in l:
...   print e
... 
1
2
3

As you can see, lists are iterable.

List object C structure

A list object in CPython is represented by the following C structure. ob_item is a list of pointers to the list elements. allocated is the number of slots allocated in memory.

typedef struct {
    PyObject_VAR_HEAD
    PyObject **ob_item;
    Py_ssize_t allocated;
} PyListObject;

List initialization

Let’s look at what happens when we initialize an empty list. e.g. l = [].

arguments: size of the list = 0
returns: list object = []
PyListNew:
    nbytes = size * size of global Python object = 0
    allocate new list object
    allocate list of pointers (ob_item) of size nbytes = 0
    clear ob_item
    set list's allocated var to 0 = 0 slots
    return list object 

It is important to notice the difference between allocated slots and the size of the list. The size of a list is the same as len(l). The number of allocated slots is what has been allocated in memory. Often, you will see that allocated can be greater than size. This is to avoid needing calling realloc each time a new elements is appended to the list. We will see more about that later.

Append

We append an integer to the list: l.append(1). What happens? The internal C function app1() is called:

arguments: list object, new element
returns: 0 if OK, -1 if not
app1:
    n = size of list
    call list_resize() to resize the list to size n+1 = 0 + 1 = 1
    list[n] = list[0] = new element
    return 0

Let’s look at list_resize(). It over-allocates memory to avoid calling list_resize too many time. The growth pattern of the list is: 0, 4, 8, 16, 25, 35, 46, 58, 72, 88, …

arguments: list object, new size
returns: 0 if OK, -1 if not
list_resize:
    new_allocated = (newsize >> 3) + (newsize < 9 ? 3 : 6) = 3
    new_allocated += newsize = 3 + 1 = 4
    resize ob_item (list of pointers) to size new_allocated
    return 0

4 slots are now allocated to contain elements and the first one is the integer 1. You can see on the following diagram that l[0] points to the integer object that we just appended. The dashed squares represent the slots allocated but not used yet.

Append operation complexity is O(1).

Python lists

We continue by adding one more element: l.append(2). list_resize is called with n+1 = 2 but because the allocated size is 4, there is no need to allocate more memory. Same thing happens when we add 2 more integers: l.append(3), l.append(4). The following diagram shows what we have so far.

Python lists

Insert

Let’s insert a new integer (5) at position 1: l.insert(1,5) and look at what happens internally. ins1() is called:

arguments: list object, where, new element
returns: 0 if OK, -1 if not
ins1:
    resize list to size n+1 = 5 -> 4 more slots will be allocated
    starting at the last element up to the offset where, right shift each element 
    set new element at offset where
    return 0

Python lists

The dashed squares represent the slots allocated but not used yet. Here, 8 slots are allocated but the size or length of the list is only 5.

Insert operation complexity is O(n).

Pop

When you pop the last element: l.pop(), listpop() is called. list_resize is called inside listpop() and if the new size is less than half of the allocated size then the list is shrunk.

arguments: list object
returns: element popped
listpop:
    if list empty:
        return null
    resize list with size 5 - 1 = 4. 4 is not less than 8/2 so no shrinkage
    set list object size to 4
    return last element

Pop operation complexity is O(1).

Python lists

You can observe that slot 4 still points to the integer but the important thing is the size of the list which is now 4.

Let’s pop one more element. In list_resize(), size – 1 = 4 – 1 = 3 is less than half of the allocated slots so the list is shrunk to 6 slots and the new size of the list is now 3.

You can observe that slot 3 and 4 still point to some integers but the important thing is the size of the list which is now 3.

Python lists

Remove

Python list object has a method to remove a specific element: l.remove(5). listremove() is called.

arguments: list object, element to remove
returns none if OK, null if not
listremove:
    loop through each list element:
        if correct element:
            slice list between element's slot and element's slot + 1
            return none
    return null

To slice the list and remove the element, list_ass_slice() is called and it is interesting to see how it works. Here, low offset is 1 and high offset is 2 as we are removing the element 5 at position 1.

arguments: list object, low offset, high offset
returns: 0 if OK
list_ass_slice:
    copy integer 5 to recycle list to dereference it
    shift elements from slot 2 to slot 1
    resize list to 5 slots
    return 0

Remove operation complexity is O(n).

Python lists

That’s it for now. I hope you enjoyed the article. Please write a comment if you have any feedback. If you need help with a project written in Python or with building a new web service, I am available as a freelancer: LinkedIn profile. Follow me on Twitter @laurentluce.

Solving mazes using Python: Simple recursivity and A* search

March 10, 2011

This post describes how to solve mazes using 2 algorithms implemented in Python: a simple recursive algorithm and the A* search algorithm.

Maze

The maze we are going to use in this article is 6 cells by 6 cells. The walls are colored in blue. The starting cell is at the bottom left (x=0 and y=0) colored in green. The ending cell is at the top right (x=5 and y=5) colored in green. We can only move horizontally or vertically 1 cell at a time.

Python algorithms for mazes

Recursive walk

We use a nested list of integers to represent the maze. The values are the following:

  • 0: empty cell
  • 1: unreachable cell: e.g. wall
  • 2: ending cell
  • 3: visited cell
grid = [[0, 0, 0, 0, 0, 1],
        [1, 1, 0, 0, 0, 1],
        [0, 0, 0, 1, 0, 0],
        [0, 1, 1, 0, 0, 1],
        [0, 1, 0, 0, 1, 0],
        [0, 1, 0, 0, 0, 2]]

This is a very simple algorithm which does the job even if it is not an efficient algorithm. It walks the maze recursively by visiting each cell and avoiding walls and already visited cells.

The search function accepts the coordinates of a cell to explore. If it is the ending cell, it returns True. If it is a wall or an already visited cell, it returns False. The neighboring cells are explored recursively and if nothing is found at the end, it returns False so it backtracks to explore new paths. We start at cell x=0 and y=0.

def search(x, y):
    if grid[x][y] == 2:
        print 'found at %d,%d' % (x, y)
        return True
    elif grid[x][y] == 1:
        print 'wall at %d,%d' % (x, y)
        return False
    elif grid[x][y] == 3:
        print 'visited at %d,%d' % (x, y)
        return False
    
    print 'visiting %d,%d' % (x, y)

    # mark as visited
    grid[x][y] = 3

    # explore neighbors clockwise starting by the one on the right
    if ((x < len(grid)-1 and search(x+1, y))
        or (y > 0 and search(x, y-1))
        or (x > 0 and search(x-1, y))
        or (y < len(grid)-1 and search(x, y+1))):
        return True

    return False

search(0, 0)

Let’s see what happens when we run the script.

$ python maze.py
visiting 0,0
wall at 1,0
visiting 0,1
wall at 1,1
visited at 0,0
visiting 0,2
...

First cell visited is (0,0). Its neighbors are explored starting by the one on the right (1,0). search(1,0) returns False because it is a wall. There is no cell below and on the left so the one at the top (0,1) is explored. Right of that is a wall and below is already visited so the one at the top (0,2) is explored. This is what we have so far:

Python algorithms for mazes

Because the neighbor on the right is explored first, this algorithm is going to explore the dead-end at the bottom-right first.

...
visiting 1,2
visiting 2,2
wall at 3,2
visiting 2,1
wall at 3,1
visiting 2,0
visiting 3,0
visiting 4,0
visiting 5,0
...

Python algorithms for mazes

The algorithm is going to backtrack because there is nothing else to explore as we are in a dead-end and we are going to end up at cell (1, 2) again where there is more to explore.

...
visited at 4,0
wall at 5,1
visited at 3,0
wall at 4,1
visited at 2,0
wall at 3,1
wall at 1,0
visited at 2,1
wall at 1,1
visited at 2,2
visited at 1,2
wall at 2,3
wall at 1,1
visited at 0,2
visiting 1,3
...

Python algorithms for mazes

Let’s continue, we end up in a second dead-end at cell (4, 2).

...
wall at 2,3
visited at 1,2
visiting 0,3
visited at 1,3
visited at 0,2
visiting 0,4
visiting 1,4
visiting 2,4
visiting 3,4
wall at 4,4
visiting 3,3
visiting 4,3
visiting 5,3
visiting 5,2
wall at 5,1
visiting 4,2
visited at 5,2
wall at 4,1
wall at 3,2
visited at 4,3
...

Python algorithms for mazes

Backtracking happens one more time to go back to cell (5, 3) and we are now on our way to the exit.

...
visiting 5,4
visited at 5,3
wall at 4,4
found at 5,5

Python algorithms for mazes

The full walk looks like this:

Python algorithms for mazes

A* search

We are going to look at a more sophisticated algorithm called A* search. This is based on costs to move around the grid. Let’s assume the cost to move horizontally or vertically 1 cell is equal to 10. Again, we cannot move diagonally here.

Before we start describing the algorithm, let’s define 2 variables: G and H. G is the cost to move from the starting cell to a given cell.

Python algorithms for mazes

H is an estimation of the cost to move from a given cell to the ending cell. How do we calculate that if we don’t know the path to the ending cell? To simplify, we just calculate the distance if no walls were present. There are other ways to do the estimation but this one is good enough for this example.

Python algorithms for mazes

We use 2 lists: an open list containing the cells to explore and a closed list containing the processed cells. We start with the starting cell in the open list and nothing in the closed list.

Let’s follow 1 round of this algorithm by processing our first cell from the open list. It is the starting cell. We remove it from the list and append it to the closed list. We retrieve the list of adjacent cells and we start processing them. The starting cell has 2 adjacent cells: (1, 0) and (0, 1). (1, 0) is a wall so we drop that one. (0, 1) is reachable and not in the closed list so we process it. We calculate G and H for it. G = 10 as we just need to move 1 cell up from the starting cell. H = 90: 5 cells right and 4 cells up to reach the ending cell. We call the sum F = G + H = 10 + 90 = 100. We set the parent of this adjacent cell to be the cell we just removed from the open list: e.g. (0, 0). Finally, we add this adjacent cell to the open list. This is what we have so far. The arrow represents the pointer to the parent cell.

Python algorithms for mazes

We continue with the cell in the open list having the lowest F = G + H. Only one cell is in the open list so it makes it easy. We remove it from the open list and we get its adjacent cells. Again, only one adjacent cell is reachable: (0, 2). We end up with the following after this 2nd round.

Python algorithms for mazes

3nd round result looks like this. Cells in green are in the open list. Cells in red are in the closed list.

Python algorithms for mazes

Let’s detail the next round. We have 2 cells in the open list: (1, 2) and (0, 3). Both have the same F value so we pick the last one added which is (0, 3). This cell has 3 reachable adjacent cells: (1, 3), (0, 2) and (0, 4). We process (1, 3) and (0, 4). (0, 2) is in the closed list so we don’t process that one again. We end up with:

Python algorithms for mazes

Let’s fast forward to:

Python algorithms for mazes

We have (1, 2), (1, 3) and (3, 3) in the open list. (1, 3) is processed next because it is the last one added with the lowest F value = 100. (1, 3) has 1 adjacent cell which is not in the closed list. It is (1, 2) which is in the open list. When an adjacent cell is in the open list, we check if its F value would be less if the path taken was going through the cell currently processed e.g. through (1, 3). Here it is not the case so we don’t update G and H of (1, 2) and its parent. This trick makes the algorithm more efficient when this condition exists.

Let’s take a break and look at a diagram representing the algorithm steps and conditions:

Python algorithms for mazes

We continue processing the cells remaining in the open list. Fast forward to:

Python algorithms for mazes

We have 2 cells in the open list: (3, 3) and (2, 0). The next cell removed from the open list is (3, 3) because its F is equal to 120. This proves that this algorithm is better than the first one we saw. We don’t end up exploring the dead end at (5, 0) and we continue walking from (3, 3) instead which is better.

Fast forward again to:

Python algorithms for mazes

The next cell processed from the open list is (5, 5) and it is the ending cell so we have found our path. It is easy to display the path. We just have to follow the parent pointers up to the starting cell. Our path is highlighted in green on the following diagram:

Python algorithms for mazes

You can read more about this algorithm here.

A* implementation

The basic object here is a cell so we write a class for it. We store the coordinates x and y, the values of G and H plus the sum F.

class Cell(object):
    def __init__(self, x, y, reachable):
        """
        Initialize new cell

        @param x cell x coordinate
        @param y cell y coordinate
        @param reachable is cell reachable? not a wall?
        """
        self.reachable = reachable
        self.x = x
        self.y = y
        self.parent = None
        self.g = 0
        self.h = 0
        self.f = 0

Next is our main class named AStar. Attributes are the open list heapified (keep cell with lowest F at the top), the closed list which is a set for fast lookup, the cells list (grid definition) and the size of the grid.

class AStar(object):
    def __init__(self):
        self.opened = []
        heapq.heapify(self.opened)
        self.closed = set()
        self.cells = []
        self.grid_height = 6
        self.grid_width = 6
  ...

We create a simple method initializing the list of cells to match our example with the walls at the same locations.

    def init_grid(self):
        walls = ((0, 5), (1, 0), (1, 1), (1, 5), (2, 3), 
                 (3, 1), (3, 2), (3, 5), (4, 1), (4, 4), (5, 1))
        for x in range(self.grid_width):
            for y in range(self.grid_height):
                if (x, y) in walls:
                    reachable = False
                else:
                    reachable = True
                self.cells.append(Cell(x, y, reachable))
        self.start = self.get_cell(0, 0)
        self.end = self.get_cell(5, 5)

Our heuristic compute method:

    def get_heuristic(self, cell):
        """
        Compute the heuristic value H for a cell: distance between
        this cell and the ending cell multiply by 10.

        @param cell
        @returns heuristic value H
        """
        return 10 * (abs(cell.x - self.end.x) + abs(cell.y - self.end.y))

We need a method to return a particular cell based on x and y coordinates.

    def get_cell(self, x, y):
        """
        Returns a cell from the cells list

        @param x cell x coordinate
        @param y cell y coordinate
        @returns cell
        """
        return self.cells[x * self.grid_height + y]

Next is a method to retrieve the list of adjacent cells to a specific cell.

    def get_adjacent_cells(self, cell):
        """
        Returns adjacent cells to a cell. Clockwise starting
        from the one on the right.

        @param cell get adjacent cells for this cell
        @returns adjacent cells list 
        """
        cells = []
        if cell.x < self.grid_width-1:
            cells.append(self.get_cell(cell.x+1, cell.y))
        if cell.y > 0:
            cells.append(self.get_cell(cell.x, cell.y-1))
        if cell.x > 0:
            cells.append(self.get_cell(cell.x-1, cell.y))
        if cell.y < self.grid_height-1:
            cells.append(self.get_cell(cell.x, cell.y+1))
        return cells

Simple method to print the path found. It follows the parent pointers to go from the ending cell to the starting cell.

    def display_path(self):
        cell = self.end
        while cell.parent is not self.start:
            cell = cell.parent
            print 'path: cell: %d,%d' % (cell.x, cell.y)

We need a method to calculate G and H and set the parent cell.

    def update_cell(self, adj, cell):
        """
        Update adjacent cell

        @param adj adjacent cell to current cell
        @param cell current cell being processed
        """
        adj.g = cell.g + 10
        adj.h = self.get_heuristic(adj)
        adj.parent = cell
        adj.f = adj.h + adj.g

The main method implements the algorithm itself.

    def process(self):
        # add starting cell to open heap queue
        heapq.heappush(self.opened, (self.start.f, self.start))
        while len(self.opened):
            # pop cell from heap queue 
            f, cell = heapq.heappop(self.opened)
            # add cell to closed list so we don't process it twice
            self.closed.add(cell)
            # if ending cell, display found path
            if cell is self.end:
                self.display_path()
                break
            # get adjacent cells for cell
            adj_cells = self.get_adjacent_cells(cell)
            for adj_cell in adj_cells:
                if adj_cell.reachable and adj_cell not in self.closed:
                    if (adj_cell.f, adj_cell) in self.opened:
                        # if adj cell in open list, check if current path is
                        # better than the one previously found for this adj
                        # cell.
                        if adj_cell.g > cell.g + 10:
                            self.update_cell(adj_cell, cell)
                    else:
                        self.update_cell(adj_cell, cell)
                        # add adj cell to open list
                        heapq.heappush(self.opened, (adj_cell.f, adj_cell))

You can checkout the code on GitHub: git clone https://laurentluce@github.com/laurentluce/python-algorithms.git.

That’s it for now. I hope you enjoyed the article. Please write a comment if you have any feedback.

Python threads synchronization: Locks, RLocks, Semaphores, Conditions, Events and Queues

February 5, 2011

This article describes the Python threading synchronization mechanisms in details. We are going to study the following types: Lock, RLock, Semaphore, Condition, Event and Queue. Also, we are going to look at the Python internals behind those mechanisms.

The source code of the programs below can be found at github.com/laurentluce/python-tutorials under threads/.

First, let’s look at a simple program using the threading module with no synchronization.

Threading

We want to write a program fetching the content of some URLs and writing it to a file. We could do it serially with no threads but to speed things up, we are going to create 2 threads processing half of the URLs each.

Note: The best way here would be to use a queue with the URLs to fetch but this example is more suitable to begin our tutorial.

The class FetchUrls is thread based and it takes a list of URLs to fetch and a file object to write the content to.

class FetchUrls(threading.Thread):
    """
    Thread checking URLs.
    """

    def __init__(self, urls, output):
        """
        Constructor.

        @param urls list of urls to check
        @param output file to write urls output
        """
        threading.Thread.__init__(self)
        self.urls = urls
        self.output = output
    
    def run(self):
        """
        Thread run method. Check URLs one by one.
        """
        while self.urls:
            url = self.urls.pop()
            req = urllib2.Request(url)
            try:
                d = urllib2.urlopen(req)
            except urllib2.URLError, e:
                print 'URL %s failed: %s' % (url, e.reason)
            self.output.write(d.read())
            print 'write done by %s' % self.name
            print 'URL %s fetched by %s' % (url, self.name)

The main function starts the 2 threads and then wait for them to finish.

def main():
    # list 1 of urls to fetch
    urls1 = ['http://www.google.com', 'http://www.facebook.com']
    # list 2 of urls to fetch
    urls2 = ['http://www.yahoo.com', 'http://www.youtube.com']
    f = open('output.txt', 'w+')
    t1 = FetchUrls(urls1, f)
    t2 = FetchUrls(urls2, f)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    f.close()

if __name__ == '__main__':
    main()

The issue is that both threads are going to write to the file at the same time, resulting in a big mess. We need to find a way to only have 1 thread writing to the file at a given time. To do that, one way is to use synchronization mechanisms like locks.

Lock

Locks have 2 states: locked and unlocked. 2 methods are used to manipulate them: acquire() and release(). Those are the rules:

  • if the state is unlocked: a call to acquire() changes the state to locked.
  • if the state is locked: a call to acquire() blocks until another thread calls release().
  • if the state is unlocked: a call to release() raises a RuntimeError exception.
  • if the state is locked: a call to release() changes the state to unlocked().

To solve our issue of 2 threads writing to the same file at the same time, we pass a lock to the FetchUrls constructor and we use it to protect the file write operation. I am just going to highlight the modifications relevant to locks. The source code can be found in threads/lock.py.

class FetchUrls(threading.Thread):
    ...

    def __init__(self, urls, output, lock):
        ...
        self.lock = lock
    
    def run(self):
        ...
        while self.urls:
            ...
            self.lock.acquire()
            print 'lock acquired by %s' % self.name
            self.output.write(d.read())
            print 'write done by %s' % self.name
            print 'lock released by %s' % self.name
            self.lock.release()
            ...

def main():
    ...
    lock = threading.Lock()
    ...
    t1 = FetchUrls(urls1, f, lock)
    t2 = FetchUrls(urls2, f, lock)
    ...

Let’s look at the program output when we run it:

$ python locks.py
lock acquired by Thread-2
write done by Thread-2
lock released by Thread-2
URL http://www.youtube.com fetched by Thread-2
lock acquired by Thread-1
write done by Thread-1
lock released by Thread-1
URL http://www.facebook.com fetched by Thread-1
lock acquired by Thread-2
write done by Thread-2
lock released by Thread-2
URL http://www.yahoo.com fetched by Thread-2
lock acquired by Thread-1
write done by Thread-1
lock released by Thread-1
URL http://www.google.com fetched by Thread-1

The write operation is now protected by a lock and we don’t have 2 threads writing to the file at the same time.

Let’s take a look at the Python internals. I am using Python 2.6.6 on Linux.

The method Lock() of the threading module is equal to thread.allocate_lock. You can see the code in Lib/threading.py.

Lock = _allocate_lock
_allocate_lock = thread.allocate_lock

The C implementation can be found in Python/thread_pthread.h. We assume that our system supports POSIX semaphores. sem_init() initializes the semaphore at the address pointed by lock. The initial value of the semaphore is 1 which means unlocked. The semaphore is shared between the threads of the process.

PyThread_type_lock
PyThread_allocate_lock(void)
{
    ...
    lock = (sem_t *)malloc(sizeof(sem_t));

    if (lock) {
        status = sem_init(lock,0,1);
        CHECK_STATUS("sem_init");
        ....
    }
    ...
}

When the acquire() method is called, the following C code is executed. waitflag is equal to 1 by default which means the call blocks until the lock is unlocked. sem_wait() decrements the semaphore’s value or blocks until the value is greater than 0 e.g. unlocked by another thread.

int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
    ...
    do {
        if (waitflag)
            status = fix_status(sem_wait(thelock));
        else
            status = fix_status(sem_trywait(thelock));
    } while (status == EINTR); /* Retry if interrupted by a signal */
    ....
}

When the release() method is called, the following C code is executed. sem_post() increments the semaphore’s value e.g. unlocks the semaphore.

void
PyThread_release_lock(PyThread_type_lock lock)
{
    ...
    status = sem_post(thelock);
    ...
}

You can also use the “with” statement. The Lock object can be used as a context manager. The advantage of using “with” is that the acquire() method will be called when the “with” block is entered and release() will be called when the block is exited. Let’s rewrite the class FetchUrls using the “with” statement.

class FetchUrls(threading.Thread):
    ...
    def run(self):
        ...
        while self.urls:
            ...
            with self.lock:
                print 'lock acquired by %s' % self.name
                self.output.write(d.read())
                print 'write done by %s' % self.name
                print 'lock released by %s' % self.name
            ...

RLock

RLock is a reentrant lock. acquire() can be called multiple times by the same thread without blocking. Keep in mind that release() needs to be called the same number of times to unlock the resource.

Using Lock, the second call to acquire() by the same thread will block:

lock = threading.Lock()
lock.acquire()
lock.acquire()

If you use RLock, the second call to acquire() won’t block.

rlock = threading.RLock()
rlock.acquire()
rlock.acquire()

RLock also uses thread.allocate_lock() but it keeps track of the owner thread to support the reentrant feature. Following is the RLock acquire() method implementation. If the thread calling acquire() is the owner of the resource then the counter is incremented by one. If not, it tries to acquire it. First time it acquires the lock, the owner is saved and the counter is initialized to 1.

def acquire(self, blocking=1):
    me = _get_ident()
    if self.__owner == me:
        self.__count = self.__count + 1
        ...
        return 1
    rc = self.__block.acquire(blocking)
    if rc:
        self.__owner = me
        self.__count = 1
        ...
    ...
    return rc

Let’s look at the RLock release() method. First is a check to make sure the thread calling the method is the owner of the lock. The counter is decremented and if it is equal to 0 then the resource is unlocked and available for grab by another thread.

def release(self):
    if self.__owner != _get_ident():
        raise RuntimeError("cannot release un-acquired lock")
    self.__count = count = self.__count - 1
    if not count:
        self.__owner = None
        self.__block.release()
        ...
    ...

Condition

This is a synchronization mechanism where a thread waits for a specific condition and another thread signals that this condition has happened. Once the condition happened, the thread acquires the lock to get exclusive access to the shared resource.

A good way to illustrate this mechanism is by looking at a producer/consumer example. The producer appends random integers to a list at random time and the consumer retrieves those integers from the list. The source code can be found in threads/condition.py.

Let’s look at the producer class. The producer acquires the lock, appends an integer, notifies the consumer thread that there is something to retrieve and release the lock. It does that forever with a random pause in between each append operation.

class Producer(threading.Thread):
    """
    Produces random integers to a list
    """

    def __init__(self, integers, condition):
        """
        Constructor.

        @param integers list of integers
        @param condition condition synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.condition = condition
    
    def run(self):
        """
        Thread run method. Append random integers to the integers list
        at random time.
        """
        while True:
            integer = random.randint(0, 256)
            self.condition.acquire()
            print 'condition acquired by %s' % self.name
            self.integers.append(integer) 
            print '%d appended to list by %s' % (integer, self.name)
            print 'condition notified by %s' % self.name
            self.condition.notify()
            print 'condition released by %s' % self.name
            self.condition.release()
            time.sleep(1)

Next is the consumer class. It acquires the lock, checks if there is an integer in the list, if there is nothing, it waits to be notified by the producer. Once the element is retrieved from the list, it releases the lock.

Note that a call to wait() releases the lock so the producer can acquire the resource and do its work.

class Consumer(threading.Thread):
    """
    Consumes random integers from a list
    """

    def __init__(self, integers, condition):
        """
        Constructor.

        @param integers list of integers
        @param condition condition synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.condition = condition
    
    def run(self):
        """
        Thread run method. Consumes integers from list
        """
        while True:
            self.condition.acquire()
            print 'condition acquired by %s' % self.name
            while True:
                if self.integers:
                    integer = self.integers.pop()
                    print '%d popped from list by %s' % (integer, self.name)
                    break
                print 'condition wait by %s' % self.name
                self.condition.wait()
            print 'condition released by %s' % self.name
            self.condition.release()

We need to write our main creating 2 threads and starting them:

def main():
    integers = []
    condition = threading.Condition()
    t1 = Producer(integers, condition)
    t2 = Consumer(integers, condition)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

if __name__ == '__main__':
    main()

The output of this program looks like this:

$ python condition.py
condition acquired by Thread-1
159 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
condition acquired by Thread-2
159 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
condition acquired by Thread-1
116 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
116 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2

Thread-1 appends 159 to the list then notifies the consumer and releases the lock. Thread-2 acquires the lock, retrieves 159 and releases the lock. The producer is still waiting at that time because of the time.sleep(1) so the consumer acquires the lock again then waits to get notified by the producer. When wait() is called, it unlocks the resource so the producer can acquire it and append a new integer to the list before notifying the consumer.

Let’s look at the Python internals for this condition synchronization mechanism. The condition’s constructor creates a RLock object if no existing lock is passed to the constructor. This lock will be used when acquire() and release() are called.

class _Condition(_Verbose):

    def __init__(self, lock=None, verbose=None):
        _Verbose.__init__(self, verbose)
        if lock is None:
            lock = RLock()
        self.__lock = lock

Next is the wait() method. We assume that we are calling wait() with no timeout value to simplify the explanation of the wait() method’s code. A new lock named waiter is created and the state is set to locked. The waiter lock is used for communication between the threads so the producer can notify the consumer by releasing this waiter lock. The lock object is added to the waiters list and the method is blocking at waiter.acquire(). Note that the condition lock state is saved at the beginning and restored when wait() returns.

def wait(self, timeout=None):
    ...
    waiter = _allocate_lock()
    waiter.acquire()
    self.__waiters.append(waiter)
    saved_state = self._release_save()
    try:    # restore state no matter what (e.g., KeyboardInterrupt)
        if timeout is None:
            waiter.acquire()
            ...
        ...
    finally:
        self._acquire_restore(saved_state)

The notify() method is used to release the waiter lock. The producer calls notify() to notify the consumer blocked on wait().

def notify(self, n=1):
    ...
    __waiters = self.__waiters
    waiters = __waiters[:n]
    ...
    for waiter in waiters:
        waiter.release()
        try:
            __waiters.remove(waiter)
        except ValueError:
            pass

You can also use the “with” statement with the Condition object so acquire() and release() are called for us. Let’s rewrite the producer class and the consumer class using “with”.

class Producer(threading.Thread):
    ...
    def run(self):
        while True:
            integer = random.randint(0, 256)
            with self.condition:
                print 'condition acquired by %s' % self.name
                self.integers.append(integer) 
                print '%d appended to list by %s' % (integer, self.name)
                print 'condition notified by %s' % self.name
                self.condition.notify()
                print 'condition released by %s' % self.name
            time.sleep(1)

class Consumer(threading.Thread):
    ... 
    def run(self):
        while True:
            with self.condition:
                print 'condition acquired by %s' % self.name
                while True:
                    if self.integers:
                        integer = self.integers.pop()
                        print '%d popped from list by %s' % (integer, self.name)
                        break
                    print 'condition wait by %s' % self.name
                    self.condition.wait()
                print 'condition released by %s' % self.name

Semaphore

A semaphore is based on an internal counter which is decremented each time acquire() is called and incremented each time release() is called. If the counter is equal to 0 then acquire() blocks. It is the Python implementation of the Dijkstra semaphore concept: P() and V(). Using a semaphore makes sense when you want to control access to a resource with limited capacity like a server.

Here is a simple example:

semaphore = threading.Semaphore()
semaphore.acquire()
# work on a shared resource
...
semaphore.release()

Let’s look at the Python internals. The constructor takes a value which is the counter initial value. This value defaults to 1. A condition instance is created with a lock to protect the counter and to notify the other thread when the semaphore is released.

class _Semaphore(_Verbose):
    ...    
    def __init__(self, value=1, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
        self.__value = value
        ...

Next is the acquire() method. If the semaphore’s counter is equal to 0, it blocks on the condition’s wait() method until it gets notified by a different thread. If the semaphore’s counter is greater than 0, it decrements the value.

def acquire(self, blocking=1):
    rc = False
    self.__cond.acquire()
    while self.__value == 0:
        ...
        self.__cond.wait()
    else:
        self.__value = self.__value - 1
        rc = True
    self.__cond.release()
    return rc

The semaphore’s release() method increments the counter and then notifies the other thread.

def release(self):
    self.__cond.acquire()
    self.__value = self.__value + 1
    self.__cond.notify()
    self.__cond.release()

Note that there is also a bounded semaphore you can use to make sure you never call release() too many times. Here is the Python internal code use for it:

class _BoundedSemaphore(_Semaphore):
    """Semaphore that checks that # releases is <= # acquires"""
    def __init__(self, value=1, verbose=None):
        _Semaphore.__init__(self, value, verbose)
        self._initial_value = value

    def release(self):
        if self._Semaphore__value >= self._initial_value:
            raise ValueError, "Semaphore released too many times"
        return _Semaphore.release(self)

You can also use the “with” statement with the Semaphore object so acquire() and release() are called for us.

semaphore = threading.Semaphore()
with semaphore:
  # work on a shared resource
  ...

Event

This is a simple mechanism. A thread signals an event and the other thread(s) wait for it.

Let’s go back to our producer and consumer example and convert it to use an event instead of a condition. The source code can be found in threads/event.py.

First the producer class. We pass an Event instance to the constructor instead of a Condition instance. Each time an integer is added to the list, the event is set then cleared right away to notify the consumer. The event instance is cleared by default.

class Producer(threading.Thread):
    """
    Produces random integers to a list
    """

    def __init__(self, integers, event):
        """
        Constructor.

        @param integers list of integers
        @param event event synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.event = event
    
    def run(self):
        """
        Thread run method. Append random integers to the integers list
        at random time.
        """
        while True:
            integer = random.randint(0, 256)
            self.integers.append(integer) 
            print '%d appended to list by %s' % (integer, self.name)
            print 'event set by %s' % self.name
            self.event.set()
            self.event.clear()
            print 'event cleared by %s' % self.name
            time.sleep(1)

Next is the consumer class. We also pass an Event instance to the constructor. The consumer instance is blocking on wait() until the event is set indicating that there is an integer to consume.

class Consumer(threading.Thread):
    """
    Consumes random integers from a list
    """

    def __init__(self, integers, event):
        """
        Constructor.

        @param integers list of integers
        @param event event synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.event = event
    
    def run(self):
        """
        Thread run method. Consumes integers from list
        """
        while True:
            self.event.wait()
            try:
                integer = self.integers.pop()
                print '%d popped from list by %s' % (integer, self.name)
            except IndexError:
                # catch pop on empty list
                time.sleep(1)

This is the output when we run the program. Thread-1 appends 124 to the list and then set the event to notify the consumer. The consumer’s call to wait() stops blocking and the integer is retrieved from the list.

$ python event.py 
124 appended to list by Thread-1
event set by Thread-1
event cleared by Thread-1
124 popped from list by Thread-2
223 appended to list by Thread-1
event set by Thread-1
event cleared by Thread-1
223 popped from list by Thread-2

Let’s look at the Python internals. First is the Event constructor. A condition instance is created with a lock to protect the event flag value and to notify the other thread when the event has been set.

class _Event(_Verbose):
    def __init__(self, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
        self.__flag = False

Following is the set() method. It sets the flag to True and notifies the other threads. The condition object is used to protect the critical part when the flag’s value is changed.

def set(self):
    self.__cond.acquire()
    try:
        self.__flag = True
        self.__cond.notify_all()
    finally:
        self.__cond.release()

Its opposite is the clear() method setting the flag to False.

def clear(self):
    self.__cond.acquire()
    try:
        self.__flag = False
    finally:
        self.__cond.release()

The wait() method blocks until the set method is called. The wait() method does nothing if the flag is set.

def wait(self, timeout=None):
    self.__cond.acquire()
    try:
        if not self.__flag:
            self.__cond.wait(timeout)
    finally:
        self.__cond.release()

Queue

Queues are a great mechanism when we need to exchange information between threads as it takes care of locking for us.

We are interested in the following 4 Queue methods:

  • put: Put an item to the queue.
  • get: Remove and return an item from the queue.
  • task_done: Needs to be called each time an item has been processed.
  • join: Blocks until all items have been processed.

Let’s convert our producer/consumer program to use a queue. The source code can be found in threads/queue.py.

First the producer class. We don’t need to pass the integers list because we are using the queue to store the integers generated. The thread generates and puts the integers in the queue in a forever loop.

class Producer(threading.Thread):
    """
    Produces random integers to a list
    """

    def __init__(self, queue):
        """
        Constructor.

        @param integers list of integers
        @param queue queue synchronization object
        """
        threading.Thread.__init__(self)
        self.queue = queue
    
    def run(self):
        """
        Thread run method. Append random integers to the integers list at
        random time.
        """
        while True:
            integer = random.randint(0, 256)
            self.queue.put(integer) 
            print '%d put to queue by %s' % (integer, self.name)
            time.sleep(1)

Next is our consumer class. The thread gets the integer from the queue and indicates that it is done working on it using task_done().

class Consumer(threading.Thread):
    """
    Consumes random integers from a list
    """

    def __init__(self, queue):
        """
        Constructor.

        @param integers list of integers
        @param queue queue synchronization object
        """
        threading.Thread.__init__(self)
        self.queue = queue
    
    def run(self):
        """
        Thread run method. Consumes integers from list
        """
        while True:
            integer = self.queue.get()
            print '%d popped from list by %s' % (integer, self.name)
            self.queue.task_done()

Here is the output of the program.

$ python queue.py 
61 put to queue by Thread-1
61 popped from list by Thread-2
6 put to queue by Thread-1
6 popped from list by Thread-2

The Queue module takes care of locking for us which is a great advantage. It is interesting to look at the Python internals to understand how the locking mechanism works underneath.

The Queue constructor creates a lock to protect the queue when an element is added or removed. Some conditions objects are created to notify events like the queue is not empty (get() call stops blocking), queue is not full (put() call stops blocking) and all items have been processed (join() call stops blocking).

class Queue:
    def __init__(self, maxsize=0):
        ...
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0

The put() method adds an item or waits if the queue is full. It notifies the threads blocked on get() that the queue is not empty. See above for an explanation on the Condition object for more details.

def put(self, item, block=True, timeout=None):
    ...
    self.not_full.acquire()
    try:
        if self.maxsize > 0:
            ...
            elif timeout is None:
                while self._qsize() == self.maxsize:
                    self.not_full.wait()
        self._put(item)
        self.unfinished_tasks += 1
        self.not_empty.notify()
    finally:
        self.not_full.release()

The get() method removes an element from the queue or waits if the queue is empty. It notifies the threads blocked on put() that the queue is not full.

def get(self, block=True, timeout=None):
    ...
    self.not_empty.acquire()
    try:
        ...
        elif timeout is None:
            while not self._qsize():
                self.not_empty.wait()
        item = self._get()
        self.not_full.notify()
        return item
    finally:
        self.not_empty.release()

When the method task_done() is called, the number of unfinished tasks is decremented. If the counter is equal to 0 then the threads waiting on the queue join() method continue their execution.

def task_done(self):
    self.all_tasks_done.acquire()
    try:
        unfinished = self.unfinished_tasks - 1
        if unfinished <= 0:
            if unfinished < 0:
                raise ValueError('task_done() called too many times')
            self.all_tasks_done.notify_all()
        self.unfinished_tasks = unfinished
    finally:
        self.all_tasks_done.release()

def join(self):
    self.all_tasks_done.acquire()
    try:
        while self.unfinished_tasks:
            self.all_tasks_done.wait()
    finally:
        self.all_tasks_done.release()

That’s it for now. I hope you enjoyed this article. Please write a comment if you have any feedback. If you need help with a project written in Python or with building a new web service, I am available as a freelancer: LinkedIn profile. Follow me on Twitter @laurentluce.

OpenStack Nova internals of instance launching

January 30, 2011

This article describes the internals of launching an instance in OpenStack Nova.

Overview

Launching a new instance involves multiple components inside OpenStack Nova:

  • API server: handles requests from the user and relays them to the cloud controller.
  • Cloud controller: handles the communication between the compute nodes, the networking controllers, the API server and the scheduler.
  • Scheduler: selects a host to run a command.
  • Compute worker: manages computing instances: launch/terminate instance, attach/detach volumes…
  • Network controller: manages networking resources: allocate fixed IP addresses, configure VLANs…

Note: There are more components in Nova like the authentication manager, the object store and the volume controller but we are not going to study them as we are focusing on instance launching in this article.

The flow of launching an instance goes like this: The API server receives a run_instances command from the user. The API server relays the message to the cloud controller (1). Authentication is performed to make sure this user has the required permissions. The cloud controller sends the message to the scheduler (2). The scheduler casts the message to a random host and asks him to start a new instance (3). The compute worker on the host grabs the message (4). The compute worker needs a fixed IP to launch a new instance so it sends a message to the network controller (5,6,7,8). The compute worker continues with spawning a new instance. We are going to see all those steps in details next.

API

You can use the OpenStack API or EC2 API to launch a new instance. We are going to use the EC2 API. We add a new key pair and we use it to launch an instance of type m1.tiny.

cd /tmp/
euca-add-keypair test > test.pem
euca-run-instances -k test -t m1.tiny ami-tiny

run_instances() in api/ec2/cloud.py is called which results in compute API create() in compute/API.py being called.

def run_instances(self, context, **kwargs):
  ...
  instances = self.compute_api.create(context,
            instance_type=instance_types.get_by_type(
                kwargs.get('instance_type', None)),
            image_id=kwargs['image_id'],
            ...

Compute API create() does the following:

  • Check if the maximum number of instances of this type has been reached.
  • Create a security group if it doesn’t exist.
  • Generate MAC addresses and hostnames for the new instances.
  • Send a message to the scheduler to run the instances.

Cast

Let’s pause for a minute and look at how the message is sent to the scheduler. This type of message delivery in OpenStack is defined as RPC casting. RabbitMQ is used here for delivery. The publisher (API) sends the message to a topic exchange (scheduler topic). A consumer (Scheduler worker) retrieves the message from the queue. No response is expected as it is a cast and not a call. We will see call later.

Here is the code casting that message:

LOG.debug(_("Casting to scheduler for %(pid)s/%(uid)s's"
        " instance %(instance_id)s") % locals())
rpc.cast(context,
         FLAGS.scheduler_topic,
         {"method": "run_instance",
          "args": {"topic": FLAGS.compute_topic,
                   "instance_id": instance_id,
                   "availability_zone": availability_zone}})

You can see that the scheduler topic is used and the message arguments indicates what we want the scheduler to use for its delivery. In this case, we want the scheduler to send the message using the compute topic.

Scheduler

The scheduler receives the message and sends the run_instance message to a random host. The chance scheduler is used here. There are more scheduler types like the zone scheduler (pick a random host which is up in a specific availability zone) or the simple scheduler (pick the least loaded host). Now that a host has been selected, the following code is executed to send the message to a compute worker on the host.

rpc.cast(context,
         db.queue_get_for(context, topic, host),
         {"method": method,
          "args": kwargs})
LOG.debug(_("Casting to %(topic)s %(host)s for %(method)s") % locals())

Compute

The Compute worker receives the message and the following method in compute/manager.py is called:

def run_instance(self, context, instance_id, **_kwargs):
  """Launch a new instance with specified options."""
  ...

run_instance() does the following:

  • Check if the instance is already running.
  • Allocate a fixed IP address.
  • Setup a VLAN and a bridge if not already setup.
  • Spawn the instance using the virtualization driver.

Call to network controller

A RPC call is used to allocate a fixed IP. A RPC call is different than a RPC cast because it uses a topic.host exchange meaning that a specific host is targeted. A response is also expected.

Spawn instance

Next is the instance spawning process performed by the virtualization driver. libvirt is used in our case. The code we are going to look at is located in virt/libvirt_conn.py.

First thing that needs to be done is the creation of the libvirt xml to launch the instance. The to_xml() method is used to retrieve the xml content. Following is the XML for our instance.

<domain type='qemu'>
    <name>instance-00000001</name>
    <memory>524288</memory>
    <os>
        <type>hvm</type>
        <kernel>/opt/novascript/trunk/nova/..//instances/instance-00000001/kernel</kernel>
        <cmdline>root=/dev/vda console=ttyS0</cmdline>
        <initrd>/opt/novascript/trunk/nova/..//instances/instance-00000001/ramdisk</initrd>
    </os>
    <features>
        <acpi/>
    </features>
    <vcpu>1</vcpu>
    <devices>
        <disk type='file'>
            <driver type='qcow2'/>
            <source file='/opt/novascript/trunk/nova/..//instances/instance-00000001/disk'/>
            <target dev='vda' bus='virtio'/>
        </disk>
        <interface type='bridge'>
            <source bridge='br100'/>
            <mac address='02:16:3e:17:35:39'/>
            <!--   <model type='virtio'/>  CANT RUN virtio network right now -->
            <filterref filter="nova-instance-instance-00000001">
                <parameter name="IP" value="10.0.0.3" />
                <parameter name="DHCPSERVER" value="10.0.0.1" />
                <parameter name="RASERVER" value="fe80::1031:39ff:fe04:58f5/64" />
                <parameter name="PROJNET" value="10.0.0.0" />
                <parameter name="PROJMASK" value="255.255.255.224" />
                <parameter name="PROJNETV6" value="fd00::" />
                <parameter name="PROJMASKV6" value="64" />
            </filterref>
        </interface>

        <!-- The order is significant here.  File must be defined first -->
        <serial type="file">
            <source path='/opt/novascript/trunk/nova/..//instances/instance-00000001/console.log'/>
            <target port='1'/>
        </serial>

        <console type='pty' tty='/dev/pts/2'>
            <source path='/dev/pts/2'/>
            <target port='0'/>
        </console>

        <serial type='pty'>
            <source path='/dev/pts/2'/>
            <target port='0'/>
        </serial>

    </devices>
</domain>

The hypervisor used is qemu. The memory allocated for the guest will be 524 kbytes. The guest OS will boot from a kernel and initrd stored on the host OS.

Number of virtual CPUs allocated for the guest OS is 1. ACPI is enabled for power management.

Multiple devices are defined:

  • The disk image is a file on the host OS using the driver qcow2. qcow2 is a qemu disk image copy-on-write format.
  • The network interface is a bridge visible to the guest. We define network filtering parameters like IP which means this interface will always use 10.0.0.3 as the source IP address.
  • Device logfile. All data sent to the character device is written to console.log.
  • Pseudo TTY: virsh console can be used to connect to the serial port locally.

Next is the preparation of the network filtering. The firewall driver used by default is iptables. The rules are defined in apply_ruleset() in the class IptablesFirewallDriver. Let’s take a look at the firewall chains and rules for this instance.

*filter
...
:nova-ipv4-fallback - [0:0]
:nova-local - [0:0]
:nova-inst-1 - [0:0]
:nova-sg-1 - [0:0]
-A nova-ipv4-fallback -j DROP
-A FORWARD -j nova-local
-A nova-local -d 10.0.0.3 -j nova-inst-1
-A nova-inst-1 -m state --state INVALID -j DROP
-A nova-inst-1 -m state --state ESTABLISHED,RELATED -j ACCEPT
-A nova-inst-1 -j nova-sg-1
-A nova-inst-1 -s 10.1.3.254 -p udp --sport 67 --dport 68
-A nova-inst-1 -j nova-ipv4-fallback
-A nova-sg-1 -p tcp -s 10.0.0.0/27 -m multiport --dports 1:65535 -j ACCEPT
-A nova-sg-1 -p udp -s 10.0.0.0/27 -m multiport --dports 1:65535 -j ACCEPT
-A nova-sg-1 -p icmp -s 10.0.0.0/27 -m icmp --icmp-type 1/65535 -j ACCEPT
COMMIT

First you have the chains: nova-local, nova-inst-1, nova-sg-1, nova-ipv4-fallback and then the rules.

Let’s look at the different chains and rules:

Packets routed through the virtual network are handled by the chain nova-local.

-A FORWARD -j nova-local

If the destination is 10.0.0.3 then it is for our instance so we jump to the chain nova-inst-1.

-A nova-local -d 10.0.0.3 -j nova-inst-1

If the packet could not be identified, drop it.

-A nova-inst-1 -m state --state INVALID -j DROP

If the packet is associated with an established connection or is starting a new connection but associated with an existing connection, accept it.

-A nova-inst-1 -m state --state ESTABLISHED,RELATED -j ACCEPT

Allow DHCP responses.

-A nova-inst-1 -s 10.0.0.254 -p udp --sport 67 --dport 68

Jump to the security group chain to check the packet against its rules.

-A nova-inst-1 -j nova-sg-1

Security group chain. Accept all TCP packets from 10.0.0.0/27 and ports 1 to 65535.

-A nova-sg-1 -p tcp -s 10.0.0.0/27 -m multiport --dports 1:65535 -j ACCEPT

Accept all UDP packets from 10.0.0.0/27 and ports 1 to 65535.

-A nova-sg-1 -p udp -s 10.0.0.0/27 -m multiport --dports 1:65535 -j ACCEPT

Accept all ICMP packets from 10.0.0.0/27 and ports 1 to 65535.

-A nova-sg-1 -p icmp -s 10.0.0.0/27 -m icmp --icmp-type 1/65535 -j ACCEPT

Jump to fallback chain.

-A nova-inst-1 -j nova-ipv4-fallback

This is the fallback chain’s rule where we drop the packet.

-A nova-ipv4-fallback -j DROP

Here is an example of a packet for a new TCP connection to 10.0.0.3:

Following the firewall rules preparation is the image creation. This happens in _create_image().

def _create_image(self, inst, libvirt_xml, suffix='', disk_images=None):
  ...

In this method, libvirt.xml is created based on the XML we generated above.

A copy of the ramdisk, initrd and disk images are made for the hypervisor to use.

If the flat network manager is used then a network configuration is injected into the guest OS image. We are using the VLAN manager in this example.

The instance’s SSH key is injected into the image. Let’s look at this part in more details. The disk inject_data() method is called.

disk.inject_data(basepath('disk'), key, net,
                 partition=target_partition,
                 nbd=FLAGS.use_cow_images)

basepath(‘disk’) is where the instance’s disk image is located on the host OS. key is the SSH key string. net is not set in our case because we don’t inject a networking configuration. partition is None because we are using a kernel image otherwise we could use a partitioned disk image. Let’s look inside inject_data().

First thing happening here is linking the image to a device. This happens in _link_device().

device = _allocate_device()
utils.execute('sudo qemu-nbd -c %s %s' % (device, image))
# NOTE(vish): this forks into another process, so give it a chance
#             to set up before continuuing
for i in xrange(10):
    if os.path.exists("/sys/block/%s/pid" % os.path.basename(device)):
        return device
    time.sleep(1)
raise exception.Error(_('nbd device %s did not show up') % device)

_allocate_device() returns the next available ndb device: /dev/ndbx where x is between 0 and 15. qemu-nbd is a QEMU disk network block device server. Once this is done, we get the device, let say: /dev/ndb0.

We disable filesystem check for this device. mapped_device here is “/dev/ndb0″.

out, err = utils.execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)

We mount the file system to a temporary directory and we add the SSH key to the ssh authorized_keys file.

sshdir = os.path.join(fs, 'root', '.ssh')
utils.execute('sudo mkdir -p %s' % sshdir)  # existing dir doesn't matter
utils.execute('sudo chown root %s' % sshdir)
utils.execute('sudo chmod 700 %s' % sshdir)
keyfile = os.path.join(sshdir, 'authorized_keys')
utils.execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')

In the code above, fs is the temporary directory.

Finally, we unmount the filesystem and unlink the device. This concludes the image creation and setup.

Next step in the virtualization driver spawn() method is the instance launch itself using the driver createXML() binding. Following that is the firewall rules apply step.

That’s it for now. I hope you enjoyed this article. Please write a comment if you have any feedback. If you need help with a project written in Python or with building a new web service, I am available as a freelancer: LinkedIn profile. Follow me on Twitter @laurentluce.

Cars monitoring client/server application using Python Twisted

January 23, 2011

This article describes how to use Twisted to build a client/server cars monitoring system. We are going to focus on the client/server communication.

The client and server source code can be retrieved using Git:

git clone https://github.com/laurentluce/twisted-examples.git

Overview

Some researchers invented a system capable of monitoring cars and detecting the brand and color of a car. We are in charge of building a client/server software solution to retrieve the list of cars from different monitoring locations.

First element is a server used to monitor the cars and listen for clients connections to reply with the list of cars. Second element is the client retrieving the list of cars from the servers. We will use the Deferred feature to handle completion and failures callbacks.

Twisted is an asynchronous networking framework. It uses an event loop called the reactor. When this loop detects an event, it uses callbacks to report those events. Events include: connection made, data received, connection lost…

Server

The server listens for connection and write the cars data when a connection is initiated.

First, we have our server application class. The constructor takes care of the following:

  • Logging facility initialization.
  • Cars list initialization.
  • Create a server protocol factory. This factory produces a protocol instance for each connection.
  • Start a thread monitoring cars.

The class has a method named “listen” to start listening for new TCP connections on a specific host and port.

class TrafficServer(object):
    """
    Server main class
    """
    def __init__(self):
        """
        Constructor
        """
        # init logging facility: log to client.log
        logging.basicConfig(filename='server.log', level=logging.DEBUG)
        # cars list
        self.cars = []
        # server listening interface
        self.interface = 'server1.monitoring.com'
        # server port number
        self.port = 8000
        # Factory class for connections
        self.factory = TrafficFactory(self.cars)
        # Thread monitoring for new cars
        self.watchcars = WatchCars(self.cars)
        self.watchcars.start()
 
    def listen(self):
        """
        Call reactor's listen to listen for client's connections
        """
        port = reactor.listenTCP(self.port or 0, self.factory,
            interface=self.interface)

This is our server factory class creating protocol instances each time a connection is made. We pass the list of cars so it can be accessed using the factory in the protocol instance which we are going to see next.

class TrafficFactory(ServerFactory):
    """
    Factory to create protocol instances.
    """
        
    protocol = TrafficProtocol

    def __init__(self, cars):
        """
        Constructor.

        @param cars cars list
        """
        logging.debug('Traffic factory init')
        self.cars = cars

Next is the protocol class itself. The Protocol class implements connectionMade() which is called when a new connection is made. We are going to use this callback method to write the cars data to the client.

class TrafficProtocol(Protocol):
    """
    Protocol class to handle data between the client and the server.
    """

    def connectionMade(self):
        """
        Callback when a connection is made. Write cars data to the client then
        close the connection.
        """
        logging.debug('Connection made')
        data = '.'.join(self.factory.cars)
        self.transport.write(data)
        self.transport.loseConnection()

Here is the flow on the server side:

The WatchCars thread watches for new cars and append them to the cars list. Assume that get_next_car() is a blocking call returning the cars one by one.

class WatchCars(Thread):
    """
    Thread monitoring the cars.
    """

    def __init__(self, cars):
        """
        Constructor.

        @param cars cars list
        """
        Thread.__init__(self)
        self.cars = cars
    
    def run(self):
        """
        Thread run. Get new cars and add them to the cars list.
        """
        while True:
            t, brand, color = get_next_car()
            self.cars.append('%s:%s:%s' % (t, brand, color))

Finally, we have a simple main() function creating an instance of the server and starting the reactor loop.

def main():
    server = TrafficServer()
    server.listen()
    reactor.run()

if __name__ == '__main__':
    main()

Let’s start the server alone (server.py) and look at the logging output:

python server.py &

cat server.log
DEBUG:root:Traffic server init
DEBUG:root:Traffic factory init
DEBUG:root:Watch cars thread init
DEBUG:root:Watch cars thread run
DEBUG:root:Traffic server listen

We can see the server initializing the factory to instantiate protocol objects each time a connection is made. The server is listening for connections from the client.

Client

The client retrieves the list of cars from the different servers.

First is our client class doing the following:

  • Logging facility initialization.
  • Initialize the Deferred object to handle callbacks and failures.
  • Create a client protocol factory. This factory produces instances of our protocol each time a connection is made.
  • Cars list initialization.
  • Servers addresses list initialization.
class TrafficClient(object):
    """
    Client
    """
    def __init__(self):
        """
        Constructor
        """
        # init logging facility: log to client.log
        logging.basicConfig(filename='client.log', level=logging.DEBUG)
        # init deferred object to handle callbacks and failures
        self.deferred = defer.Deferred()
        # init factory to create protocol instances
        self.factory = TrafficClientFactory(d)
        # list of cars
        self.cars = []
        # keep track of servers replying so we know when the overall work
        # is finished
        self.addr_count = 0
        # list of servers to get cars list from
        self.addresses = [('server1.monitoring.com', 8000),
            ('server2.monitoring.com', 8000)]
        logging.debug('Init traffic client')
    ... 

Let’s look at the methods of the client class.

First is get_cars() which retrieves the list of cars from a server. It uses the reactor connectTCP() method to initiate a connection to the server. We will see later how we detect that we received data from he server. The deferred object allow us to register success and failure callbacks instead of handling the exception ourselves. We are going to register some callbacks in the main loop.

    def get_cars(self, host, port):
        """
        Connect to server to retrieve list of cars

        @param host server's hostname
        @param port server's port
        """
        reactor.connectTCP(host, port, self.factory)

Next is the main loop used to retrieve the list of cars from all the servers. We also register callbacks for when the list is returned and also to handle errors. The addCallbacks() method allow us to specify both. We also register a done method to be called no matter what happens. We are going to see those callback methods next.

    def update_cars(self):
        """
        Retrieve list of cars from all servers. Set callbacks to handle
        success and failure.
        """
        for address in self.addresses:
            host, port = address
            self.get_cars(host, port)
            self.deferred.addCallbacks(self.got_cars, self.get_cars_failed)
            self.deferred.addBoth(self.cars_done)

The method got_cars() is called when we are done receiving the data from the server. It is called by the protocol instance handling the data between the client and the server. We are going to see the factory and the protocol classes later.

    def got_cars(self, cars):
        """
        Callback when cars retrieval is successful

        @param cars data returned by server
        """
        logging.debug('Got cars: %s' % cars)
        self.cars.extend(cars)

The method get_cars_failed() is called when an error happens in the reactor loop.

    def get_cars_failed(self, err):
        """
        Callback when retrieval from server failed. Log error.

        @param err server error
        """
        logging.debug('Get cars failed: %s' % err)

The method cars_done() is called when all servers cars list have been retrieved. We also tell the reactor to stop.

    def cars_done(self, cars):
        """
        Callback when retrieval operation is finished for all servers.
        Log cars list and stop Twisted reactor loop which is listening to events
        """
        self.addr_count += 1
        if self.addr_count == len(addresses):
            logging.debug('Cars done: %s' % self.cars)
            reactor.stop()

Next is our protocol class to handle the data between the client and the server. We need to specify a method to be called when some data is received. We also specify a method to be called when the connection is lost. This happens normally when the server closes the connection after sending the list of cars.

class TrafficProtocol(Protocol):
    """
    Protocol class to handle data between the client and the server.
    """

    data = ''

    def dataReceived(self, data):
        """
        Callback when some data is received from server.

        @param data data received from server
        """
        logging.debug('Data received: %s' % data)
        self.data += data

    def connectionLost(self, reason):
        """
        Callback when connection is lost with server. At that point, the
        cars have been receieved.

        @param reason failure object
        """
        logging.debug('Connection lost: %s' % reason)
        self.cars = []
        for c in self.data.split('.'):
            self.cars.append(c)
        self.carsReceived(self.cars)

    def carsReceived(self, cars):
        """
        Called when the cars data are received.

        @param cars data received from the server
        """
        self.factory.get_cars_finished(cars)

We need to create a factory class to produce protocol instances. The method get_cars_finished() is called by the protocol instance when the connection is lost with the server. We also define clientConnectionFailed() to handle connection errors. Note how we use the deferred callbacks methods registered by the client class.

class TrafficClientFactory(ClientFactory):
    """
    Factory to create protocol instances
    """

    protocol = TrafficProtocol

    def __init__(self, deferred):
        """
        Constructor.

        @param deferred callbacks to handle completion and failures
        """
        self.deferred = deferred

    def get_cars_finished(self, cars):
        """
        Callback when the cars data is retrieved from the server successfully

        @param cars data received from the server
        """
        if self.deferred:
            d, self.deferred = self.deferred, None
            d.callback(cars)

    def clientConnectionFailed(self, connector, reason):
        """
        Callback when connection fails

        @param connector connection object.
        @param reason failure object
        """
        if self.deferred:
            d, self.deferred = self.deferred, None
            d.errback(reason)

Here is the flow on the client side:

Our simple main() function instantiates a client object and starts the reactor loop.

def main():
    client = TrafficClient()
    client.update_cars()

    reactor.run()

if __name__ == '__main__':
    main()

Let’s start the client alone (client.py) and look at the logging output:

python client.py &

cat client.log
DEBUG:root:Traffic client init
DEBUG:root:Traffic client factory init: <Deferred at 0x21945a8>
DEBUG:root:Update cars
DEBUG:root:Get cars: server1.monitoring.com - 8000
DEBUG:root:Get cars: server2.monitoring.com - 8000
DEBUG:root:Client connection failed: <twisted.internet.tcp.Connector instance at 0x2a28638> - [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionRefusedError'>: Connection was refused by other side: 111: Connection refused.
]
DEBUG:root:Get cars failed: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionRefusedError'>: Connection was refused by other side: 111: Connection refused.
]
DEBUG:root:Client connection failed: <twisted.internet.tcp.Connector instance at 0x2a28668> - [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionRefusedError'>: Connection was refused by other side: 111: Connection refused.
]
DEBUG:root:Get cars failed: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionRefusedError'>: Connection was refused by other side: 111: Connection refused.
]
DEBUG:root:Cars done: []

The client tries to connect to server1.monitoring.com and server2.monitoring.com. clientConnectionFailed() is called because there is no server listening. This is expected behavior. This results in calling get_cars_failed() followed by cars_done() as it is the callbacks chain we define for the Deferred object.

Testing

Let’s start 2 servers and 1 client and see what happens:

python server.py &

cat server.log
DEBUG:root:Traffic server init
DEBUG:root:Traffic factory init
DEBUG:root:Watch cars thread init
DEBUG:root:Watch cars thread run
DEBUG:root:Traffic server listen
DEBUG:root:Connection made

On the server side, we can see that a connection is made from the client. The server writes data to the client and closes the connection.

python client.py &

cat client.log
DEBUG:root:Traffic client init
DEBUG:root:Traffic client factory init: <Deferred at 0x24975a8>
DEBUG:root:Update cars
DEBUG:root:Get cars: server1.monitoring.com - 8000
DEBUG:root:Get cars: server2.monitoring.com - 8000
DEBUG:root:Data received: 97264836:peugeot:red
DEBUG:root:Connection lost: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionDone'>: Connection was closed cleanly.
]
DEBUG:root:Cars received: ['97264836:peugeot:red']
DEBUG:root:Get cars finished: ['97264836:peugeot:red']
DEBUG:root:Got cars: ['97264836:peugeot:red']
DEBUG:root:Cars done: ['97264836:peugeot:red', '97264846:renault:green']
DEBUG:root:Data received: 97264836:renault:green
DEBUG:root:Connection lost: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionDone'>: Connection was closed cleanly.
]
DEBUG:root:Cars received: ['97264846:renault:green']
DEBUG:root:Get cars finished: ['97264846:renault:green']
DEBUG:root:Got cars: ['97264846:renault:green']
DEBUG:root:Cars done: ['97264836:peugeot:red', '97264846:renault:green']

On the client side, 2 connections are made. 1 to server1.monitoring.com and 1 to server2.monitoring.com. ’97264836:peugeot:red’ is received from server1 and ’97264846:renault:green’ is received from server2.

That’s it for now. I hope you enjoyed this article. Please write a comment if you have any feedback.

OpenStack Nova nova.sh script explained

January 14, 2011

Update 11/24/2011: Updated article based on the latest nova.sh script.

This article describes the internals of the script nova.sh used to get the OpenStack Nova source code, install it and run it. Nova is a cloud computing fabric controller, the main part of an IaaS system.

The script can be retrieved using Git:

git clone https://github.com/vishvananda/novascript.git

Arguments

The script takes 1 mandatory argument and 2 optional arguments:

  • command: “branch”, “install”, “run”, “terminate”, “clean”, “scrub”.
  • source branch (branch command only): default to “lp:nova” which is the location of the source code on Launchpad.
  • install directory: default to “nova”

Note: You will need to use sudo to run the script.

Initialization

The arguments are grabbed from the command line or set to their defaults:

CMD=$1
if [ "$CMD" = "branch" ]; then
    SOURCE_BRANCH=${2:-lp:nova}
    DIRNAME=${3:-nova}
else
    DIRNAME=${2:-nova}
fi

By default, sqlite will be used but you can use MySQL instead by setting the env variable USE_MYSQL and MYSQL_PASS.

USE_MYSQL=${USE_MYSQL:-0}
MYSQL_PASS=${MYSQL_PASS:-nova}

Next is the interface used as the public interface and the VLAN interface in the nova configuration file.

INTERFACE=${INTERFACE:-eth0}

Floating IP addresses are used for HA. One VM can grab a floating IP address as it is taking over.

FLOATING_RANGE=${FLOATING_RANGE:-10.6.0.0/27}

Fixed IP addresses are attached to the different interfaces.

FIXED_RANGE=${FIXED_RANGE:-10.0.0.0/24}

You can set the path where instances data are stored:

INSTANCES_PATH=${INSTANCES_PATH:-$NOVA_DIR/instances}

You can also force the script to run some unit tests (Python unit tests) when you use the command “run”. For example, it will test the api, authentication, compute, network modules and much more. You can take a look at the folder nova/tests/ to see all the tests.

TEST=${TEST:-0}

LDAP can be used for authentication. It is not used by default. The database is used by default to store authentication data.

USE_LDAP=${USE_LDAP:-0}

OpenDJ can be used instead of OpenLDAP when LDAP is used. OpenDJ is a new LDAPv3 compliant directory service, developed for the Java platform, providing a high performance, highly available and secure store for the identities managed by enterprises.

USE_OPENDJ=${USE_OPENDJ:-0}

IPv6 support can be enabled:

USE_IPV6=${USE_IPV6:-0}

Nova has support for libvirt and you can set LIBVIRT_TYPE to something if you don’t like the default qemu. You can set it to “uml” and a different libvirt XML template will be used. Libvirt is a virtualization API.

LIBVIRT_TYPE=${LIBVIRT_TYPE:-qemu}

Next is the network manager type. It defaults to VlanManager where a host-managed VLAN will be created for each project. Other types are FlatManager, FlatDHCPManager. See Network Manager Documentation for more details.

NET_MAN=${NET_MAN:-VlanManager}

In case you are using FlatDHCP on multiple hosts, you need to set the env variable FLAT_INTERFACE to a network interface with no defined IP.

FLAT_INTERFACE=ethx

The first network interface IP address is grabbed using the ifconfig command. It is explained in the script that if you have more than 1 network interfaces then you should set the environment variable HOST_IP.

if [ ! -n "$HOST_IP" ]; then
HOST_IP=`LC_ALL=C ifconfig  | grep -m 1 'inet addr:'| cut -d: -f2 | awk '{print $1}'`
fi

The connection to the database is defined the following way. It will be MySQL or sqlite based on your choice.

if [ "$USE_MYSQL" == 1 ]; then
    SQL_CONN=mysql://root:$MYSQL_PASS@localhost/nova
else
    SQL_CONN=sqlite:///$NOVA_DIR/nova.sqlite
fi

The authentication driver is set based on your choice: LDAP or not. If LDAP is not selected (by default), it will use the database to store the authentication data.

if [ "$USE_LDAP" == 1 ]; then
    AUTH=ldapdriver.LdapDriver
else
    AUTH=dbdriver.DbDriver
fi

Branch command

This command installs Bazaar (bzr) which is a distributed version control system, initializes the repository, retrieves the latest source code for Nova and places it in the Nova folder you just defined It also creates the “instances” folder and “networks” folders.

if [ "$CMD" == "branch" ]; then
    sudo apt-get install -y bzr
    if [ ! -e "$DIR/.bzr" ]; then
        bzr init-repo $DIR
    fi
    rm -rf $NOVA_DIR
    bzr branch $SOURCE_BRANCH $NOVA_DIR
    cd $NOVA_DIR
    mkdir -p $NOVA_DIR/instances
    mkdir -p $NOVA_DIR/networks
    exit
fi

LXC setup

The libvirt LXC driver manages “Linux Containers”. Containers are sets of processes with private namespaces which can (but don’t always) look like separate machines, but do not have their own OS. If you use “lxc” for the libvirt type, some cgroups controllers need to be mounted on the host OS.

has_fsmp() {
  # has_fsmp(mountpoint,file): does file have an fstab entry for mountpoint
  awk '$1 !~ /#/ && $2 == mp { e=1; } ; END { exit(!e); }' "mp=$1" "$2" ;
}

function lxc_setup() {
  local mntline cmd=""
  mntline="none /cgroups cgroup cpuacct,memory,devices,cpu,freezer,blkio 0 0"
  has_fsmp "/cgroups" /etc/fstab ||
     cmd="$cmd && mkdir -p /cgroups && echo '$mntline' >> /etc/fstab"
  has_fsmp "/cgroups" /proc/mounts ||
     cmd="$cmd && mount /cgroups"

  [ -z "$cmd" ] && return 0
  sudo sh -c ": $cmd"
}

[ "$LIBVIRT_TYPE" != "lxc" ] || lxc_setup || fail "failed to setup lxc"

Install command

The following Debian packages are installed if not already installed:

  • python-software-properties: This software provides an abstraction of the used apt repositories. It allows you to easily manage your distribution and independent software vendor software sources.
  • dnsmasq-base: A small caching DNS proxy and DHCP/TFTP server.
  • kpartx: create device mappings for partitions.
  • kvm: Full virtualization on x86 hardware.
  • gawk: a pattern scanning and processing language.
  • iptables: administration tools for packet filtering and NAT.
  • ebtables: Ethernet bridge frame table administration.
  • user-mode-linux: User-mode Linux (kernel).
  • libvirt-bin: the programs for the libvirt library.
  • screen: terminal multiplexor with VT100/ANSI terminal emulation.
  • euca2ools: managing cloud instances for Eucalyptus.
  • vlan: user mode programs to enable VLANs on your ethernet devices.
  • curl: Get a file from an HTTP, HTTPS or FTP server.
  • rabbitmq-server: An AMQP server written in Erlang.
  • lvm2: The Linux Logical Volume Manager.
  • iscsitarget: iSCSI Enterprise Target userland tools.
  • open-iscsi: High performance, transport independent iSCSI implementation.
  • socat: multipurpose relay for bidirectional data transfer.
  • unzip: De-archiver for .zip files.
  • glance: The Glance project provides an image registration and discovery service (Parallax) and an image delivery service (Teller).
  • radvd: Router Advertisement Daemon.
  • python-twisted: Event-based framework for internet applications.
  • python-sqlalchemy: SQL toolkit and Object Relational Mapper for Python.
  • python-suds: Lightweight SOAP client for Python.
  • python-lockfile: file locking library for Python.
  • python-mox: a mock object framework for Python.
  • python-lxml: pythonic binding for the libxml2 and libxslt libraries.
  • python-kombu: AMQP Messaging Framework for Python.
  • python-greenlet: Lightweight in-process concurrent programming.
  • python-carrot: An AMQP messaging queue framework.
  • python-migrate: Database schema migration for SQLAlchemy.
  • python-eventlet: Eventlet is a concurrent networking library for Python.
  • python-gflags: Python implementation of the Google command line flags module.
  • python-novaclient: client library for OpenStack Compute API.
  • python-ipy: Python module for handling IPv4 and IPv6 addresses and networks.
  • python-cheetah: text-based template engine and Python code generator.
  • python-libvirt: libvirt Python bindings.
  • python-libxml2: Python bindings for the GNOME XML library.
  • python-routes: Routing Recognition and Generation Tools.
  • python-paste: Tools for using a Web Server Gateway Interface stack.
  • python-netaddr: manipulation of various common network address notations.
  • python-tempita: very small text templating language.
  • python-pastedeploy: Load, configure, and compose WSGI applications and servers.
  • python-glance: OpenStack Image Registry and Delivery Service.

The script also adds an APT repository: “ppa:nova-core/trunk” which contains some patched versions of some of the packages above.

The modules kvm and ndb are loaded. iscsitarget and libvirt-bin are restarted and a test image is downloaded and uncompressed.

If you enable IPv6 support, radvd will be installed, IPv6 forwarding will be enabled and router advertisement messages will be ignored.

If you chose to use MySQL, the root password is set for you based on the environment variable MYSQL_PASS and the following 2 packages are installed: mysql-server and python-mysqldb.

if [ "$CMD" == "install" ]; then
    sudo apt-get install -y python-software-properties
    sudo add-apt-repository ppa:nova-core/trunk
    sudo apt-get update
    sudo apt-get install -y dnsmasq-base kpartx kvm gawk iptables ebtables
    sudo apt-get install -y user-mode-linux kvm libvirt-bin
    # Bypass  RabbitMQ "OK" dialog
    echo "rabbitmq-server rabbitmq-server/upgrade_previous note" | sudo debconf-set-selections
    sudo apt-get install -y screen euca2ools vlan curl rabbitmq-server
    sudo apt-get install -y lvm2 iscsitarget open-iscsi
    sudo apt-get install -y socat unzip glance
    echo "ISCSITARGET_ENABLE=true" | sudo tee /etc/default/iscsitarget
    sudo /etc/init.d/iscsitarget restart
    sudo modprobe kvm
    sudo /etc/init.d/libvirt-bin restart
    sudo modprobe ndb
    sudo apt-get install -y python-mox python-lxml python-kombu python-paste
    sudo apt-get install -y python-migrate python-gflags python-greenlet
    sudo apt-get install -y python-libvirt python-libxml2 python-routes
    sudo apt-get install -y python-netaddr python-pastedeploy python-eventlet
    sudo apt-get install -y python-novaclient python-glance python-cheetah
    sudo apt-get install -y python-carrot python-tempita python-sqlalchemy
    sudo apt-get install -y python-suds python-lockfile python-netaddr
    
    if [ "$USE_IPV6" == 1 ]; then
        sudo apt-get install -y radvd
        sudo bash -c "echo 1 > /proc/sys/net/ipv6/conf/all/forwarding"
        sudo bash -c "echo 0 > /proc/sys/net/ipv6/conf/all/accept_ra"
    fi

    if [ "$USE_MYSQL" == 1 ]; then
        cat <<MYSQL_PRESEED | debconf-set-selections
mysql-server-5.1 mysql-server/root_password password $MYSQL_PASS
mysql-server-5.1 mysql-server/root_password_again password $MYSQL_PASS
mysql-server-5.1 mysql-server/start_on_boot boolean true
MYSQL_PRESEED
        apt-get install -y mysql-server python-mysqldb
    fi
    exit
fi

Run command

A lot is happening in this section. First is

A new screen is started in detached mode with the session name specified in the environment variable SCREEN_NAME. The following code also checks if a screen with the same session name already exists and asks the user to kill it if it is the case. Screen is a full-screen window manager that multiplexes a physical terminal between several processes.

# check for existing screen, exit if present
  found=$(screen -ls | awk '-F\t' '$2 ~ m {print $2}' "m=[0-9]+[.]$SCREEN_NAME")
  if [ -n "$found" ]; then
    {
    echo "screen named '$SCREEN_NAME' already exists!"
    echo " kill it with: screen -r '$SCREEN_NAME' -x -X quit"
    echo " attach to it with: screen -d -r '$SCREEN_NAME'"
    exit 1;
    } 2>&1
  fi
  screen -d -m -S $SCREEN_NAME -t nova
  sleep 1
  if [ "$SCREEN_STATUS" != "0" ]; then
    screen -r "$SCREEN_NAME" -X hardstatus alwayslastline "%-Lw%{= BW}%50>%n%f* %t%{-}%+Lw%< %= %H"
  fi

Based on the environment variables set above, the script writes the nova flags to nova.conf and creates the nova folder in /etc/.

cat >$NOVA_DIR/bin/nova.conf << NOVA_CONF_EOF
--verbose
--nodaemon
--dhcpbridge_flagfile=$NOVA_DIR/bin/nova.conf
--network_manager=nova.network.manager.$NET_MAN
--my_ip=$HOST_IP
--public_interface=$INTERFACE
--vlan_interface=$INTERFACE
--sql_connection=$SQL_CONN
--auth_driver=nova.auth.$AUTH
--libvirt_type=$LIBVIRT_TYPE
--fixed_range=$FIXED_RANGE
--lock_path=$LOCK_PATH
--instances_path=$INSTANCES_PATH
--flat_network_bridge=br100
NOVA_CONF_EOF

if [ -n "$FLAT_INTERFACE" ]; then
    echo "--flat_interface=$FLAT_INTERFACE" >>$NOVA_DIR/bin/nova.conf
fi

if [ "$USE_IPV6" == 1 ]; then
    echo "--use_ipv6" >>$NOVA_DIR/bin/nova.conf
fi

Next, all dnsmasq processes are killed: DNS cache proxy + DHCP server.

killall dnsmasq

In case IPv6 support is enabled, radvd is killed:

if [ "$USE_IPV6" == 1 ]; then
   killall radvd
fi

The script recreates the database “nova”.

if [ "$USE_MYSQL" == 1 ]; then
    mysql -p$MYSQL_PASS -e 'DROP DATABASE nova;'
    mysql -p$MYSQL_PASS -e 'CREATE DATABASE nova;'
else
    rm $NOVA_DIR/nova.sqlite
fi

If you decided to use LDAP, OpenLDAP or OpenDJ needs to be configured:

if [ "$USE_LDAP" == 1 ]; then
    if [ "$USE_OPENDJ" == 1 ]; then
        echo '--ldap_user_dn=cn=Directory Manager' >> \
            /etc/nova/nova-manage.conf
        sudo $NOVA_DIR/nova/auth/opendj.sh
    else
        sudo $NOVA_DIR/nova/auth/slap.sh
    fi
fi

The script also recreates the instances and networks folders.

rm -rf $NOVA_DIR/instances
mkdir -p $NOVA_DIR/instances
rm -rf $NOVA_DIR/networks
mkdir -p $NOVA_DIR/networks

If test mode is enabled (see above), the unit tests are run:

if [ "$TEST" == 1 ]; then
    cd $NOVA_DIR
    python $NOVA_DIR/run_tests.py
    cd $DIR
fi

A new database is created

$NOVA_DIR/bin/nova-manage db sync

A new admin user is added:

$NOVA_DIR/bin/nova-manage user admin admin admin admin

A new project “admin” managed by “admin” is created:

$NOVA_DIR/bin/nova-manage project create admin admin

A small network is created with 32 IPs from the fixed range:

$NOVA_DIR/bin/nova-manage network create private $FIXED_RANGE 1 32

Create some floating IPs using the floating range.

$NOVA_DIR/bin/nova-manage floating create $FLOATING_RANGE

Download an image from ansolabs and untar it in the images dir.

if [ ! -d $DIR/images ]; then
   mkdir -p $DIR/images
   wget -c http://images.ansolabs.com/tty.tgz
   tar -C $DIR/images -zxf tty.tgz
fi

If ami-tty image in images service then convert the image in directory from the old (Bexar) format to the new format.

if ! glance details | grep ami-tty; then
    $NOVA_DIR/bin/nova-manage image convert $DIR/images
fi

The file novarc looks like this for me. I am running things on Amazon EC2 right now. 10.240.95.3 is my EC2 instance private IP. The EC2 API server is listening on port 8773 and the OpenStack API is listening on port 8774. The data store will be running on port 3333.

The different servers, controllers and stores are started and we can browse them through the screen session created above. See Nova Concepts and Introduction for more details on the Nova architecture.

screen_it api "$NOVA_DIR/bin/nova-api"
screen_it objectstore "$NOVA_DIR/bin/nova-objectstore"
screen_it compute "$NOVA_DIR/bin/nova-compute"
screen_it network "$NOVA_DIR/bin/nova-network"
screen_it scheduler "$NOVA_DIR/bin/nova-scheduler"
screen_it volume "$NOVA_DIR/bin/nova-volume"
screen_it ajax_console_proxy "$NOVA_DIR/bin/nova-ajax-console-proxy"
sleep 2
$NOVA_DIR/bin/nova-manage project zipfile admin admin $NOVA_DIR/nova.zip
unzip -o $NOVA_DIR/nova.zip -d $NOVA_DIR/
screen_it test "export PATH=$NOVA_DIR/bin:$PATH;. $NOVA_DIR/novarc"
if [ "$CMD" != "run_detached" ]; then
  screen -S nova -x
fi
NOVA_KEY_DIR=$(pushd $(dirname $BASH_SOURCE)>/dev/null; pwd; popd>/dev/null)
export EC2_ACCESS_KEY="admin:admin"
export EC2_SECRET_KEY="admin"
export EC2_URL="http://10.240.95.3:8773/services/Cloud"
export S3_URL="http://10.240.95.3:3333"
export EC2_USER_ID=42 # nova does not use user id, but bundling requires it
export EC2_PRIVATE_KEY=${NOVA_KEY_DIR}/pk.pem
export EC2_CERT=${NOVA_KEY_DIR}/cert.pem
export NOVA_CERT=${NOVA_KEY_DIR}/cacert.pem
export EUCALYPTUS_CERT=${NOVA_CERT} # euca-bundle-image seems to require this set
alias ec2-bundle-image="ec2-bundle-image --cert ${EC2_CERT} --privatekey ${EC2_PRIVATE_KEY} --user 42 --ec2cert ${NOVA_CERT}"
alias ec2-upload-bundle="ec2-upload-bundle -a ${EC2_ACCESS_KEY} -s ${EC2_SECRET_KEY} --url ${S3_URL} --ec2cert ${NOVA_CERT}"
export CLOUD_SERVERS_API_KEY="admin"
export CLOUD_SERVERS_USERNAME="admin"
export CLOUD_SERVERS_URL="http://10.240.95.3:8774/v1.0/"

Once the nova session is finished, the instances are terminated and the volumes deleted. This code is also run when the command “terminate” is used.

if [ "$CMD" == "run" ] || [ "$CMD" == "terminate" ]; then
    # shutdown instances
    . $NOVA_DIR/novarc; euca-describe-instances | grep i- | cut -f2 | xargs     euca-terminate-instances
    sleep 2
    # delete volumes
    . $NOVA_DIR/novarc; euca-describe-volumes | grep vol- | cut -f2 | xargs -n1 euca-delete-volume
    sleep 2
fi

The screen session is forced to shutdown. This part is also called when the command “clean” is executed.

if [ "$CMD" == "run" ] || [ "$CMD" == "clean" ]; then
    screen -S nova -X quit
    rm *.pid*
fi

There is one last command called “scrub” which is used to remove the bridges and VLANs configuration. It also destroys the domains.

if [ "$CMD" == "scrub" ]; then
    $NOVA_DIR/tools/clean-vlans
    if [ "$LIBVIRT_TYPE" == "uml" ]; then
        virsh -c uml:///system list | grep i- | awk '{print \$1}' | xargs -n1   virsh -c uml:///system destroy
    else
        virsh list | grep i- | awk '{print \$1}' | xargs -n1 virsh destroy
    fi
fi

When you run the branch, install and run commands, you end up with the different components (API server, object store, scheduler…) running which is really neat.

You can use the screen windows to check each component status. Do a “man screen” to learn how to navigate between the different windows if you are not familiar with the tool screen.

API server:

# /opt/novascript/trunk/bin/nova-api
(nova.root 2011.1-LOCALBRANCH:LOCALREVISION): AUDIT [N/A] Starting /opt/novascript/trunk/bin/nova-api on 0.0.0.0:8774
(nova.root 2011.1-LOCALBRANCH:LOCALREVISION): AUDIT [N/A] Starting /opt/novascript/trunk/bin/nova-api on 0.0.0.0:8773

Object store:

(nova.root 2011.1-LOCALBRANCH:LOCALREVISION): DEBUG [N/A] network_topic : network from MainProcess (pid=13763) serve /opt/novascript/trunk/nova/twistd.py:266
(nova.root 2011.1-LOCALBRANCH:LOCALREVISION): AUDIT [N/A] Starting nova-objectstore
2011-01-14 22:15:09+0000 [-] Log opened.
2011-01-14 22:15:09+0000 [-] twistd 10.0.0 (/usr/bin/python 2.6.5) starting up.
2011-01-14 22:15:09+0000 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
2011-01-14 22:15:09+0000 [-] twisted.web.server.Site starting on 3333
2011-01-14 22:15:09+0000 [-] Starting factory <twisted.web.server.Site instance at 0xaaf5bec>

You can launch a new instance this way:

cd /tmp/
euca-add-keypair test > test.pem
euca-run-instances -k test -t m1.tiny ami-tty

You can list your instances this way:

# euca-describe-instances
RESERVATION     r-ky6gm38t      admin
INSTANCE        i-00000001      ami-tty        10.0.0.3        10.0.0.3        running test (admin, domU-12-31-39-04-58-F5)    0               m1.tiny 2011-01-14 22:32:03.466420      nova

Running ifconfig, we can now see the bridge and VLAN created

br100     Link encap:Ethernet  HWaddr 12:31:39:04:58:f5  
          inet addr:10.0.0.1  Bcast:10.0.0.31  Mask:255.255.255.224
...
vlan100   Link encap:Ethernet  HWaddr 12:31:39:04:58:f5  
          inet6 addr: fe80::1031:39ff:fe04:58f5/64 Scope:Link
...

A network route has been set up for us to use the bridge interface when the packets are sent to 10.0.0.x:

# route -n
Kernel IP routing table
Destination     Gateway         Genmask         Flags Metric Ref    Use Iface
10.0.0.0        0.0.0.0         255.255.255.224 U     0      0        0 br100
...

VoilĂ . Don’t hesitate to post a comment if you have any feedback.

Distributed messaging using RabbitMQ and Python

January 8, 2011

This tutorial shows how to distribute messages to multiple servers for processing. We will be using RabbitMQ which is based on AMQP.

The way messaging works is that you have producers producing messages and consumers consuming them. We are going to have 1 producer generating random integers and 2 consumers: 1 consuming the odd integers and 1 consuming the even integers.

The producer sends messages to the exchange and the consumers create and bind queues to the exchange to receive the messages they are interested in. The way messages are routed is using routing keys for the messages and binding keys for the queues.

The producer will be running on the server “producer.messaging.com”, 1 consumer will be running on “consumer1.messaging.com” and the other one on “consumer2.messaging.com”.

Requirements

We need to install RabbitMQ server on producer.messaging.com and py-amqplib on the 3 servers.

Here, I am using Ubuntu on all the servers.

sudo apt-get install rabbitmq-server
sudo apt-get install python-amqplib

After installing rabbitmq-server, the server should be up and running.

Producer

First thing to do is to create a Python class for the producer. The constructor will initiate a connection to the RabbitMQ server. The constructor also takes as argument the name of the exchange which will receive all the messages from the producer.

from amqplib import client_0_8 as amqp

class Producer(object):
    def __init__(self, exchange_name, host, userid, password):
        """
        Constructor. Initiate connection with the RabbitMQ server.

        @param exchange_name name of the exchange to send messages to
        @param host RabbitMQ server host 
        @param userid RabbitMQ server username
        @param password RabbitMQ server user's password
        """
        self.exchange_name = exchange_name
        self.connection = amqp.Connection(host=host, userid=userid,
            password=password, virtual_host="/", insist=False)
        self.channel = self.connection.channel()
    ...

We also need a method to publish a message to the exchange. This method takes the message and its routing key as arguments. The content type of the message here is text but you can change it. Also, we use persistence mode for the message (delivery_mode = 2) so best-effort is used for delivery.

    def publish(self, message, routing_key):
        """
        Publish message to exchange using routing key

        @param text message to publish
        @param routing_key message routing key
        """
        msg = amqp.Message(message)
        msg.properties["content_type"] = "text/plain"
        msg.properties["delivery_mode"] = 2
        self.channel.basic_publish(exchange=self.exchange_name,
                                   routing_key=routing_key, msg=msg)

At last, we add a method to close the channel and the connection.

    def close(self):
        """
        Close channel and connection
        """
        self.channel.close()
        self.connection.close()

We will come back to this class when we look at our example.

Consumer

Let’s create our Consumer class to help us create consumers.

from amqplib import client_0_8 as amqp

class Consumer(object):
    def __init__(self, host, userid, password):
        """
        Constructor. Initiate a connection to the RabbitMQ server.

        @param host RabbitMQ server host 
        @param userid RabbitMQ server username
        @param password RabbitMQ server user's password
        """
        self.connection = amqp.Connection(host=host, userid=userid,
            password=password, virtual_host="/", insist=False)
        self.channel = self.connection.channel()

Our close method:

    def close(self):
        """
        Close channel and connection
        """
        self.channel.close()
        self.connection.close()

Next is a method to create an exchange. We use the exchange type “direct” which means the broker routes the message using a straight match on the routing key. If a we bind a queue using the key ‘test’, only the messages with the routing key ‘test’ will be sent to it.

    def declare_exchange(self, exchange_name, durable=True, auto_delete=False):
        """
        Create exchange.

        @param exchange_name name of the exchange
        @param durable will the server survive a server restart
        @param auto_delete should the server delete the exchange when it is
        no longer in use
        """
        self.exchange_name = exchange_name
        self.channel.exchange_declare(exchange=self.exchange_name,
            type='direct', durable=durable, auto_delete=auto_delete)

Followed by a method to create a queue and bind it to the exchange. We need to indicate the binding key so the exchange knows which messages to send to the queue based on their routing key.

    def declare_queue(self, queue_name, routing_key, durable=True, exclusive=False, auto_delete=False):
        """
        Create a queue and bind it to the exchange.

        @param queue_name Name of the queue to create
        @param routing_key binding key
        @param durable will the queue service a server restart
        @param exclusive only 1 client can work with it
        @param auto_delete should the server delete the exchange when it is 
                no longer in use
        """
        self.queue_name = queue_name
        self.routing_key = routing_key
        self.channel.queue_declare(queue=self.queue_name, durable=durable,
                                   exclusive=exclusive, auto_delete=auto_delete)
        self.channel.queue_bind(queue=self.queue_name,
            exchange=self.exchange_name, routing_key=self.routing_key)

At this point, we can get define a method to start a consumer and use a callback for each message delivered. The next method allows us to do that.

    def start_consuming(self, callback, queue_name=None, consumer_tag='consumer'):
        """
        Start a consumer and register a function to be called when a message is consumed

        @param callback function to call
        @param queue_name name of the queue
        @param consumer_tag a client-generated consumer tag to establish context
        """
        if hasattr(self, 'queue_name') or queue_name:
            self.channel.basic_consume(queue=getattr(self, 'queue_name',
                    queue_name),
                callback=callback, consumer_tag=consumer_tag)

We need the opposite of the previous method.

    def stop_consuming(self, consumer_tag='consumer'):
        """
        Cancel a consumer.

        @param consumer_tag a client-generated consumer tag to establish context
        """
        self.channel.basic_cancel(consumer_tag)

Last, we need a wait method to ask the consumer to be patient and wait for activity on the channel: ie: messages being delivered.

    def wait(self):
        """
        Wait for activity on the channel.
        """
        while True:
            self.channel.wait()

Example

Let’s put our classes into action. We are going to define a producer and 2 consumers as described at the beginning of this tutorial.

First, our producer will generate random integers and send them to the exchange. If the integer is odd, it will use the routing key “odd” and if the integer is even, it will use the routing key “even”. This code will be running on “producer.messaging.com”.

import random, time
import producer
p = producer.Producer(exchange_name='integers', host='producer.messaging.com', userid='guest', password='guest')
while True:
    # generate a random integer between 1 and 100 included
    i = random.randint(1, 100)
    if i % 2 == 0:
        key = 'even'
    else:
        key = 'odd'
    p.publish(str(i), key)
    print 'integer: %d' % i 
    time.sleep(1) 

Next is our first consumer running on “consumer1.messaging.com”. We declare an exchange named “integers” and create a queue and bind it to receive messages with the routing key “odd”. If the exchange and queue already exist, calling declare_exchange() and declare_queue() does nothing.

We define a callback function which prints the odd integer and acknowledge the message so the exchange knows it has been delivered properly.

import consumer

c = consumer.Consumer(host='producer.messaging.com', userid='guest', password='guest')
c.declare_exchange(exchange_name='integers')
c.declare_queue(queue_name='odds', routing_key='odd')

def message_callback(message):
    print 'odd integer: %s' % message.body
    c.channel.basic_ack(message.delivery_tag)

c.start_consuming(message_callback)
c.wait()

First time we call declare_exchange() and declare_queue(), we end up with the following:

Similar to the first consumer, we create another consumer on “consumer2.messaging.com” to consume the even integers.

import consumer

c = consumer.Consumer(host='producer.messaging.com', userid='guest', password='guest')
c.declare_exchange(exchange_name='integers')
c.declare_queue(queue_name='evens', routing_key='even')

def message_callback(message):
    print 'even integer: %s' % message.body
    c.channel.basic_ack(message.delivery_tag)

c.start_consuming(message_callback)
c.wait()

We end up with the following:

Let’s start our 2 consumers and our producer. This is the output of each:

Producer:

integer: 14
integer: 47
integer: 6
integer: 83
...

Consumer 1:

odd integer: 47
odd integer: 83

Consumer 2:

even integer: 14
even integer: 6 

On the server where rabbitmq is running, we can use rabbitmqctl to take a look at the objects created.

List the exchanges. We can see our exchange “integers” listed in the direct category.

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
amq.direct             direct
amq.topic              topic
amq.rabbitmq.logging   topic
amq.fanout             fanout
integers direct
amq.headers            headers
  direct
amq.match              headers
...done.

List the queues. We can check the number of messages to be consumed.

$ sudo rabbitmqctl list_queues
Listing queues ...
evens    4
odds     3
...done.

If you want to remove the exchange, all the queues. You have to first stop the app, then issue a reset follow by a start.

$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl reset
$ sudo rabbitmqctl start_app

I hope you enjoyed this tutorial. You can see that RabbitMQ is really easy to use and very powerful. Do not hesitate to add a comment if you have any feedback. If you need help with a project written in Python or with building a new web service, I am available as a freelancer: LinkedIn profile. Follow me on Twitter @laurentluce.

Add REST resources to a database using Python and SQLAchemy

December 26, 2010

This tutorial shows how to download REST resources to your database using Python and SQLAchemy. We are going to download manufacturers and cars resources to our database. We will also show how easy it is to query them once they are stored.

Requirements

We need to install SQLAchemy and simplejson Python libraries:

easy_install SQLAlchemy
easy_install simplejson

You can use any database supported by SQLAlchemy: List of databases. I am using MySQL.

Imports

We need to import the following modules for our code to work fine.

import urllib2
import simplejson
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
from sqlalchemy.orm import sessionmaker, relationship, backref
from sqlalchemy.ext.declarative import declarative_base

Tables, classes and mappers

SQLAchemy declarative configuration style allows us to define our tables, resources classes and the mapping between the class and the tables just by declaring 2 classes derived from the class Base.

Base = declarative_base()

Let’s define the manufacturers table, class and mapper.

class Manufacturer(Base):
  """
  Manufacturer resource
  """
  __tablename__ = 'manufacturers'
  
  id = Column(Integer, primary_key=True)
  name = Column(String(32), unique=True)
  country = Column(String(32))

  def __init__(self, resource):
    """
    Class instantiation

    @param resource resource JSON attribute
    """
    self.name = resource['name']
    self.country = resource['country']
    
  def __repr__(self):
    return "<manufacturer('%s in %s')>" % (self.name, self.country)

Let’s define the cars table, class and mapper. Note how the relationship between the car and its manufacturer is defined.

class Car(Base):
  """
  Car resource
  """
  __tablename__ = 'cars'
        
  id = Column(Integer, primary_key=True)
  name = Column(String(32), unique=True)
  max_speed = Column(Integer)
  year_released = Column(Integer)
  manufacturer_id = Column(Integer, ForeignKey('manufacturers.id'))

  manufacturer = relationship(Manufacturer, backref=backref('cars', order_by=id))

  def __init__(self, resource):
    """
    Class instantiation

    @param resource resource JSON attribute
    """
    self.name = resource['name']
    self.max_speed = resource['max_speed']
    self.year_released = resource['year_released']
    
  def __repr__(self):
    return "<car('%s - max speed: %d, released in %s')>" % (self.name, self.max_speed, self.year_released)

Our main class is named “Resources” and we will add methods to setup a connection with the database, create the tables and download the resources.

class Resources:
  """
  Main class 
  """
  def __init__(self):
    """
    Class instantiation

    @param settings dict of settings
    """
    # database engine
    self.engine = None
    # Collection of tables and their associated schema constructs
    self.metadata = None
    # database session
    self.session = None

    self.setup_connection()
    self.setup_tables()
    self.process()

  def setup_connection(self):
    """
    Create DB session
    """
    ...

  def setup_tables(self):
    """
    Create tables in database.
    """
    ...

  def process(self):
    """
    Get resources and add them to DB
    """
    ...

Let’s look at how we can setup a connection to our database. We call create_engine() and pass the database dialect and connection arguments. In our case, we use the string: ‘mysql://root:password@localhost:3306/vroom’ where vroom is the name of the database. This allows us to define our session using the Session factory. This session is the ORM’s handle to the database.

  def setup_connection(self):
    """
    Create DB session
    """
    s = 'mysql://root:password@localhost:3306/vroom'
    self.engine = create_engine(s)
    Session = sessionmaker(bind=self.engine)
    self.session = Session()

Next is the tables creation using our tables definition. create_all() can be called multiple times safely and only missing tables will be created.

  def setup_tables(self):
    """
    Create tables in database.
    """
    self.metadata = Base.metadata
    self.metadata.create_all(self.engine)

Using simplejson and urllib2, it is really easy to get resources and retrieve them as a dictionary. Let’s get the list of manufacturers.

  def process(self):
    ...
    data = simplejson.load(urllib2.urlopen("http://api_url/manufacturers"))

data will contain something like this:

{"manufacturers": [
  {"name": "Peugeot", "country": "France"},
  {"name": "Chevrolet", "country": "USA"},
  ]
}

For each manufacturer, we need to get the list of cars so we can add the resources to the database. SQLAchemy will take care of setting up the foreign keys values for us: ie: relations between the manufacturers and the cars.

    for i, m in enumerate(data['manufacturers']):
      # create Manufacturer object
      manufacturer = Manufacturer(m)
      # get cars for this manufacturer
      data2 = simplejson.load(urllib2.urlopen("http://api_url/manufacturers/%s" % manufacturer['name']))
      # fill-up list of Car objects
      cars = []
      for c in data2['cars'][i]:
        # create Car object and add it to cars list
        cars.append(Car(c))
      # add cars list to manufacturer's cars list
      manufacturer.cars = cars
      # add manufacturer object to DB
      self.session.add(manufacturer)
    # commit changes to DB
    self.session.commit()

Once the objects are in the database, it is easy to query them using the ORM. For example, let’s print the list of manufacturers along with their cars.

for m in self.session.query(Manufacturer).order_by(Manufacturer.id):
  print m
  for c in m.cars:
    print '-- %s' % c

The output should be something like this:

<manufacturer('Peugeot in France')>
-- <car('307 - max speed: 180, released in 2002')>
<manufacturer('Chevrolet in USA')>
-- <car('Malibu - max speed: 190, released in 2010')>

That’s it for now. Please add a comment if you have any feedback.

Python module to download Twilio REST resources to your database

December 21, 2010

Are you are looking for an easy way to download your Twilio resources (calls, sms, notifications, recordings…) to your own database so you can access them faster and also when you are offline? Twilio Resources DB is a Python module doing just that.

Features

  • Download Twilio resources to a database.
  • Download recordings audio files.
  • Support for MySQL, PostgreSQL and Redis.
  • Automatic database tables creation for SQL DB.
  • Handle tables relations for SQL DB.
  • Option to download new resources continuously.

This is the list of resource types we support:

  • Accounts
  • Calls
  • Notifications
  • Recordings
  • Transcriptions
  • Sms Messages
  • Conferences
  • Incoming Phone Numbers
  • Outgoing Caller Ids

Requirements

We need to install simplejson so we can use the JSON format for the HTTP requests results: ie: resources returned using JSON instead of XML.

sudo easy_install simplejson

Depending on the database used, we need to install SQLAlchemy for MySQL/PostgreSQL or redis for Redis.

easy_install SQLAlchemy
easy_install redis

We also need the Twilio Python library helper to help us with the HTTP requests to the Twilio server:

git clone https://github.com/twilio/twilio-python.git
python setup.py install

At last, we need to install the Twilio resources DB library:

git clone https://github.com/laurentluce/twilio-python-utils.git
python setup.py install

Download resources to a database

Here is an example of a simple Python script using the library to download all the resources from the Twilio server to a MySQL database.

A “Resources” object needs to be created with a list of settings:

from twilioresourcesdb import resources

# settings
settings = {}
# Twilio account
settings['account_sid'] = 'Twilio account SID'
settings['account_token'] = 'Twilio account token'
# DB
settings['database_type'] = 'mysql'
settings['database_user'] = 'username'
settings['database_password'] = 'password'
settings['database_host'] = 'hostname or ip address'
settings['database_name'] = 'database name'
# Resources options
settings['check_frequency'] = 300 # check for new resources every 5 minutes
# Download recordings audio files
settings['download_recordings'] = True
settings['recording_path'] = '/data/recordings/'
settings['recording_format'] = 'wav'

# Instantiate resources object.
# This will setup a connection with the DB and create the table if necessary.
r = resources.Resources(settings)

Here is the full list of options:

  • account_sid : Twilio account SID – ‘ACxxxxx’
  • account_token : Twilio account token – ‘xxxxx’
  • database_type : type of database – ‘mysql’, ‘postgresql’, ‘redis’
  • database_user : database username – ‘xxx’ (default: ‘root’, not required for Redis)
  • database_password : database user password – ‘xxxx’ (default: None)
  • database_host : database ip address or hostname – ‘xxxx’ (default: ‘localhost’)
  • database_port : database port number – xxxx (default: 3306 for MySQL, 5432 for PostgreSQL, 6379 for Redis)
  • database_name : database name – ‘xxxx’ (not required for Redis database)
  • check_frequency : frequency in seconds on how often to check for new resources – xx (default: 5)
  • page_size : number of resources to download at each request – xxxx (default: 50)
  • download_recordings : enable recordings downloads (audio files) – True/False (default: False)
    • recording_path : absolute path to store recordings audio files – ‘/xxx/xxx/xxx…’
    • recording_format : audio files format: ‘wav’, ‘mp3′

Next, we can download all the resources and then stop or we can start a thread downloading the resources continuously.

First option – blocking call – download all resources then stop:

r.process()

Second option – start a thread to download resources continuously:

r.start()

Here is some code you may want to re-use to stop the thread when the script is interrupted:

while True:
  try:
    time.sleep(1)
  except (KeyboardInterrupt, SystemExit):
    r.stop = True
    r.join()
    break

When an in-progress resource is received from the server, it is placed in a temporary list and checked against for completion regularly so we only save completed resources in the database. For example, a queued SMS message will be placed in this temporary list and only saved to the database when it is in a termination state: ie: completed, failed, dropped…

MySQL and PostgreSQL database tables

When you create the Resources object, the tables are automatically created for you (not if they already exist).

Here is the list of tables created automatically:

  • accounts
  • calls
  • notifications
  • recordings
  • transcriptions
  • sms_messages
  • conferences
  • incoming_phone_numbers
  • outgoing_caller_ids

And those are the relationships using foreign keys:

  • calls -> accounts
  • sms_messages -> accounts
  • conferences -> accounts
  • incoming_phone_numbers -> accounts
  • outgoing_caller_ids -> accounts
  • notifications -> accounts
  • notifications -> calls
  • recordings -> accounts
  • recordings -> calls
  • transcriptions -> accounts
  • transcriptions -> recordings

Redis keys and values

In case of Redis, we add the resources using keys and values.

The resource’s key is the resource SID and the value is the JSON resource object.

Call resource: CAxxxx
SMS message resource: SMxxxx

That’s it for now. Don’t hesitate to add comments if you have any feedback.

Binary Search Tree library in Python

December 18, 2010

This article is about a Python library I created to manage binary search trees. I will go over the following:

  • Node class
  • Insert method
  • Lookup method
  • Delete method
  • Print method
  • Comparing 2 trees
  • Generator returning the tree elements one by one

You can checkout the library code on GitHub: git clone https://laurentluce@github.com/laurentluce/python-algorithms.git. This folder contains more libraries but we are just going to focus on the Binary Tree one.

As a reminder, here is a binary search tree definition (Wikipedia).

A binary search tree (BST) or ordered binary tree is a node-based binary tree data structure which has the following properties:

  • The left subtree of a node contains only nodes with keys less than the node’s key.
  • The right subtree of a node contains only nodes with keys greater than the node’s key.
  • Both the left and right subtrees must also be binary search trees.

Here is an example of a binary search tree:

Node class

We need to represent a tree node. To do that, we create a new class named Node with 3 attributes:

  • Left node
  • Right node
  • Node’s data (same as key in the definition above.)
class Node:
    """
    Tree node: left and right child + data which can be any object
    """
    def __init__(self, data):
        """
        Node constructor

        @param data node data object
        """
        self.left = None
        self.right = None
        self.data = data

Let’s create a tree node containing the integer 8. You can pass any object for the data so it is flexible. When you create a node, both left and right node equal to None.

root = Node(8)

Note that we just created a tree with a single node.

Insert method

We need a method to help us populate our tree. This method takes the node’s data as an argument and inserts a new node in the tree.

class Node:
    ...
    def insert(self, data):
        """
        Insert new node with data

        @param data node data object to insert
        """
        if data < self.data:
            if self.left is None:
                self.left = Node(data)
            else:
                self.left.insert(data)
        elif data > self.data:
            if self.right is None:
                self.right = Node(data)
            else:
                self.right.insert(data)

insert() is called recursively as we are locating the place where to add the new node.

Let’s add 3 nodes to our root node which we created above and let’s look at what the code does.

root.insert(3)
root.insert(10)
root.insert(1)

This is what happens when we add the second node (3):

  • 1- root node’s method insert() is called with data = 3.
  • 2- 3 is less than 8 and left child is None so we attach the new node to it.

This is what happens when we add the third node (10):

  • 1- root node’s method insert() is called with data = 10.
  • 2- 10 is greater than 8 and right child is None so we attach the new node to it.

This is what happens when we add the fourth node (1):

  • 1- root node’s method insert() is called with data = 1.
  • 2- 1 is less than 8 so the root’s left child (3) insert() method is called with data = 1. Note how we call the method on a subtree.
  • 3- 1 is less than 3 and left child is None so we attach the new node to it.

This is how the tree looks like now:

Let’s continue and complete our tree so we can move on to the next section which is about looking up nodes in the tree.

root.insert(6)
root.insert(4)
root.insert(7)
root.insert(14)
root.insert(13)

The complete tree looks like this:

Lookup method

We need a way to look for a specific node in the tree. We add a new method named lookup which takes a node’s data as an argument and returns the node if found or None if not. We also return the node’s parent for convenience.

class Node:
    ...
    def lookup(self, data, parent=None):
        """
        Lookup node containing data

        @param data node data object to look up
        @param parent node's parent
        @returns node and node's parent if found or None, None
        """
        if data < self.data:
            if self.left is None:
                return None, None
            return self.left.lookup(data, self)
        elif data > self.data:
            if self.right is None:
                return None, None
            return self.right.lookup(data, self)
        else:
            return self, parent

Let’s look up the node containing 6.

node, parent = root.lookup(6)

This is what happens when lookup() is called:

  • 1- lookup() is called with data = 6, and default value parent = None.
  • 2- data = 6 is less than root’s data which is 8.
  • 3- root’s left child lookup() method is called with data = 6, parent = current node. Notice how we call lookup() on a subtree.
  • 4- data = 6 is greater than node’s data which is now 3.
  • 5- node’s right child lookup() method is called with data = 6 and parent = current node
  • 6- node’s data is equal to 6 so we return it and its parent which is node 3.

Delete method

The method delete() takes the data of the node to remove as an argument.

class Node:
    ...
    def delete(self, data):
        """
        Delete node containing data

        @param data node's content to delete
        """
        # get node containing data
        node, parent = self.lookup(data)
        if node is not None:
            children_count = node.children_count()
        ...

There are 3 possibilities to handle:

  • 1- The node to remove has no child.
  • 2- The node to remove has 1 child.
  • 3- The node to remove has 2 children.

Let’s tackle the first possibility which is the easiest. We look for the node to remove and we set its parent’s left or right child to None.

    def delete(self, data):
        ...
        if children_count == 0:
            # if node has no children, just remove it
            if parent:
                if parent.left is node:
                    parent.left = None
                else:
                    parent.right = None
            del node
        ...

Note: children_count() returns the number of children of a node.

Here is the function children_count:

class Node:
    ...
    def children_count(self):
        """
        Returns the number of children

        @returns number of children: 0, 1, 2
        """
        cnt = 0
        if self.left:
            cnt += 1
        if self.right:
            cnt += 1
        return cnt

For example, we want to remove node 1. Node 3 left child will be set to None.

root.delete(1)

Let’s look at the second possibility which is the node to be removed has 1 child. We replace the node’s data by its left or right child’s data and we set its left or right child to None.

    def delete(self, data):
        ...
        elif children_count == 1:
            # if node has 1 child
            # replace node by its child
            if node.left:
                n = node.left
            else:
                n = node.right
            if parent:
                if parent.left is node:
                    parent.left = n
                else:
                    parent.right = n
            del node
        ...

For example, we want to remove node 14. Node 14 data will be set to 13 (its left child’s data) and its left child will be set to None.

root.delete(14)

Let’s look at the last possibility which is the node to be removed has 2 children. We replace its data with its successor’s data and we fix the successor’s parent’s child.

    def delete(self, data):
        ...
        else:
            # if node has 2 children
            # find its successor
            parent = node
            successor = node.right
            while successor.left:
                parent = successor
                successor = successor.left
            # replace node data by its successor data
            node.data = successor.data
            # fix successor's parent's child
            if parent.left == successor:
                parent.left = successor.right
            else:
                parent.right = successor.right

For example, we want to remove node 3. We look for its successor by going right then left until we reach a leaf. Its successor is node 4. We replace 3 with 4. Node 4 doesn’t have a child so we set node 6 left child to None.

root.delete(3)

Print method

We add a method to print the tree inorder. This method has no argument. We use recursion inside print_tree() to walk the tree breath-first. We first traverse the left subtree, then we print the root node then we traverse the right subtree.

class Node:
    ...
    def print_tree(self):
        """
        Print tree content inorder
        """
        if self.left:
            self.left.print_tree()
        print self.data,
        if self.right:
            self.right.print_tree()

Let’s print our tree:

root.print_tree()

The output will be: 1, 3, 4, 6, 7, 8, 10, 13, 14

Comparing 2 trees

To compare 2 trees, we add a method which compares each subtree recursively. It returns False when one leaf is not the same in both trees. This includes 1 leaf missing in the other tree or the data is different. We need to pass the root of the tree to compare to as an argument.

class Node:
    ...
    def compare_trees(self, node):
        """
        Compare 2 trees

        @param node tree's root node to compare to
        @returns True if the tree passed is identical to this tree
        """
        if node is None:
            return False
        if self.data != node.data:
            return False
        res = True
        if self.left is None:
            if node.left:
                return False
        else:
            res = self.left.compare_trees(node.left)
        if res is False:
            return False
        if self.right is None:
            if node.right:
                return False
        else:
            res = self.right.compare_trees(node.right)
        return res

For example, we want to compare tree (3, 8, 10) with tree (3, 8, 11)

# root2 is the root of tree 2
root.compare_trees(root2)

This is what happens in the code when we call compare_trees().

  • 1- The root node compare_trees() method is called with the tree to compare root node.
  • 2- The root node has a left child so we call the left child compare_trees() method.
  • 3- The left subtree comparison will return True.
  • 2- The root node has a right child so we call the right child compare_trees() method.
  • 3- The right subtree comparison will return False because the data is different.
  • 4- compare_trees() will return False.

Generator returning the tree elements one by one

It is sometimes useful to create a generator which returns the tree nodes values one by one. It is memory efficient as it doesn’t have to build the full list of nodes right away. Each time we call this method, it returns the next node value.

To do that, we use the yield keyword which returns an object and stops right there so the function will continue from there next time the method is called.

We cannot use recursion in this case so we use a stack.

Here is the code:

class Node:
    ...
    def tree_data(self):
        """
        Generator to get the tree nodes data
        """
        # we use a stack to traverse the tree in a non-recursive way
        stack = []
        node = self
        while stack or node: 
            if node:
                stack.append(node)
                node = node.left
            else: # we are returning so we pop the node and we yield it
                node = stack.pop()
                yield node.data
                node = node.right

For example, we want to access the tree nodes using a for loop:

for data in root.tree_data():
    print data

Let’s look at what happens in the code with the same example we have been using:

  • 1- The root node tree_data() method is called.
  • 2- Node 8 is added to the stack. We go to the left child of 8.
  • 3- Node 3 is added to the stack. We go to the left child of 3.
  • 4- Node 1 is added to the stack. Node is set to None because there is no left child.
  • 5- We pop a node which is Node 1. We yield it (returns 1 and stops here until tree_data() is called again.
  • 6- tree_data() is called again because we are using it in a for loop.
  • 7- Node is set to None because Node 1 doesn’t have a right child.
  • 8- We pop a node which is Node 3. We yield it (returns 3 and stops here until tree_data() is called again.

Here you go, I hope you enjoyed this tutorial. Don’t hesitate to add comments if you have any feedback.

 
Powered by Wordpress and MySQL. Theme by Shlomi Noach, openark.org