Skip to content

blockutils.common

Common methods shared between blocks, especially useful for directory handling and block parameter/query input.

BlockModes (Enum)

Types of block modes: DRY_RUN or DEFAULT.

Important

Find out more about job modes/block modes in our documentation.

TestDirectoryContext (ContextDecorator)

Yields the test directory making sure folders exist and cleans up when context is closed.

__enter__(self) special

Context entry point.

Returns:

Type Description
Path

Temporary test directory.

Source code in blockutils/common.py
def __enter__(self) -> Path:
    """Context entry point.

    Returns:
        Temporary test directory.
    """
    setup_test_directories(self.test_dir)
    return self.test_dir

__exit__(self, *exc) special

Context exit point. Cleans up test subdirs.

Source code in blockutils/common.py
def __exit__(self, *exc):
    """Context exit point. Cleans up test subdirs."""
    setup_test_directories(test_dir=self.test_dir)
    return False

__init__(self, test_dir) special

Examples:

with TestDirectoryContext(Path("/tmp")) as test_dir:
    block.process(dir=Path("/tmp"))

Parameters:

Name Type Description Default
test_dir Path

A directory to store temporary files (usually /tmp or /tmp/e2e_test)

required
Source code in blockutils/common.py
def __init__(self, test_dir: Path):
    """
    Example:
        ```python
        with TestDirectoryContext(Path("/tmp")) as test_dir:
            block.process(dir=Path("/tmp"))
        ```

    Arguments:
        test_dir: A directory to store temporary files (usually `/tmp`
            or `/tmp/e2e_test`)
    """
    self.test_dir = test_dir

decode_str_base64(string)

Inverter function for encode_str_base64

Parameters:

Name Type Description Default
string str

A base64-encoded string

required

Returns:

Type Description
str

A decoded string

Source code in blockutils/common.py
def decode_str_base64(string: str) -> str:
    """
    Inverter function for encode_str_base64

    Arguments:
        string: A base64-encoded string
    Returns:
        A decoded string
    """

    str_encoded_byte = bytes(string, "utf-8")
    str_decoded_byte = base64.b64decode(str_encoded_byte)
    return str_decoded_byte.decode("utf-8")

encode_str_base64(string)

A function that encodes strings in base64. The primary use case is passing complex environment variables into docker containers. In cases where these env variables are complex json objects including a number of different special characters the process of getting them unharmed into a docker container sometimes fails. Encoding them in base64 is a save method to solve this problem.

Parameters:

Name Type Description Default
string str

A, potentially complex, non-encoded string

required

Returns:

Type Description
str

A base64-encoded string

Source code in blockutils/common.py
def encode_str_base64(string: str) -> str:
    """
    A function that encodes strings in base64. The primary use case is passing complex environment variables into
    docker containers. In cases where these env variables are complex json objects including a number of different
    special characters the process of getting them unharmed into a docker container sometimes fails. Encoding them
    in base64 is a save method to solve this problem.

    Arguments:
        string: A, potentially complex, non-encoded string
    Returns:
        A base64-encoded string
    """
    return base64.b64encode(string.encode("ascii")).decode("ascii")

ensure_data_directories_exist()

Creates required directories for any block input and output (/tmp/input, tmp/output, /tmp/quicklooks).

Source code in blockutils/common.py
def ensure_data_directories_exist():
    """Creates required directories for any block input and output (`/tmp/input`,
    `tmp/output`, `/tmp/quicklooks`).
    """
    Path("/tmp/input/").mkdir(parents=True, exist_ok=True)
    Path("/tmp/output/").mkdir(parents=True, exist_ok=True)
    Path("/tmp/quicklooks/").mkdir(parents=True, exist_ok=True)

get_block_info()

Gets the Block Info variable as a dictionary.

Returns:

Type Description
dict

Block info as a dict.

Source code in blockutils/common.py
def get_block_info() -> dict:
    """Gets the Block Info variable as a dictionary.

    Returns:
        Block info as a dict.
    """
    value_str = str(os.environ.get("UP42_BLOCK_INFO"))
    value_dict = json.loads(value_str)

    return value_dict

get_block_mode()

Gets the task mode from environment variables. If no task mode is set, DEFAULT mode will be returned.

Important

Find out more about job modes/block modes in our documentation.

Returns:

Type Description
str

Block mode.

Source code in blockutils/common.py
def get_block_mode() -> str:
    """Gets the task mode from environment variables. If no task mode is set,
    DEFAULT mode will be returned.

    Important:
        Find out more about job modes/block modes in our
        [documentation](https://docs.up42.com/reference/block-envvars.html#up42-job-mode).

    Returns:
        Block mode.
    """
    value = os.environ.get("UP42_JOB_MODE", BlockModes.DEFAULT.value)
    if value not in [mode.value for mode in BlockModes]:
        value = "DEFAULT"
    return value

get_job_info()

Gets the Job Info variable as a dictionary.

Returns:

Type Description
dict

Job info as a dict.

Source code in blockutils/common.py
def get_job_info() -> dict:
    """Gets the Job Info variable as a dictionary.

    Returns:
        Job info as a dict.
    """
    value_str = str(os.environ.get("UP42_JOB_INFO", "{}"))
    value_dict = json.loads(value_str)

    return value_dict

get_timeperiod(duration=365, start=-365, start_date=None, end_date=None)

Generates time period string from any combinations of duration(days) & start(days), duration(days) & start_date or start_date & end_date parameters. Defaults to 1 year till today.

Most relevant for selecting timeperiods for testing of rolling archives.

Parameters:

Name Type Description Default
duration int

Time period duration in days.

365
start int

Time period start from today in days.

-365
start_date str

Time period start date, format string YYYY-MM-DD.

None
end_date str

Time period end date, format string YYYY-MM-DD.

None

Returns:

Type Description
time period string with variable time and fixed clock time, in the format "2019-07-01T00
Source code in blockutils/common.py
def get_timeperiod(
    duration: int = 365,
    start: int = -365,
    start_date: str = None,
    end_date: str = None,
) -> str:
    """
    Generates time period string from any combinations of duration(days) & start(days),
    duration(days) & start_date or start_date & end_date parameters. Defaults to 1 year till today.

    Most relevant for selecting timeperiods for testing of rolling archives.

    Args:
        duration: Time period duration in days.
        start: Time period start from today in days.
        start_date: Time period start date, format string YYYY-MM-DD.
        end_date: Time period end date, format string YYYY-MM-DD.

    Returns:
        time period string with variable time and fixed clock time, in the format
        "2019-07-01T00:00:00+00:00/2029-07-10T23:59:59+00:00"
    """
    if start_date is not None and end_date is not None:
        start_date = parse(start_date).replace(
            hour=0, minute=0, second=0, microsecond=0
        )
        end_date = parse(end_date).replace(hour=0, minute=0, second=0, microsecond=0)
    elif start_date is not None and duration is not None:
        start_date = parse(start_date).replace(
            hour=0, minute=0, second=0, microsecond=0
        )
        end_date = start_date + timedelta(days=+duration)
    elif duration is not None and start is not None:
        today = datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
        start_date = today + timedelta(days=+start)
        end_date = start_date + timedelta(days=+duration)
    else:
        raise ValueError(
            "Only the combinations `1. duration(months) & start(months), "
            "2. duration(months) & start_date or 3. start_date & end_date` are allowed."
        )

    start_date = start_date.strftime("%Y-%m-%d")
    end_date = end_date.strftime("%Y-%m-%d")

    time = f"{start_date}T00:00:00+00:00/{end_date}T23:59:59+00:00"
    return time

load_metadata()

Get the geojson metadata input.

Returns:

Type Description
FeatureCollection

Object defining input features for block.

Source code in blockutils/common.py
def load_metadata() -> FeatureCollection:
    """Get the geojson metadata input.

    Returns:
        Object defining input features for block.
    """
    ensure_data_directories_exist()
    if os.path.exists("/tmp/input/data.json"):
        with open("/tmp/input/data.json", encoding="utf-8") as fp:
            data = json.loads(fp.read())

        features = []
        for feature in data["features"]:
            features.append(Feature(**feature))

        return FeatureCollection(features)
    else:
        return FeatureCollection([])

load_params()

Get the parameters for the current task directly from the task parameters parameters in UP42_TASK_PARAMETERS environment variable.

Returns:

Type Description
dict

Dictionary of task parameters.

Source code in blockutils/common.py
def load_params() -> dict:
    """Get the parameters for the current task directly from the task
    parameters parameters in `UP42_TASK_PARAMETERS` environment variable.

    Returns:
        Dictionary of task parameters.
    """
    data: str = os.environ.get("UP42_TASK_PARAMETERS", "{}")
    logger.debug(f"Fetching parameters for this block: {data}")
    if data == "":
        data = "{}"
    return json.loads(data)

load_query(validator=<function <lambda> at 0x11aa7c3a0>)

Get the query for the current task directly from the task parameters in UP42_TASK_PARAMETERS environment variable.

Examples:

def val(data: dict) -> bool:
    # Ensure bbox is defined.
    return "bbox" in data

query = load_query(val)

Parameters:

Name Type Description Default
validator Callable

Callable that returns if the loaded query is valid.

<function <lambda> at 0x11aa7c3a0>

Returns:

Type Description
STACQuery

A STACQuery object initialized with the parameters if valid.

Source code in blockutils/common.py
def load_query(validator: Callable = lambda x: True) -> STACQuery:
    """Get the query for the current task directly from the task parameters
    in `UP42_TASK_PARAMETERS` environment variable.

    Example:
        ```python
        def val(data: dict) -> bool:
            # Ensure bbox is defined.
            return "bbox" in data

        query = load_query(val)
        ```

    Arguments:
        validator: Callable that returns if the loaded query is valid.

    Returns:
        A `STACQuery` object initialized with the parameters if valid.
    """
    data: str = os.environ.get("UP42_TASK_PARAMETERS", "{}")
    logger.debug(f"Raw task parameters from UP42_TASK_PARAMETERS are: {data}")
    query_data = json.loads(data)

    return STACQuery.from_dict(query_data, validator)

save_metadata(result)

Save the geojson metadata output.

Parameters:

Name Type Description Default
result FeatureCollection

Output feature collection.

required
Source code in blockutils/common.py
def save_metadata(result: FeatureCollection):
    """Save the geojson metadata output.

    Arguments:
        result: Output feature collection.
    """
    ensure_data_directories_exist()
    with open("/tmp/output/data.json", "w", encoding="utf-8") as fp:
        fp.write(json.dumps(result))

setup_test_directories(test_dir, clean_subdirs=True)

Creates given test directory and empty subdirs "input", "output", "quicklooks".

Parameters:

Name Type Description Default
test_dir Union[str, pathlib.Path]

A directory to store temporary files (usually /tmp or /tmp/e2e_test)

required
clean_subdirs bool

Remove all files in the subdirs "input", "output", "quicklooks".

True

Returns:

Type Description
List[pathlib.Path]

List of sub directories Paths depending onthe selection in sub_dirs.

Source code in blockutils/common.py
def setup_test_directories(
    test_dir: Union[str, Path], clean_subdirs: bool = True
) -> List[Path]:
    """Creates given test directory and empty subdirs "input", "output", "quicklooks".

    Args:
        test_dir: A directory to store temporary files (usually `/tmp` or `/tmp/e2e_test`)
        clean_subdirs: Remove all files in the subdirs "input", "output", "quicklooks".
    Returns:
        List of sub directories Paths depending onthe selection in sub_dirs.
    """
    test_dir = Path(test_dir)
    test_dir.mkdir(parents=True, exist_ok=True)

    subdirs = ["input", "output", "quicklooks"]
    subdir_paths = [test_dir / subdir for subdir in subdirs]
    for subdir_path in subdir_paths:
        if clean_subdirs:
            # Should not remove other files/dirs in test_dir, since often /tmp.
            subdir_path.mkdir(parents=True, exist_ok=True)
            try:
                shutil.rmtree(subdir_path)
            # Deleting subfolder sometimes does not work in temp, then remove all subfiles.
            except (PermissionError, OSError):
                files_to_delete = subdir_path.rglob("*.*")
                for file_path in files_to_delete:
                    file_path.unlink()
        subdir_path.mkdir(parents=True, exist_ok=True)

    return subdir_paths

update_extents(feat_coll)

Updates all geometry extents to reflect actual images

Parameters:

Name Type Description Default
feat_coll FeatureCollection

geojson Feature Collection

required
Source code in blockutils/common.py
def update_extents(feat_coll: FeatureCollection) -> FeatureCollection:
    """
    Updates all geometry extents to reflect actual images

    Arguments:
        feat_coll: geojson Feature Collection

    Returns: A FeatureCollection where image extents reflect actual images
    """
    for feature in feat_coll.features:
        with rio.open(
            os.path.join("/tmp/output", feature.properties["up42.data_path"])
        ) as img_file:
            img_bounds = img_file.bounds
        bounds_trans = warp.transform_bounds(
            img_file.crs, {"init": "epsg:4326"}, *img_bounds
        )

        geom = box(*bounds_trans)
        feature["geometry"] = mapping(geom)
        feature["bbox"] = geom.bounds

    return feat_coll