Job Management with maap-py
Set up maap.py
Open a Jupyter Notebook then click the
MAAP
button from the notebook toolbar. This will paste the code snippet below into your notebook.
Provide the MAAP host. For normal operations, this would be
api.maap-project.org
.
from maap.maap import MAAP
maap = MAAP()
Passing Credentials for Other Services into Jobs (Secrets Management)
You may have an Algorithm that downloads or uploads data to other systems (e.g. Google Earth Engine, etc.). In order for your Algorithm to access those other systems during a DPS run, your login credentials or login token must be passed into the DPS Job. This is done via “secrets”, which keeps your credentials encrypted during transmission.
You may store and retrieve secrets using maap-py, which allows you to use those secrets during a Job that you run.
Typically you will use a Jupyter notebook to manage your secrets using the methods below. An example notebook for Secrets Managment can be viewed and downloaded here.
Inside your Algorithm code, you will use the maap-py secrets.get_secret(“SECRET_NAME”) method shown below to have the Algorithm retrieve and use the value of your secret. This may be a hard-coded secret name in your Algorithm, or the secret name may be passed in at runtime—this would be important if different scientists have named their secret differently.
Add a Secret
maap.secrets.add_secret("<SECRET_NAME>", "<SECRET_VALUE>")
ex.
response = maap.secrets.add_secret("MY_TOKEN", "98aj48j(774hh9*H")
print(response)
>>> {'secret_name': 'MY_TOKEN'}
Get a List of Your Secrets
maap.secrets.get_secrets()
ex.
response = maap.secrets.get_secrets()
print(response)
>>> [{'secret_name': 'MY_TOKEN'}]
Get the Value of a Specific Secret
maap.secrets.get_secret("<SECRET_NAME>")
ex.
response = maap.secrets.get_secret("MY_TOKEN")
print(response)
>>> 98aj48j(774hh9*H
Delete a Secret
maap.secrets.delete_secret("<SECRET_NAME>")
ex.
response = maap.secrets.delete_secret("MY_TOKEN")
print(response)
>>> {'code': 200, 'message': 'Successfully deleted secret MY_TOKEN'}
Submit a Job
Use the submitJob
method and provide your algorithm inputs. The example below will run the run-dps-test_ubuntu
algorithm. Be sure to put your username in the submitJob() call where indicated below.
Note
Experimental feature: The output data will be put into a folder named for your algo_id
and the identifier
. You can use the same idenfitier
on several jobs (e.g. in a batch) to group related output data in one place. In the View & Submit Jobs GUI this is the Job Tag field in the Submit form.
maap.submitJob(identifier="test-job",
algo_id="run-dps-test_ubuntu",
version="delay10",
username="YOUR_USERNAME_HERE",
queue="maap-dps-worker-8gb",
input_file="https://raw.githubusercontent.com/MAAP-Project/dps-unit-test/main/README.md")
Run the notebook to submit the job. The cell output for a job that was submitted successfully will look similar to this:
{'status': 'success',
'http_status_code': 200,
'job_id': '86fbac52-24b0-4963-8b67-59d0fc09946a'}
Monitor a Job
Use the
getJobStatus
method and provide the job ID that was created upon job submission.
r = maap.getJobStatus("86fbac52-24b0-4963-8b67-59d0fc09946a")
r.text
Run the notebook to get the job status. The output should resemble the xml snippet below. In this example, the job status is
Succeeded
.
'<wps:StatusInfo xmlns:ows="http://www.opengis.net/ows/2.0" xmlns:schemaLocation="http://schemas.opengis.net/wps/2.0/wps.xsd" xmlns:wps="http://www.opengis.net/wps/2.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><wps:JobID>86fbac52-24b0-4963-8b67-59d0fc09946a</wps:JobID><wps:Status>Succeeded</wps:Status></wps:StatusInfo>'
Job Status
Job status may be different between the HySDS Figaro job-monitoring dashboard and the Jobs UI. Below is a mapping of status terms:
MAAP <- HySDS
Accepted <- job-queued
Running <- job-started
Success <- job-completed
Failed <- job-offline or job-failed
job-revoked <- job-revoked (when a queued or running job is stopped before completion)
HySDS state not valid/used in MAAP: job-deduped
Get Job Results
Use the
getJobResult
method and provide the job ID that was created upon job submission.
r = maap.getJobResult("86fbac52-24b0-4963-8b67-59d0fc09946a")
r.text
Run the notebook to get the job result. The output should resemble the xml snippet below.
<wps:Result xmlns:ows="http://www.opengis.net/ows/2.0" xmlns:schemaLocation="http://schemas.opengis.net/wps/2.0/wps.xsd" xmlns:wps="http://www.opengis.net/wps/2.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><wps:JobID>86fbac52-24b0-4963-8b67-59d0fc09946a</wps:JobID><wps:Output id="output-2023-05-10T15:39:51.905070"><wps:Data>http://maap-dit-workspace.s3-website-us-west-2.amazonaws.com/anonymous/dps_output/run-dps-test_ubuntu/delay10/2023/05/10/15/39/51/905070</wps:Data><wps:Data>s3://s3-us-west-2.amazonaws.com:80/maap-dit-workspace/anonymous/dps_output/run-dps-test_ubuntu/delay10/2023/05/10/15/39/51/905070</wps:Data><wps:Data>https://s3.console.aws.amazon.com/s3/buckets/maap-dit-workspace/anonymous/dps_output/run-dps-test_ubuntu/delay10/2023/05/10/15/39/51/905070/?region=us-east-1&tab=overview</wps:Data></wps:Output></wps:Result>
Cancel Job
Use the
cancelJob
method and provide the job ID that was created upon job submission.
r = maap.cancelJob("fac2904d-b45d-4cf3-971f-45586a6bc78a")
print(r)
Run the notebook to get the job result. The output should resemble the xml snippet below.
<wps:StatusInfo xmlns:wps="http://www.opengis.net/wps/2.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:schemaLocation="http://schemas.opengis.net/wps/2.0/wps.xsd" xmlns:ows="http://www.opengis.net/ows/2.0"><wps:JobID>fac2904d-b45d-4cf3-971f-45586a6bc78a</wps:JobID><wps:Status>Accepted</wps:Status></wps:StatusInfo>
Note
This method submits a request to cancel a job so it may take several minutes before the cancellation takes effect. Users may check the job’s status to confim cancellation. Cancelled jobs will return a status of Dismissed
if they were cancelled after starting. Queued jobs that are cancelled will be purged from the system.
List Jobs
Use the
listJobs
method, specifying a username, to retrieve a list of jobs that user has submitted.
r = maap.listJobs("mlucas")
print(r.text)
Here is a sample truncated output:
{"code": 200, "jobs": [{"b308b3cd-8848-4154-b682-aaf3d39734ee": {"resource": "job", "payload_hash": "0f217c35a6bdacf46b164106af9ab473", "job_id": "job-dps_tutorial_mlucas_test__main-20240626T172521.763123Z", "status": "job-completed", "context": {"tag": "test-new", "_prov": {"wasGeneratedBy": "task_id:a4761c87-dc68-4511-9a3e-e628446f1e44", "wasDerivedFrom": ["url:https://photojournal.jpl.nasa.gov/tiff/PIA00127.tif"]}, "container_image_name": "container-dps_tutorial:main", "outsize": 30, "container_specification": {"version": "main", "digest": "sha256:e539186c9e76fb923da7b3bfb6a3c700403e5302a71c3e8ba8635b7f216ab7e3", "id": "container-dps_tutorial:main", "url": "s3://s3-us-west-2.amazonaws.com/maap-dit-registry/container-dps_tutorial:main.tar.gz"}, "container_mappings": {"$HOME/.aws": "/home/ops/.aws", "$HOME/.netrc": "/home/ops/.netrc", "/tmp": ["/tmp", "rw"]}, "_command": "/app/dps_wrapper.sh '/app/dps_tutorial/gdal_wrapper/run_gdal.sh' 'output.tif' '30'", "localize_urls": [{"url": "https://photojournal.jpl.nasa.gov/tiff/PIA00127.tif"}], "output_file": "output.tif", "_disk_usage": "1GB", "container_image_url": "s3://s3-us-west-2.amazonaws.com/maap-dit-registry/container-dps_tutorial:main.tar.gz", "container_image_id": "sha256:e539186c9e76fb923da7b3bfb6a3c700403e5302a71c3e8ba8635b7f216ab7e3", "job_specification": {"post": ["hysds.triage.triage"], "resource": "jobspec", "container": "container-dps_tutorial:main", "soft_time_limit": 86400, "time_limit": 86400, "recommended-queues": ["maap-dps-worker-8gb"], "job-version": "main", "command": "/app/dps_wrapper.sh '/app/dps_tutorial/gdal_wrapper/run_gdal.sh'", "params": [{"destination": "localize", "name": "input_file", "value": "https://photojournal.jpl.nasa.gov/tiff/PIA00127.tif"}, {"destination": "positional", "name": "output_file", "value": "output.tif"}...
Users may provide optional query parameters to filter the list on desired fields. The following example will return a list of jobs submitted by user mlucas
that all ran the job-dps_tutorial_mlucas:main
algorithm:
r = maap.listJobs("mlucas", algo_id="job-dps_tutorial_mlucas", version="main")
Users may also request just the list of jobs id’s and their corresponding tags rather than full job details. This is controlled using the get_job_details
option. By default get_job_details
is set to True
, but if set to False
will return a compact list, as shown in this example:
r = maap.listJobs("mlucas", algo_id="job-dps_tutorial_mlucas", version="main", get_job_details=False)
print(r.text)
{"code": 200, "jobs": [{"id": "e6b6b27d-d409-4f6b-8aa2-505c2dc150fd", "tags": ["test"]}, {"id": "fd504b51-096a-42fe-8f1d-a157352dfad0", "tags": ["test"]}, {"id": "69dc095a-4895-43a2-951a-763f3a33b0bf", "tags": ["test"]}, {"id": "c7983efc-689f-4008-b87a-bc3450213152", "tags": ["test"]}, {"id": "41288295-6744-424e-ae26-c1dd9e26c288", "tags": ["test", "test"]}], "message": "success"}
These are the available query parameters:
Parameter |
Description |
Type |
Optional/Required? |
Acceptable Values |
---|---|---|---|---|
algo_id |
algorithm id e.g. |
string |
optional |
Valid string |
end_time |
job end time |
string e.g. |
optional |
Valid time string |
get_job_details |
Flag that determines whether to return full job details or just a list
of job id’s with their corresponding tags. Default is |
bool |
optional |
|
offset |
pagination offset |
number |
optional |
Integer |
page_size |
pagination page size |
number |
optional |
Integer |
priority |
job priority |
number |
optional |
0-9 |
queue |
resource queue |
string |
optional |
Valid string |
start_time |
job start time |
string e.g. |
optional |
Valid string |
status |
job status |
string |
optional |
|
tag |
user-specified tag |
string |
optional |
Valid string |
version |
algorithm version i.e. github branch |
string |
optional |
Valid string |