Source code for step_pipeline.io
"""This module contains classes and methods related to data input & output."""
from abc import ABC, abstractmethod
from enum import Enum
import os
import re
import uuid
[docs]
class Localize(Enum):
"""Constants that represent different options for how to localize files into the running container.
Each 2-tuple consists of a name for the localization approach, and a subdirectory where to put files.
"""
COPY = ("copy", "local_copy")
"""COPY uses the execution backend's default approach to localizing files"""
GSUTIL_COPY = ("gsutil_copy", "local_copy")
"""GSUTIL_COPY runs 'gsutil cp' to localize file(s) from a google bucket path. This requires gsutil to be available
inside the execution container.
"""
HAIL_HADOOP_COPY = ("hail_hadoop_copy", "local_copy")
"""HAIL_HADOOP_COPY uses the Hail hadoop API to copy file(s) from a google bucket path. This requires python3 and
Hail to be installed inside the execution container.
"""
HAIL_BATCH_CLOUDFUSE = ("hail_batch_cloudfuse", "cloudfuse")
"""HAIL_BATCH_CLOUDFUSE use the Hail Batch cloudfuse function to mount a google bucket into the execution container
as a network drive, without copying the files. This Hail Batch service account must have read access to the bucket.
"""
HAIL_BATCH_CLOUDFUSE_VIA_TEMP_BUCKET = ("hail_batch_cloudfuse_via_temp_bucket", "cloudfuse")
"""HAIL_BATCH_CLOUDFUSE_VIA_TEMP_BUCKET is useful for situations where you'd like to use cloudfuse to localize files and
your personal gcloud account has read access to the source bucket, but the Hail Batch service account cannot be
granted read access to that bucket. Since it's possible to run 'gsutil cp' under your personal credentials within
the execution container, but Hail Batch cloudfuse always runs under the Hail Batch service account credentials, this
workaround 1) runs 'gsutil cp' under your personal credentials to copy the source files to a temporary bucket that
you control, and where you have granted read access to the Hail Batch service account 2) uses cloudfuse to mount the
temporary bucket 3) performs computational steps on the mounted data 4) deletes the source files from the temporary
bucket when the Batch job completes.
This localization approach may be useful for situations where you need a large number of jobs and each job processes
a very small piece of a large data file (eg. a few loci in a cram file).
Copying the large file(s) from the source bucket to a temp bucket in the same region is fast and inexpensive, and
only needs to happen once before the jobs run. Each job can then avoid allocating a large disk, and waiting for the
large file to be copied into the container. This approach requires gsutil to be available inside the execution
container.
"""
def __init__(self, label, subdir="local_copy"):
"""Enum constructor.
Args:
label (str): a name
subdir (str): subdirectory where files will be localized within the execution container
"""
self._label = label
self._subdir = subdir
def __str__(self):
return self._label
def __repr__(self):
return self._label
[docs]
def get_subdir_name(self):
"""Returns the subdirectory name passed to the constructor"""
return self._subdir
[docs]
class Delocalize(Enum):
"""Constants that represent different options for how to delocalize file(s) from a running container."""
COPY = "copy"
"""COPY uses the execution backend's default approach to delocalizing files"""
GSUTIL_COPY = "gsutil_copy"
"""GSUTIL_COPY runs 'gsutil cp' to copy the path to a google bucket destination. This requires gsutil to be
available inside the execution container.
"""
HAIL_HADOOP_COPY = "hail_hadoop_copy"
"""HAIL_HADOOP_COPY uses the hail hadoop API to copy file(s) to a google bucket path. This requires python3 and
hail to be installed inside the execution container.
"""
def __str__(self):
return self.value
def __repr__(self):
return self.value
[docs]
class InputType(Enum):
"""Constants that represent the type of a step.input_value(..) arg."""
STRING = "string"
FLOAT = "float"
INT = "int"
BOOL = "boolean"
[docs]
class InputSpecBase(ABC):
"""This is the InputSpec parent class, with subclasses implementing specific types of input specs which contain
metadata about inputs to a Pipeline Step.
"""
def __init__(self, name=None):
"""InputSpec constructor
Args:
name (str): Optional name for this input.
"""
self._uuid = str(uuid.uuid4())
self._name = name
@property
def name(self):
return self._name
@property
def uuid(self):
return self._uuid
@abstractmethod
def __str__(self):
return self._uuid
[docs]
class InputValueSpec(InputSpecBase):
"""An InputValueSpec stores metadata about an input that's not a file path"""
def __init__(
self,
value=None,
name=None,
input_type=InputType.STRING):
"""InputValueSpec constructor
Args:
value: The value.
name (str): Optional name for this input.
input_type (InputType): The input value's type.
"""
super().__init__(name=name)
self._value = value
self._input_type = input_type
def __str__(self):
if self.value is not None:
return self.value
else:
return f"[input:{self._uuid}]"
@property
def value(self):
return self._value
@property
def input_type(self):
return self._input_type
[docs]
class InputSpec(InputSpecBase):
"""An InputSpec stores metadata about an input file or directory"""
def __init__(
self,
source_path=None,
name=None,
localize_by=None,
localization_root_dir=None,
original_source_path=None,
):
"""InputSpec constructor
Args:
source_path (str): Source file or directory to localize. It can be a gs://, http(s)://, or a filesystem path.
The path can include * wildcards where appropriate.
name (str): Optional name for this input.
localize_by (Localize): Approach to use to localize this path.
localization_root_dir (str): This input will be localized to this directory within the container filesystem.
original_source_path (str): Sometimes the file is copied to a different cloud location as part of
localization. This is an optional arg to records its original location.
"""
super().__init__(name=name)
self._source_path = source_path
self._localize_by = localize_by
self._localization_root_dir = localization_root_dir
self._original_source_path = original_source_path or source_path
# these fields are computed based on the source_path
self._source_bucket = None
self._source_path_without_protocol = None
self._source_dir = None
self._filename = None
self._local_dir = None
self._local_path = None
if localize_by and not isinstance(localize_by, Localize):
raise ValueError(f"localize_by arg: {localize_by} is not an instance of the Localize enum")
if localization_root_dir and not isinstance(localization_root_dir, str):
raise ValueError(f"localization_root_dir arg: {localization_root_dir} is not a string")
if source_path is not None:
match = re.match("^([a-zA-Z-_]+)://(.*)", source_path)
if match:
self._source_path_without_protocol = match.group(2)
self._source_bucket = self._source_path_without_protocol.split("/")[0]
elif source_path.startswith("http://") or source_path.startswith("https://"):
self._source_path_without_protocol = re.sub("^http[s]?://", "", source_path).split("?")[0]
else:
self._source_path_without_protocol = source_path
self._source_dir = os.path.dirname(self._source_path_without_protocol)
self._filename = os.path.basename(self._source_path_without_protocol).replace("*", "_._")
self._name = self._name or self._filename
subdir = localize_by.get_subdir_name()
output_dir = os.path.join(localization_root_dir, subdir, self.source_dir.strip("/"))
output_dir = output_dir.replace("*", "___")
self._local_dir = output_dir
self._local_path = os.path.join(output_dir, self.filename)
def __str__(self):
if self.local_path is not None:
return self.local_path
else:
return f"[input:{self._uuid}]"
@property
def source_path(self):
if self._source_path is None:
raise ValueError("source_path not available for this input")
return self._source_path
@property
def original_source_path(self):
if self._original_source_path is None:
raise ValueError("original_source_path not available for this input")
return self._original_source_path
@property
def source_bucket(self):
if self._source_bucket is None:
raise ValueError("source_path not available for this input")
return self._source_bucket
@property
def source_path_without_protocol(self):
if self._source_path_without_protocol is None:
raise ValueError("source_path not available for this input")
return self._source_path_without_protocol
@property
def source_dir(self):
if self._source_dir is None:
raise ValueError("source_path not available for this input")
return self._source_dir
@property
def filename(self):
if self._filename is None:
raise ValueError("source_path not available for this input")
return self._filename
@property
def local_path(self):
if self._local_path is None:
raise ValueError("source_path not available for this input")
return self._local_path
@property
def local_dir(self):
if self._local_dir is None:
raise ValueError("source_path not available for this input")
return self._local_dir
@property
def localize_by(self):
return self._localize_by
@property
def localization_root_dir(self):
return self._localization_root_dir
[docs]
class OutputSpec:
"""An OutputSpec stores metadata about an output file or directory from a Step"""
def __init__(
self,
local_path=None,
output_dir=None,
output_path=None,
name=None,
delocalize_by=None,
optional=False,
download_to_dir=None,
):
"""OutputSpec constructor
Args:
local_path (str): Local (within container) path of file or directory to delocalize. The path can include *
wildcards.
output_dir (str): Optional destination directory.
output_path (str): Optional destination path - either absolute, or relative to output_dir.
name (str): Optional name for this output.
delocalize_by (Delocalize): Approach to use to delocalize this path.
optional (bool): Whether this output is optional.
download_to_dir (str): Optional directory to download the output to.
"""
self._local_path = local_path
self._local_dir = os.path.dirname(local_path)
self._name = name
self._delocalize_by = delocalize_by
self._optional = optional
self._download_to_dir = download_to_dir
if delocalize_by and not isinstance(delocalize_by, Delocalize):
raise ValueError(f"localize_by arg: {delocalize_by} is not an instance of the Delocalize enum")
# define self._output_filename and self._output_filename_including_any_wildcards
if output_path:
self._output_filename = os.path.basename(output_path)
self._output_filename_including_any_wildcards = self._output_filename
elif "*" not in local_path:
self._output_filename = os.path.basename(local_path)
self._output_filename_including_any_wildcards = self._output_filename
else:
self._output_filename = None
self._output_filename_including_any_wildcards = os.path.basename(local_path)
# define self._output_dir and self._output_path and self._output_path_including_any_wildcards
self._output_path_including_any_wildcards = None
if output_path:
self._output_path = output_path
self._output_dir = os.path.dirname(self._output_path)
self._output_path_including_any_wildcards = output_path
elif output_dir:
self._output_dir = output_dir
assert not output_path
if self._output_filename:
self._output_path = os.path.join(output_dir, self._output_filename)
else:
self._output_path = output_dir
if self._output_filename_including_any_wildcards:
self._output_path_including_any_wildcards = os.path.join(
output_dir, self._output_filename_including_any_wildcards)
else:
raise ValueError("Neither output_dir nor output_path were specified.")
if "*" in self._output_path:
raise ValueError(f"output path ({output_path}) cannot contain wildcards (*)")
def __str__(self):
return self.output_path
@property
def output_path(self):
return self._output_path
@property
def output_path_including_any_wildcards(self):
return self._output_path_including_any_wildcards
@property
def output_dir(self):
return self._output_dir
@property
def filename(self):
return self._output_filename
@property
def name(self):
return self._name
@property
def local_path(self):
return self._local_path
@property
def local_dir(self):
return self._local_dir
@property
def delocalize_by(self):
return self._delocalize_by
@property
def optional(self):
return self._optional
@property
def download_to_dir(self):
return self._download_to_dir