# 1. Importing Required Packages
import json
import openeo
import pandas as pd
import shapely
from openeo.extra.job_management import MultiBackendJobManager, CsvJobDatabase
Upscaling of openEO parametrized proces
This notebook demonstrates how to authenticate with the OpenEO backend, create a spatial grid for a specific region, prepare jobs for geospatial analysis, run them in parallel, and visualize the job statuses using interactive maps.
We will go through the following steps: 1. Importing Required Packages 2. Authentication and Backend Initialization 3. Generating a Spatial Grid for the Antwerp Region 4. Visualizing the spatial grid 5. Preparing Jobs for processing 6. Visualizing Job Status Using Plotly Maps
2. Authentication and Backend Initialization
We start by connecting to the Copernicus Dataspace OpenEO backend and authenticating using OpenID Connect. The MultiBackendJobManager
is initialized to manage jobs across multiple backends.
# Authenticate and add the backend
= openeo.connect(url="openeofed.dataspace.copernicus.eu").authenticate_oidc()
connection
# initialize the job manager
= MultiBackendJobManager()
manager "cdse", connection=connection, parallel_jobs=2) manager.add_backend(
Authenticated using refresh token.
3. Generating a Spatial Grid for the Antwerp Region
We define a bounding box for Antwerp in WGS84 coordinates and convert it to UTM (Universal Transverse Mercator) coordinates. A grid is created using these UTM coordinates and then converted back to WGS84 for further processing.
We also save the grid as a GeoJSON file for future use.
# 3. Generate the grid for Antwerp
import geopandas as gpd
from shapely.geometry import box
import numpy as np
from pyproj import Transformer
# Define the bounding box, transformers, and grid size
= Transformer.from_crs("epsg:4326", "epsg:32631", always_xy=True)
transformer_to_utm = Transformer.from_crs("epsg:32631", "epsg:4326", always_xy=True)
transformer_to_latlon
= 4.35, 51.10
min_lon, min_lat = 4.45, 51.20
max_lon, max_lat = transformer_to_utm.transform(min_lon, min_lat)
minx, miny = transformer_to_utm.transform(max_lon, max_lat)
maxx, maxy = 5000
grid_size_m
= np.arange(minx, maxx, grid_size_m)
x_coords = np.arange(miny, maxy, grid_size_m)
y_coords
# Create polygons for the grid
= [box(x, y, x + grid_size_m, y + grid_size_m) for x in x_coords for y in y_coords]
polygons
# Create a GeoDataFrame and save it
= gpd.GeoDataFrame({'geometry': polygons}, crs="EPSG:32631")
grid_gdf_utm = grid_gdf_utm.to_crs("EPSG:4326")
grid_gdf_latlon 'id'] = range(len(grid_gdf_latlon))
grid_gdf_latlon[import os
#os.mkdir("resources")
"resources/antwerp_grid_5km.geojson", driver="GeoJSON") grid_gdf_latlon.to_file(
from plotly.offline import init_notebook_mode, iplot
init_notebook_mode() import plotly.io as pio
= 'iframe' pio.renderers.default
4. Visualizing the Spatial Grid
Using Plotly, we visualize the spatial grid we just created.
# 4. Visualizing the grid using Plotly
import plotly.express as px
= gpd.read_file("./resources/antwerp_grid_5km.geojson")
bboxes
= px.choropleth_mapbox(
fig
bboxes,=bboxes.geometry,
geojson=bboxes.index,
locations="carto-positron",
mapbox_style={"lat": 51.15, "lon": 4.4},
center=8,
zoom="Spatial Grid for Antwerp Region"
title
)
="locations")
fig.update_geos(fitbounds={"r":0,"t":0,"l":0,"b":0})
fig.update_layout(margin fig.show()
5. Preparing Jobs for processing
For an existing openEO process, we use default parameters as well as our dataframe with spatial extents to initialize the jobs to process.
More documentation on this concept is available here
from openeo.extra.job_management import (
MultiBackendJobManager,
create_job_db,
ProcessBasedJobCreator,
)
# Job creator, based on a parameterized openEO process
# (specified by the remote process definition at given URL)
= ProcessBasedJobCreator(
job_starter ="https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/main/openeo_udp/bap_composite.json",
namespace={
parameter_defaults"bands": ["B02", "B03"],
"temporal_extent": ["2023-05-01","2023-07-01"]
},
)
= create_job_db("job_tracker.csv",grid_gdf_latlon,on_exists="skip")
job_db job_db.read()
geometry | id | status | start_time | running_start_time | cpu | memory | duration | backend_name | |
---|---|---|---|---|---|---|---|---|---|
0 | POLYGON ((4.42139 51.09915, 4.42277 51.14410, ... | cdse-j-241105f07d1744dfbffb7df433bf0593 | finished | 2024-11-05T14:22:06Z | 2024-11-05T14:23:52Z | 225 cpu-seconds | 1137013 mb-seconds | 154 seconds | cdse |
1 | POLYGON ((4.42277 51.14410, 4.42415 51.18905, ... | cdse-j-2411059214eb4c699616820e3a6cec63 | finished | 2024-11-05T14:22:36Z | 2024-11-05T14:24:38Z | 357 cpu-seconds | 2171980 mb-seconds | 163 seconds | cdse |
2 | POLYGON ((4.42415 51.18905, 4.42554 51.23400, ... | cdse-j-2411051f49a6403c887d564dbda02931 | finished | 2024-11-05T14:25:55Z | 2024-11-05T14:27:14Z | 170 cpu-seconds | 908083 mb-seconds | 155 seconds | cdse |
3 | POLYGON ((4.49277 51.09826, 4.49422 51.14321, ... | NaN | not_started | 2024-11-05T14:26:11Z | NaN | NaN | NaN | NaN | cdse |
4 | POLYGON ((4.49422 51.14321, 4.49568 51.18816, ... | NaN | not_started | 2024-11-05T14:27:15Z | NaN | NaN | NaN | NaN | cdse |
5 | POLYGON ((4.49568 51.18816, 4.49713 51.23310, ... | NaN | not_started | 2024-11-05T14:27:30Z | NaN | NaN | NaN | NaN | cdse |
6. Visualizing Job Status
Set up a threaded approach to run the jobs and visualise at the same time.
We load the job tracker file and visualize the status of each job in the spatial grid using a Plotly choropleth map.
# Step 5: Initialize job database
import plotly.express as px
from shapely import geometry, from_wkt
import json
import geopandas as gpd
import pandas as pd
import time
from plotly import offline
from IPython.display import clear_output
# Update colors based on job status
= {
color_dict "not_started": 'lightgrey',
"created": 'gold',
"queued": 'lightsteelblue',
"running": 'navy',
"finished": 'lime',
"error": 'darkred',
"skipped": 'darkorange',
None: 'grey' # Default color for no status
}
# Step 6: Start job manager in a separate thread
=job_starter, job_db=job_db)
manager.start_job_thread(start_job
# Step 7: Visualization Loop
# Initialize the figure outside the loop
while not manager._stop_thread:
try:
# Read job statuses from the tracker
= job_db.read()
status_df
# Use the 'status' column to determine colors, with a fallback for NaNs or None
'color'] = status_df['status'].map(color_dict).fillna(color_dict[None])
status_df[
= status_df.total_bounds
minx, miny, maxx, maxy = (miny + maxy) / 2
center_lat = (minx + maxx) / 2
center_lon
= px.choropleth_mapbox(
fig
status_df,=status_df.geometry.__geo_interface__, # Use the correct GeoJSON representation
geojson=status_df.index,
locations='status', # Use 'status' for the color
color=color_dict, # Map colors directly from the dictionary
color_discrete_map="carto-positron",
mapbox_style={"lat": center_lat, "lon": center_lon}, # Center on your area of interest
center=8,
zoom="Job Status Overview",
title={'status': 'Job Status'}
labels
)="locations")
fig.update_geos(fitbounds={"r": 0, "t": 0, "l": 0, "b": 0})
fig.update_layout(margin
# Display the updated figure
clear_output()
offline.iplot(fig)
# Check if all jobs are done
if status_df['status'].isin(["not_started", "created", "queued", "running"]).sum() == 0:
manager.stop_job_thread()
15) # Wait before the next update
time.sleep(
except KeyboardInterrupt:
break