Skip to content

Periodic Run

Concurrent includes the ability to run DAGs periodically, on a schedule.

Periodic Run CLI

Concurrent Periodic Run CLI is part of the concurrent plugin. Here's the help message:

$ python -m concurrent_plugin.periodic_run -h
usage: periodic_run.py [-h] [--periodic_run_name PERIODIC_RUN_NAME] [--schedule SCHEDULE] [--schedule_type {once,hourly,daily,weekly,monthly,yearly}] [--experiment_id EXPERIMENT_ID] [--dagid DAGID] {add,delete,list}

positional arguments:
  {add,delete,list}

optional arguments:
  -h, --help            show this help message and exit
  --periodic_run_name PERIODIC_RUN_NAME
  --schedule SCHEDULE   format is a_b_c_d_e_f where a=minutes(0-59), b=hour(0-23), c=day_of_month(1-31), d=month(1-12), e=day_of_week(0-7, 0 is Sunday)
  --schedule_type {once,hourly,daily,weekly,monthly,yearly}
  --experiment_id EXPERIMENT_ID
  --dagid DAGID

Example: python -m concurrent_plugin.periodic_run add --periodic_run_name test1 --schedule "06_22_*_*_*_*" --schedule_type once --dagid DAG1665114786385 --experiment_id 7

Create

python -m concurrent_plugin.periodic_run add --periodic_run_name cw-hourly --schedule "53_*_*_*_*_*" --schedule_type hourly --dagid DAG1670863769990 --experiment_id 15

Delete

python -m concurrent_plugin.periodic_run delete --periodic_run_name cw-hourly

List

python -m concurrent_plugin.periodic_run list

Periodic Run Data

Periodic runs are useful for processing data that was received/ingested in the time period since the last run. Concurrent provides assistance for this purpose.

InfinSlice

If the input to any node in the DAG is specified as a timeslice, for example infinslice, then the Concurrent server will automatically modify the period to indicate the interval from the previous run to the start of this current run

Non InfinSlice

For all other types of storage/data sources, Concurrent will set two different environment variables in all DAG nodes:

PERIDOIC_RUN_FREQUENCY
PERIDOIC_RUN_START_TIME

PERIDOIC_RUN_FREQUENCY is one of once,hourly,daily,weekly,monthly,yearly

PERIDOIC_RUN_START_TIME is the time in milliseconds since the epoch