Configure worker queues
By default, all tasks run in a default worker queue that does not require configuration or code. If you're using the Celery executor, you can create additional worker queues to enable multiple worker types or configurations for different groups of tasks, and assign tasks to queues in your DAG code. For more information about Airflow executors on Astro, see Manage executors.
Use worker queues to create optimized execution environments for different types of tasks in the same Deployment. You can use worker queues to:
- Separate resource-intensive tasks, such as those that execute machine learning models, from tasks that require minimal resources, such as those that execute SQL queries.
- Separate short-running tasks from long-running tasks.
- Isolate a single task from other tasks in your Deployment.
- Allow some workers to scale to zero but keep a minimum of 1 for other types of workers.
Benefits
Worker queues can enhance performance, decrease cost, and increase the reliability of task execution in your Deployment. Specifically:
- Executing a task with dedicated hardware that best fits the needs of that task can result in faster performance. In some cases, this can decrease the duration of a DAG by up to 50%.
- Paying for larger workers only for select tasks means that you can lower your infrastructure cost by not paying for that worker when your tasks don't need it.
- Separating tasks that have different characteristics often means they're less likely to result in a failed or zombie state.
Example
By configuring multiple worker queues and assigning tasks to these queues based on the requirements of the tasks, you can enhance the performance, reliability, and throughput of your Deployment. For example, consider the following scenario:
- You are running Task A and Task B in a Deployment on an AWS cluster.
- Task A and Task B are dependent on each other, so they need to run in the same Deployment.
- Task A is a long-running task that uses a lot of CPU and little memory, while Task B is a short-running task that uses minimal amounts of CPU and memory.
You can assign Task A to a worker queue that is configured to use the c6i.4xlarge
worker type that's optimized for compute. Then, you can assign Task B to a worker queue that is configured to use the m5.xlarge
worker type that is smaller and optimized for general usage.
Worker queue settings
You can configure each worker queue on Astro with the following settings:
- Name: The name of your worker queue. Use this name to assign tasks to the worker queue in your DAG code. Worker queue names must consist only of lowercase letters and hyphens. For example,
machine-learning-tasks
orshort-running-tasks
. - Worker Type: The size and type of workers in the worker queue, defined as a node instance type that is supported by the cloud provider of your cluster. For example, a worker type might be
m5.2xlarge
orc6i.4xlarge
for a Deployment running on an AWS cluster. A worker’s total available CPU, memory, storage, and GPU is defined by its worker type. Actual worker size is equivalent to the total capacity of the worker type minus Astro’s system overhead. - Max Tasks per Worker: The maximum number of tasks that a single worker can run at a time. If the number of queued and running tasks exceeds this number, a new worker is added to run the remaining tasks. This value is equivalent to worker concurrency in Apache Airflow. It is 16 by default.
- Worker Count: The minimum and maximum number of workers that can run at a time. The number of running workers changes regularly based on Maximum Tasks per Worker and the current number of tasks in a
queued
orrunning
state. By default, the minimum number of workers is 1 and the maximum is 10.
Default worker queue
Each Deployment requires a worker queue named default
to run tasks. Tasks that are not assigned to a worker queue in your DAG code are executed by workers in the default worker queue.
If you don’t change any settings in the default worker queue:
- A maximum of 16 tasks can run at one time per worker. If more than 16 tasks are queued or running, a new worker is added to run the remaining tasks.
- A maximum of 10 workers can run at once, meaning that a maximum of 160 tasks can be in a
running
state at a time. Remaining tasks will stay in aqueued
orscheduled
state until running tasks complete.
You can change all settings of the default worker queue except for its name.
Request a worker type
Your organization can enable up to 10 different worker types for each cluster. After a worker type is enabled on an Astro cluster, the worker type becomes available to any Deployment in that cluster and appears in the Worker Type menu of the Cloud UI.
- Review the list of supported worker types for your cloud provider. See AWS, Azure, or GCP.
- Contact Astronomer support with the name of the worker type(s) you want to enable for your cluster. For example,
m6i.2xlarge
.
For more information on requesting cluster changes, see Modify a cluster.
Create a worker queue
If you prefer, you can also run the astro deployment worker-queue create
command in the Astro CLI to create a worker queue. See the CLI Command Reference.
In the Cloud UI, select a Workspace, click Deployments, and then select a Deployment.
Click the Worker Queues tab and then click Worker Queue.
Configure the worker queue’s settings. You can't change the name of a worker queue after you create it.
Click Create Queue.
You can create, update, and delete multiple worker queues at once using a Deployment file. See Deployments as Code.
Assign tasks to a worker queue
By default, all tasks run in the default worker queue. To run tasks on a different worker queue, assign the task to the worker queue in your DAG code.
To assign an Airflow task to a worker queue:
In the Cloud UI, select a Workspace and select a Deployment.
Click the Worker Queues tab.
Copy the name of the worker queue name you want to assign a task to.
In your DAG code, add a
queue='<worker-queue-name>'
argument to the definition of the task. If a task is assigned to a queue that does not exist or is not referenced properly, the task might remain in aqueued
state and fail to execute. Make sure that the name of the queue in your DAG code matches the name of the queue in the Cloud UI.For example, all instances of this task will run in the
short-running-tasks
queue:feature_engineering = DatabricksSubmitRunOperator(
task_id='feature_engineering_notebook_task'
notebook_task={
'notebook_path': "/Users/{{ var.value.databricks_user }}/feature-eng_census-pred"
},
queue='short-running-tasks',
)
Update a worker queue
If you prefer, you can run the astro deployment worker-queue update
command in the Astro CLI to update a worker queue. See the CLI Command Reference.
You can update worker queue configurations at any time. The worker queue name can't be changed.
If you need to change the worker type of an existing worker queue, Astronomer recommends making the change at a time when it will not affect production pipelines. After you've changed a worker type, Astronomer recommends waiting a minimum of five minutes before pushing new code to your Deployment.
In the Cloud UI, select a Workspace, click Deployments, and then select a Deployment.
Click the Worker Queues tab.
Click Edit for the worker queue that you want to update.
Update the worker queue settings, and then click Update Queue.
The Airflow components of your Deployment automatically restart to apply the updated resource allocations. This action is equivalent to deploying code to your Deployment and does not impact running tasks that have 24 hours to complete before running workers are terminated. See What happens during a code deploy.
Delete a worker queue
If you prefer, you can also run the astro deployment worker-queue delete
command in the Astro CLI to delete a worker queue. See the CLI Command Reference.
All scheduled tasks that are assigned to a worker queue after the worker queue is deleted remain in a queued
state indefinitely and won't execute. To avoid stalled task runs, ensure that you reassign all tasks from a worker queue before deleting it. You can either remove the worker queue argument or assign the task to a different queue.
In the Cloud UI, select a Workspace, click Deployments, and then select a Deployment.
Click the Worker Queues tab.
Click Delete for the worker queue that you want to delete.
Enter
Delete <worker-queue-name>
and then click Yes, Continue.