Andy's insights

Opinions, thoughts, archivements

Mon, 06 Mar 2006

Building Finite State Machines (FSM) with Python

A problem that many times occured to me. I tried to implement a finite state machine, but there was no convenient framework to do this. Of corse there are powerful RAD Tools (e.g. for SDL), but I didn’t wanted this dependency. The framework should be simple, clean and easy to use but still powerfull enough to describe the problem. I know there is alway a switch statement FSM. This is good enough for simple state machines.

I ended up by implementing something for myself. The first try was a C++ module (see SIPSec ). two years later I played a bit with a Implementation in Python. This one worked, but didn’t satisfied me enough. It was still to complex.

Once again nearly a year later I had the Idea. I’ll provide the full documented source later. First I developp the ideas mixed with concrete code.

For a full state machine we need:

Further we should have (but not necessary need for a FSM by definition):

Let’s start at the end: We like to implement an example FSM looking like that:

  +-----------------+
  |       +-+e1     |
  V e1,e3 V | e2    |e1
 (A) ---> (B) ---> (C) <-+
  ^        ^        |e2  |
  |        |        V    |
  |        |       /x\   |
  |        |      /0_1\  |
  |        |     /0 1 2\ |
  |        |      | | |  |
  |        +------+ | +--+ 
  +-----------------+

The first thing we need are States. We can define those like this:

a = State('A') 
b = State('B')
c = State('C')

Fine, but how does a state look like? We have one methode process() that takes an event and decide if and which transition it has to choose. And then we have some methods needed to construct the state machine, adding transitions and operations. Have a look at the code


class State(object): ON_ENTER = ‘on_enter’ ON_LEAVE = ‘on_leave’ def __init__(self, name): self.name = name self.on_enter = [] # callbacks self.on_leave = [] # callbacks self.events = {} # events -> transition def __repr__(self): return self.name

def add_operation(self, when, operation): getattr(self, when).append(operation) return self def add_event(self, event, transition): self.events[event] = transition return self def enter(self): for op in self.on_enter: op(self) def leave(self): for op in self.on_leave: op(self) def process(self, event): if not self.events.has_key(event): return self # sink self.leave() return self.events[event].process(event)

For the building phase the add_event() method is important: It add a transition initiated by an event to the internal lookup-table (dict). If an event occurs it is submitted to the process() method. This function look up the event in the table and execute its transition process() method if an event exists or return itself without furter processing if not. The enter() and leave() functions should be clear.

We talked about a transition, but never saw one. May I introduce them? A transition is the thing between two states on time they changes. This is if the old state is left and before the new one comes. A Transition defines which state comes next. And it also provides a hook for operations to execute on this transition. Likewise to the state the transition consist of methods to build the machine and of methodes used at execution time. When we instantiate a transition we define the end state we enter.

class Transition(object):
    def __init__(self, to_state):
        self.to_state = to_state
        self.operations = []
    def add_operation(self, op):
        self.operations.append(op)
        return self
    def process(self, event):
        for op in self.operations:
            op(event)
        self.to_state.enter()
        return self.to_state

The process() method returns the state we end in. To connect two states, we do something like this:

b.add_event(e2, Transition(c).add_operation(Printer("Transition b -> c")))

This adds a transition to state B on event e thats execute the callable Printer() on transition.

Finally we are missing the events. Those aren’t nothing special. We could have choosen to differentiate on class or just choosen a single object (dictionary for example). But I think it’s best to have a single Event class and differentiate based on a ID given on construction time. The ID should be a string (but could also be an integer). We are still able to subclass the Event if we need not only to set atributes but also to give a behaviour to an event. So look at the implementation: It’s somewhat pythonic:

class Event(object):
    def __init__(self, event_id):
        self.event_id = event_id
    def __hash__(self):
        return hash(self.event_id)
    def __cmp__(self, other):
        if isinstance(other, Event):
            return cmp(self.event_id,  other.event_id)
        else:
            return cmp(self.event_id, other)
    def __repr__(self):
        return "<Event %s>" % `self.event_id`

Finally we define some events for our example:

e1 = Event('e1')
e2 = Event('e2')
e3 = Event('e3')

This is nearly all we need for our example FSM. But what we need an entry point, an interface to access the whole thing. I’ll call them FSM because it represent an FSM (even if it’s deadly simple!):

class FSM(object):
    def __init__(self, start):
        self.state = self.start = start
    def process(self, event):
        self.state = self.state.process(event)

If you look at our example you seed that there is still on missing brick: The conditional. I implemented this one as generic as possible: First it subclass our transition and second it ends up in transitions (there must be at least one). The Conditional has finite ends (I call them paths) and a desition to choose one of those paths. The paths are simple implemented by yet another lookup table (dict) and the decition is just a callable object (function, method, lambda or callable class). Once again we don’t have to subclass the Conditional class for every single task.


class Conditional(Transition): def __init__(self, decision, paths): Transition.__init__(self, None) self.decision = decision self.paths = paths

def process(self, event): for op in self.operations: op(event) return self.paths[self.decision(event)].process(event)

Simple, isn’t it?

Let’s do some action: Here is the code to implement the FSM as shown above:


class Printer(object): def __init__(self, txt): self.txt = txt def __call__(self, *args): print self.txt, str(args)

a = State(‘A’)
b = State(‘B’)
c = State(‘C’)
e1 = Event(‘e1’)
e2 = Event(‘e2’)
e3 = Event(‘e3’)

t = Transition(b).add_operation(Printer(“Transition a -> b”))
a.add_event(e1, t).add_event(e3, t)
b.add_event(e1, Transition(b).add_operation(Printer(“Transition b -> b”))\ .add_operation(Printer(“2nd Operation”)))
b.add_event(e2, Transition©.add_operation(Printer(“Transition b -> c”)))
c.add_event(e1, Transition(a).add_operation(Printer(“Transition c -> a”)))
c.add_event(e2, Conditional(lambda event: event.x, { 0 : Transition(b).add_operation(Printer(“Transition c -> b”)), 1 : Transition(a).add_operation(Printer(“Transition c -> a”)), 2 : Transition©.add_operation(Printer(“Transition c -> c”)),
}).add_operation(Printer(“Decision (x = ?) -> a,b,c”)))

a.add_operation(‘on_enter’, Printer(“Enter State A”))
a.add_operation(‘on_leave’, Printer(“Leave State A”))
b.add_operation(‘on_enter’, Printer(“Enter State B”))
b.add_operation(‘on_leave’, Printer(“Leave State B”))
c.add_operation(‘on_enter’, Printer(“Enter State C”))
c.add_operation(‘on_leave’, Printer(“Leave State C”))

fsm = FSM

That’s it. Next we jump a bit around just to see if it works:

fsm.process(e1)
fsm.process(e1)
fsm.process(e2)
fsm.process(e1)
fsm.process(e3)
fsm.process(e2)
e2.x = 2
fsm.process(e2)
e2.x = 0
fsm.process(e2)
fsm.process(e2)
fsm.process(e2)

The output should look like this:

Leave State A (A,)
Transition a -> b (<Event 'e1'>,)
Enter State B (B,)
Leave State B (B,)
Transition b -> b (<Event 'e1'>,)
2nd Operation (<Event 'e1'>,)
Enter State B (B,)
Leave State B (B,)
Transition b -> c (<Event 'e2'>,)
Enter State C (C,)
Leave State C (C,)
Transition c -> a (<Event 'e1'>,)
Enter State A (A,)
Leave State A (A,)
Transition a -> b (<Event 'e3'>,)
Enter State B (B,)              
Leave State B (B,)              
Transition b -> c (<Event 'e2'>,)
Enter State C (C,)              
Leave State C (C,)              
Decision (x = ?) -> a,b,c (<Event 'e2'>,)
Transition c -> c (<Event 'e2'>,)
Enter State C (C,)              
Leave State C (C,)              
Decision (x = ?) -> a,b,c (<Event 'e2'>,)
Transition c -> b (<Event 'e2'>,)
Enter State B (B,)              
Leave State B (B,)              
Transition b -> c (<Event 'e2'>,)
Enter State C (C,)              
Leave State C (C,)              
Decision (x = ?) -> a,b,c (<Event 'e2'>,)
Transition c -> a (<Event 'e2'>,)
Enter State A (A,)

posted at: 23:48 | path: /python | permanent link to this entry