import os
from typing import Optional
import pystac
from pystac.layout import BestPracticesLayoutStrategy
from pystac.utils import is_absolute_href, make_relative_href
from shapely.geometry import mapping, shape
from stactools.core.copy import copy_catalog, move_asset_file
from stactools.core.copy import move_assets as do_move_assets
[docs]
def merge_items(
source_item: pystac.Item,
target_item: pystac.Item,
move_assets: bool = False,
ignore_conflicts: bool = False,
) -> None:
"""Merges the assets from ``source_item`` into ``target_item``.
The geometry and bounding box of the items will also be merged.
Args:
source_item (pystac.Item):
The Item that will be merged into target_item. This item is not
mutated in this operation.
target_item (pystac.Item):
The target item that will be merged into. This item will be mutated
in this operation.
move_assets (bool): If true, move the asset files alongside the target item.
ignore_conflicts (bool):
If True, assets with the same keys will not be merged, and asset
files that would be moved to overwrite an existing file will not be
moved. If False, either of these situations will throw an error.
"""
target_item_href = target_item.get_self_href()
if target_item_href is None:
raise ValueError(f"Target Item {target_item.id} must have an HREF for merge")
for key, asset in source_item.assets.items():
if key in target_item.assets:
if ignore_conflicts:
continue
else:
raise Exception(
"Target item {} already has asset with key {}, "
"cannot merge asset in from {}".format(
target_item, key, source_item
)
)
else:
asset_href = asset.get_absolute_href()
if asset_href is None:
raise ValueError(f"Asset {asset.title} must have an HREF for merge")
if move_assets:
new_asset_href = move_asset_file(
target_item, asset_href, ignore_conflicts=ignore_conflicts
)
else:
if not is_absolute_href(asset.href):
asset_href = make_relative_href(asset_href, target_item_href)
new_asset_href = asset_href
new_asset = asset.clone()
new_asset.href = new_asset_href
target_item.add_asset(key, new_asset)
source_geom = shape(source_item.geometry)
target_geom = shape(target_item.geometry)
union_geom = source_geom.union(target_geom).buffer(0)
target_item.geometry = mapping(union_geom)
target_item.bbox = list(union_geom.bounds)
[docs]
def merge_all_items(
source_catalog: pystac.Catalog,
target_catalog: pystac.Catalog,
move_assets: bool = False,
ignore_conflicts: bool = False,
as_child: bool = False,
child_folder: Optional[str] = None,
) -> pystac.Catalog:
"""Merge all items from ``source_catalog`` into ``target_catalog``.
Calls :py:mod:`stactools.core.merge.merge_items` on any items that have the
same ID between the two catalogs. Any items that don't exist in the
target_catalog will be added to the target_catalog. If the target_catalog
is a Collection, it will be set as the collection of any new items.
Args:
source_catalog (pystac.Catalog or pystac.Collection):
The catalog or collection that items will be drawn from to merge
into the target catalog. This catalog is not mutated in this
operation.
target_catalog (pystac.Catalog or pystac.Collection):
The target catalog that will be merged into. This catalog will not
be mutated in this operation.
move_assets (bool): If true, move the asset files alongside the target item.
ignore_conflicts (bool):
If True, assets with the same keys will not be merged, and asset
files that would be moved to overwrite an existing file will not be
moved. If False, either of these situations will throw an error.
as_child (bool):
If True, a child catalog will be added with the content of the
source catalog. Otherwise, items will be added directly to the
destination catalog.
child_folder (Optional[str]):
Name of the subfolder to use in case the as_child option is set to
True. If None, the id of the catalog will be used as folder name.
Returns:
pystac.Catalog or pystac.Collection: The ``target_catalog``
"""
source_items = source_catalog.get_items(recursive=True)
ids_to_items = {item.id: item for item in source_items}
parent_dir = os.path.dirname(target_catalog.self_href)
if as_child:
child_dir = os.path.join(parent_dir, child_folder or source_catalog.id)
copy_catalog(
source_catalog, child_dir, source_catalog.catalog_type, move_assets
)
child_catalog_path = os.path.join(
child_dir, os.path.basename(source_catalog.self_href)
)
new_source_catalog = pystac.read_file(child_catalog_path)
if not isinstance(new_source_catalog, pystac.Catalog):
raise ValueError(
f"Child catalog {child_catalog_path} is not a STAC Catalog"
)
source_catalog = new_source_catalog
target_catalog.add_child(source_catalog, source_catalog.title)
else:
for item in target_catalog.get_items(recursive=True):
source_item = ids_to_items.get(item.id)
if source_item is not None:
merge_items(
source_item,
item,
move_assets=move_assets,
ignore_conflicts=ignore_conflicts,
)
del ids_to_items[item.id]
# Process source items that did not match existing target items
layout_strategy = BestPracticesLayoutStrategy()
for item in ids_to_items.values():
item_copy = item.clone()
item_copy.set_self_href(
layout_strategy.get_item_href(item_copy, parent_dir)
)
target_catalog.add_item(item_copy)
if isinstance(target_catalog, pystac.Collection):
item_copy.set_collection(target_catalog)
else:
item_copy.set_collection(None)
if move_assets:
do_move_assets(item_copy, copy=False)
if isinstance(target_catalog, pystac.Collection):
target_catalog.update_extent_from_items()
return target_catalog