A "Workflow" is an optional way to combine multiple tasks together in a way that can be gracefully retried from the point of failure.
They're most helpful when you have multiple tasks in a row, and you want to configure each task to be able to be retried if they fail.
If a task within a workflow fails, the Workflow will automatically "pick back up" on the task where it failed and not re-execute any prior tasks that have already been executed.
The most important aspect of a Workflow is the handler, where you can declare when and how the tasks should run by simply calling the runTask function. If any task within the workflow, fails, the entire handler function will re-run.
However, importantly, tasks that have successfully been completed will simply re-return the cached and saved output without running again. The Workflow will pick back up where it failed and only task from the failure point onward will be re-executed.
To define a JS-based workflow, simply add a workflow to the jobs.workflows array in your Payload config. A workflow consists of the following fields:
Option
Description
slug
Define a slug-based name for this workflow. This slug needs to be unique among both tasks and workflows.
handler
The function that should be responsible for running the workflow. You can either pass a string-based path to the workflow function file, or workflow job function itself. If you are using large dependencies within your workflow, you might prefer to pass the string path because that will avoid bundling large dependencies in your Next.js app. Passing a string path is an advanced feature that may require a sophisticated build pipeline in order to work.
inputSchema
Define the input field schema - Payload will generate a type for this schema.
interfaceName
You can use interfaceName to change the name of the interface that is generated for this workflow. By default, this is "Workflow" + the capitalized workflow slug.
label
Define a human-friendly label for this workflow.
queue
Optionally, define the queue name that this workflow should be tied to. Defaults to "default".
retries
You can define retries on the workflow level, which will enforce that the workflow can only fail up to that number of retries. If a task does not have retries specified, it will inherit the retry count as specified on the workflow. You can specify 0 as workflow retries, which will disregard all task retry specifications and fail the entire workflow on any task failure. You can leave workflow retries as undefined, in which case, the workflow will respect what each task dictates as their own retry count. By default this is undefined, meaning workflows retries are defined by their tasks
concurrency
Control how jobs with the same concurrency key are handled. Jobs with the same key will run exclusively (one at a time). Requires jobs.enableConcurrencyControl: true to be set. See Concurrency Controls below for details.
Example:
1
exportdefaultbuildConfig({
2
// ...
3
jobs:{
4
tasks:[
5
// ...
6
]
7
workflows:[
8
{
9
slug:'createPostAndUpdate',
10
11
// The arguments that the workflow will accept
12
inputSchema:[
13
{
14
name:'title',
15
type:'text',
16
required:true,
17
},
18
],
19
20
// The handler that defines the "control flow" of the workflow
21
// Notice how it uses the `tasks` argument to execute your predefined tasks.
22
// These are strongly typed!
23
handler:async({ job, tasks })=>{
24
25
// This workflow first runs a task called `createPost`.
26
27
// You need to define a unique ID for this task invocation
28
// that will always be the same if this workflow fails
29
// and is re-executed in the future. Here, we hard-code it to '1'
30
const output =await tasks.createPost('1',{
31
input:{
32
title: job.input.title,
33
},
34
})
35
36
// Once the prior task completes, it will run a task
37
// called `updatePost`
38
await tasks.updatePost('2',{
39
input:{
40
post: job.taskStatus.createPost['1'].output.postID,// or output.postID
In the above example, our workflow was executing tasks that we already had defined in our Payload config. But, you can also run tasks without predefining them.
To do this, you can use the inlineTask function.
The drawbacks of this approach are that tasks cannot be re-used across workflows as easily, and the task data stored in the job will not be typed. In the following example, the inline task data will be stored on the job under job.taskStatus.inline['2'] but completely untyped, as types for dynamic tasks like these cannot be generated beforehand.
Example:
1
exportdefaultbuildConfig({
2
// ...
3
jobs:{
4
tasks:[
5
// ...
6
]
7
workflows:[
8
{
9
slug:'createPostAndUpdate',
10
inputSchema:[
11
{
12
name:'title',
13
type:'text',
14
required:true,
15
},
16
],
17
handler:async({ job, tasks, inlineTask })=>{
18
// Here, we run a predefined task.
19
// The `createPost` handler arguments and return type
20
// are both strongly typed
21
const output =await tasks.createPost('1',{
22
input:{
23
title: job.input.title,
24
},
25
})
26
27
// Here, this task is not defined in the Payload config
28
// and is "inline". Its output will be stored on the Job in the database
When multiple jobs operate on the same resource, race conditions can occur. For example, if a user creates a document and then quickly updates it, two jobs might be queued that both try to process the same document simultaneously, leading to unexpected results.
The concurrency option allows you to prevent this by ensuring that jobs with the same "key" run exclusively (one at a time).
Important: To use concurrency controls, you must first enable them in your Payload config by setting jobs.enableConcurrencyControl: true. This adds an indexed concurrencyKey field to your jobs collection schema and may require a database migration depending on your database adapter.
When queuing: The concurrency key is computed from the job's input and stored on the job document.
When running: The job runner enforces exclusive execution through two mechanisms:
It first checks which concurrency keys are currently being processed and excludes pending jobs with those keys from the query
If multiple pending jobs with the same key are picked up in the same batch, only the first one (by creation order) runs - the others are released back to processing: false and will be picked up on subsequent runs
Result: Jobs with the same concurrency key are guaranteed to run sequentially, never in parallel. All jobs are preserved and will eventually complete - they just wait their turn.
Key uniqueness: The concurrency key should uniquely identify the resource being operated on. Include all relevant identifiers (collection slug, document ID, locale, etc.).
Global by default: By default, concurrency is global across all queues. A job with key sync:doc1 in the default queue will block a job with the same key in the emails queue. Include the queue name in your key if you want queue-specific concurrency.
No concurrency key = no restrictions: Jobs without a concurrency configuration run in parallel as before.
Pending jobs wait: Jobs that can't run due to concurrency constraints remain in the queue with processing: false and will be picked up on subsequent runs.