blockutils.wmts
¶
Helper for downloading, processing and merging tiles from tile based data sources.
MultiTileMergeHelper
¶
This class is a helper class that allows to pass list of TileMergeHelper object. This is mostly relevant when the datasets has multiple layers (e.g. SENTINEL1GRD with two polarization). Via this object, each layer will be handled separately (fetching tiles, processing them, and merging them) with an additional final step which appends this layers to one tif file to a create a single multi-bands tif. Also, the end result will be in COG format.
TileMergeHelper (TileMergeHelperSettings)
¶
This class is a helper class that allows to fetch tiles via a req function, process them via a process function (a custom process function can be called, or the default process function will be used) ,and finally merge them into a single image in COG format.
__init__(self, tiles, req, process=None, **kwargs)
special
¶
Initialize self. See help(type(self)) for accurate signature.
Source code in blockutils/wmts.py
def __init__(
self,
tiles: List[Tile],
req: Callable,
process: Callable = None,
**kwargs,
):
self.tiles = tiles
self.req = req
self.process = process
super().__init__(**kwargs)
get_final_image(self, output_path, return_cog=True)
¶
The main method which merges list of tiles and produces the final merged image.
Source code in blockutils/wmts.py
def get_final_image(self, output_path: Path, return_cog: bool = True):
"""The main method which merges list of tiles and produces the final merged image."""
with self.tile_dataset() as (tile_tif_list, valid_tiles):
merged_array, merged_transform, meta = self.merge_tiles(tile_tif_list)
self.write_merged_tiles(
output_path,
merged_array,
merged_transform,
meta,
self.tile_size,
return_cog,
)
return valid_tiles
loop_over_tiles(self)
¶
This method iterates over a list of tiles and applies tile_worker method to fetch and process them. In the case of not being able to fetch the tile (API issues) a retrying mechanism will happen to retry fetching those tiles again. Also for speeding up the fetching process a parallelization option has been implemented. Moreover, if a tile is empty, it will be skipped.
Returns:
Type | Description |
---|---|
tile_tif_list |
List of path to tiles which are written in tif format. valid_tiles: List of tiles that are NOT empty. |
Source code in blockutils/wmts.py
def loop_over_tiles(self):
"""
This method iterates over a list of tiles and applies tile_worker method to fetch and process them.
In the case of not being able to fetch the tile (API issues) a retrying mechanism will happen to
retry fetching those tiles again. Also for speeding up the fetching process a parallelization option
has been implemented. Moreover, if a tile is empty, it will be skipped.
Returns:
tile_tif_list: List of path to tiles which are written in tif format.
valid_tiles: List of tiles that are NOT empty.
"""
tile_tif_list = []
valid_tiles = []
tiles_not_fetched = []
active_tiles = self.tiles.copy()
retry_round = 0
while retry_round <= self.retries_for_no_fetched_tiles:
# It initially starts with active_tiles, and among them, there are some tiles that are not fetched
# which will be saved to tiles_not_fetched list. Then when retrying started, the tile in this list
# will be assigned to active_tiles (active_tiles will be rewritten) to do the whole fetching process again.
if retry_round:
if tiles_not_fetched:
time.sleep(self.polling_cycle)
active_tiles = tiles_not_fetched.copy()
tiles_not_fetched = []
logger.info(
f"Now starting {retry_round}. round of retrying failures"
)
logger.info(f"{len(active_tiles)}. tiles will be retried")
else:
break
if not self.parallelize:
for tile in active_tiles:
self.tile_worker(
tile, tile_tif_list, valid_tiles, tiles_not_fetched
)
else:
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_threads
) as executor:
future_list = []
for tile in active_tiles:
logger.info(f"Tile {tile} added to executor...")
future_list.append(
executor.submit(
lambda t: self.tile_worker(*t),
[tile, tile_tif_list, valid_tiles, tiles_not_fetched],
)
)
for future in concurrent.futures.as_completed(future_list):
try:
future.result()
except Exception as e:
raise e
retry_round += 1
logger.info(
f"There are {len(valid_tiles)} valid data tiles out of {len(self.tiles)}"
)
if tiles_not_fetched:
raise UP42Error(
SupportedErrors.API_CONNECTION_ERROR,
"Retrying was unsuccessful. API returned error code 500", # TODO: Is this assumption from empty list?
)
return tile_tif_list, valid_tiles
merge_tiles(tile_file_list)
staticmethod
¶
This method merges the list of tif files and returns the merged array, transform and meta for the final writing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tile_file_list |
List of path to tif files. For instance: ["/a/b/1234.tif", "/a/c/5678.tif"] |
required |
Source code in blockutils/wmts.py
@staticmethod
def merge_tiles(tile_file_list):
"""
This method merges the list of tif files and returns the merged array,
transform and meta for the final writing.
Args:
tile_file_list: List of path to tif files. For instance: ["/a/b/1234.tif", "/a/c/5678.tif"]
"""
if not tile_file_list:
raise UP42Error(SupportedErrors.NO_INPUT_ERROR, "All tiles are empty.")
tile_file_list_str = [str(tif_file) for tif_file in tile_file_list]
merged_array, merged_transform = riomerge.merge(tile_file_list_str)
with rasterio.open(tile_file_list[0]) as src:
temp_meta = src.meta.copy()
return merged_array, merged_transform, temp_meta
tile_worker(self, tile, tile_tif_list, valid_tiles, tiles_not_fetched)
¶
This method fetches the tile from a specific url and applies processing on it. Final result should be a path to the tile written in tif format. If user did not specify a process method, the default _process method will called.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tile |
Tile |
A tuple of x,y,z information for the tile. For instance: Tile(x=41955, y=101467, z=18) |
required |
tile_tif_list |
List[pathlib.Path] |
List for saving the tiles that were fetched and stored in the tif format. |
required |
valid_tiles |
List[mercantile.Tile] |
List of tiles that are NOT empty. |
required |
tiles_not_fetched |
List[mercantile.Tile] |
List of tiles that were not fetched due to API issues. |
required |
Source code in blockutils/wmts.py
def tile_worker(
self,
tile: Tile,
tile_tif_list: List[Path],
valid_tiles: List[Tile],
tiles_not_fetched: List[Tile],
):
"""
This method fetches the tile from a specific url and applies processing on it.
Final result should be a path to the tile written in tif format. If user did not specify a process
method, the default _process method will called.
Args:
tile: A tuple of x,y,z information for the tile. For instance: Tile(x=41955, y=101467, z=18)
tile_tif_list: List for saving the tiles that were fetched and stored in the tif format.
valid_tiles: List of tiles that are NOT empty.
tiles_not_fetched: List of tiles that were not fetched due to API issues.
"""
try:
response = self.req(tile, **self.req_kwargs)
try:
if self.process:
if "tile_size" in self.process.__code__.co_varnames:
tile_tif_list.append(
self.process(response, tile, tile_size=self.tile_size)
)
else:
tile_tif_list.append(self.process(response, tile))
else:
tile_tif_list.append(self._process(response, tile))
valid_tiles.append(tile)
except TileIsEmptyError:
logger.info(f"{tile} is emtpy, Skipping ...")
except TileNotFetchedError:
tiles_not_fetched.append(tile)
write_merged_tiles(self, output_path, merged_array, merged_transform, meta, tile_size, apply_to_cog=True)
¶
This method writes the merged tiles in a file. The final result will be in COG format.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_path |
Path to save the final image. |
required | |
merged_array |
Numpy array of merged tiles. |
required | |
merged_transform |
Transform for merged tiles. |
required | |
meta |
Metadata for writing the final image. |
required | |
tile_size |
Size of the tile. In most cases 256. |
required | |
apply_to_cog |
bool |
Whether convert a tif to cog tif. Default is true. |
True |
Source code in blockutils/wmts.py
def write_merged_tiles(
self,
output_path,
merged_array,
merged_transform,
meta,
tile_size,
apply_to_cog: bool = True,
):
"""
This method writes the merged tiles in a file. The final result will be in COG format.
Args:
output_path: Path to save the final image.
merged_array: Numpy array of merged tiles.
merged_transform: Transform for merged tiles.
meta: Metadata for writing the final image.
tile_size: Size of the tile. In most cases 256.
apply_to_cog: Whether convert a tif to cog tif. Default is true.
"""
_merged_shape: List = list(merged_array.shape)
if _merged_shape[1] > tile_size and _merged_shape[2] > tile_size:
while _merged_shape[1] % tile_size != 0:
# X dimension not divisible by tile_size
_merged_shape[1] -= 1
# Remove one pixel
while _merged_shape[2] % tile_size != 0:
# Y dimension not divisible by tile_size
_merged_shape[2] -= 1
# Remove one pixel
merged_shape = tuple(_merged_shape)
meta.update(
{
"driver": "GTiff",
"height": merged_shape[1],
"width": merged_shape[2],
"transform": merged_transform,
"crs": self.crs,
}
)
with rasterio.open(output_path, "w", **meta) as output:
output.write(merged_array[:, : merged_shape[1], : merged_shape[2]])
if apply_to_cog:
to_cog(output_path)
TileMergeHelperSettings
dataclass
¶
This class wraps all defaultable attributes of the TileMergeHelper.