Source code for step_pipeline.main
"""This module contains the pipeline(..) function which is the main gateway for users to access the functionality in the
step_pipeline library"""
import configargparse
from .constants import Backend
from .batch import BatchPipeline
from .wdl import WdlPipeline
# for debugging (from https://stackoverflow.com/questions/132058/showing-the-stack-trace-from-a-running-python-application)
# pkill -SIGHUP -f mypythonapp to print stack trace
# pkill -SIGUSR1 -f mypythonapp to print stack trace
import signal
import traceback
#import faulthandler
for sig in signal.SIGUSR1, signal.SIGHUP:
signal.signal(sig, lambda _, stack: traceback.print_stack(stack))
[docs]
def pipeline(name=None, backend=Backend.HAIL_BATCH_SERVICE, config_file_path="~/.step_pipeline"):
"""Creates a pipeline object.
Usage::
with step_pipeline("my pipeline") as sp:
s = sp.new_step(..)
... step definitions ...
# or alternatively:
sp = step_pipeline("my pipeline")
s = sp.new_step(..)
... step definitions ...
sp.run()
Args:
name (str): Pipeline name.
backend (Backend): The backend to use for executing the pipeline.
config_file_path (str): path of a configargparse config file.
Return:
Pipeline: An object that you can use to create Steps by calling `.new_step(..)` and then execute the pipeline by
calling `.run()`
"""
config_arg_parser = configargparse.ArgumentParser(
add_config_file_help=True,
add_env_var_help=True,
formatter_class=configargparse.ArgumentDefaultsHelpFormatter,
default_config_files=[config_file_path],
ignore_unknown_config_file_keys=True,
config_file_parser_class=configargparse.YAMLConfigFileParser,
)
config_arg_parser.add_argument("--backend", help="The backend system to use for executing the pipeline.",
default=Backend.HAIL_BATCH_SERVICE.name, choices=[e.name for e in Backend])
args, _ = config_arg_parser.parse_known_args(ignore_help_args=True)
# create and yield the pipeline
backend = Backend[args.backend] if args.backend else backend
if backend in (Backend.HAIL_BATCH_SERVICE, Backend.HAIL_BATCH_LOCAL):
pipeline = BatchPipeline(name=name, config_arg_parser=config_arg_parser, backend=backend)
elif backend in (Backend.TERRA, Backend.CROMWELL):
pipeline = WdlPipeline(name=name, config_arg_parser=config_arg_parser, backend=backend)
else:
raise ValueError(f"Unknown backend: {args.backend}. Valid options are {Backend.HAIL_BATCH_SERVICE} or "
f"{Backend.HAIL_BATCH_LOCAL}")
return pipeline