Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

12 job splitters #45

Merged
merged 27 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
54aeadc
First implementation of job splitter and started job manager
GriffinBabe Feb 5, 2024
843e51c
Updated manager/splitter
GriffinBabe Feb 7, 2024
97e31cb
Finished job manager and post-job actions
GriffinBabe Feb 12, 2024
eff07d2
Fix! Blackified
GriffinBabe Feb 12, 2024
16c9e20
One-liner mistake
GriffinBabe Feb 12, 2024
44405de
Added example notebook for the use of the pipeline
GriffinBabe Feb 12, 2024
2d34b24
First version of a creating a STAC collection after feature extractio…
VincentVerelst Feb 22, 2024
0208856
added CDSE staging backend for Sentinel 1 and 2 #18
VincentVerelst Feb 22, 2024
46c5ea9
generated STAC metadata with item for netcdfs with time series #18
VincentVerelst Feb 23, 2024
9b7d853
formatting
VincentVerelst Feb 23, 2024
2c3c9f5
enriched the STAC collection metadata #18
VincentVerelst Feb 27, 2024
fd050b5
formatting
VincentVerelst Feb 27, 2024
3dab8b4
Mulitple fixes on the pipeline
GriffinBabe Mar 4, 2024
eadb4d4
Merge branch '12-job-splitters' into 18-generate-stac
GriffinBabe Mar 4, 2024
cff7e1f
Changed on_job_done function to support post_job_action playing with …
GriffinBabe Mar 7, 2024
d3fb0c6
Merge pull request #52 from Open-EO/18-generate-stac
GriffinBabe Mar 7, 2024
0b54d35
Fix isort
GriffinBabe Mar 7, 2024
1642244
Fix black
GriffinBabe Mar 7, 2024
9a9ab08
Merge branch 'main' into 12-job-splitters
GriffinBabe Mar 7, 2024
1388d54
Blackified according to main branch
GriffinBabe Mar 7, 2024
0809623
Added metadata on post-job action
GriffinBabe Mar 7, 2024
5b775ea
Removed hardcoded title and ID for STAC catalogue in GFMAP manager fo…
GriffinBabe Mar 8, 2024
03aa686
Added processing level to S2
GriffinBabe Mar 8, 2024
ca7b90f
In STAC band names renamed WORLDCEREAL to more generic LABEL
GriffinBabe Mar 8, 2024
dbe6116
Added processing level for S1 too
GriffinBabe Mar 8, 2024
e7c31be
Changed imports for UDF in feature_extractor.py file
GriffinBabe Mar 11, 2024
86bd7c4
output labels for UDF results are now dynamic and can access the inpu…
GriffinBabe Mar 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
902 changes: 902 additions & 0 deletions examples/extraction_pipelines/S2_extraction_example.ipynb

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ dependencies = [
"openeo[localprocessing]",
"cftime",
"pyarrow",
"fastparquet"
"fastparquet",
"h3",
]

[project.urls]
Expand Down
10 changes: 10 additions & 0 deletions src/openeo_gfmap/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Backend(Enum):
TERRASCOPE = "terrascope"
EODC = "eodc" # Dask implementation. Do not test on this yet.
CDSE = "cdse" # Terrascope implementation (pyspark) #URL: openeo.dataspace.copernicus.eu (need to register)
CDSE_STAGING = "cdse-staging"
LOCAL = "local" # Based on the same components of EODc


Expand Down Expand Up @@ -82,6 +83,14 @@ def cdse_connection() -> openeo.Connection:
)


def cdse_staging_connection() -> openeo.Connection:
"""Performs a connection to the CDSE backend using oidc authentication."""
return _create_connection(
url="openeo-staging.dataspace.copernicus.eu",
env_var_suffix="CDSE_STAGING",
)


def eodc_connection() -> openeo.Connection:
"""Perfroms a connection to the EODC backend using the oidc authentication."""
return _create_connection(
Expand All @@ -93,4 +102,5 @@ def eodc_connection() -> openeo.Connection:
BACKEND_CONNECTIONS: Dict[Backend, Callable] = {
Backend.TERRASCOPE: vito_connection,
Backend.CDSE: cdse_connection,
Backend.CDSE_STAGING: cdse_staging_connection,
}
11 changes: 0 additions & 11 deletions src/openeo_gfmap/extractions/__init__.py

This file was deleted.

41 changes: 0 additions & 41 deletions src/openeo_gfmap/extractions/commons.py

This file was deleted.

78 changes: 0 additions & 78 deletions src/openeo_gfmap/extractions/extraction.py

This file was deleted.

129 changes: 0 additions & 129 deletions src/openeo_gfmap/extractions/s2.py

This file was deleted.

25 changes: 18 additions & 7 deletions src/openeo_gfmap/fetching/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,16 @@ def load_collection(
properties=load_collection_parameters,
)
elif fetch_type == FetchType.POLYGON:
assert isinstance(
spatial_extent, GeoJSON
), "Please provide only a GeoJSON FeatureCollection for point based fetching."
assert (
spatial_extent["type"] == "FeatureCollection"
), "Please provide a FeatureCollection type of GeoJSON"
if isinstance(spatial_extent, GeoJSON):
assert (
spatial_extent["type"] == "FeatureCollection"
), "Please provide a FeatureCollection type of GeoJSON"
elif isinstance(spatial_extent, str):
assert spatial_extent.startswith("https://") or spatial_extent.startswith(
"http://"
), "Please provide a valid URL or a path to a GeoJSON file."
else:
raise ValueError("Please provide a valid URL to a GeoParquet or GeoJSON file.")
cube = connection.load_collection(
collection_id=collection_name,
temporal_extent=[temporal_extent.start_date, temporal_extent.end_date],
Expand All @@ -133,6 +137,13 @@ def load_collection(
cube = cube.mask(pre_mask.resample_cube_spatial(cube))

if fetch_type == FetchType.POLYGON:
cube = cube.filter_spatial(spatial_extent)
if isinstance(spatial_extent, str):
geometry = connection.load_url(
spatial_extent,
format="Parquet" if ".parquet" in spatial_extent else "GeoJSON",
)
cube = cube.filter_spatial(geometry)
else:
cube = cube.filter_spatial(spatial_extent)

return cube
4 changes: 4 additions & 0 deletions src/openeo_gfmap/fetching/s1.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ def s1_grd_default_processor(cube: openeo.DataCube, **params):
"default": partial(get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"),
"preprocessor": partial(get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"),
},
Backend.CDSE_STAGING: {
"default": partial(get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"),
"preprocessor": partial(get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"),
},
}


Expand Down
8 changes: 4 additions & 4 deletions src/openeo_gfmap/fetching/s2.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ def s2_l2a_fetch_default(
**params,
)

# Apply if the collection is a GeoJSON Feature collection
if isinstance(spatial_extent, GeoJSON):
cube = cube.filter_spatial(spatial_extent)

return cube

return s2_l2a_fetch_default
Expand Down Expand Up @@ -190,6 +186,10 @@ def s2_l2a_default_processor(cube: openeo.DataCube, **params):
"fetch": partial(get_s2_l2a_default_fetcher, collection_name="SENTINEL2_L2A"),
"preprocessor": partial(get_s2_l2a_default_processor, collection_name="SENTINEL2_L2A"),
},
Backend.CDSE_STAGING: {
"fetch": partial(get_s2_l2a_default_fetcher, collection_name="SENTINEL2_L2A"),
"preprocessor": partial(get_s2_l2a_default_processor, collection_name="SENTINEL2_L2A"),
},
}


Expand Down
6 changes: 6 additions & 0 deletions src/openeo_gfmap/manager/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""OpenEO GFMAP Manager submodule. Implements the logic of splitting the jobs into subjobs and
managing the subjobs.
"""
import logging

_log = logging.getLogger(__name__)
Loading
Loading