Prepare and launch a DPS batch of jobs for a particular algorithm

Goal Provide a template for DPS job submission which will be changed/adapted according to specific algorithms being run in DPS.

Motivation
It’s easier to learn how to run many jobs of your script (where for each job there is some input that changes) if you can first see an example.
Paul Montesano, PhD
June 2024
[1]:
from maap.maap import MAAP
maap = MAAP()
[3]:
import os
import pandas as pd
import glob
import datetime
import sys

Use MAAP Registration call in notebook chunk to register DPS algorithm

  • You need to register the DPS algorithm before first before you loop over jobs that will use it.

  • If you register your algorithm using the Register Algorithm UI in Jupyter, a configuration file (in yml format) will be placed in your workspace home folder, which can then be used as a template for reuse

[ ]:
maap.register_algorithm_from_yaml_file("/projects/.../.../<my_algorithms_yaml_file>.yml").text

Build a dictionary of the argument names and values needed to run the algorithm in the way you want

This can be called a parameters dictionary

  • These will be arguments that the .sh wrapper (which calls your .py or .R code) is hard-coded to accept.

  • The .yml file that you use to Register your algorithm is what connects this parameters dictionary to your .sh wrapper.

  • This combo of the parameters dictionary, the .yml, and the .sh provides a specific (and repeatable) way of running your .py or .R code.

Note: make sure the in_params_dict coincides with the args of your underlying Python/R code

[3]:
in_params_dict = {
            'arg name_1': 'some_value',
            'arg_name_2': 'another_value',
            'in_tile_num': 1
        }

Set up a list of items you want to run across - this is an example of some algorithm input that will vary according to job

In this example, we are using geographic tiles to break up our processing. These tiles are defined by vector polygons and have ids that our .sh, .py, and .yml files are set up to take in as arguments. We use these ids as the basis for a loop that will sequentially submit our jobs to DPS.

There are many ways one could decide to split up their DPS jobs - so this use of tiles here is just for the purposes of this example.

[15]:
# Just an example of a list of some input parameter to your script that needs to vary for each job, thus creating multiple jobs
DPS_INPUT_TILE_NUM_LIST = [1,3,5,7,13,17,19]

Set up the general submission variables that will be applied to all runs of this DPS batch

These will also determine the look of path of the DPS output (/projects/my-private-bucket/dps_output):
/projects/my-private-bucket/dps_output/<ALGO_ID>/<ALGO_VERSION>/<IDENTIFIER>
[7]:
# MAAP algorithm version name
IDENTIFIER='BIOMASS_2020'
ALGO_VERSION = 'my_biomass_algorithm_v2024_1'
ALGO_ID = "run_my_biomass_algorithm"
USER = 'montesano'
WORKER_TYPE = 'maap-dps-worker-8gb'
[8]:
RUN_NAME = IDENTIFIER
RUN_NAME
[8]:
'BIOMASS_2020'
[13]:
DPS_INPUT_TILE_NUM_LIST[0:2]
[13]:
[1, 3]

Set up a dir to hold the metadata output table from the DPS submission

[ ]:
DPS_SUBMISSION_RESULTS_DIR = '/projects/my-public-bucket/dps_submission_results'
!mkdir -p $DPS_SUBMISSION_RESULTS_DIR

The submission is done as a loop.

Since submission is fast, this doesn’t need to be parallelized. The jobs will start soon after submission and will be processed in parallel depending on administrator settings.

[ ]:
%%time

import json

submit_results_df_list = []
len_input_list = len(DPS_INPUT_TILE_NUM_LIST)
print(f"# of input tiles for DPS: {len_input_list}")

for i, INPUT_TILE_NUM in enumerate(DPS_INPUT_TILE_NUM_LIST):

    # Just a way to keep track of the job number associated with this submission's loop
    DPS_num = i+1

    # Update the in_params_dict with the current INPUT_TILE_NUM from this loop
    in_params_dict['in_tile_num'] = INPUT_TILE_NUM

    submit_result = maap.submitJob(
            identifier=IDENTIFIER,
            algo_id=ALGO_ID,
            version=ALGO_VERSION,
            queue=WORKER_TYPE,
            **in_params_dict
        )

    # Build a dataframe of submission details - this holds metadata about your DPS job
    submit_result_df = pd.DataFrame(
        {
                'dps_num':[DPS_num],
                'tile_num':[INPUT_TILE_NUM],
                'submit_time':[datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%s')],
                'dbs_job_hour': [datetime.datetime.now().hour],
                'algo_id': [ALGO_ID],
                'user': [USER],
                'worker_type': [WORKER_TYPE],
                'job_id': [submit_result.id],
                'submit_status': [submit_result.status],

        }
    )

    # Append to a list of data frames of DPS submission results
    submit_results_df_list.append(submit_result_df)

    if DPS_num in [1, 5, 10, 50, 100, 250, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 7000, 9000, 11000, 13000, 15000, 17000, 19000, 21000, 24000, len_input_list]:
        print(f"DPS run #: {DPS_num}\t| tile num: {INPUT_TILE_NUM}\t| submit status: {submit_result.status}\t| job id: {submit_result.id}")

# Build a final submission results data frame and save
submit_results_df = pd.concat(submit_results_df_list)
submit_results_df['run_name'] = RUN_NAME
nowtime = pd.Timestamp.now().strftime('%Y%m%d%H%M')
print(f"Current time:\t{nowtime}")

# This creates a CSV of the metadata associated with the DPS jobs you have just submitted
submit_results_df.to_csv(f'{DPS_SUBMISSION_RESULTS_DIR}/DPS_{ALGO_ID}_{RUN_NAME}_submission_results_{len_input_list}_{nowtime}.csv')
submit_results_df.info()