"""This module contains Hail Batch-specific extensions of the Pipeline and Step classes"""
import os
import stat
import tempfile
from enum import Enum
import hailtop.batch as hb
import hailtop.fs as hfs
from .constants import Backend
from .io import InputSpec, InputValueSpec, InputType
from .pipeline import Pipeline, Step, Localize, Delocalize
from .utils import check_gcloud_storage_region, MARK_FILE_SUFFIX
# TODO get latest tag via https://hub.docker.com/v2/repositories/hailgenetics/genetics/tags/ ?
DEFAULT_BASH_IMAGE = DEFAULT_PYTHON_IMAGE = "hailgenetics/hail:0.2.77"
[docs]
class BatchStepType(Enum):
"""Constants that represent different Batch Step types."""
PYTHON = "python"
BASH = "bash"
[docs]
class BatchPipeline(Pipeline):
"""This class contains Hail Batch-specific extensions of the Pipeline class"""
def __init__(self, name=None, config_arg_parser=None, backend=Backend.HAIL_BATCH_SERVICE):
"""
BatchPipeline constructor
Args:
name (str): Pipeline name
config_arg_parser (configargparse): The configargparse.ArgumentParser object to use for defining
command-line args
backend (Backend): Either Backend.HAIL_BATCH_SERVICE or Backend.HAIL_BATCH_LOCAL
"""
super().__init__(name=name, config_arg_parser=config_arg_parser)
batch_args = self.get_config_arg_parser_group("hail batch")
batch_args.add_argument(
"--batch-billing-project",
env_var="BATCH_BILLING_PROJECT",
help="Batch requires a billing project to charge for compute costs. To set up a billing project, email the "
"hail team."
)
batch_args.add_argument(
"--batch-remote-tmpdir",
env_var="BATCH_REMOTE_TMPDIR",
help="Batch requires a temp cloud storage path that it can use to store intermediate files. The Batch "
"service account must have Admin access to this directory. To get the name of your Batch "
"service account, go to https://auth.hail.is/user. Then, to grant Admin permissions, run "
"gsutil iam ch serviceAccount:[SERVICE_ACCOUNT_NAME]:objectAdmin gs://[BUCKET_NAME]"
)
args = self.parse_known_args()
self._backend = backend
self._gcloud_project = args.gcloud_project
self._cancel_after_n_failures = None
self._default_image = DEFAULT_BASH_IMAGE
self._default_python_image = DEFAULT_PYTHON_IMAGE
self._default_memory = None
self._default_cpu = None
self._default_storage = None
self._default_timeout = None
self._default_custom_machine_type = None
self._default_preemptible = None
self._backend_obj = None
@property
def backend(self):
"""Returns either Backend.HAIL_BATCH_SERVICE or Backend.HAIL_BATCH_LOCAL"""
return self._backend
[docs]
def new_step(
self,
name=None,
step_number=None,
arg_suffix=None,
depends_on=None,
image=None,
cpu=None,
memory=None,
storage=None,
always_run=False,
timeout=None,
custom_machine_type=None,
preemptible=None,
output_dir=None,
reuse_job_from_previous_step=None,
localize_by=Localize.COPY,
delocalize_by=Delocalize.COPY,
add_force_command_line_args=True,
add_skip_command_line_args=True,
add_run_subset_command_line_args=True,
all_inputs_precached=False,
all_outputs_precached=False,
):
"""Creates a new pipeline Step.
Args:
name (str): A short name for this Step.
step_number (int): Optional Step number which serves as another alias for this step in addition to name.
arg_suffix (str): Optional suffix for the command-line args that will be created for forcing or skipping
execution of this Step.
depends_on (Step): Optional upstream Step that this Step depends on.
image (str): Docker image to use for this Step.
cpu (str, float, int): CPU requirements. Units are in cpu if cores is numeric.
memory (str, float int): Memory requirements. The memory expression must be of the form {number}{suffix}
where valid optional suffixes are K, Ki, M, Mi, G, Gi, T, Ti, P, and Pi. Omitting a suffix means
the value is in bytes. For the ServiceBackend, the values ‘lowmem’, ‘standard’, and ‘highmem’ are also
valid arguments. ‘lowmem’ corresponds to approximately 1 Gi/core, ‘standard’ corresponds to
approximately 4 Gi/core, and ‘highmem’ corresponds to approximately 7 Gi/core. The default value
is ‘standard’.
storage (str, int): Disk size. The storage expression must be of the form {number}{suffix} where valid
optional suffixes are K, Ki, M, Mi, G, Gi, T, Ti, P, and Pi. Omitting a suffix means the value is in
bytes. For the ServiceBackend, jobs requesting one or more cores receive 5 GiB of storage for the root
file system /. Jobs requesting a fraction of a core receive the same fraction of 5 GiB of storage.
If you need additional storage, you can explicitly request more storage using this method and the extra
storage space will be mounted at /io. Batch automatically writes all ResourceFile to /io.
The default storage size is 0 Gi. The minimum storage size is 0 Gi and the maximum storage size is
64 Ti. If storage is set to a value between 0 Gi and 10 Gi, the storage request is rounded up to 10 Gi.
All values are rounded up to the nearest Gi.
always_run (bool): Set the Step to always run, even if dependencies fail.
timeout (float, int): Set the maximum amount of time this job can run for before being killed.
custom_machine_type (str): Use a custom Cloud machine type, eg. 'n1-highmem-32'
preemptible (bool): Whether to use a preemptible machine type.
output_dir (str): Optional default output directory for Step outputs.
reuse_job_from_previous_step (Step): Optionally, reuse the batch.Job object from this other upstream Step.
localize_by (Localize): If specified, this will be the default Localize approach used by Step inputs.
delocalize_by (Delocalize): If specified, this will be the default Delocalize approach used by Step outputs.
add_force_command_line_args (bool): Whether to add command line args for forcing execution of this Step.
add_skip_command_line_args (bool): Whether to add command line args for skipping execution of this Step.
add_run_subset_command_line_args (bool): Whether to add command line args for running a subset of
parallel jobs from this Step (--run-n-step1, --run-offset-step1).
all_inputs_precached (bool): If True, all inputs for this Step will be assumed to have been checked and
pre-cached already via call(s) to pipeline.precache_file_paths(..). This allows for much faster
processing when deciding which steps need to run and which can be skipped because their outputs
already exist and are newer than their inputs.
all_outputs_precached (bool): Same as the all_inputs_precached argument, but for outputs.
Return:
BatchStep: The new BatchStep object.
"""
step = BatchStep(
self,
name=name,
step_number=step_number,
arg_suffix=arg_suffix,
image=image,
cpu=cpu,
memory=memory,
storage=storage,
always_run=always_run,
timeout=timeout,
custom_machine_type=custom_machine_type,
preemptible=preemptible,
output_dir=self._default_output_dir or output_dir,
reuse_job_from_previous_step=reuse_job_from_previous_step,
localize_by=localize_by,
delocalize_by=delocalize_by,
add_force_command_line_args=add_force_command_line_args,
add_skip_command_line_args=add_skip_command_line_args,
add_run_subset_command_line_args=add_run_subset_command_line_args,
all_inputs_precached=all_inputs_precached,
all_outputs_precached=all_outputs_precached,
)
if depends_on:
step.depends_on(depends_on)
# register the Step
self._all_steps.append(step)
return step
[docs]
def gcloud_project(self, gcloud_project):
"""Set the requester-pays project.
Args:
gcloud_project (str): The name of the Google Cloud project to be billed when accessing requester-pays
buckets.
"""
self._gcloud_project = gcloud_project
return self
[docs]
def cancel_after_n_failures(self, cancel_after_n_failures):
"""Set the cancel_after_n_failures value.
Args:
cancel_after_n_failures: (int): Automatically cancel the batch after N failures have occurred.
"""
self._cancel_after_n_failures = cancel_after_n_failures
return self
[docs]
def default_image(self, default_image):
"""Set the default Docker image to use for Steps in this pipeline.
Args:
default_image (str): Default docker image to use for Bash jobs. This must be the full name
of the image including any repository prefix and tags if desired (default tag is latest).
"""
self._default_image = default_image
return self
[docs]
def default_python_image(self, default_python_image):
"""Set the default image for Python Jobs.
Args:
default_python_image (str): The Docker image to use for Python jobs. The image specified must have the dill
package installed. If default_python_image is not specified, then a Docker image will automatically be
created for you with the base image hailgenetics/python-dill:[major_version].[minor_version]-slim and
the Python packages specified by python_requirements will be installed. The default name of the image
is batch-python with a random string for the tag unless python_build_image_name is specified. If the
ServiceBackend is the backend, the locally built image will be pushed to the repository specified by
image_repository.
"""
self._default_python_image = default_python_image
return self
[docs]
def default_memory(self, default_memory):
"""Set the default memory usage.
Args:
default_memory (int, str): Memory setting to use by default if not specified by a Step.
Only applicable if a docker image is specified for the LocalBackend or the ServiceBackend.
See Job.memory().
"""
self._default_memory = default_memory
return self
[docs]
def default_cpu(self, default_cpu):
"""Set the default cpu requirement.
Args:
default_cpu (float, int, str): CPU setting to use by default if not specified by a job.
Only applicable if a docker image is specified for the LocalBackend or the ServiceBackend.
See Job.cpu().
"""
self._default_cpu = default_cpu
return self
[docs]
def default_storage(self, default_storage):
"""Set the default storage disk size.
Args:
default_storage (str, int): Storage setting to use by default if not specified by a job. Only applicable
for the ServiceBackend. See Job.storage().
"""
self._default_storage = default_storage
return self
[docs]
def default_timeout(self, default_timeout):
"""Set the default job timeout duration.
Args:
default_timeout (int): Maximum time in seconds for a job to run before being killed. Only applicable for the
ServiceBackend. If None, there is no timeout.
"""
self._default_timeout = default_timeout
return self
[docs]
def default_custom_machine_type(self, default_custom_machine_type):
"""Set the default custom machine type.
Args:
default_custom_machine_type (str): Cloud machine type, eg. 'n1-highmem-32'
"""
self._default_custom_machine_type = default_custom_machine_type
return self
[docs]
def default_preemptible(self, default_preemptible):
"""Set whether the custom machine should be preemptible.
Args:
default_preemptible (bool): Whether the custom machine is preemptible.
"""
self._default_preemptible = default_preemptible
return self
[docs]
def run(self):
"""Batch-specific code for submitting the pipeline to the Hail Batch backend"""
print(f"Starting {self.name or ''} pipeline")
super().run()
try:
self._create_batch_obj()
num_steps_transferred = self._transfer_all_steps()
if num_steps_transferred > 0:
result = self._run_batch_obj()
else:
result = None
print("No steps to run.")
self._download_output_files()
return result
finally:
if isinstance(self._backend_obj, hb.ServiceBackend):
try:
self._backend_obj.close()
except:
pass
def _get_localization_root_dir(self, localize_by):
"""Return the top-level root directory where localized files will be copied"""
return "/io"
def _create_batch_obj(self):
"""Instantiate the Hail Batch Backend."""
args = self.parse_known_args()
if self._backend == Backend.HAIL_BATCH_LOCAL:
self._backend_obj = hb.LocalBackend()
elif self._backend == Backend.HAIL_BATCH_SERVICE:
if not args.batch_billing_project:
raise ValueError("--batch-billing-project must be set when --cluster is used")
if not args.batch_remote_tmpdir:
raise ValueError("--batch-remote-tmpdir must be set when --cluster is used")
self._backend_obj = hb.ServiceBackend(
google_project=args.gcloud_project,
billing_project=args.batch_billing_project,
remote_tmpdir=args.batch_remote_tmpdir)
else:
raise Exception(f"Unexpected _backend: {self._backend}")
if args.verbose:
print("Args:")
for key, value in sorted(vars(args).items(), key=lambda x: x[0]):
print(f" {key}: {value}")
print("Creating Batch with the following parameters:")
if self.name: print(f" name: {self.name}")
if self._backend_obj: print(f" backend: {self._backend_obj}")
if self._gcloud_project: print(f" requester_pays_project: {self._gcloud_project}")
if self._cancel_after_n_failures: print(f" cancel_after_n_failures: {self._cancel_after_n_failures}")
if self.default_image: print(f" default_image: {self._default_image}")
if self.default_python_image: print(f" default_python_image: {self._default_python_image}")
if self.default_memory: print(f" default_memory: {self._default_memory}")
if self.default_cpu: print(f" default_cpu: {self._default_cpu}")
if self._default_storage: print(f" default_storage: {self._default_storage}")
if self._default_timeout: print(f" default_timeout: {self._default_timeout}")
self._batch = hb.Batch(
backend=self._backend_obj,
name=self.name,
requester_pays_project=self._gcloud_project, # The name of the Google project to be billed when accessing requester pays buckets.
cancel_after_n_failures=self._cancel_after_n_failures, # Automatically cancel the batch after N failures have occurre
default_image=self._default_image, #(Optional[str]) – Default docker image to use for Bash jobs. This must be the full name of the image including any repository prefix and tags if desired (default tag is latest).
default_python_image=self._default_python_image,
default_memory=self._default_memory, # (Union[int, str, None]) – Memory setting to use by default if not specified by a job. Only applicable if a docker image is specified for the LocalBackend or the ServiceBackend. See Job.memory().
default_cpu=self._default_cpu, # (Union[float, int, str, None]) – CPU setting to use by default if not specified by a job. Only applicable if a docker image is specified for the LocalBackend or the ServiceBackend. See Job.cpu().
default_storage=self._default_storage, # Storage setting to use by default if not specified by a job. Only applicable for the ServiceBackend. See Job.storage().
default_timeout=self._default_timeout, # Maximum time in seconds for a job to run before being killed. Only applicable for the ServiceBackend. If None, there is no timeout.
)
def _run_batch_obj(self):
"""Launch the Hail Batch pipeline and return the result."""
args = self.parse_args()
if self._backend == Backend.HAIL_BATCH_LOCAL:
# Hail Batch LocalBackend mode doesn't support some of the args suported by ServiceBackend
result = self._batch.run(dry_run=args.dry_run, verbose=args.verbose)
elif self._backend == Backend.HAIL_BATCH_SERVICE:
result = self._batch.run(
dry_run=args.dry_run,
verbose=False, # always set to False since hail verbose output is too detailed
delete_scratch_on_exit=None, # If True, delete temporary directories with intermediate files
wait=True, # If True, wait for the batch to finish executing before returning
open=False, # If True, open the UI page for the batch
disable_progress_bar=False, # If True, disable the progress bar.
callback=None, # If not None, a URL that will receive at most one POST request after the entire batch completes.
)
else:
raise Exception(f"Unexpected _backend: {self._backend}")
try:
self._backend_obj.close()
except:
pass
# The Batch pipeline returns an undocumented result object which can be used to retrieve the Job's status code
# and logs
return result
def _transfer_all_steps(self):
"""This method performs the core task of executing a pipeline. It traverses the execution graph (DAG) of
user-defined Steps and decides which steps can be skipped, and which should be executed (ie. transferred to
the execution backend).
"""
num_steps_transferred = super()._transfer_all_steps()
# handle --slack-when-done by adding an always-run job
args = self.parse_known_args()
if args.slack_when_done and num_steps_transferred > 0:
post_to_slack_job = self._batch.new_job(name="post to slack when done")
for step in self._all_steps:
if step._job:
post_to_slack_job.depends_on(step._job)
post_to_slack_job.always_run()
post_to_slack_job.cpu(0.25)
slack_message = f"{self.name} pipeline finished"
post_to_slack_job.command("python3 -m pip install slacker")
post_to_slack_job.command(self._generate_post_to_slack_command(slack_message))
return num_steps_transferred
[docs]
class BatchStep(Step):
"""This class contains Hail Batch-specific extensions of the Step class"""
def __init__(
self,
pipeline,
name=None,
step_number=None,
arg_suffix=None,
image=None,
cpu=None,
memory=None,
storage=None,
always_run=False,
timeout=None,
custom_machine_type=None,
preemptible=None,
output_dir=None,
reuse_job_from_previous_step=None,
localize_by=Localize.COPY,
delocalize_by=Delocalize.COPY,
add_force_command_line_args=True,
add_skip_command_line_args=True,
add_run_subset_command_line_args=True,
all_inputs_precached=False,
all_outputs_precached=False,
):
"""Step constructor.
Args:
pipeline (BatchPipeline): The pipeline that this Step is a part of.
name (str): Step name
step_number (int): optional Step number which serves as another alias for this step in addition to name.
arg_suffix (str): optional suffix for the command-line args that will be created for forcing or skipping
execution of this Step.
image (str): Docker image to use for this Step
cpu (str, float, int): CPU requirements. Units are in cpu if cores is numeric.
memory (str, float int): Memory requirements. The memory expression must be of the form {number}{suffix}
where valid optional suffixes are K, Ki, M, Mi, G, Gi, T, Ti, P, and Pi. Omitting a suffix means
the value is in bytes. For the ServiceBackend, the values ‘lowmem’, ‘standard’, and ‘highmem’ are also
valid arguments. ‘lowmem’ corresponds to approximately 1 Gi/core, ‘standard’ corresponds to
approximately 4 Gi/core, and ‘highmem’ corresponds to approximately 7 Gi/core. The default value
is ‘standard’.
storage (str, int): Disk size. The storage expression must be of the form {number}{suffix} where valid
optional suffixes are K, Ki, M, Mi, G, Gi, T, Ti, P, and Pi. Omitting a suffix means the value is in
bytes. For the ServiceBackend, jobs requesting one or more cores receive 5 GiB of storage for the root
file system /. Jobs requesting a fraction of a core receive the same fraction of 5 GiB of storage.
If you need additional storage, you can explicitly request more storage using this method and the extra
storage space will be mounted at /io. Batch automatically writes all ResourceFile to /io.
The default storage size is 0 Gi. The minimum storage size is 0 Gi and the maximum storage size is
64 Ti. If storage is set to a value between 0 Gi and 10 Gi, the storage request is rounded up to 10 Gi.
All values are rounded up to the nearest Gi.
always_run (bool): Set the Step to always run, even if dependencies fail.
timeout (float, int): Set the maximum amount of time this job can run for before being killed.
custom_machine_type (str):
preemptible (bool):
output_dir (str): Optional default output directory for Step outputs.
reuse_job_from_previous_step (Step): Optionally, reuse the batch.Job object from this other upstream Step.
localize_by (Localize): If specified, this will be the default Localize approach used by Step inputs.
delocalize_by (Delocalize): If specified, this will be the default Delocalize approach used by Step outputs.
add_force_command_line_args (bool): Whether to add command line args for forcing execution of this Step.
add_skip_command_line_args (bool): Whether to add command line args for skipping execution of this Step.
add_run_subset_command_line_args (bool): Whether to add command line args for running a subset of
parallel jobs from this Step (--run-n-step1, --run-offset-step1).
all_inputs_precached (bool): If True, all inputs for this Step will be assumed to have been checked and
pre-cached already via call(s) to pipeline.precache_file_paths(..). This allows for much faster
processing when deciding which steps need to run and which can be skipped because their outputs
already exist and are newer than their inputs.
all_outputs_precached (bool): Same as the all_inputs_precached argument, but for outputs.
"""
super().__init__(
pipeline,
name,
step_number=step_number,
arg_suffix=arg_suffix,
output_dir=output_dir,
localize_by=localize_by,
delocalize_by=delocalize_by,
add_force_command_line_args=add_force_command_line_args,
add_skip_command_line_args=add_skip_command_line_args,
add_run_subset_command_line_args=add_run_subset_command_line_args,
all_inputs_precached=all_inputs_precached,
all_outputs_precached=all_outputs_precached,
)
self._image = image
self._cpu = cpu
self._memory = memory
self._storage = storage
self._set_storage_to_fit_all_inputs = False
self._storage_to_fit_all_inputs_margin = 0
self._always_run = always_run
self._timeout = timeout
self._custom_machine_type = custom_machine_type
self._preemptible = preemptible
self._reuse_job_from_previous_step = reuse_job_from_previous_step
self._job = None
self._output_file_counter = 0
self._paths_localized_via_temp_bucket = set()
self._buckets_mounted_via_cloudfuse = set()
self._step_type = BatchStepType.BASH
self._write_commands_to_script = False
self._regions = None
self._localize_by_copy_already_created_dirs_set = set()
[docs]
def regions(self, *region):
"""Set one or more compute regions.
Args:
region (str): eg. "us-central1".
"""
self._regions = region
[docs]
def cpu(self, cpu):
"""Set the CPU requirement for this Step.
Args:
cpu (str, float, int): CPU requirements. Units are in cpu if cores is numeric.
"""
self._cpu = cpu
return self
[docs]
def memory(self, memory):
"""Set the memory requirement for this Step.
Args:
memory (str, float int): Memory requirements. The memory expression must be of the form {number}{suffix}
where valid optional suffixes are K, Ki, M, Mi, G, Gi, T, Ti, P, and Pi. Omitting a suffix means
the value is in bytes. For the ServiceBackend, the values ‘lowmem’, ‘standard’, and ‘highmem’ are also
valid arguments. ‘lowmem’ corresponds to approximately 1 Gi/core, ‘standard’ corresponds to
approximately 4 Gi/core, and ‘highmem’ corresponds to approximately 7 Gi/core. The default value
is ‘standard’.
"""
self._memory = memory
return self
[docs]
def storage(self, storage):
"""Set the disk size for this Step.
Args:
storage (str, int): Disk size. The storage expression must be of the form {number}{suffix} where valid
optional suffixes are K, Ki, M, Mi, G, Gi, T, Ti, P, and Pi. Omitting a suffix means the value is in
bytes. For the ServiceBackend, jobs requesting one or more cores receive 5 GiB of storage for the root
file system /. Jobs requesting a fraction of a core receive the same fraction of 5 GiB of storage.
If you need additional storage, you can explicitly request more storage using this method and the extra
storage space will be mounted at /io. Batch automatically writes all ResourceFile to /io.
The default storage size is 0 Gi. The minimum storage size is 0 Gi and the maximum storage size is
64 Ti. If storage is set to a value between 0 Gi and 10 Gi, the storage request is rounded up to 10 Gi.
All values are rounded up to the nearest Gi.
"""
if self._set_storage_to_fit_all_inputs:
raise ValueError(f"storage(..) call conflicts with previous set_storage_to_fit_all_inputs(..) call")
self._storage = storage
return self
[docs]
def always_run(self, always_run):
"""Set the always_run parameter for this Step.
Args:
always_run (bool): Set the Step to always run, even if dependencies fail.
"""
self._always_run = always_run
return self
[docs]
def timeout(self, timeout):
"""Set the timeout for this Step.
Args:
timeout (float, int): Set the maximum amount of time this job can run for before being killed.
"""
self._timeout = timeout
return self
[docs]
def custom_machine_type(self, custom_machine_type):
"""Set a custom machine type.
Args:
custom_machine_type (str): Cloud machine type, eg. 'n1-highmem-32'
"""
self._custom_machine_type = custom_machine_type
return self
[docs]
def preemptible(self, preemptible):
"""Set whether the custom machine should be preemptible.
Args:
preemptible (bool): Whether the custom machine is preemptible.
"""
self._preemptible = preemptible
return self
def _transfer_step(self):
"""Submit this Step to the Batch backend. This method is only called if the Step isn't skipped."""
# create Batch Job object
batch = self._pipeline._batch
args = self._pipeline.parse_known_args()
if self._reuse_job_from_previous_step:
# reuse previous Job
if self._reuse_job_from_previous_step._job is None:
raise Exception(f"self._reuse_job_from_previous_step._job object is None")
self._job = self._reuse_job_from_previous_step._job
else:
# create new job
if self._step_type == BatchStepType.PYTHON:
self._job = batch.new_python_job(name=self.name)
elif self._step_type == BatchStepType.BASH:
self._job = batch.new_bash_job(name=self.name)
else:
raise ValueError(f"Unexpected BatchStepType: {self._step_type}")
if self._set_storage_to_fit_all_inputs:
# calculate storage needed to fit all inputs
total_size_in_bytes = self._get_size_of_all_inputs_localized_by_copy()
self._storage = f"{int(total_size_in_bytes / 2**30) + self._storage_to_fit_all_inputs_margin}Gi"
# set execution parameters
if self._image:
self._job.image(self._image)
if self._regions is not None:
self._job.regions(self._regions)
else:
# set the default region to us-central1 to avoid random egress charges
self._job.regions(["us-central1"])
if self._cpu is not None:
if self._cpu < 0.25 or self._cpu > 16:
raise ValueError(f"CPU arg is {self._cpu}. This is outside the range of 0.25 to 16 CPUs")
self._job.cpu(self._cpu) # Batch default is 1
if self._memory is not None:
if isinstance(self._memory, int) or isinstance(self._memory, float):
if self._memory < 0.1 or self._memory > 60:
raise ValueError(f"Memory arg is {self._memory}. This is outside the range of 0.1 to 60 Gb")
self._job.memory(f"{self._memory}Gi") # Batch default is 3.75G
elif isinstance(self._memory, str):
self._job.memory(self._memory)
else:
raise ValueError(f"Unexpected memory arg type: {type(self._memory)}")
custom_machine_type_requested = any(p is not None for p in [
self._custom_machine_type, self._pipeline._default_custom_machine_type,
])
if custom_machine_type_requested:
if self._cpu or self._memory:
raise ValueError("Both a custom_machine_type or preemptible as well as cpu or memory "
"arguments were specified. Only one or the other should be provided.")
self._job._machine_type = self._custom_machine_type or self._pipeline._default_custom_machine_type
if self._preemptible is not None or self._pipeline._default_preemptible is not None:
self._job._preemptible = self._preemptible if self._preemptible is not None else self._pipeline._default_preemptible
if self._storage:
self._job.storage(self._storage)
if self._timeout is not None:
self._job.timeout(self._timeout)
if self._always_run:
self._job.always_run(self._always_run)
# transfer job dependencies
for upstream_step in self._upstream_steps:
if upstream_step._job:
self._job.depends_on(upstream_step._job)
# transfer inputs
for input_spec in self._input_specs:
if args.verbose: print(" "*4 + f"Input: {input_spec.original_source_path} ({input_spec.localize_by})")
self._transfer_input_spec(input_spec)
# transfer commands
if self._write_commands_to_script or len(" ".join(self._commands)) > 5*10**4:
# write script to a temp file
script_file = tempfile.NamedTemporaryFile("wt", prefix="script_", suffix=".sh", encoding="UTF-8", delete=True)
for command in self._commands:
script_file.write(f"{command}\n")
script_file.flush()
# upload script to the temp bucket or output dir
script_temp_gcloud_path = os.path.join(
args.batch_remote_tmpdir,
f"pipeline_{self._pipeline._unique_pipeline_instance_id}/step_{self._unique_step_instance_id}",
os.path.basename(script_file.name))
os.chmod(script_file.name, mode=stat.S_IREAD | stat.S_IEXEC)
hfs.copy(script_file.name, script_temp_gcloud_path)
script_file.close()
if args.verbose: print(" "*4 + f"Will run commands from: {script_temp_gcloud_path}")
script_input_obj = self._job._batch.read_input(script_temp_gcloud_path)
self._job.command(f"bash -c 'source {script_input_obj}'")
else:
for command in self._commands:
command_summary = command
command_summary_line_count = len(command_summary.split("\n"))
if command_summary_line_count > 5:
command_summary = "\n".join(command_summary.split("\n")[:5]) + f"\n... {command_summary_line_count-5} more line(s)"
if args.verbose: print(" "*4 + f"Adding command: {command_summary}")
self._job.command(command)
# transfer outputs
for output_spec in self._output_specs:
self._transfer_output_spec(output_spec)
if args.verbose: print(" "*4 + f"Output: {output_spec} ({output_spec.delocalize_by})")
# clean up any files that were copied to the temp bucket
if self._paths_localized_via_temp_bucket:
cleanup_job_name = f"clean up {len(self._paths_localized_via_temp_bucket)} files"
if self.name:
cleanup_job_name += f" from {self.name}"
cleanup_job = self._pipeline._batch.new_job(name=cleanup_job_name)
cleanup_job.image("docker.io/hailgenetics/genetics:0.2.77")
cleanup_job.depends_on(self._job)
cleanup_job.always_run()
cleanup_job.command("set -x")
cleanup_job.command(f"gcloud auth activate-service-account --key-file /gsa-key/key.json")
for temp_file_path in self._paths_localized_via_temp_bucket:
cleanup_job.command(f"gsutil -m rm -r {temp_file_path}")
self._paths_localized_via_temp_bucket = set()
def _get_supported_localize_by_choices(self):
"""Returns the set of Localize options supported by BatchStep"""
return super()._get_supported_localize_by_choices() | {
Localize.COPY,
Localize.GSUTIL_COPY,
Localize.HAIL_BATCH_CLOUDFUSE,
#Localize.HAIL_BATCH_CLOUDFUSE_VIA_TEMP_BUCKET,
}
def _get_supported_delocalize_by_choices(self):
"""Returns the set of Delocalize options supported by BatchStep"""
return super()._get_supported_delocalize_by_choices() | {
Delocalize.COPY,
Delocalize.GSUTIL_COPY,
}
def _preprocess_input_spec(self, input_spec):
"""This method is called by step.input(..) immediately when the input is first specified, regardless of whether
the Step runs or not. It validates the input_spec's localize_by value and adds any commands to the
Step necessary for performing this localization.
Args:
input_spec (InputSpec): The input to localize.
"""
input_spec = super()._preprocess_input_spec(input_spec)
if input_spec.localize_by == Localize.GSUTIL_COPY:
if not input_spec.original_source_path.startswith("gs://"):
raise ValueError(f"Expected gs:// path but instead found '{input_spec.local_dir}'")
self.gcloud_auth_activate_service_account()
self.command(f"mkdir -p '{input_spec.local_dir}'")
self.command(self._generate_gcloud_copy_command(
input_spec.original_source_path, output_dir=input_spec.local_dir))
self.command(f"ls -lh '{input_spec.local_path}'") # check that file was copied successfully
elif input_spec.localize_by in (
Localize.COPY,
Localize.HAIL_BATCH_CLOUDFUSE):
pass # these will be handled in _transfer_input_spec(..)
elif input_spec.localize_by == Localize.HAIL_BATCH_CLOUDFUSE_VIA_TEMP_BUCKET:
raise ValueError("Localize.HAIL_BATCH_CLOUDFUSE_VIA_TEMP_BUCKET is no longer supported due to changes in gcloud egress charges")
args = self._pipeline.parse_known_args()
source_path = input_spec.source_path
source_path_without_protocol = input_spec.source_path_without_protocol
if not args.batch_remote_tmpdir:
raise ValueError("--batch-remote-tmpdir not specified.")
temp_dir = os.path.join(
args.batch_remote_tmpdir,
f"pipeline_{self._pipeline._unique_pipeline_instance_id}/step_{self._unique_step_instance_id}",
os.path.dirname(source_path_without_protocol).strip("/")+"/")
temp_file_path = os.path.join(temp_dir, input_spec.filename)
if temp_file_path in self._paths_localized_via_temp_bucket:
raise ValueError(f"{source_path} has already been localized via temp bucket.")
self._paths_localized_via_temp_bucket.add(temp_file_path)
# copy file to temp bucket
gsutil_command = self._generate_gcloud_copy_command(source_path, output_dir=temp_dir)
self.command(gsutil_command)
# create an InputSpec with the updated source path
input_spec = InputSpec(
source_path=temp_file_path,
name=input_spec.name,
localize_by=input_spec.localize_by,
localization_root_dir=input_spec.localization_root_dir,
original_source_path=input_spec.source_path,
)
elif input_spec.localize_by not in super()._get_supported_localize_by_choices():
raise ValueError(
f"The hail Batch backend doesn't support input_spec.localize_by={input_spec.localize_by}")
return input_spec
def _transfer_input_spec(self, input_spec):
"""When a Step isn't skipped and is being transferred to the execution backend, this method is called for
each input to the Step. It performs the steps necessary to localize this input.
Args:
input_spec (InputSpec): The input to localize.
"""
super()._transfer_input_spec(input_spec)
args = self._pipeline.parse_known_args()
if args.acceptable_storage_regions and input_spec.source_path.startswith("gs:"):
check_gcloud_storage_region(
input_spec.source_path,
expected_regions=args.acceptable_storage_regions,
gcloud_project=args.gcloud_project,
ignore_access_denied_exception=True,
verbose=args.verbose)
if input_spec.localize_by == Localize.GSUTIL_COPY:
pass # All necessary steps for this option were already handled by self._preprocess_input(..)
elif input_spec.localize_by == Localize.COPY:
if self._step_type == BatchStepType.BASH and input_spec.local_dir not in self._localize_by_copy_already_created_dirs_set:
self._job.command(f"mkdir -p '{input_spec.local_dir}'")
self._localize_by_copy_already_created_dirs_set.add(input_spec.local_dir)
if input_spec.source_path.startswith("http") or input_spec.source_path.startswith("ftp"):
self._job.command(f"curl -L '{input_spec.source_path}' > '{input_spec.local_path}'")
else:
input_spec.read_input_obj = self._job._batch.read_input(input_spec.source_path)
if self._step_type == BatchStepType.BASH:
self._job.command(f"ln -s -f {input_spec.read_input_obj} '{input_spec.local_path}'") # needed to trigger download
echo_done_command = 'echo "Done localizing files via COPY"'
if echo_done_command not in self._commands:
self._commands.insert(0, echo_done_command)
elif input_spec.localize_by in (
Localize.HAIL_BATCH_CLOUDFUSE,
Localize.HAIL_BATCH_CLOUDFUSE_VIA_TEMP_BUCKET):
self._handle_input_transfer_using_cloudfuse(input_spec)
elif input_spec.localize_by == Localize.HAIL_HADOOP_COPY:
self._add_commands_for_hail_hadoop_copy(input_spec.source_path, input_spec.local_dir)
def _get_size_of_all_inputs_localized_by_copy(self):
"""Returns the total size of all the Step's inputs"""
total_size_bytes = 0
for input_spec in self._input_specs:
if input_spec.localize_by not in (Localize.GSUTIL_COPY, Localize.COPY):
continue
input_path = input_spec.source_path
for stat in self._pipeline.check_input_glob(input_path):
total_size_bytes += stat["size_bytes"]
return total_size_bytes
def _generate_gcloud_copy_command(self, source_path, output_dir=None, output_path=None, ignore_nonzero_exit_code=False):
"""Utility method that puts together the gsutil command for copying the given source path to an output path
or directory. Either the output path or the output directory must be provided.
Args:
source_path (str): The source path.
output_dir (str): Output directory.
output_path (str): Output file path.
ignore_nonzero_exit_code (bool): If true, any non-zero exit codes from the gsutil command will be ignored.
Return:
str: gsutil command string
"""
args = self._pipeline.parse_known_args()
gcloud_copy_command = f"gcloud "
if args.gcloud_project:
gcloud_copy_command += f"--project {args.gcloud_project} "
gcloud_copy_command += "storage cp "
if output_path:
destination = output_path
elif output_dir:
destination = output_dir.rstrip("/") + "/"
else:
raise ValueError("Neither output_path nor output_dir arg was specified")
full_gcloud_copy_command = f"{gcloud_copy_command} --recursive \"{source_path}\" \"{destination}\""
if args.gcloud_project and not "gs://gcp-public-data" in source_path:
# try again, this time specifying the billing project for requester-pays buckets
full_gcloud_copy_command = f"({full_gcloud_copy_command} || true) && "
# if the first copy failed, try again with --billing-project and --no-clobber
full_gcloud_copy_command += f"{gcloud_copy_command} "
full_gcloud_copy_command += f"--no-clobber "
full_gcloud_copy_command += f"--billing-project {args.gcloud_project} "
full_gcloud_copy_command += f"--recursive \"{source_path}\" \"{destination}\""
if ignore_nonzero_exit_code:
# try uploading the file. Regardless of whether the upload succeeds or fails, create a .MARK file in the
# destination to indicate that the upload was attempted, and therefore the step can be considered done.
gsutil_command_with_error_handling = (
f"({full_gcloud_copy_command}) || (touch {os.path.basename(source_path)}{MARK_FILE_SUFFIX} && "
f"{gcloud_copy_command} cp '{os.path.basename(source_path)}{MARK_FILE_SUFFIX}' '{destination}{MARK_FILE_SUFFIX}' "
f") || true"
)
return gsutil_command_with_error_handling
else:
return full_gcloud_copy_command
def _handle_input_transfer_using_cloudfuse(self, input_spec):
"""Utility method that implements localizing an input via cloudfuse.
Args:
input_spec (InputSpec): The input to localize.
"""
localize_by = input_spec.localize_by
source_bucket = input_spec.source_bucket
local_root_dir = self._pipeline._get_localization_root_dir(localize_by)
local_mount_dir = os.path.join(local_root_dir, localize_by.get_subdir_name(), source_bucket)
if source_bucket not in self._buckets_mounted_via_cloudfuse:
self._job.command(f"mkdir -p {local_mount_dir}")
self._job.cloudfuse(source_bucket, local_mount_dir, read_only=True)
self._buckets_mounted_via_cloudfuse.add(source_bucket)
def _add_commands_for_hail_hadoop_copy(self, source_path, output_dir):
"""Utility method that implements localizing an input via hl.hadoop_copy.
Args:
source_path (str): The source path.
output_dir (str): Output directory.
"""
#if not hasattr(self, "_already_installed_hail"):
# self.command("python3 -m pip install hail")
#self._already_installed_hail = True
#self.command(f"mkdir -p {output_dir}")
self.command(f"""python3 <<EOF
import hail as hl
hl.init(log='/dev/null', quiet=True)
hl.hadoop_copy("{source_path}", "{output_dir}")
EOF""")
def _preprocess_output_spec(self, output_spec):
"""This method is called by step.output(..) immediately when the output is first specified, regardless of
whether the Step runs or not. It validates the output_spec.
Args:
output_spec (OutputSpec): The output to preprocess.
"""
if output_spec.delocalize_by not in self._get_supported_delocalize_by_choices():
raise ValueError(f"Unexpected output_spec.delocalize_by value: {output_spec.delocalize_by}")
super()._preprocess_output_spec(output_spec)
if output_spec.optional and output_spec.delocalize_by in (Delocalize.COPY, Delocalize.HAIL_HADOOP_COPY):
raise ValueError(f"Delocalize.{output_spec.delocalize_by} isn't supported for optional output: {output_spec}")
if output_spec.delocalize_by == Delocalize.COPY:
# validate path since Batch delocalization doesn't work for gs:// paths with a Local backend.
if self._pipeline.backend == Backend.HAIL_BATCH_LOCAL and any(
output_spec.output_path.startswith(s) for s in ("gs://", "hail-az://")):
raise ValueError("The hail Batch Local backend doesn't support Delocalize.COPY for non-local path: "
f"{output_spec.output_path}")
if not output_spec.filename:
raise ValueError(f"{output_spec} filename isn't specified. It is required for Delocalize.COPY")
elif output_spec.delocalize_by == Delocalize.GSUTIL_COPY:
if not output_spec.output_path.startswith("gs://"):
raise ValueError(f"{output_spec.output_path} Destination path must start with gs://")
self.gcloud_auth_activate_service_account()
self.command(self._generate_gcloud_copy_command(
output_spec.local_path,
output_path=output_spec.output_path,
ignore_nonzero_exit_code=output_spec.optional))
def _transfer_output_spec(self, output_spec):
"""When a Step isn't skipped and is being transferred to the execution backend, this method is called for
each output of the Step. It performs the steps necessary to delocalize the output using the approach requested
by the user via the delocalize_by parameter.
Args:
output_spec (OutputSpec): The output to delocalize.
"""
super()._transfer_output_spec(output_spec)
if output_spec.delocalize_by == Delocalize.COPY:
self._output_file_counter += 1
output_file_obj = self._job[f"ofile{self._output_file_counter}"]
self._job.command(f"cp '{output_spec.local_path}' {output_file_obj}")
if not output_spec.output_dir:
raise ValueError(f"{output_spec} output directory is required for Delocalize.COPY")
if not output_spec.filename:
raise ValueError(f"{output_spec} output filename is required for Delocalize.COPY")
destination_path = os.path.join(output_spec.output_dir, output_spec.filename)
self._job.command(f'echo Copying {output_spec.local_path} to {destination_path}')
self._job._batch.write_output(output_file_obj, destination_path)
elif output_spec.delocalize_by == Delocalize.GSUTIL_COPY:
pass # GSUTIL_COPY was already handled in _preprocess_output_spec(..)
elif output_spec.delocalize_by == Delocalize.HAIL_HADOOP_COPY:
self.command(self._add_commands_for_hail_hadoop_copy(output_spec.local_path, output_spec.output_dir))