MPipe API

class mpipe.OrderedWorker

An OrderedWorker object operates in a stage where the order of output results always matches that of corresponding input tasks.

A worker is linked to its two nearest neighbors – the previous worker and the next – all workers in the stage thusly connected in circular fashion. Input tasks are fetched in this order. Before publishing its result, a worker first waits for its previous neighbor to do the same.

doInit()

Implement this method in the subclass in case there’s need for additional initialization after process startup. Since this class inherits from multiprocessing.Process, its constructor executes in the spawning process. This method allows additional code to be run in the forked process, before the worker begins processing input tasks.

doTask(task)

Implement this method in the subclass with work functionality to be executed on each task object. The implementation can publish the output result in one of two ways, either by 1) calling putResult() and returning None, or 2) returning the result (other than None).

putResult(result)

Register the result by putting it on all the output tubes.

class mpipe.UnorderedWorker

An UnorderedWorker object operates independently of other workers in the stage, fetching the first available task, and publishing its result whenever it is done (without coordinating with neighboring workers). Consequently, the order of output results may not match that of corresponding input tasks.

doInit()

Implement this method in the subclass in case there’s need for additional initialization after process startup. Since this class inherits from multiprocessing.Process, its constructor executes in the spawning process. This method allows additional code to be run in the forked process, before the worker begins processing input tasks.

doTask(task)

Implement this method in the subclass with work to be executed on each task object. The implementation can publish the output result in one of two ways, either by 1) calling putResult() and returning None, or 2) returning the result (other than None).

putResult(result)

Register the result by putting it on all the output tubes.


class mpipe.Stage(worker_class, size=1, disable_result=False, do_stop_task=False, input_tube=None, **worker_args)

The Stage is an assembly of workers of identical functionality.

Create a stage of workers of given worker_class implementation, with size indicating the number of workers within the stage. disable_result overrides any result defined in worker implementation, and does not propagate it downstream (equivalent to the worker producing None result).

do_stop_task indicates whether the incoming “stop” signal (None value) will actually be passed to the worker as a task. When using this option, implement your worker so that, in addition to regular incoming tasks, it handles the None value as well. This will be the worker’s final task before the process exits.

Any worker initialization arguments are given in worker_args.

get(timeout=None)

Retrieve results from all the output tubes.

Link to the given downstream stage next_stage by adding its input tube to the list of this stage’s output tubes. Return this stage.

put(task)

Put task on the stage’s input tube.

class mpipe.OrderedStage(target, size=1, disable_result=False)

A specialized Stage, internally creating OrderedWorker objects.

Constructor takes a function implementing OrderedWorker.doTask().

class mpipe.UnorderedStage(target, size=1, disable_result=False, max_backlog=None)

A specialized Stage, internally creating UnorderedWorker objects.

Constructor takes a function implementing UnorderedWorker.doTask().


class mpipe.Pipeline(input_stage)

A pipeline of stages.

Constructor takes the root upstream stage.

get(timeout=None)

Return result from the pipeline.

put(task)

Put task on the pipeline.

results()

Return a generator to iterate over results from the pipeline.


class mpipe.FilterWorker(stages, max_tasks=1, drop_results=False, cache_results=False)

FilterWorker filters input to sub-pipelines.

Constructor takes an iterable of Stage objects and creates one pipeline for each stage. The filter then propagates its input task as input into each pipeline, filtered by limiting the number of tasks allowed in the stream of a pipeline, given as max_tasks parameter. Any task in excess is not added to a topped-out pipeline.

For every input task (even tasks not propagated to sub-pipelines) the filter stage produces a result. By default, as its result, the filter stage produces a tuple (task, results) where results is a list of results from all pipelines, unless drop_results is True, in which case it ignores any sub-pipeline result, and propagates only the input task.

If drop_results is False, then cache_results flag may be used to save (i.e. cache) last results from pipelines. These are then used as repeated pipeline results when a pipeline does not produce a result upon the current input task.

class mpipe.FilterStage(stages, max_tasks=1, drop_results=False, cache_results=False, do_stop_task=True)

Single worker stage running FilterWorker.