# 1. Importing Required Packages
import json
import openeo
import pandas as pd
import shapely
from openeo.extra.job_management import MultiBackendJobManager, CsvJobDatabaseUpscaling of openEO parametrized proces
This notebook demonstrates how to authenticate with the OpenEO backend, create a spatial grid for a specific region, prepare jobs for geospatial analysis, run them in parallel, and visualize the job statuses using interactive maps.
We will go through the following steps: 1. Importing Required Packages 2. Authentication and Backend Initialization 3. Generating a Spatial Grid for the Antwerp Region 4. Visualizing the spatial grid 5. Preparing Jobs for processing 6. Visualizing Job Status Using Plotly Maps
2. Authentication and Backend Initialization
We start by connecting to the Copernicus Dataspace OpenEO backend and authenticating using OpenID Connect. The MultiBackendJobManager is initialized to manage jobs across multiple backends.
# Authenticate and add the backend
connection = openeo.connect(url="openeofed.dataspace.copernicus.eu").authenticate_oidc()
# initialize the job manager
manager = MultiBackendJobManager()
manager.add_backend("cdse", connection=connection, parallel_jobs=2)Authenticated using refresh token.
3. Generating a Spatial Grid for the Antwerp Region
We define a bounding box for Antwerp in WGS84 coordinates and convert it to UTM (Universal Transverse Mercator) coordinates. A grid is created using these UTM coordinates and then converted back to WGS84 for further processing.
We also save the grid as a GeoJSON file for future use.
# 3. Generate the grid for Antwerp
import geopandas as gpd
from shapely.geometry import box
import numpy as np
from pyproj import Transformer
# Define the bounding box, transformers, and grid size
transformer_to_utm = Transformer.from_crs("epsg:4326", "epsg:32631", always_xy=True)
transformer_to_latlon = Transformer.from_crs("epsg:32631", "epsg:4326", always_xy=True)
min_lon, min_lat = 4.35, 51.10
max_lon, max_lat = 4.45, 51.20
minx, miny = transformer_to_utm.transform(min_lon, min_lat)
maxx, maxy = transformer_to_utm.transform(max_lon, max_lat)
grid_size_m = 5000
x_coords = np.arange(minx, maxx, grid_size_m)
y_coords = np.arange(miny, maxy, grid_size_m)
# Create polygons for the grid
polygons = [box(x, y, x + grid_size_m, y + grid_size_m) for x in x_coords for y in y_coords]
# Create a GeoDataFrame and save it
grid_gdf_utm = gpd.GeoDataFrame({'geometry': polygons}, crs="EPSG:32631")
grid_gdf_latlon = grid_gdf_utm.to_crs("EPSG:4326")
grid_gdf_latlon['id'] = range(len(grid_gdf_latlon))
import os
os.mkdir("resources")
grid_gdf_latlon.to_file("resources/antwerp_grid_5km.geojson", driver="GeoJSON")from plotly.offline import init_notebook_mode, iplot
init_notebook_mode()
import plotly.io as pio
pio.renderers.default = 'iframe'4. Visualizing the Spatial Grid
Using Plotly, we visualize the spatial grid we just created.
# 4. Visualizing the grid using Plotly
import plotly.express as px
bboxes = gpd.read_file("./resources/antwerp_grid_5km.geojson")
fig = px.choropleth_mapbox(
bboxes,
geojson=bboxes.geometry,
locations=bboxes.index,
mapbox_style="carto-positron",
center={"lat": 51.15, "lon": 4.4},
zoom=8,
title="Spatial Grid for Antwerp Region"
)
fig.update_geos(fitbounds="locations")
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()/var/folders/50/09_2zmx12zj6ks4fdl4y9wgc0000gn/T/ipykernel_88411/2536672382.py:6: DeprecationWarning:
*choropleth_mapbox* is deprecated! Use *choropleth_map* instead. Learn more at: https://plotly.com/python/mapbox-to-maplibre/
5. Preparing Jobs for processing
For an existing openEO process, we use default parameters as well as our dataframe with spatial extents to initialize the jobs to process.
More documentation on this concept is available here
from openeo.extra.job_management import (
MultiBackendJobManager,
create_job_db,
ProcessBasedJobCreator,
)
# Job creator, based on a parameterized openEO process
# (specified by the remote process definition at given URL)
job_starter = ProcessBasedJobCreator(
namespace="https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/main/algorithm_catalog/vito/bap_composite/openeo_udp/bap_composite.json",
parameter_defaults={
"bands": ["B02", "B03"],
"temporal_extent": ["2023-05-01","2023-07-01"]
},
)
job_db = create_job_db("job_tracker.csv",grid_gdf_latlon,on_exists="skip")
job_db.read()| geometry | id | backend_name | status | start_time | running_start_time | cpu | memory | duration | costs | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | POLYGON ((4.42139 51.09915, 4.42277 51.1441, 4... | 0 | NaN | not_started | NaN | NaN | NaN | NaN | NaN | NaN |
| 1 | POLYGON ((4.42277 51.1441, 4.42415 51.18905, 4... | 1 | NaN | not_started | NaN | NaN | NaN | NaN | NaN | NaN |
| 2 | POLYGON ((4.42415 51.18905, 4.42554 51.234, 4.... | 2 | NaN | not_started | NaN | NaN | NaN | NaN | NaN | NaN |
| 3 | POLYGON ((4.49277 51.09826, 4.49422 51.14321, ... | 3 | NaN | not_started | NaN | NaN | NaN | NaN | NaN | NaN |
| 4 | POLYGON ((4.49422 51.14321, 4.49568 51.18816, ... | 4 | NaN | not_started | NaN | NaN | NaN | NaN | NaN | NaN |
| 5 | POLYGON ((4.49568 51.18816, 4.49713 51.2331, 4... | 5 | NaN | not_started | NaN | NaN | NaN | NaN | NaN | NaN |
6. Visualizing Job Status
Set up a threaded approach to run the jobs and visualise at the same time.
We load the job tracker file and visualize the status of each job in the spatial grid using a Plotly choropleth map.
# Step 5: Initialize job database
import plotly.express as px
import time
from plotly import offline
from IPython.display import clear_output
# Update colors based on job status
color_dict = {
"not_started": 'lightgrey',
"created": 'gold',
"queued": 'lightsteelblue',
"running": 'navy',
"finished": 'lime',
"error": 'darkred',
"skipped": 'darkorange',
None: 'grey' # Default color for no status
}
# Step 6: Start job manager in a separate thread
manager.start_job_thread(start_job=job_starter, job_db=job_db)
# Step 7: Visualization Loop
# Initialize the figure outside the loop
while not manager._stop_thread:
try:
# Read job statuses from the tracker
status_df = job_db.read()
# Use the 'status' column to determine colors, with a fallback for NaNs or None
status_df['color'] = status_df['status'].map(color_dict).fillna(color_dict[None])
minx, miny, maxx, maxy = status_df.total_bounds
center_lat = (miny + maxy) / 2
center_lon = (minx + maxx) / 2
fig = px.choropleth_mapbox(
status_df,
geojson=status_df.geometry.__geo_interface__, # Use the correct GeoJSON representation
locations=status_df.index,
color='status', # Use 'status' for the color
color_discrete_map=color_dict, # Map colors directly from the dictionary
mapbox_style="carto-positron",
center={"lat": center_lat, "lon": center_lon}, # Center on your area of interest
zoom=8,
title="Job Status Overview",
labels={'status': 'Job Status'}
)
fig.update_geos(fitbounds="locations")
fig.update_layout(margin={"r": 0, "t": 0, "l": 0, "b": 0})
# Display the updated figure
clear_output()
offline.iplot(fig)
# Check if all jobs are done
if status_df['status'].isin(["not_started", "created", "queued", "running"]).sum() == 0:
manager.stop_job_thread()
time.sleep(15) # Wait before the next update
except KeyboardInterrupt:
break