The European Space Agency
Home
APEx Application Propagation Environments
Main navigation
  • Algorithm Support
  • Project Environments
    QGIS
    CodeServer
    JupyterLab
    Web Portal
    Geospatial Explorer
    Documentation Hub
    User Forum
    Product Catalogue
  • Resources
    • Algorithm Catalogue
    • Data Catalogue
    • Geospatial Explorer
    • Demo Project Environment
  • Community
    • Documentation
    • User Forum
    • FAQ
  • About APEx
    • Mission Statement
    • News
Contact us
Main navigation
  • Algorithm Support
  • Project Environments
  • Resources
  • Community
  • About APEx
APEx - Documentation Portal
  1. Guides
  2. Developer Guides
  3. openEO
  4. Upscaling openEO-based services using the openEO Batch Job Manager
  • Welcome
  • On-demand EO services
    • Using openEO service
    • Supported Platforms
  • Project Environments
    • Use Cases
    • Customisation
    • Accessing your environment
    • Geospatial Explorer
    • Project Web Portal
    • Code Server IDE
    • JupyterLab IDE
    • QGIS as a Remote Desktop
    • Product Catalogue
    • Documentation Portal
    • User Forum
  • Algorithm Support
    • On-Demand EO Services
    • Use Cases
    • Algorithm Porting
    • Algorithm Onboarding
    • Algorithm Upscaling
    • Algorithm Enhancement
    • Toolbox Cloudification
    • Algorithm Intercomparison
  • Guides
    • Developer Guides
      • Authentication
        • Creating an APEx account
        • Creating APEx single sign-on token
      • openEO
        • Creating openEO based services
        • Upscaling openEO-based services using the openEO Batch Job Manager
      • EOAP
        • Creating EOAP based services
      • APEx Dispatch API
        • Service execution using the APEx Dispatch API
        • Upscaling using the APEx Dispatch API
      • APEx Product Catalogue
        • Ingesting STAC metadata in APEx Product Catalogue
        • Linking APEx Product Catalogue with an openEO service
      • APEx Algorithm Catalogue
        • Registering your service
        • Creating benchmarks for your service
        • APEx Algorithm Dashboard
    • Admin Guides
      • Creating an APEx account
      • Custom domains for your project environment
      • Geospatial Explorer
        • Configuring the APEx Geospatial Explorer
      • Project Web Portal
        • Login to the Project Web Portal and the Drupal content overview
        • Manage web pages or add a new page
        • Edit web pages via paragraphs
        • Add content and/or visuals
        • Add a Call-To-Action (CTA)
        • Add news items and an overview of the latest or all news
        • Add an event and an overview of the latest or all events
        • Add a web form
        • Add a logo (partners) banner
        • Add publications or downloads (files)
        • Edit the menu navigation
        • Edit the footer
        • Customizing the look and feel
        • Manage Partners Block
        • Manage Tags and Themes
        • Customizing Site Name and Slogan
    • Consumer Guides
      • Using the Geospatial Explorer
  • Interoperability and Compliance Guidelines
    • Definitions & Actors
    • Algorithm Service Development Options
    • Algorithm Developer and Provider Guidelines
    • Data Provider Guidelines
    • Algorithm Hosting Platforms Guidelines
    • Geospatial Explorer
    • Federated Business Model
  1. Guides
  2. Developer Guides
  3. openEO
  4. Upscaling openEO-based services using the openEO Batch Job Manager

Geospatial Job Management and Visualization with OpenEO

When executing algorithms across large spatial areas, it is often necessary to divide the area of interest into smaller regions and run the algorithm on each region independently. To streamline this process and manage multiple jobs simultaneously, the MultiBackendJobManager was developed.

In this example, we demonstrate how to process an algorithm across a grid of smaller tiles and visualize job statuses using interactive maps. Our use case involves calculating Best Available Pixel Composites, using an openEO Process hosted in the APEx Algorithm Catalogue.

We will go through the following steps: 1. Import the required packages 2. Generate a Spatial Grid for the Antwerp Region 3. Prepare Jobs for Parallel Processing 4. Prepare Job visualization with a Custom Color Mapping 5. Run the Jobs with MultiBackendJobManager

NOTE It is important to highlight that you will need to have access to the Copernicus Data Space Ecosytem in order to execute this notebook.

1. Import the required packages

Before we start, we install the required non-native packages needed this notebook example.

import time
import copy

import geopandas as gpd
from shapely import wkt

import openeo
from openeo.extra.job_management import (
    MultiBackendJobManager,
    create_job_db,
    get_job_db,
    split_area,
)
from openeo.extra.job_management.process_based import ProcessBasedJobCreator

import folium
from IPython.display import clear_output, display

2. Generate a Spatial-Temporal Grid for the Antwerp Region

To manage our large-scale task efficiently, we split a larger area of interest into smaller tiles with the built-in split_area() helper from the openEO Python client.

split_area() returns a GeoDataFrame in the tiling projection. In this notebook, we convert the tiles to WGS 84 afterwards so the geometries can be visualized with Folium and passed cleanly to the parameterized process used by ProcessBasedJobCreator.

TILING_PROJECTION = "EPSG:32631"
AREA_OF_INTEREST = {
    "west": 590_000.0,
    "south": 5_660_000.0,
    "east": 610_000.0,
    "north": 5_680_000.0,
    "crs": TILING_PROJECTION,
}
TILE_SIZE = 5_000

grid_df = split_area(
    aoi=AREA_OF_INTEREST,
    tile_size=TILE_SIZE,
    projection=TILING_PROJECTION,
)

# Convert tiles to WGS 84 for Folium and GeoJSON-style process parameters.
grid_df = grid_df.to_crs("EPSG:4326")
grid_df["id"] = range(len(grid_df))

grid_df
geometry id
0 POLYGON ((4.35633 51.08363, 4.35765 51.12858, ... 0
1 POLYGON ((4.35765 51.12858, 4.35897 51.17353, ... 1
2 POLYGON ((4.35897 51.17353, 4.36029 51.21848, ... 2
3 POLYGON ((4.36029 51.21848, 4.36161 51.26343, ... 3
4 POLYGON ((4.42769 51.08278, 4.42908 51.12773, ... 4
5 POLYGON ((4.42908 51.12773, 4.43047 51.17268, ... 5
6 POLYGON ((4.43047 51.17268, 4.43186 51.21762, ... 6
7 POLYGON ((4.43186 51.21762, 4.43326 51.26257, ... 7
8 POLYGON ((4.49905 51.08189, 4.50051 51.12683, ... 8
9 POLYGON ((4.50051 51.12683, 4.50196 51.17178, ... 9
10 POLYGON ((4.50196 51.17178, 4.50343 51.21673, ... 10
11 POLYGON ((4.50343 51.21673, 4.50489 51.26167, ... 11
12 POLYGON ((4.57041 51.08095, 4.57193 51.1259, 4... 12
13 POLYGON ((4.57193 51.1259, 4.57346 51.17084, 4... 13
14 POLYGON ((4.57346 51.17084, 4.57499 51.21578, ... 14
15 POLYGON ((4.57499 51.21578, 4.57653 51.26073, ... 15

Visualize the tiling grid

We use Folium to create an interactive visualization of the spatial grid. This allows us to examine the layout of tiles across the area of interest and ensure that the grid aligns correctly with our region.

minx, miny, maxx, maxy = grid_df.total_bounds
center_lat = (miny + maxy) / 2
center_lon = (minx + maxx) / 2

grid_map = folium.Map(location=[center_lat, center_lon], zoom_start=9, tiles="CartoDB positron")
folium.GeoJson(
    data=grid_df.__geo_interface__,
    style_function=lambda _: {"color": "#1f77b4", "weight": 1, "fillOpacity": 0.10},
    tooltip=folium.GeoJsonTooltip(fields=["id"], aliases=["Tile ID"]),
).add_to(grid_map)
grid_map.fit_bounds([[miny, minx], [maxy, maxx]])
grid_map
Make this Notebook Trusted to load map: File -> Trust Notebook

3. Prepare Jobs for Parallel Processing

In this example, we create Best Available Pixel composites for every tile. For the compositing workflow itself, we directly use the standardized implementation hosted in the APEx Algorithm Catalogue. The information that we need is the link to the openEO process and the openEO backend, both of which you can find in the Execution Information tab on the catalogue.

APEx Algorithm Catalogue - BAP Execution Information

The jobs database is initialized from the GeoDataFrame returned by split_area() and persisted with create_job_db(), which adds the bookkeeping columns needed by the MultiBackendJobManager.

For more details, see the cookbook sections on Preparing the job database, Job Database, and Job creation based on parameterized processes.

# Make use of the Best Available Pixel openEO process to obtain Sentinel-2 composites
process_graph_url = "https://github.com/ESA-APEx/apex_algorithms/raw/refs/heads/main/algorithm_catalog/vito/bap_composite/openeo_udp/bap_composite.json"
backend = "https://openeofed.dataspace.copernicus.eu"

start_job = ProcessBasedJobCreator(
    namespace=process_graph_url,
    parameter_defaults={
        "temporal_extent": ["2024-06-01", "2024-09-01"],
    },
)

# Initiate MultiBackendJobManager
job_manager = MultiBackendJobManager()
connection = openeo.connect(url=backend).authenticate_oidc()
job_manager.add_backend("cdse", connection=connection, parallel_jobs=2)

# Create the job tracker file
job_tracker = "jobs.csv"
job_db = create_job_db(job_tracker, df=grid_df, on_exists="skip")
Authenticated using refresh token.

4. Prepare Job visualization with a Custom Color Mapping

To effectively monitor the progress of geospatial processing tasks, we define a function to visualize job statuses on an interactive map. This visualization uses Folium, with custom color mappings for each job status, providing a clear overview of the current state of all jobs.

colors = {
    "not_started": "lightgrey",
    "created": "gold",
    "queued": "lightsteelblue",
    "running": "navy",
    "finished": "lime",
    "error": "darkred",
    "skipped": "darkorange",
    "start_failed": "red",
    None: "grey",  # Default color for any undefined status
}

This color scheme makes it easy to distinguish between different statuses. The plot_job_status function generates an interactive map.

import rasterio
import numpy as np
from rasterio.warp import transform_bounds
from pyproj import Transformer
from PIL import Image
import base64
import io
from pathlib import Path

S2_RGB_GAMMA = 2.2
S2_RGB_STRETCH = {
    "red": (0.02, 0.40),
    "green": (0.02, 0.40),
    "blue": (0.02, 0.45),
}


# Define the color mapping for job statuses
def plot_job_status(status_df, color_dict):
    status_plot = copy.deepcopy(status_df)
    if len(status_plot) > 0 and isinstance(status_plot["geometry"].iloc[0], str):
        status_plot["geometry"] = status_plot["geometry"].apply(wkt.loads)
    status_plot = gpd.GeoDataFrame(status_plot, geometry="geometry", crs="EPSG:4326")

    if status_plot.empty:
        return folium.Map(location=[51.15, 4.4], zoom_start=8, tiles="CartoDB positron")

    minx, miny, maxx, maxy = status_plot.total_bounds
    center_lat = (miny + maxy) / 2
    center_lon = (minx + maxx) / 2

    status_map = folium.Map(location=[center_lat, center_lon], zoom_start=8, tiles="CartoDB positron")

    folium.GeoJson(
        data=status_plot.__geo_interface__,
        style_function=lambda feature: {
            "fillColor": color_dict.get(feature["properties"].get("status"), color_dict[None]),
            "color": "#333333",
            "weight": 1,
            "fillOpacity": 0 if feature["properties"].get("status") == "finished" else 0.6,
        },
        tooltip=folium.GeoJsonTooltip(fields=["status"], aliases=["Status"]),
    ).add_to(status_map)

    status_map.fit_bounds([[miny, minx], [maxy, maxx]])

    return status_map

def normalize(
    array: np.ndarray, min_reflectance: float, max_reflectance: float
) -> np.ndarray:
    """Apply one fixed Sentinel-2 reflectance stretch across all tiles."""
    reflectance = array.astype(np.float32) / 10_000.0
    clipped = np.clip(reflectance, min_reflectance, max_reflectance)
    normalized = (clipped - min_reflectance) / (max_reflectance - min_reflectance)
    corrected = np.power(normalized, 1 / S2_RGB_GAMMA)
    return (corrected * 255).astype(np.uint8)

def add_cog_layer(cog_url, name=None, m=None):
    """Add a Cloud-Optimized GeoTIFF layer to the folium map.
    
    Supports both single-band (grayscale) and multi-band (RGB) COGs.
    """
    with rasterio.open(cog_url) as src:
        bounds = transform_bounds(src.crs, "EPSG:4326", *src.bounds)
        num_bands = src.count
        
        # Read RGB bands
        b = normalize(src.read(1), *S2_RGB_STRETCH["blue"])
        g = normalize(src.read(2), *S2_RGB_STRETCH["green"])
        r = normalize(src.read(3), *S2_RGB_STRETCH["red"])
            
        # Stack into RGB image
        rgb_image = np.stack([r, g, b], axis=2).astype(np.uint8)
        img = Image.fromarray(rgb_image, mode='RGB')

    # Convert to PNG data URI
    buf = io.BytesIO()
    img.save(buf, format="PNG")
    data_url = "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode("utf-8")

    # bounds format: (west, south, east, north)
    bbox = [[bounds[1], bounds[0]], [bounds[3], bounds[2]]]
    overlay = folium.raster_layers.ImageOverlay(
        image=data_url,
        bounds=bbox,
        name=name or "COG",
        opacity=0.8
    )
    if m:
        overlay.add_to(m)
    return overlay


def visualise_finished_jobs(output_df, status_map):
    finished_jobs = output_df[output_df["status"] == "finished"]
    for _, job in finished_jobs.iterrows():
        image_path = Path(f"job_{job['id']}/openEO_2024-06-01Z.tif")
        if not image_path.exists():
            print(f"{image_path} does not exist for {job['id']}")
            continue
        add_cog_layer(image_path, f"Result {job['id']}", status_map)
    
    

5. Running the Jobs with MultiBackendJobManager

Finally, we run the jobs using MultiBackendJobManager, which allows us to manage multiple job executions across. As a standard-user, you can run 2 parallel jobs at any time.

Threading is applied to enable the visualization of job statuses while concurrently running openEO jobs. This approach allows the jobs to execute in parallel with the status updates, ensuring that the map is refreshed regularly without blocking job execution. In total there are two threads:

  • Job Execution: The jobs are initiated using the job manager that runs in its own thread. This allows the jobs to be executed asynchronously.
  • Visualization: At the same time, we reopen the persisted job database and update the visualization from the current tracked statuses.

For the public thread lifecycle and stop behavior, see Running in a Background Thread and Job Status Tracking.

# Start a threaded job manager
job_manager.start_job_thread(start_job=start_job, job_db=job_db)

tracked_statuses = [
    "not_started",
    "queued_for_start",
    "created",
    "queued",
    "running",
    "finished",
    "error",
    "canceled",
    "skipped",
    "start_failed",
]
active_statuses = {"not_started", "queued_for_start", "created", "queued", "running"}


try:
    while True:
        status_df = get_job_db(job_tracker).get_by_status(statuses=tracked_statuses)
        status_map = plot_job_status(status_df=status_df, color_dict=colors)
        clear_output(wait=True)
        visualise_finished_jobs(status_df, status_map)
        display(status_map)

        

        # Stop once no jobs are left in an active state.
        if status_df["status"].isin(active_statuses).sum() == 0:
            job_manager.stop_job_thread()
            break

        time.sleep(60)  # Wait before the next update

except KeyboardInterrupt:
    job_manager.stop_job_thread()
Make this Notebook Trusted to load map: File -> Trust Notebook
Creating openEO based services
Creating EOAP based services