How to control the parallelism or concurrency of an Airflow installation?
Check the airflow configuration for which core.executor is used. SequentialExecutor will be executing sequentially, so you can choose Local Executor or Clery Executor which execute the task parallel. After that, you can use other options as mentioned by @hexacyanide
An illustration for three major concurrency control variables:
From airflow version 2.2, task_concurrency
parameter is deprecated by max_active_tis_per_dag
.
https://airflow.apache.org/docs/stable/faq.html#how-can-my-airflow-dag-run-faster
Here's an expanded list of configuration options that are available since Airflow v1.10.2. Some can be set on a per-DAG or per-operator basis, but may also fall back to the setup-wide defaults when they are not specified.
Options that can be specified on a per-DAG basis:
concurrency
: the number of task instances allowed to run concurrently across all active runs of the DAG this is set on. Defaults tocore.dag_concurrency
if not setmax_active_runs
: maximum number of active runs for this DAG. The scheduler will not create new active DAG runs once this limit is hit. Defaults tocore.max_active_runs_per_dag
if not set
Examples:
# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)
# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)
Options that can be specified on a per-operator basis:
pool
: the pool to execute the task in. Pools can be used to limit parallelism for only a subset of taskstask_concurrency
: concurrency limit for the same task across multiple DAG runs
Example:
t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)
Options that are specified across an entire Airflow setup:
core.parallelism
: maximum number of tasks running across an entire Airflow installationcore.dag_concurrency
: max number of tasks that can be running per DAG (across multiple DAG runs)core.non_pooled_task_slot_count
: number of task slots allocated to tasks not running in a poolcore.max_active_runs_per_dag
: maximum number of active DAG runs, per DAGscheduler.max_threads
: how many threads the scheduler process should use to use to schedule DAGscelery.worker_concurrency
: max number of task instances that a worker will process at a time if using CeleryExecutorcelery.sync_parallelism
: number of processes CeleryExecutor should use to sync task state