MPipe API¶
-
class
mpipe.
OrderedWorker
¶ An OrderedWorker object operates in a stage where the order of output results always matches that of corresponding input tasks.
A worker is linked to its two nearest neighbors – the previous worker and the next – all workers in the stage thusly connected in circular fashion. Input tasks are fetched in this order. Before publishing its result, a worker first waits for its previous neighbor to do the same.
-
doInit
()¶ Implement this method in the subclass in case there’s need for additional initialization after process startup. Since this class inherits from
multiprocessing.Process
, its constructor executes in the spawning process. This method allows additional code to be run in the forked process, before the worker begins processing input tasks.
-
doTask
(task)¶ Implement this method in the subclass with work functionality to be executed on each task object. The implementation can publish the output result in one of two ways, either by 1) calling
putResult()
and returningNone
, or 2) returning the result (other thanNone
).
-
putResult
(result)¶ Register the result by putting it on all the output tubes.
-
-
class
mpipe.
UnorderedWorker
¶ An UnorderedWorker object operates independently of other workers in the stage, fetching the first available task, and publishing its result whenever it is done (without coordinating with neighboring workers). Consequently, the order of output results may not match that of corresponding input tasks.
-
doInit
()¶ Implement this method in the subclass in case there’s need for additional initialization after process startup. Since this class inherits from
multiprocessing.Process
, its constructor executes in the spawning process. This method allows additional code to be run in the forked process, before the worker begins processing input tasks.
-
doTask
(task)¶ Implement this method in the subclass with work to be executed on each task object. The implementation can publish the output result in one of two ways, either by 1) calling
putResult()
and returningNone
, or 2) returning the result (other thanNone
).
-
putResult
(result)¶ Register the result by putting it on all the output tubes.
-
-
class
mpipe.
Stage
(worker_class, size=1, disable_result=False, do_stop_task=False, input_tube=None, **worker_args)¶ The Stage is an assembly of workers of identical functionality.
Create a stage of workers of given worker_class implementation, with size indicating the number of workers within the stage. disable_result overrides any result defined in worker implementation, and does not propagate it downstream (equivalent to the worker producing
None
result).do_stop_task indicates whether the incoming “stop” signal (
None
value) will actually be passed to the worker as a task. When using this option, implement your worker so that, in addition to regular incoming tasks, it handles theNone
value as well. This will be the worker’s final task before the process exits.Any worker initialization arguments are given in worker_args.
-
get
(timeout=None)¶ Retrieve results from all the output tubes.
-
link
(next_stage)¶ Link to the given downstream stage next_stage by adding its input tube to the list of this stage’s output tubes. Return this stage.
-
put
(task)¶ Put task on the stage’s input tube.
-
-
class
mpipe.
OrderedStage
(target, size=1, disable_result=False)¶ A specialized
Stage
, internally creatingOrderedWorker
objects.Constructor takes a function implementing
OrderedWorker.doTask()
.
-
class
mpipe.
UnorderedStage
(target, size=1, disable_result=False, max_backlog=None)¶ A specialized
Stage
, internally creatingUnorderedWorker
objects.Constructor takes a function implementing
UnorderedWorker.doTask()
.
-
class
mpipe.
Pipeline
(input_stage)¶ A pipeline of stages.
Constructor takes the root upstream stage.
-
get
(timeout=None)¶ Return result from the pipeline.
-
put
(task)¶ Put task on the pipeline.
-
results
()¶ Return a generator to iterate over results from the pipeline.
-
-
class
mpipe.
FilterWorker
(stages, max_tasks=1, drop_results=False, cache_results=False)¶ FilterWorker filters input to sub-pipelines.
Constructor takes an iterable of
Stage
objects and creates one pipeline for each stage. The filter then propagates its input task as input into each pipeline, filtered by limiting the number of tasks allowed in the stream of a pipeline, given as max_tasks parameter. Any task in excess is not added to a topped-out pipeline.For every input task (even tasks not propagated to sub-pipelines) the filter stage produces a result. By default, as its result, the filter stage produces a tuple (task, results) where results is a list of results from all pipelines, unless drop_results is True, in which case it ignores any sub-pipeline result, and propagates only the input task.
If drop_results is False, then cache_results flag may be used to save (i.e. cache) last results from pipelines. These are then used as repeated pipeline results when a pipeline does not produce a result upon the current input task.
-
class
mpipe.
FilterStage
(stages, max_tasks=1, drop_results=False, cache_results=False, do_stop_task=True)¶ Single worker stage running
FilterWorker
.