Skip to main content

Configure worker queues

By default, all tasks using the Celery executor run in a default worker queue. You can create additional worker queues to enable multiple worker types or configurations for different groups of tasks, and assign tasks to these queues in your DAG code. For more information about Airflow executors on Astro, see Manage Airflow executors on Astro.

Use worker queues to create optimized execution environments for different types of tasks in the same Deployment. You can use worker queues to:

  • Separate resource-intensive tasks, such as those that execute machine learning models, from tasks that require minimal resources, such as those that execute SQL queries.
  • Separate short-running tasks from long-running tasks.
  • Isolate a single task from other tasks in your Deployment.
  • Allow some workers to scale to zero but keep a minimum of 1 for other types of workers.

Benefits

Worker queues can enhance performance, decrease cost, and increase the reliability of task execution in your Deployment. Specifically:

  • Executing a task with dedicated hardware that best fits the needs of that task can result in faster performance. In some cases, this can decrease the duration of a DAG by up to 50%.
  • Paying for larger workers only for select tasks means that you can lower your infrastructure cost by not paying for that worker when your tasks don't need it.
  • Separating tasks that have different characteristics often means they're less likely to result in a failed or zombie state.

Example

By configuring multiple worker queues and assigning tasks to these queues based on the requirements of the tasks, you can enhance the performance, reliability, and throughput of your Deployment. For example, consider the following scenario:

  • You are running Task A and Task B in a Deployment.
  • Task A and Task B are dependent on each other, so they need to run in the same Deployment.
  • Task A is a long-running task that uses a lot of CPU and memory, while Task B is a short-running task that uses minimal amounts of CPU and memory.

You can assign Task A to a worker queue that is configured to use the A20 worker type, which is optimized for running compute-heavy tasks. Then, you can assign Task B to a worker queue that is configured to use the A5 worker type, which is smaller and optimized for general usage.

Default worker queue

Each Deployment requires a worker queue named default to run tasks. Tasks that are not assigned to a worker queue in your DAG code are executed by workers in the default worker queue.

You can change all settings of the default worker queue except for its name.

Worker queue settings

You can configure each worker queue on Astro with the following settings:

  • Name: The name of your worker queue. Use this name to assign tasks to the worker queue in your DAG code. Worker queue names must consist only of lowercase letters and hyphens. For example, machine-learning-tasks or short-running-tasks or high-cpu.
  • Concurrency: The maximum number of tasks that a single worker can run at a time. If the number of queued and running tasks exceeds this number, a new worker is added to run the remaining tasks. This value is equivalent to worker concurrency in Apache Airflow. The default for your worker type is suitable for most use cases.
  • Min # Workers / Max # Workers: The minimum and maximum number of workers that can run at a time. The number of workers autoscales based on Concurrency and the current number of tasks in a queued or running state.
  • Worker Type: The size of workers in the worker queue, for example A5 or A20. A worker’s total available CPU, memory, and storage is defined by its worker size. For more information, see the following section on hosted worker types.

Hosted worker types

Each Deployment worker queue has a worker type that determines how many resources are available to your Airflow workers for running tasks. A worker type is a virtualized instance of CPU and memory on your cluster that is specific to the Astro platform. The underlying node instance type running your worker can vary based on how Astro optimizes resource usage on your cluster.

Each virtualized instance of your worker type is a worker. Celery workers can run multiple tasks at once, while Kubernetes workers only scale up and down to run a single task at a time.

The following table lists all available worker types on Astro Deployments.

Worker TypevCPUMemoryEphemeral storageDefault task concurrencyMax task concurrency
A512GiB10 GiB515
A1024GiB10 GiB1030
A2048GiB10 GiB2060
A40816GiB10 GiB40120
A601224GiB10 GiB60180
A1202448GiB10 GiB120360
A1603264GiB10 GiB160480

All worker types additionally have 10 GiB of ephemeral storage that your tasks can use when storing small amounts of data within the worker.

Hybrid worker type setup

On Astro Hybrid clusters, worker type is defined as a node instance type that is supported by the cloud provider of your cluster. For example, a worker type might be m5.2xlarge or c6i.4xlarge for a Deployment running on a Hybrid AWS cluster hosted on your cloud. Actual worker size is equivalent to the total capacity of the worker type minus Astro’s system overhead.

Your Organization can enable up to 10 additional different worker types for each Hybrid cluster. After a worker type is enabled on an Astro Hybrid cluster, the worker type becomes available to any Deployment in that cluster and appears in the Worker Type menu of the Cloud UI.

  1. Review the list of supported worker types for your cloud provider. See AWS, Azure, or GCP resource references.

  2. Contact Astronomer support and provide the following information:

    • The name of your cluster.
    • The name of the worker type(s) you want to enable for your cluster. For example, m6i.2xlarge.

For more information on requesting cluster changes, see Manage Hybrid clusters.

Create a worker queue

cli

If you prefer, you can also run the astro deployment worker-queue create command in the Astro CLI to create a worker queue. See the CLI Command Reference.

  1. In the Cloud UI, select a Workspace, click Deployments, and then select a Deployment.

  2. Click the Options menu of the Deployment you want to update, and select Edit Deployment.

    Edit Deployment in options menu

  3. In the Execution section, click Add Queue to create a new worker queue, and then configure its related attributes. Note that you can't change the name of a worker queue after you create it.

  4. When you're done creating your worker queue, click Update Deployment to save your changes.

tip

You can create, update, and delete multiple worker queues at once using a Deployment file. See Deployments as Code.

Assign tasks to a worker queue

By default, all tasks run in the default worker queue. To run tasks on a different worker queue, assign the task to the worker queue in your DAG code.

Step 1: Copy the name of the worker queue

  1. In the Cloud UI, select a Workspace and select a Deployment.
  2. Click the Worker Queues tab.
  3. Copy the name of the worker queue name you want to assign a task to.

Step 2: Assign the task in your DAG code

In your DAG code, add a queue='<worker-queue-name>' argument to the definition of the task. If a task is assigned to a queue that does not exist or is not referenced properly, the task might remain in a queued state and fail to execute. Make sure that the name of the queue in your DAG code matches the name of the queue in the Cloud UI.

Astronomer recommends using Apache Airflow's Taskflow API to define your task argument. The Taskflow API is feature in Airflow 2 that includes a task decorator and makes DAGs easier to write. In the following examples, all instances of the task will run in the machine-learning-tasks queue. Choose an example based on whether or not you use the Taskflow API.

 train_model = PythonOperator(
task_id='train_model',
python_callable=train_model_flights
queue='machine-learning-tasks'
)

Update a worker queue

cli

If you prefer, you can run the astro deployment worker-queue update command in the Astro CLI to update a worker queue. See the CLI Command Reference.

You can update worker queue configurations at any time. The worker queue name can't be changed.

If you need to change the worker type of an existing worker queue, Astronomer recommends making the change at a time when it will not affect production pipelines. After you've changed a worker type, Astronomer recommends waiting a minimum of five minutes before pushing new code to your Deployment.

  1. In the Cloud UI, select a Workspace, click Deployments, and then select a Deployment.

  2. Click the Options menu of the Deployment you want to update, and select Edit Deployment.

    Edit Deployment in options menu

  3. Expand the Execution section if it is not already expanded.

  4. Update the worker queue settings, and then click Update Deployment.

    The Airflow components of your Deployment automatically restart to apply the updated resource allocations. This action is equivalent to deploying code to your Deployment and does not impact running tasks that have 24 hours to complete before running workers are terminated. See What happens during a code deploy.

Update Airflow Pool

If you see tasks getting stuck, it might be because the worker queue configuration doesn't align with the Airflow Pools, a component that allows you to control execution parallelism. Make sure to update your Airflow Pools to match the changes in potential maximum task parallelism caused by changes to the worker queue.

For more information on limited parallelism, see Airflow Pools.

Delete a worker queue

cli

If you prefer, you can also run the astro deployment worker-queue delete command in the Astro CLI to delete a worker queue. See the CLI Command Reference.

All scheduled tasks that are assigned to a worker queue after the worker queue is deleted remain in a queued state indefinitely and won't execute. To avoid stalled task runs, ensure that you reassign all tasks from a worker queue before deleting it. You can either remove the worker queue argument or assign the task to a different queue.

  1. In the Cloud UI, select a Workspace, click Deployments, and then select a Deployment.

  2. Click the Options menu of the Deployment you want to update, and select Edit Deployment.

    Edit Deployment in options menu

  3. Expand the Execution section if it is not already expanded.

  4. Click Remove queue to delete the worker queue, and then click Update Deployment.

Was this page helpful?

Sign up for Developer Updates

Get a summary of new Astro features once a month.

You can unsubscribe at any time.
By proceeding you agree to our Privacy Policy, our Website Terms and to receive emails from Astronomer.