The European Space Agency
Home
APEx Application Propagation Environments
Main navigation
  • Algorithm Services
  • Project Environments
    Web Portal
    Geospatial Explorer
    Documentation Hub
    User Forum
    Product Catalogue
    Collaborative Workspaces
  • Resources
    • Algorithm Services Catalogue
    • Data Catalogue
  • Documentation
  • FAQ
  • About APEx
  • News
Contact us
Main navigation
  • Algorithm Services
  • Project Environments
  • Resources
  • Documentation
  • FAQ
  • About APEx
  • News
APEx - Documentation Portal
  1. Guides
  2. Upscaling openEO based services
  • Welcome
  • On-demand EO services
    • Using openEO service
    • APEx-Compliant Platforms
  • Project Tools
    • Use Cases
    • Geospatial Explorer
    • Project Portal
    • User Workspace
    • Interactive Development Environment
    • Product Catalogue
    • Documentation Portal
    • User Forum
  • Algorithm Services
    • On-Demand EO Services
    • Use Cases
    • Algorithm Porting
    • Algorithm Onboarding
    • Algorithm Upscaling
    • Algorithm Enhancement
    • Toolbox Cloudification
    • Algorithm Intercomparison
  • Guides
    • Creating an APEx account
    • Creating APEx single sign-on token
    • Creating openEO based services
    • Creating EOAP based services
    • Upscaling openEO based services
    • Ingesting STAC metadata in APEx Product Catalogue
    • Linking APEx STAC catalogue with an openEO service
    • File format recommendations
  • Interoperability and Compliance Guidelines
    • Definitions & Actors
    • Algorithm Service Development Options
    • Algorithm Developer and Provider Guidelines
    • Algorithm Hosting Platforms Guidelines
    • Geospatial Explorer
    • Federated Business Model
  1. Guides
  2. Upscaling openEO based services

Upscaling of openEO parametrized proces

This notebook demonstrates how to authenticate with the OpenEO backend, create a spatial grid for a specific region, prepare jobs for geospatial analysis, run them in parallel, and visualize the job statuses using interactive maps.

We will go through the following steps: 1. Importing Required Packages 2. Authentication and Backend Initialization 3. Generating a Spatial Grid for the Antwerp Region 4. Visualizing the spatial grid 5. Preparing Jobs for processing 6. Visualizing Job Status Using Plotly Maps

# 1. Importing Required Packages

import json
import openeo
import pandas as pd
import shapely
from openeo.extra.job_management import MultiBackendJobManager, CsvJobDatabase

2. Authentication and Backend Initialization

We start by connecting to the Copernicus Dataspace OpenEO backend and authenticating using OpenID Connect. The MultiBackendJobManager is initialized to manage jobs across multiple backends.

# Authenticate and add the backend
connection = openeo.connect(url="openeofed.dataspace.copernicus.eu").authenticate_oidc()

# initialize the job manager
manager = MultiBackendJobManager()
manager.add_backend("cdse", connection=connection, parallel_jobs=2)
Authenticated using refresh token.

3. Generating a Spatial Grid for the Antwerp Region

We define a bounding box for Antwerp in WGS84 coordinates and convert it to UTM (Universal Transverse Mercator) coordinates. A grid is created using these UTM coordinates and then converted back to WGS84 for further processing.

We also save the grid as a GeoJSON file for future use.

# 3. Generate the grid for Antwerp
import geopandas as gpd
from shapely.geometry import box
import numpy as np
from pyproj import Transformer

# Define the bounding box, transformers, and grid size
transformer_to_utm = Transformer.from_crs("epsg:4326", "epsg:32631", always_xy=True)
transformer_to_latlon = Transformer.from_crs("epsg:32631", "epsg:4326", always_xy=True)

min_lon, min_lat = 4.35, 51.10
max_lon, max_lat = 4.45, 51.20
minx, miny = transformer_to_utm.transform(min_lon, min_lat)
maxx, maxy = transformer_to_utm.transform(max_lon, max_lat)
grid_size_m = 5000

x_coords = np.arange(minx, maxx, grid_size_m)
y_coords = np.arange(miny, maxy, grid_size_m)

# Create polygons for the grid
polygons = [box(x, y, x + grid_size_m, y + grid_size_m) for x in x_coords for y in y_coords]

# Create a GeoDataFrame and save it
grid_gdf_utm = gpd.GeoDataFrame({'geometry': polygons}, crs="EPSG:32631")
grid_gdf_latlon = grid_gdf_utm.to_crs("EPSG:4326")
grid_gdf_latlon['id'] = range(len(grid_gdf_latlon))
import os
#os.mkdir("resources")
grid_gdf_latlon.to_file("resources/antwerp_grid_5km.geojson", driver="GeoJSON")
from plotly.offline import init_notebook_mode, iplot

init_notebook_mode()  
import plotly.io as pio
pio.renderers.default = 'iframe'

4. Visualizing the Spatial Grid

Using Plotly, we visualize the spatial grid we just created.

# 4. Visualizing the grid using Plotly
import plotly.express as px

bboxes = gpd.read_file("./resources/antwerp_grid_5km.geojson")

fig = px.choropleth_mapbox(
    bboxes,
    geojson=bboxes.geometry,
    locations=bboxes.index,
    mapbox_style="carto-positron",
    center={"lat": 51.15, "lon": 4.4},
    zoom=8,
    title="Spatial Grid for Antwerp Region"
)

fig.update_geos(fitbounds="locations")
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()

5. Preparing Jobs for processing

For an existing openEO process, we use default parameters as well as our dataframe with spatial extents to initialize the jobs to process.

More documentation on this concept is available here

from openeo.extra.job_management import (
    MultiBackendJobManager,
    create_job_db,
    ProcessBasedJobCreator,
)

# Job creator, based on a parameterized openEO process
# (specified by the remote process definition at given URL)
job_starter = ProcessBasedJobCreator(
    namespace="https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/main/openeo_udp/bap_composite.json",
    parameter_defaults={
        "bands": ["B02", "B03"],
        "temporal_extent": ["2023-05-01","2023-07-01"]
    },
)

job_db = create_job_db("job_tracker.csv",grid_gdf_latlon,on_exists="skip")
job_db.read()
geometry id status start_time running_start_time cpu memory duration backend_name
0 POLYGON ((4.42139 51.09915, 4.42277 51.14410, ... cdse-j-241105f07d1744dfbffb7df433bf0593 finished 2024-11-05T14:22:06Z 2024-11-05T14:23:52Z 225 cpu-seconds 1137013 mb-seconds 154 seconds cdse
1 POLYGON ((4.42277 51.14410, 4.42415 51.18905, ... cdse-j-2411059214eb4c699616820e3a6cec63 finished 2024-11-05T14:22:36Z 2024-11-05T14:24:38Z 357 cpu-seconds 2171980 mb-seconds 163 seconds cdse
2 POLYGON ((4.42415 51.18905, 4.42554 51.23400, ... cdse-j-2411051f49a6403c887d564dbda02931 finished 2024-11-05T14:25:55Z 2024-11-05T14:27:14Z 170 cpu-seconds 908083 mb-seconds 155 seconds cdse
3 POLYGON ((4.49277 51.09826, 4.49422 51.14321, ... NaN not_started 2024-11-05T14:26:11Z NaN NaN NaN NaN cdse
4 POLYGON ((4.49422 51.14321, 4.49568 51.18816, ... NaN not_started 2024-11-05T14:27:15Z NaN NaN NaN NaN cdse
5 POLYGON ((4.49568 51.18816, 4.49713 51.23310, ... NaN not_started 2024-11-05T14:27:30Z NaN NaN NaN NaN cdse

6. Visualizing Job Status

Set up a threaded approach to run the jobs and visualise at the same time.

We load the job tracker file and visualize the status of each job in the spatial grid using a Plotly choropleth map.

# Step 5: Initialize job database
import plotly.express as px
from shapely import geometry, from_wkt
import json
import geopandas as gpd
import pandas as pd
import time
from plotly import offline
from IPython.display import clear_output

# Update colors based on job status
color_dict = {
    "not_started": 'lightgrey', 
    "created": 'gold', 
    "queued": 'lightsteelblue', 
    "running": 'navy', 
    "finished": 'lime',
    "error": 'darkred',
    "skipped": 'darkorange',
    None: 'grey'  # Default color for no status
}

# Step 6: Start job manager in a separate thread
manager.start_job_thread(start_job=job_starter, job_db=job_db)

# Step 7: Visualization Loop
# Initialize the figure outside the loop

while not manager._stop_thread:
    try:
        # Read job statuses from the tracker
        status_df = job_db.read()

        # Use the 'status' column to determine colors, with a fallback for NaNs or None
        status_df['color'] = status_df['status'].map(color_dict).fillna(color_dict[None])

        minx, miny, maxx, maxy = status_df.total_bounds
        center_lat = (miny + maxy) / 2
        center_lon = (minx + maxx) / 2

        fig = px.choropleth_mapbox(
            status_df,
            geojson=status_df.geometry.__geo_interface__,  # Use the correct GeoJSON representation
            locations=status_df.index,
            color='status',  # Use 'status' for the color
            color_discrete_map=color_dict,  # Map colors directly from the dictionary
            mapbox_style="carto-positron",
            center={"lat": center_lat, "lon": center_lon},  # Center on your area of interest
            zoom=8,
            title="Job Status Overview",
            labels={'status': 'Job Status'} 
        )
        fig.update_geos(fitbounds="locations")
        fig.update_layout(margin={"r": 0, "t": 0, "l": 0, "b": 0})

        # Display the updated figure
        clear_output()
        offline.iplot(fig)

        # Check if all jobs are done
        if status_df['status'].isin(["not_started", "created", "queued", "running"]).sum() == 0:
            manager.stop_job_thread()

        time.sleep(15)  # Wait before the next update

    except KeyboardInterrupt:
        break
Creating EOAP based services
Ingesting STAC metadata in APEx Product Catalogue