MPipe cookbook

A pipeline algorithm is implemented using classes from the mpipe module. The building blocks of pipelines map to specific Python objects:

Framework element Python construct
task, result any Python picklable object
worker single-argument function, OrderedWorker or UnorderedWorker
stage Stage, OrderedStage or UnorderedStage
pipeline Pipeline

It may be useful to keep in mind that MPipe is built using classes from Python’s standard multiprocessing module. It is a layer on top, encapsulating classes like Process, Queue and Connection with behavior specific to the pipeline workflow.

The procedure of building and running a pipeline is a sequence of five steps:

1. Define workers

Start by defining the work that will be performed by individual workers of your stages. The easiest way is to write a function that takes a single task parameter:

def doSomething(task):
   result = f(task)
   return result

The function’s return value becomes result of the stage. If it doesn’t return anything (or None), then the stage is considered a dead-end stage, not producing any output.

The other way is to subclass from OrderedWorker or UnorderedWorker and put the actual work inside the doTask() method:

class MyWorker(mpipe.OrderedWorker):
   def doTask(task):
      result = f(task)
      return result

Just like when using a standalone function, stage result is the return value of doTask(). Another option is to call putResult(). This can be useful if you want your worker to continue processing after registering the stage result:

class MyWorker(mpipe.OrderedWorker):
   def doTask(task):
      result = f(task)
      self.putResult(result)
      # Do some more stuff.

2. Create stage objects

Having defined your workers, the next step is to instantiate stage objects. With standalone work functions, the stage is created with OrderedStage or UnorderedStage.

stage1 = mpipe.OrderedStage(doSomething, 3)

When using worker classes, create a Stage object instead:

stage2 = mpipe.Stage(MyWorker, 4)

In both cases the second argument is the number of processes devoted to the particular stage.

4. Create pipeline object

A pipeline is created by passing the root upstream stage to the Pipeline constructor:

pipe = mpipe(stage1)

Once built, the pipeline has allocated and started all designated processes. At this point the pipeline is waiting for input, its worker processes idle and ready.

5. Operate the pipeline

From this point on, operating the pipeline is solely accomplished by manipulating the Pipeline object. Input tasks are fed using put():

pipe.put(something)

Output results, if any, are fetched using get():

result = pipe.get()

Alternatively, one can iterate the output stream with results() method:

for result in pipe.results():
   print(result)

At some point in manipulating the pipeline, the special task None should be put on it.

pipe.put(None)

This signals the end of input stream and eventually terminates all worker processes, effectively “closing” the pipeline to further input.

The None task can be thought of as a “stop” request. It becomes part of the sequence of input tasks streaming into the pipeline and, like other tasks, it propagates through all stages. However, it is processed in a special way: when it arrives at a stage, it signals all worker processes within to complete any current task they may be running, and to terminate execution. Before the last worker terminates, it propagates the “stop” request to the next downstram stage (or stages, if forked).

The None task should be the last input to the pipeline. After it is added to the stream of tasks, the pipeline continues to process any previous tasks still in the system. After worker processes terminate, results can still be accesses in the usual way (using get() or results()) until the pipeline is emptied. However, any “real” task (i.e. not None) put on the pipeline following the “stop” request will not be processed.