Skip to content

blockutils.wmts

Helper for downloading, processing and merging tiles from tile based data sources.

MultiTileMergeHelper

This class is a helper class that allows to pass list of TileMergeHelper object. This is mostly relevant when the datasets has multiple layers (e.g. SENTINEL1GRD with two polarization). Via this object, each layer will be handled separately (fetching tiles, processing them, and merging them) with an additional final step which appends this layers to one tif file to a create a single multi-bands tif. Also, the end result will be in COG format.

TileMergeHelper (TileMergeHelperSettings)

This class is a helper class that allows to fetch tiles via a req function, process them via a process function (a custom process function can be called, or the default process function will be used) ,and finally merge them into a single image in COG format.

__init__(self, tiles, req, process=None, **kwargs) special

Initialize self. See help(type(self)) for accurate signature.

Source code in blockutils/wmts.py
def __init__(
    self,
    tiles: List[Tile],
    req: Callable,
    process: Callable = None,
    **kwargs,
):
    self.tiles = tiles
    self.req = req
    self.process = process
    super().__init__(**kwargs)

get_final_image(self, output_path, return_cog=True)

The main method which merges list of tiles and produces the final merged image.

Source code in blockutils/wmts.py
def get_final_image(self, output_path: Path, return_cog: bool = True):
    """The main method which merges list of tiles and produces the final merged image."""
    with self.tile_dataset() as (tile_tif_list, valid_tiles):
        merged_array, merged_transform, meta = self.merge_tiles(tile_tif_list)

    self.write_merged_tiles(
        output_path,
        merged_array,
        merged_transform,
        meta,
        self.tile_size,
        return_cog,
    )
    return valid_tiles

loop_over_tiles(self)

This method iterates over a list of tiles and applies tile_worker method to fetch and process them. In the case of not being able to fetch the tile (API issues) a retrying mechanism will happen to retry fetching those tiles again. Also for speeding up the fetching process a parallelization option has been implemented. Moreover, if a tile is empty, it will be skipped.

Returns:

Type Description
tile_tif_list

List of path to tiles which are written in tif format. valid_tiles: List of tiles that are NOT empty.

Source code in blockutils/wmts.py
def loop_over_tiles(self):
    """
    This method iterates over a list of tiles and applies tile_worker method to fetch and process them.
    In the case of not being able to fetch the tile (API issues) a retrying mechanism will happen to
    retry fetching those tiles again. Also for speeding up the fetching process a parallelization option
    has been implemented. Moreover, if a tile is empty, it will be skipped.
    Returns:
        tile_tif_list: List of path to tiles which are written in tif format.
        valid_tiles: List of tiles that are NOT empty.
    """
    tile_tif_list = []
    valid_tiles = []
    tiles_not_fetched = []

    active_tiles = self.tiles.copy()

    retry_round = 0
    while retry_round <= self.retries_for_no_fetched_tiles:
        # It initially starts with active_tiles, and among them, there are some tiles that are not fetched
        # which will be saved to tiles_not_fetched list. Then when retrying started, the tile in this list
        # will be assigned to active_tiles (active_tiles will be rewritten) to do the whole fetching process again.
        if retry_round:
            if tiles_not_fetched:
                time.sleep(self.polling_cycle)
                active_tiles = tiles_not_fetched.copy()
                tiles_not_fetched = []
                logger.info(
                    f"Now starting {retry_round}. round of retrying failures"
                )
                logger.info(f"{len(active_tiles)}. tiles will be retried")
            else:
                break

        if not self.parallelize:
            for tile in active_tiles:
                self.tile_worker(
                    tile, tile_tif_list, valid_tiles, tiles_not_fetched
                )
        else:
            with concurrent.futures.ThreadPoolExecutor(
                max_workers=self.max_threads
            ) as executor:
                future_list = []
                for tile in active_tiles:
                    logger.info(f"Tile {tile} added to executor...")
                    future_list.append(
                        executor.submit(
                            lambda t: self.tile_worker(*t),
                            [tile, tile_tif_list, valid_tiles, tiles_not_fetched],
                        )
                    )
                for future in concurrent.futures.as_completed(future_list):
                    try:
                        future.result()
                    except Exception as e:
                        raise e
        retry_round += 1

    logger.info(
        f"There are {len(valid_tiles)} valid data tiles out of {len(self.tiles)}"
    )
    if tiles_not_fetched:
        raise UP42Error(
            SupportedErrors.API_CONNECTION_ERROR,
            "Retrying was unsuccessful. API returned error code 500",  # TODO: Is this assumption from empty list?
        )
    return tile_tif_list, valid_tiles

merge_tiles(tile_file_list) staticmethod

This method merges the list of tif files and returns the merged array, transform and meta for the final writing.

Parameters:

Name Type Description Default
tile_file_list

List of path to tif files. For instance: ["/a/b/1234.tif", "/a/c/5678.tif"]

required
Source code in blockutils/wmts.py
@staticmethod
def merge_tiles(tile_file_list):
    """
    This method merges the list of tif files and returns the merged array,
    transform and meta for the final writing.
    Args:
        tile_file_list: List of path to tif files. For instance: ["/a/b/1234.tif", "/a/c/5678.tif"]
    """
    if not tile_file_list:
        raise UP42Error(SupportedErrors.NO_INPUT_ERROR, "All tiles are empty.")

    tile_file_list_str = [str(tif_file) for tif_file in tile_file_list]
    merged_array, merged_transform = riomerge.merge(tile_file_list_str)
    with rasterio.open(tile_file_list[0]) as src:
        temp_meta = src.meta.copy()

    return merged_array, merged_transform, temp_meta

tile_worker(self, tile, tile_tif_list, valid_tiles, tiles_not_fetched)

This method fetches the tile from a specific url and applies processing on it. Final result should be a path to the tile written in tif format. If user did not specify a process method, the default _process method will called.

Parameters:

Name Type Description Default
tile Tile

A tuple of x,y,z information for the tile. For instance: Tile(x=41955, y=101467, z=18)

required
tile_tif_list List[pathlib.Path]

List for saving the tiles that were fetched and stored in the tif format.

required
valid_tiles List[mercantile.Tile]

List of tiles that are NOT empty.

required
tiles_not_fetched List[mercantile.Tile]

List of tiles that were not fetched due to API issues.

required
Source code in blockutils/wmts.py
def tile_worker(
    self,
    tile: Tile,
    tile_tif_list: List[Path],
    valid_tiles: List[Tile],
    tiles_not_fetched: List[Tile],
):
    """
    This method fetches the tile from a specific url and applies processing on it.
    Final result should be a path to the tile written in tif format. If user did not specify a process
    method, the default _process method will called.
    Args:
        tile: A tuple of x,y,z information for the tile. For instance: Tile(x=41955, y=101467, z=18)
        tile_tif_list: List for saving the tiles that were fetched and stored in the tif format.
        valid_tiles: List of tiles that are NOT empty.
        tiles_not_fetched: List of tiles that were not fetched due to API issues.
    """
    try:
        response = self.req(tile, **self.req_kwargs)
        try:
            if self.process:
                if "tile_size" in self.process.__code__.co_varnames:
                    tile_tif_list.append(
                        self.process(response, tile, tile_size=self.tile_size)
                    )
                else:
                    tile_tif_list.append(self.process(response, tile))
            else:
                tile_tif_list.append(self._process(response, tile))
            valid_tiles.append(tile)
        except TileIsEmptyError:
            logger.info(f"{tile} is emtpy, Skipping ...")
    except TileNotFetchedError:
        tiles_not_fetched.append(tile)

write_merged_tiles(self, output_path, merged_array, merged_transform, meta, tile_size, apply_to_cog=True)

This method writes the merged tiles in a file. The final result will be in COG format.

Parameters:

Name Type Description Default
output_path

Path to save the final image.

required
merged_array

Numpy array of merged tiles.

required
merged_transform

Transform for merged tiles.

required
meta

Metadata for writing the final image.

required
tile_size

Size of the tile. In most cases 256.

required
apply_to_cog bool

Whether convert a tif to cog tif. Default is true.

True
Source code in blockutils/wmts.py
def write_merged_tiles(
    self,
    output_path,
    merged_array,
    merged_transform,
    meta,
    tile_size,
    apply_to_cog: bool = True,
):
    """
    This method writes the merged tiles in a file. The final result will be in COG format.
    Args:
        output_path: Path to save the final image.
        merged_array: Numpy array of merged tiles.
        merged_transform: Transform for merged tiles.
        meta: Metadata for writing the final image.
        tile_size: Size of the tile. In most cases 256.
        apply_to_cog: Whether convert a tif to cog tif. Default is true.

    """
    _merged_shape: List = list(merged_array.shape)
    if _merged_shape[1] > tile_size and _merged_shape[2] > tile_size:
        while _merged_shape[1] % tile_size != 0:
            # X dimension not divisible by tile_size
            _merged_shape[1] -= 1
            # Remove one pixel
        while _merged_shape[2] % tile_size != 0:
            # Y dimension not divisible by tile_size
            _merged_shape[2] -= 1
            # Remove one pixel

    merged_shape = tuple(_merged_shape)

    meta.update(
        {
            "driver": "GTiff",
            "height": merged_shape[1],
            "width": merged_shape[2],
            "transform": merged_transform,
            "crs": self.crs,
        }
    )
    with rasterio.open(output_path, "w", **meta) as output:
        output.write(merged_array[:, : merged_shape[1], : merged_shape[2]])

    if apply_to_cog:
        to_cog(output_path)

TileMergeHelperSettings dataclass

This class wraps all defaultable attributes of the TileMergeHelper.