Go SDK developer's guide - Features
The Features section of the Temporal Developer's guide provides basic implementation guidance on how to use many of the development features available to Workflows and Activities in the Temporal Platform.
This guide is a work in progress. Some sections may be incomplete or missing for some languages. Information may change at any time.
If you can't find what you are looking for in the Developer's guide, it could be in older docs for SDKs.
In this section you can find the following:
- How to develop Signals
- How to develop Queries
- How to start a Child Workflow Execution
- How to start a Temporal Cron Job
- How to use Continue-As-New
- How to set Workflow timeouts & retries
- How to set Activity timeouts & retries
- How to Heartbeat an Activity
- How to Asynchronously complete an Activity
- How to register Namespaces
- How to use custom payload conversion
Signals
A SignalWhat is a Signal?
A Signal is an asynchronous request to a Workflow Execution.
Learn more is a message sent to a running Workflow Execution.
Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.
Define Signal
A Signal has a name and can have arguments.
- The name, also called a Signal type, is a string.
- The arguments must be serializableWhat is a Data Converter?
A Data Converter is a Temporal SDK component that serializes and encodes data entering and exiting a Temporal Cluster.
Learn more.
Structs should be used to define Signals and carry data, as long as the struct is serializable via the Data Converter.
The Receive()
method on the Data Converter decodes the data into the Struct within the Workflow.
Only public fields are serializable.
MySignal struct {
Message string // serializable
message string // not serializable
}
Handle Signal
Workflows listen for Signals by the Signal's name.
Use the GetSignalChannel()
API from the go.temporal.io/sdk/workflow
package to get the Signal Channel.
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
// ...
var signal MySignal
signalChan := workflow.GetSignalChannel(ctx, "your-signal-name")
signalChan.Receive(ctx, &signal)
if len(signal.Message) > 0 && signal.Message != "SOME_VALUE" {
return errors.New("signal")
}
// ...
}
In the example above, the Workflow code uses workflow.GetSignalChannel
to open a workflow.Channel
for the Signal type (identified by the Signal name).
Before completing the Workflow or using Continue-As-NewHow to Continue-As-New
Continue-As-New enables a Workflow Execution to close successfully and create a new Workflow Execution in a single atomic operation if the number of Events in the Event History is becoming too large.
Learn more, make sure to do an asynchronous drain on the Signal channel.
Otherwise, the Signals will be lost.
Send Signal from Client
When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaledEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more Event appears in the Event History of the Workflow that receives the Signal.
Use the SignalWorkflow()
method on an instance of the Go SDK Temporal Client to send a SignalWhat is a Signal?
A Signal is an asynchronous request to a Workflow Execution.
Learn more to a Workflow Execution.
Pass in both the Workflow IdWhat is a Workflow Id?
A Workflow Id is a customizable, application-level identifier for a Workflow Execution that is unique to an Open Workflow Execution within a Namespace.
Learn more and Run IdWhat is a Run Id?
A Run Id is a globally unique, platform-level identifier for a Workflow Execution.
Learn more to uniquely identify the Workflow Execution.
If only the Workflow Id is supplied (provide an empty string as the Run Id param), the Workflow Execution that is Running receives the Signal.
// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWorkflow(context.Background(), "your-workflow-id", runID, "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}
// ...
Possible errors:
serviceerror.NotFound
serviceerror.Internal
serviceerror.Unavailable
Send Signal from Workflow
A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiatedEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more Event appears in the sender's Event History. - A WorkflowExecutionSignaledEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more Event appears in the recipient's Event History.
A Signal can be sent from within a Workflow to a different Workflow Execution using the SignalExternalWorkflow
API from the go.temporal.io/sdk/workflow
package.
// ...
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
//...
signal := MySignal {
Message: "Some important data",
}
err := workflow.SignalExternalWorkflow(ctx, "some-workflow-id", "", "your-signal-name", signal).Get(ctx, nil)
if err != nil {
// ...
}
// ...
}
Signal-With-Start
Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.
If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.
Use the SignalWithStartWorkflow()
API on the Go SDK Temporal Client to start a Workflow Execution (if not already running) and pass it the Signal at the same time.
Because the Workflow Execution might not exist, this API does not take a Run ID as a parameter
// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWithStartWorkflow(context.Background(), "your-workflow-id", "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}
Queries
A QueryWhat is a Query?
A Query is a synchronous operation that is used to report the state of a Workflow Execution.
Learn more is a synchronous operation that is used to get the state of a Workflow Execution.
Define Query
A Query has a name and can have arguments.
- The name, also called a Query type, is a string.
- The arguments must be serializableWhat is a Data Converter?
A Data Converter is a Temporal SDK component that serializes and encodes data entering and exiting a Temporal Cluster.
Learn more.
In Go, a Query type, also called a Query name, is a string
value.
queryType := "your_query_name"
Handle Query
Queries are handled by your Workflow.
Don’t include any logic that causes CommandWhat is a Command?
A Command is a requested action issued by a Worker to the Temporal Cluster after a Workflow Task Execution completes.
Learn more generation within a Query handler (such as executing Activities).
Including such logic causes unexpected behavior.
Use the SetQueryHandler
API from the go.temporal.io/sdk/workflow
package to set a Query Handler that listens for a Query by name.
The handler must be a function that returns two values:
- A serializable result
- An error
The handler function can receive any number of input parameters, but all input parameters must be serializable.
The following sample code sets up a Query Handler that handles the current_state
Query type:
func YourWorkflow(ctx workflow.Context, input string) error {
currentState := "started" // This could be any serializable struct.
queryType := "current_state"
err := workflow.SetQueryHandler(ctx, queryType, func() (string, error) {
return currentState, nil
})
if err != nil {
currentState = "failed to register query handler"
return err
}
// Your normal Workflow code begins here, and you update the currentState as the code makes progress.
currentState = "waiting timer"
err = NewTimer(ctx, time.Hour).Get(ctx, nil)
if err != nil {
currentState = "timer failed"
return err
}
currentState = "waiting activity"
ctx = WithActivityOptions(ctx, yourActivityOptions)
err = ExecuteActivity(ctx, YourActivity, "your_input").Get(ctx, nil)
if err != nil {
currentState = "activity failed"
return err
}
currentState = "done"
return nil
}
For example, suppose your query handler function takes two parameters:
err := workflow.SetQueryHandler(ctx, "current_state", func(prefix string, suffix string) (string, error) {
return prefix + currentState + suffix, nil
})
Send Query
Queries are sent from a Temporal Client.
Use the QueryWorkflow()
API or the QueryWorkflowWithOptions
API on the Temporal Client to send a Query to a Workflow Execution.
// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType)
if err != nil {
// ...
}
// ...
You can pass an arbitrary number of arguments to the QueryWorkflow()
function.
// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType, "foo", "baz")
if err != nil {
// ...
}
// ...
The QueryWorkflowWithOptions()
API provides similar functionality, but with the ability to set additional configurations through QueryWorkflowWithOptionsRequest.
When using this API, you will also receive a structured response of type QueryWorkflowWithOptionsResponse.
// ...
response, err := temporalClient.QueryWorkflowWithOptions(context.Background(), &client.QueryWorkflowWithOptionsRequest{
WorkflowID: workflowID,
RunID: runID,
QueryType: queryType,
Args: args,
})
if err != nil {
// ...
}
Workflow timeouts
Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.
Workflow timeouts are set when starting the Workflow ExecutionWorkflow timeouts
Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.
Learn more.
- Workflow Execution TimeoutWhat is a Workflow Execution Timeout?
A Workflow Execution Timeout is the maximum time that a Workflow Execution can be executing (have an Open status) including retries and any usage of Continue As New.
Learn more - restricts the maximum amount of time that a single Workflow Execution can be executed. - Workflow Run TimeoutWhat is a Workflow Run Timeout?
This is the maximum amount of time that a single Workflow Run is restricted to.
Learn more: restricts the maximum amount of time that a single Workflow Run can last. - Workflow Task TimeoutWhat is a Workflow Task Timeout?
A Workflow Task Timeout is the maximum amount of time that the Temporal Server will wait for a Worker to start processing a Workflow Task after the Task has been pulled from the Task Queue.
Learn more: restricts the maximum amount of time that a Worker can execute a Workflow Task.
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set a timeout, and pass the instance to the ExecuteWorkflow
call.
Available timeouts are:
WorkflowExecutionTimeout
WorkflowRunTimeout
WorkflowTaskTimeout
workflowOptions := client.StartWorkflowOptions{
// ...
// Set Workflow Timeout duration
WorkflowExecutionTimeout: time.Hours * 24 * 365 * 10,
// WorkflowRunTimeout: time.Hours * 24 * 365 * 10,
// WorkflowTaskTimeout: time.Second * 10,
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Workflow retries
A Retry Policy can work in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Use a Retry PolicyWhat is a Retry Policy?
A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.
Learn more to retry a Workflow Execution in the event of a failure.
Workflow Executions do not retry by default, and Retry Policies should be used with Workflow Executions only in certain situations.
Create an instance of a RetryPolicy
from the go.temporal.io/sdk/temporal
package and provide it as the value to the RetryPolicy
field of the instance of StartWorkflowOptions
.
- Type:
RetryPolicy
- Default: None
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100,
}
workflowOptions := client.StartWorkflowOptions{
RetryPolicy: retrypolicy,
// ...
}
workflowRun, err := temporalClient.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Activity timeouts
Each Activity timeout controls the maximum duration of a different aspect of an Activity Execution.
The following timeouts are available in the Activity Options.
- Schedule-To-Close TimeoutWhat is a Schedule-To-Close Timeout?
A Schedule-To-Close Timeout is the maximum amount of time allowed for the overall Activity Execution, from when the first Activity Task is scheduled to when the last Activity Task, in the chain of Activity Tasks that make up the Activity Execution, reaches a Closed status.
Learn more: is the maximum amount of time allowed for the overall Activity ExecutionWhat is an Activity Execution?
An Activity Execution is the full chain of Activity Task Executions.
Learn more. - Start-To-Close TimeoutWhat is a Start-To-Close Timeout?
A Start-To-Close Timeout is the maximum time allowed for a single Activity Task Execution.
Learn more: is the maximum time allowed for a single Activity Task ExecutionWhat is an Activity Task Execution?
An Activity Task Execution occurs when a Worker uses the context provided from the Activity Task and executes the Activity Definition.
Learn more. - Schedule-To-Start TimeoutWhat is a Schedule-To-Start Timeout?
A Schedule-To-Start Timeout is the maximum amount of time that is allowed from when an Activity Task is placed in a Task Queue to when a Worker picks it up from the Task Queue.
Learn more: is the maximum amount of time that is allowed from when an Activity TaskWhat is an Activity Task?
An Activity Task contains the context needed to make an Activity Task Execution.
Learn more is scheduled to when a WorkerWhat is a Worker?
In day-to-day conversations, the term Worker is used to denote both a Worker Program and a Worker Process. Temporal documentation aims to be explicit and differentiate between them.
Learn more starts that Activity Task.
An Activity Execution must have either the Start-To-Close or the Schedule-To-Close Timeout set.
To set an Activity Timeout in Go, create an instance of ActivityOptions
from the go.temporal.io/sdk/workflow
package, set the Activity Timeout field, and then use the WithActivityOptions()
API to apply the options to the instance of workflow.Context
.
Available timeouts are:
StartToCloseTimeout
ScheduleToClose
ScheduleToStartTimeout
activityoptions := workflow.ActivityOptions{
// Set Activity Timeout duration
ScheduleToCloseTimeout: 10 * time.Second,
// StartToCloseTimeout: 10 * time.Second,
// ScheduleToStartTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}
Activity retries
A Retry Policy works in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Activity Executions are automatically associated with a default Retry PolicyWhat is a Retry Policy?
A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.
Learn more if a custom one is not provided.
To set a RetryPolicyWhat is a Retry Policy?
A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.
Learn more, create an instance of ActivityOptions
from the go.temporal.io/sdk/workflow
package, set the RetryPolicy
field, and then use the WithActivityOptions()
API to apply the options to the instance of workflow.Context
.
- Type:
RetryPolicy
- Default:
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100, // 100 * InitialInterval
MaximumAttempts: 0, // Unlimited
NonRetryableErrorTypes: []string, // empty
}
Providing a Retry Policy here is a customization, and overwrites individual Field defaults.
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100,
}
activityoptions := workflow.ActivityOptions{
RetryPolicy: retrypolicy,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}
Activity Heartbeats
An Activity HeartbeatWhat is an Activity Heartbeat?
An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
Learn more is a ping from the Worker ProcessWhat is a Worker Process?
A Worker Process is responsible for polling a Task Queue, dequeueing a Task, executing your code in response to a Task, and responding to the Temporal Server with the results.
Learn more that is executing the Activity to the Temporal ClusterWhat is a Temporal Cluster?
A Temporal Cluster is the Temporal Server paired with persistence.
Learn more.
Each Heartbeat informs the Temporal Cluster that the Activity ExecutionWhat is an Activity Execution?
An Activity Execution is the full chain of Activity Task Executions.
Learn more is making progress and the Worker has not crashed.
If the Cluster does not receive a Heartbeat within a Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more time period, the Activity will be considered failed and another Activity Task ExecutionWhat is an Activity Task Execution?
An Activity Task Execution occurs when a Worker uses the context provided from the Activity Task and executes the Activity Definition.
Learn more may be scheduled according to the Retry Policy.
Heartbeats may not always be sent to the Cluster—they may be throttledWhat is an Activity Heartbeat?
An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
Learn more by the Worker.
Activity Cancellations are delivered to Activities from the Cluster when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.
Heartbeats can contain a details
field describing the Activity's current progress.
If an Activity gets retried, the Activity can access the details
from the last Heartbeat that was sent to the Cluster.
To HeartbeatWhat is an Activity Heartbeat?
An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
Learn more in an Activity in Go, use the RecordHeartbeat
API.
import (
// ...
"go.temporal.io/sdk/workflow"
// ...
)
func YourActivityDefinition(ctx, YourActivityDefinitionParam) (YourActivityDefinitionResult, error) {
// ...
activity.RecordHeartbeat(ctx, details)
// ...
}
When an Activity Task Execution times out due to a missed Heartbeat, the last value of the details
variable above is returned to the calling Workflow in the details
field of TimeoutError
with TimeoutType
set to Heartbeat
.
You can also Heartbeat an Activity from an external source:
// The client is a heavyweight object that should be created once per process.
temporalClient, err := client.Dial(client.Options{})
// Record heartbeat.
err := temporalClient.RecordActivityHeartbeat(ctx, taskToken, details)
The parameters of the RecordActivityHeartbeat
function are:
taskToken
: The value of the binaryTaskToken
field of theActivityInfo
struct retrieved inside the Activity.details
: The serializable payload containing progress information.
If an Activity Execution Heartbeats its progress before it failed, the retry attempt will have access to the progress information, so that the Activity Execution can resume from the failed state. Here's an example of how this can be implemented:
func SampleActivity(ctx context.Context, inputArg InputParams) error {
startIdx := inputArg.StartIndex
if activity.HasHeartbeatDetails(ctx) {
// Recover from finished progress.
var finishedIndex int
if err := activity.GetHeartbeatDetails(ctx, &finishedIndex); err == nil {
startIdx = finishedIndex + 1 // Start from next one.
}
}
// Normal Activity logic...
for i:=startIdx; i<inputArg.EndIdx; i++ {
// Code for processing item i goes here...
activity.RecordHeartbeat(ctx, i) // Report progress.
}
}
Heartbeat Timeout
A Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more works in conjunction with Activity HeartbeatsWhat is an Activity Heartbeat?
An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
Learn more.
To set a Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more, Create an instance of ActivityOptions
from the go.temporal.io/sdk/workflow
package, set the RetryPolicy
field, and then use the WithActivityOptions()
API to apply the options to the instance of workflow.Context
.
activityoptions := workflow.ActivityOptions{
HeartbeatTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}
Asynchronous Activity Completion
Asynchronous Activity CompletionWhat is Asynchronous Activity Completion?Asynchronous Activity Completion occurs when an external system provides the final result of a computation, started by an Activity, to the Temporal System.
Learn more enables the Activity Function to return without the Activity Execution completing.
There are three steps to follow:
The Activity provides the external system with identifying information needed to complete the Activity Execution. Identifying information can be a Task TokenWhat is a Task Token?
A Task Token is a unique Id that correlates to an Activity Execution.
Learn more, or a combination of Namespace, Workflow Id, and Activity Id.The Activity Function completes in a way that identifies it as waiting to be completed by an external system.
The Temporal Client is used to Heartbeat and complete the Activity.
Provide the external system with a Task Token to complete the Activity Execution. To do this, use the
GetInfo()
API from thego.temporal.io/sdk/activity
package.
// Retrieve the Activity information needed to asynchronously complete the Activity.
activityInfo := activity.GetInfo(ctx)
taskToken := activityInfo.TaskToken
// Send the taskToken to the external service that will complete the Activity.
- Return an
activity.ErrResultPending
error to indicate that the Activity is completing asynchronously.
return "", activity.ErrResultPending
- Use the Temporal Client to complete the Activity using the Task Token.
// Instantiate a Temporal service client.
// The same client can be used to complete or fail any number of Activities.
// The client is a heavyweight object that should be created once per process.
temporalClient, err := client.Dial(client.Options{})
// Complete the Activity.
temporalClient.CompleteActivity(context.Background(), taskToken, result, nil)
The following are the parameters of the CompleteActivity
function:
taskToken
: The value of the binaryTaskToken
field of theActivityInfo
struct retrieved inside the Activity.result
: The return value to record for the Activity. The type of this value must match the type of the return value declared by the Activity function.err
: The error code to return if the Activity terminates with an error.
If error
is not null, the value of the result
field is ignored.
To fail the Activity, you would do the following:
// Fail the Activity.
client.CompleteActivity(context.Background(), taskToken, nil, err)
Child Workflows
A Child Workflow ExecutionWhat is a Child Workflow Execution?
A Child Workflow Execution is a Workflow Execution that is spawned from within another Workflow.
Learn more is a Workflow Execution that is scheduled from within another Workflow using a Child Workflow API.
When using a Child Workflow API, Child Workflow related Events (StartChildWorkflowExecutionInitiatedEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more, ChildWorkflowExecutionStartedEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more, ChildWorkflowExecutionCompletedEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more, etc...) are logged in the Workflow Execution Event History.
Always block progress until the ChildWorkflowExecutionStartedEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more Event is logged to the Event History to ensure the Child Workflow Execution has started.
After that, Child Workflow Executions may be abandoned using the default Abandon Parent Close PolicyWhat is a Parent Close Policy?
If a Workflow Execution is a Child Workflow Execution, a Parent Close Policy determines what happens to the Workflow Execution if its Parent Workflow Execution changes to a Closed status (Completed, Failed, Timed out).
Learn more set in the Child Workflow Options.
To be sure that the Child Workflow Execution has started, first call the Child Workflow Execution method on the instance of Child Workflow future, which returns a different future.
Then get the value of an object that acts as a proxy for a result that is initially unknown, which is what waits until the Child Workflow Execution has spawned.
To spawn a Child Workflow ExecutionWhat is a Child Workflow Execution?
A Child Workflow Execution is a Workflow Execution that is spawned from within another Workflow.
Learn more in Go, use the ExecuteChildWorkflow
API, which is available from the go.temporal.io/sdk/workflow
package.
The ExecuteChildWorkflow
call requires an instance of workflow.Context
, with an instance of workflow.ChildWorkflowOptions
applied to it, the Workflow Type, and any parameters that should be passed to the Child Workflow Execution.
workflow.ChildWorkflowOptions
contain the same fields as client.StartWorkflowOptions
.
Workflow Option fields automatically inherit their values from the Parent Workflow Options if they are not explicitly set.
If a custom WorkflowID
is not set, one is generated when the Child Workflow Execution is spawned.
Use the WithChildOptions
API to apply Child Workflow Options to the instance of workflow.Context
.
The ExecuteChildWorkflow
call returns an instance of a ChildWorkflowFuture
.
Call the .Get()
method on the instance of ChildWorkflowFuture
to wait for the result.
func YourWorkflowDefinition(ctx workflow.Context, params ParentParams) (ParentResp, error) {
childWorkflowOptions := workflow.ChildWorkflowOptions{}
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
var result ChildResp
err := workflow.ExecuteChildWorkflow(ctx, YourOtherWorkflowDefinition, ChildParams{}).Get(ctx, &result)
if err != nil {
// ...
}
// ...
return resp, nil
}
func YourOtherWorkflowDefinition(ctx workflow.Context, params ChildParams) (ChildResp, error) {
// ...
return resp, nil
}
To asynchronously spawn a Child Workflow Execution, the Child Workflow must have an "Abandon" Parent Close Policy set in the Child Workflow Options.
Additionally, the Parent Workflow Execution must wait for the ChildWorkflowExecutionStarted
Event to appear in its Event History before it completes.
If the Parent makes the ExecuteChildWorkflow
call and then immediately completes, the Child Workflow Execution does not spawn.
To be sure that the Child Workflow Execution has started, first call the GetChildWorkflowExecution
method on the instance of the ChildWorkflowFuture
, which will return a different Future.
Then call the Get()
method on that Future, which is what will wait until the Child Workflow Execution has spawned.
import (
// ...
"go.temporal.io/api/enums/v1"
)
func YourWorkflowDefinition(ctx workflow.Context, params ParentParams) (ParentResp, error) {
childWorkflowOptions := workflow.ChildWorkflowOptions{
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
}
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, YourOtherWorkflowDefinition, ChildParams{})
// Wait for the Child Workflow Execution to spawn
var childWE workflow.Execution
if err := childWorkflowFuture.GetChildWorkflowExecution().Get(ctx, &childWE); err != nil {
return err
}
// ...
return resp, nil
}
func YourOtherWorkflowDefinition(ctx workflow.Context, params ChildParams) (ChildResp, error) {
// ...
return resp, nil
}
Parent Close Policy
A Parent Close PolicyWhat is a Parent Close Policy?
If a Workflow Execution is a Child Workflow Execution, a Parent Close Policy determines what happens to the Workflow Execution if its Parent Workflow Execution changes to a Closed status (Completed, Failed, Timed out).
Learn more determines what happens to a Child Workflow Execution if its Parent changes to a Closed status (Completed, Failed, or Timed Out).
The default Parent Close Policy option is set to terminate the Child Workflow Execution.
In Go, a Parent Close Policy is set on the ParentClosePolicy
field of an instance of workflow.ChildWorkflowOptions
.
The possible values can be obtained from the go.temporal.io/api/enums/v1
package.
PARENT_CLOSE_POLICY_ABANDON
PARENT_CLOSE_POLICY_TERMINATE
PARENT_CLOSE_POLICY_REQUEST_CANCEL
The Child Workflow Options are then applied to the instance of workflow.Context
by using the WithChildOptions
API, which is then passed to the ExecuteChildWorkflow()
call.
- Type:
ParentClosePolicy
- Default:
PARENT_CLOSE_POLICY_TERMINATE
import (
// ...
"go.temporal.io/api/enums/v1"
)
func YourWorkflowDefinition(ctx workflow.Context, params ParentParams) (ParentResp, error) {
// ...
childWorkflowOptions := workflow.ChildWorkflowOptions{
// ...
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
}
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, YourOtherWorkflowDefinition, ChildParams{})
// ...
}
func YourOtherWorkflowDefinition(ctx workflow.Context, params ChildParams) (ChildResp, error) {
// ...
return resp, nil
}
Continue-As-New
Continue-As-NewWhat is Continue-As-New?Continue-As-New is the mechanism by which all relevant state is passed to a new Workflow Execution with a fresh Event History.
Learn more enables a Workflow Execution to close successfully and create a new Workflow Execution in a single atomic operation if the number of Events in the Event History is becoming too large. The Workflow Execution spawned from the use of Continue-As-New has the same Workflow Id, a new Run Id, and a fresh Event History and is passed all the appropriate parameters.
To cause a Workflow Execution to Continue-As-NewWhat is Continue-As-New?
Continue-As-New is the mechanism by which all relevant state is passed to a new Workflow Execution with a fresh Event History.
Learn more, the Workflow API should return the result of the NewContinueAsNewError()
function available from the go.temporal.io/sdk/workflow
package.
func SimpleWorkflow(ctx workflow.Context, value string) error {
...
return workflow.NewContinueAsNewError(ctx, SimpleWorkflow, value)
}
To check whether a Workflow Execution was spawned as a result of Continue-As-New, you can check if workflow.GetInfo(ctx).ContinuedExecutionRunID
is not empty (i.e. ""
).
Notes
- To prevent Signal loss, be sure to perform an asynchronous drain on the Signal channel. Failure to do so can result in buffered Signals being ignored and lost.
- Make sure that the previous Workflow and the Continue-As-New Workflow are referenced by the same alias. Failure to do so can cause the Workflow to Continue-As-New on an entirely different Workflow.
Timers
A Workflow can set a durable timer for a fixed time period.
In some SDKs, the function is called sleep()
, and in others, it's called timer()
.
A Workflow can sleep for months.
Timers are persisted, so even if your Worker or Temporal Cluster is down when the time period completes, as soon as your Worker and Cluster are back up, the sleep()
call will resolve and your code will continue executing.
Sleeping is a resource-light operation: it does not tie up the process, and you can run millions of Timers off a single Worker.
To set a Timer in Go, use the NewTimer()
function and pass the duration you want to wait before continuing.
timer := workflow.NewTimer(timerCtx, duration)
To set a sleep duration in Go, use the sleep()
function and pass the duration you want to wait before continuing.
A zero or negative sleep duration causes the function to return immediately.
sleep = workflow.Sleep(ctx, 10*time.Second)
For more information, see the Timer example in the Go Samples repository.
Schedule a Workflow
Scheduling Workflows is a crucial aspect of any automation process, especially when dealing with time-sensitive tasks. By scheduling a Workflow, you can automate repetitive tasks, reduce the need for manual intervention, and ensure timely execution of your business processes
Use any of the following action to help Schedule a Workflow Execution and take control over your automation process.
Create Schedule
SchedulesWhat is a ScheduleA Schedule enables the scheduling of Workflow Executions.
Learn more are initiated with the `create` call. The user generates a unique Schedule ID for each new Schedule.
To create a Schedule in Go, use Create()
on the ClientWhat is a Temporal Client
A Temporal Client, provided by a Temporal SDK, provides a set of APIs to communicate with a Temporal Cluster.
Learn more.
Schedules must be initialized with a Schedule ID, SpecWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more, and ActionWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more in client.ScheduleOptions{}
.
func main() {
// ...
// Create Schedule and Workflow IDs
scheduleID := "schedule_" + uuid.New()
workflowID := "schedule_workflow_" + uuid.New()
// Create the schedule.
scheduleHandle, err := temporalClient.ScheduleClient().Create(ctx, client.ScheduleOptions{
ID: scheduleID,
Spec: client.ScheduleSpec{},
Action: &client.ScheduleWorkflowAction{
ID: workflowID,
Workflow: schedule.ScheduleWorkflow,
TaskQueue: "schedule",
},
})
// ...
Backfill Schedule
Backfilling a ScheduleWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more executes Actions ahead of the Schedule's specified time range.
This is useful for executing a missed or delayed ActionWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more, or for testing the WorkflowWhat is a Workflow?
In day-to-day conversations, the term "Workflow" frequently denotes either a Workflow Type, a Workflow Definition, or a Workflow Execution.
Learn more ahead of time.
To backfill a Schedule in Go, use Backfill()
on ScheduleHandle
.
Specify the start and end times to execute the Workflow, along with the Overlap PolicyWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more.
func main() {
// ...
Backfill: []client.ScheduleBackfill{
{
Start: now.Add(-4 * time.Minute),
End: now.Add(-2 * time.Minute),
Overlap: enums.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
},
{
Start: now.Add(-2 * time.Minute),
End: now,
Overlap: enums.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
},
},
})
// ...
Delete Schedule
Deleting a ScheduleWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more erases a Schedule.
Deletion does not affect any WorkflowsWhat is a Workflow?
In day-to-day conversations, the term "Workflow" frequently denotes either a Workflow Type, a Workflow Definition, or a Workflow Execution.
Learn more started by the Schedule.
To delete a Schedule, use Delete()
on the ScheduleHandle
.
func main() {
// ...
scheduleHandle.Delete(ctx)
}
// ...
Describe Schedule
Describe
retrieves information about the current ScheduleWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more configuration.
This can include details about the Schedule SpecWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more (such as Intervals or CronExpressions) and Schedule State.
To describe a Schedule, use Describe()
on the ScheduleHandle
.
func main() {
// ...
// describe schedule
scheduleHandle.Describe(ctx)
}
// ...
List Schedules
The List
action returns a list of existing SchedulesWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more and their respective Schedule IDs.
To return information on all Schedules, use ScheduleClient.List()
.
func main() {
// ...
// list schedules
listView, _ := temporalClient.ScheduleClient().List(ctx, client.ScheduleListOptions{
PageSize: 1,
})
// ...
}
// ...
Pause Schedule
Pause
and Unpause
enable the start or stop of all future Workflow RunsWhat is a Workflow?
In day-to-day conversations, the term "Workflow" frequently denotes either a Workflow Type, a Workflow Definition, or a Workflow Execution.
Learn more on a given ScheduleWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more.
Pausing a Schedule halts all future Workflow Runs.
Pause a Schedule by setting State.Paused
to true
, or by using Pause()
on the ScheduleHandle
.
Unpausing a Schedule allows the Workflow to execute as planned.
To unpause a Schedule, use Unpause()
on the ScheduleHandle
, or set State.Paused
to false
.
func main() {
// ...
scheduleHandle, err := temporalClient.ScheduleClient().Create(ctx, client.ScheduleOptions{
// ...
Paused: true,
})
// ...
scheduleHandle.Unpause(ctx, client.ScheduleUnpauseOptions{
Note: "The Schedule has been unpaused.",
})
scheduleHandle.Pause(ctx, client.SchedulePauseOptions{
Note: "The Schedule has been paused.",
})
}
// ...
Trigger Schedule
Triggering a ScheduleWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more immediately executes an ActionWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more defined in that Schedule.
By default, trigger
is subject to the AllowAll
Overlap Policy.
To trigger a Scheduled Workflow ExecutionWhat is a Workflow Execution?
A Temporal Workflow Execution is a durable, scalable, reliable, and reactive function execution. It is the main unit of execution of a Temporal Application.
Learn more, use trigger()
on ScheduleHandle
.
func main() {
// ...
// Create a Schedule to trigger
scheduleHandle, _ := temporalClient.ScheduleClient().Create(ctx, client.ScheduleOptions{
ID: "trigger-schedule",
Spec: client.ScheduleSpec{},
Action: &client.ScheduleWorkflowAction{},
Paused: true,
Overlap: enums.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
})
// ...
// Trigger Schedule
for i := 0; i < 5; i++ {
scheduleHandle.Trigger(ctx, client.ScheduleTriggerOptions{
Overlap: enums.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
})
time.Sleep(2 * time.Second)
}
}
// ...
Update Schedule
Updating a ScheduleWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more changes the configuration of an existing Schedule.
These changes can be made to ActionsWhat is a Schedule
A Schedule enables the scheduling of Workflow Executions.
Learn more, Action parameters, MemosWhat is a Memo?
A Memo is a non-indexed user-supplied set of Workflow Execution metadata that is returned when you describe or list Workflow Executions.
Learn more, and the Workflow's Cancellation Policy.
Use Update()
on the ScheduleHandle
to modify a Schedule.
func main() {
// ...
updateSchedule := func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
}, nil
}
_ = scheduleHandle.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: updateSchedule,
})
}
// ...
Temporal Cron Jobs
A Temporal Cron JobWhat is a Temporal Cron Job?
A Temporal Cron Job is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.
Learn more is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.
A Cron Schedule is provided as an option when the call to spawn a Workflow Execution is made.
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set the CronSchedule
field, and pass the instance to the ExecuteWorkflow
call.
- Type:
string
- Default: None
workflowOptions := client.StartWorkflowOptions{
CronSchedule: "15 8 * * *",
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Side Effects
Side Effects are used to execute non-deterministic code, such as generating a UUID or a random number, without compromising deterministic in the Workflow. This is done by storing the non-deterministic results of the Side Effect into the Workflow Event History.
A Side Effect does not re-execute during a Replay. Instead, it returns the recorded result from the Workflow Execution Event History.
Side Effects should not fail. An exception that is thrown from the Side Effect causes failure and retry of the current Workflow Task.
An Activity or a Local Activity may also be used instead of a Side effect, as its result is also persisted in Workflow Execution History.
You shouldn’t modify the Workflow state inside a Side Effect function, because it is not reexecuted during Replay. Side Effect function should be used to return a value.
Use the SideEffect
function from the go.temporal.io/sdk/workflow
package to execute a Side EffectWhat is a Side Effect?
A Side Effect is a way to execute a short, non-deterministic code snippet, such as generating a UUID, that executes the provided function once and records its result into the Workflow Execution Event History.
Learn more directly in your Workflow.
Pass it an instance of context.Context
and the function to execute.
The SideEffect
API returns a Future, an instance of converter.EncodedValue
.
Use the Get
method on the Future to retrieve the result of the Side Effect.
Correct implementation
The following example demonstrates the correct way to use SideEffect
:
encodedRandom := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return rand.Intn(100)
})
var random int
encodedRandom.Get(&random)
// ...
}
Incorrect implementation
The following example demonstrates how NOT to use SideEffect
:
// Warning: This is an incorrect example.
// This code is non-deterministic.
var random int
workflow.SideEffect(func(ctx workflow.Context) interface{} {
random = rand.Intn(100)
return nil
})
// random will always be 0 in replay, so this code is non-deterministic.
On replay the provided function is not executed, the random number will always be 0, and the Workflow Execution could take a different path, breaking determinism.
Mutable Side Effects
Mutable Side Effects execute the provided function once, and then it looks up the History of the value with the given Workflow ID.
- If there is no existing value, then it records the function result as a value with the given Workflow Id on the History.
- If there is an existing value, then it compares whether the existing value from the History has changed from the new function results, by calling the equals function.
- If the values are equal, then it returns the value without recording a new Marker Event
- If the values aren't equal, then it records the new value with the same ID on the History.
During a Workflow Execution, every new Side Effect call results in a new Marker recorded on the Workflow History; whereas Mutable Side Effects only records a new Marker on the Workflow History if the value for the Side Effect ID changes or is set the first time.
During a Replay, Mutable Side Effects will not execute the function again. Instead, it returns the exact same value that was returned during the Workflow Execution.
To use MutableSideEffect()
in Go, provide a unique name within the scope of the workflow.
if err := workflow.MutableSideEffect(ctx, "configureNumber", get, eq).Get(&number); err != nil {
panic("can't decode number:" + err.Error())
}
Namespaces
You can create, update, deprecate or delete your NamespacesWhat is a Namespace?
A Namespace is a unit of isolation within the Temporal Platform
Learn more using either tctl or SDK APIs.
Use Namespaces to isolate your Workflow Executions according to your needs.
For example, you can use Namespaces to match the development lifecycle by having separate dev
and prod
Namespaces.
You could also use them to ensure Workflow Executions between different teams never communicate - such as ensuring that the teamA
Namespace never impacts the teamB
Namespace.
On Temporal Cloud, use the Temporal Cloud UIHow to create a Namespace in Temporal Cloud
To create a Namespace in Temporal Cloud, use either Temporal Cloud UI or tcld.
Learn more to create and manage a Namespace from the UI, or tcld commands to manage Namespaces from the command-line interface.
On self-hosted Temporal Cluster, you can register and manage your Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.
Use a custom AuthorizerWhat is an Authorizer Plugin?
undefined
Learn more on your Frontend Service in the Temporal Cluster to set restrictions on who can create, update, or deprecate Namespaces.
You must register a Namespace with the Temporal Cluster before setting it in the Temporal Client.
Register Namespace
Registering a Namespace creates a Namespace on the Temporal Cluster or Temporal Cloud.
On Temporal Cloud, use the Temporal Cloud UIHow to create a Namespace in Temporal Cloud
To create a Namespace in Temporal Cloud, use either Temporal Cloud UI or tcld.
Learn more or tcld commands to create Namespaces.
On self-hosted Temporal Cluster, you can register your Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.
Use a custom AuthorizerWhat is an Authorizer Plugin?
undefined
Learn more on your Frontend Service in the Temporal Cluster to set restrictions on who can create, update, or deprecate Namespaces.
Use Register
API with the NamespaceClient
interface to register a NamespaceWhat is a Namespace?
A Namespace is a unit of isolation within the Temporal Platform
Learn more and set the Retention PeriodWhat is a Retention Period?
A Retention Period is the amount of time a Workflow Execution Event History remains in the Cluster's persistence store.
Learn more for the Workflow Execution Event History for the Namespace.
You can also register Namespaces using the tctl command-line tooltctl namespace register
How to register a Namespace using tctl.
Learn more.
client, err := client.NewNamespaceClient(client.Options{HostPort: ts.config.ServiceAddr})
//...
err = client.Register(ctx, &workflowservice.RegisterNamespaceRequest{
Namespace: your-namespace-name,
WorkflowExecutionRetentionPeriod: &retention,
})
The Retention Period setting using WorkflowExecutionRetentionPeriod
is mandatory.
The minimum value you can set for this period is 1 day.
Once registered, set Namespace using Dial
in a Workflow Client to run your Workflow Executions within that Namespace.
See how to set Namespace in a Client in GoHow to connect to Temporal Cloud
Use a compatible mTLS CA certificate and mTLS private key and your Cloud Namespace to connect to Temporal Cloud.
Learn more for details.
Note that Namespace registration using this API takes up to 10 seconds to complete. Ensure that you wait for this registration to complete before starting the Workflow Execution against the Namespace.
To update your Namespace, use the Update
API with the NamespaceClient
.
To update your Namespace using tctl, use the tctl namespace updatetctl namespace update
How to update a Namespace using tctl.
Learn more command.
Manage Namespaces
You can get details for your Namespaces, update Namespace configuration, and deprecate or delete your Namespaces.
On Temporal Cloud, use the Temporal Cloud UIHow to create a Namespace in Temporal Cloud
To create a Namespace in Temporal Cloud, use either Temporal Cloud UI or tcld.
Learn more or tcld commands to manage Namespaces.
On self-hosted Temporal Cluster, you can manage your registered Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.
Use a custom AuthorizerWhat is an Authorizer Plugin?
undefined
Learn more on your Frontend Service in the Temporal Cluster to set restrictions on who can create, update, or deprecate Namespaces.
You must register a Namespace with the Temporal Cluster before setting it in the Temporal Client.
On Temporal Cloud, use the Temporal Cloud UI or tcld commands to manage Namespaces.
On self-hosted Temporal Cluster, you can manage your registered Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.
Update information and configuration for a registered Namespace on your Temporal Cluster:
With tctl:
tctl namespace update
ExampleUse the
UpdateNamespace
API to update configuration on a Namespace. Example//...
err = client.Update(context.Background(), &workflowservice.UpdateNamespaceRequest{
Namespace: "your-namespace-name",
UpdateInfo: &namespace.UpdateNamespaceInfo{ //updates info for the namespace "your-namespace-name"
Description: "updated namespace description",
OwnerEmail: "newowner@mail.com",
//Data: nil,
//State: 0,
},
/*other details that you can update:
Config: &namespace.NamespaceConfig{ //updates the configuration of the namespace with the following options
//WorkflowExecutionRetentionTtl: nil,
//BadBinaries: nil,
//HistoryArchivalState: 0,
//HistoryArchivalUri: "",
//VisibilityArchivalState: 0,
//VisibilityArchivalUri: "",
},
ReplicationConfig: &replication.NamespaceReplicationConfig{ //updates the replication configuration for the namespace
//ActiveClusterName: "",
//Clusters: nil,
//State: 0,
},
SecurityToken: "",
DeleteBadBinary: "",
PromoteNamespace: false,
})*/
//...
Get details for a registered Namespace on your Temporal Cluster:
With tctl:
tctl namespace describe
Use the
DescribeNamespace
API to return information and configuration details for a registered Namespace. Example//...
client, err := client.NewNamespaceClient(client.Options{})
//...
client.Describe(context.Background(), "default")
//...
Get details for all registered Namespaces on your Temporal Cluster:
- With tctl:
tctl namespace list
- Use the
ListNamespace
API to return information and configuration details for all registered Namespaces on your Temporal Cluster. Example
//...
namespace.Handler.ListNamespaces(context.Context(), &workflowservice.ListNamespacesRequest{ //lists 1 page (1-100) of namespaces on the active cluster. You can set a large PageSize or loop until NextPageToken is nil
//PageSize: 0,
//NextPageToken: nil,
//NamespaceFilter: nil,
})
//...- With tctl:
Delete a Namespace: The
DeleteNamespace
API deletes a Namespace. Deleting a Namespace deletes all running and completed Workflow Executions on the Namespace, and removes them from the persistence store and the visibility store. Example://...
client.OperatorService().DeleteNamespace(ctx, &operatorservice.DeleteNamespaceRequest{...
//...
Custom payload conversion
Temporal SDKs provide a Payload ConverterWhat is a Payload Converter?
A Payload Converter serializes data, converting objects or values to bytes and back.
Learn more that can be customized to convert a custom data type to PayloadWhat is a Payload?
A Payload represents binary data such as input and output from Activities and Workflows.
Learn more and back.
Implementing custom Payload conversion is optional.
It is needed only if the default Data ConverterWhat is a default Data Converter?
The default Data Converter is used by the Temporal SDK to convert objects into bytes using a series of Payload Converters.
Learn more does not support your custom values.
To support custom Payload conversion, create a custom Payload ConverterWhat is a Payload Converter?
A Payload Converter serializes data, converting objects or values to bytes and back.
Learn more and configure the Data Converter to use it in your Client options.
The order in which your encoding Payload Converters are applied depend on the order given to the Data Converter. You can set multiple encoding Payload Converters to run your conversions. When the Data Converter receives a value for conversion, it passes through each Payload Converter in sequence until the converter that handles the data type does the conversion.
Use CompositeDataConverter to apply custom, type-specific Payload Converters in a specified order.
NewCompositeDataConverter
creates a new instance of CompositeDataConverter
from an ordered list of type-specific Payload Converters.
The following type-specific Payload Converters are available in the Go SDK, listed in the order that they are applied by the default Data Converter:
- NewNilPayloadConverter()
- NewByteSlicePayloadConverter()
- NewProtoJSONPayloadConverter()
- NewProtoPayloadConverter()
- NewJSONPayloadConverter()
The order in which the Payload Converters are applied is important because during serialization the Data Converter tries the Payload Converters in that specific order until a Payload Converter returns a non-nil Payload.
A custom PayloadConverter
must implement functions FromPayload
(for a single value) or FromPayloads
(for a list of values) to convert to values from a Payload, and ToPayload
(for a single value) or ToPayloads
(for a list of values) to convert values to a Payload.
To set your custom Payload Converter, use NewCompositeDataConverter
and set it as the Data Converter in the Client options.
To replace the default Data Converter with a custom
NewCompositeDataConverter
, use the following.dataConverter := converter.NewCompositeDataConverter(YourCustomPayloadConverter())
To add your custom type conversion to the default Data Converter, use the following to keep the defaults but set yours just before the default JSON fall through.
dataConverter := converter.NewCompositeDataConverter(
converter.NewNilPayloadConverter(),
converter.NewByteSlicePayloadConverter(),
converter.NewProtoJSONPayloadConverter(),
converter.NewProtoPayloadConverter(),
YourCustomPayloadConverter(),
converter.NewJSONPayloadConverter(),
)
Worker Sessions
- This feature is currently available only in the Go SDK.
A Worker Session is a feature that provides a straightforward API for Task RoutingWhat is Task Routing?
Task Routing is when a Task Queue is paired with one or more Worker Processes, primarily for Activity Task Executions.
Learn more to ensure that Activity Tasks are executed with the same Worker without requiring you to manually specify Task Queue names.
Enable Sessions
Enable the Worker to use Sessions.
Set EnableSessionWorker
to true
in the Worker options.
// ...
func main() {
// ...
// Enable Sessions for this Worker.
workerOptions := worker.Options{
EnableSessionWorker: true,
// ...
}
w := worker.New(temporalClient, "fileprocessing", workerOptions)
w.RegisterWorkflow(sessions.SomeFileProcessingWorkflow)
w.RegisterActivity(&sessions.FileActivities{})
err = w.Run(worker.InterruptCh())
// ...
}
Max concurrent Sessions
You can adjust the maximum concurrent Sessions of a Worker.
To limit the number of concurrent Sessions running on a Worker, set the MaxConcurrentSessionExecutionSize
field of worker.Options
to the desired value.
By default, this field is set to a very large value, so there's no need to manually set it if no limitation is needed.
If a Worker hits this limitation, it won't accept any new CreateSession()
requests until one of the existing sessions is completed.
If the session can't be created within CreationTimeout
, CreateSession()
returns an error .
func main() {
// ...
workerOptions := worker.Options{
// ...
// This configures the maximum allowed concurrent sessions.
// Customize this value only if you need to.
MaxConcurrentSessionExecutionSize: 1000,
// ...
}
// ...
Create a Session
Within the Workflow code use the Workflow APIs to create a Session with whichever Worker picks up the first Activity Task.
Use the CreateSession
API to create a Context object that can be passed to calls to spawn Activity Executions.
Pass an instance of workflow.Context
and SessionOptions
to the CreateSession
API call and get a Session Context that contains metadata information of the Session.
Use the Session Context to spawn all Activity Executions that should belong to the Session.
All associated Activity Tasks are then processed by the same Worker Entity.
When the CreateSession
API is called, the Task Queue name that is specified in ActivityOptions
(or in StartWorkflowOptions
if the Task Queue name is not specified in ActivityOptions
) is used, and a Session is created with one of the Workers polling that Task Queue.
The Session Context is cancelled if the Worker executing this Session dies or CompleteSession()
is called.
When using the returned Session Context to spawn Activity Executions, a workflow.ErrSessionFailed
error is returned if the Session framework detects that the Worker executing this Session has died.
The failure of Activity Executions won't affect the state of the Session, so you still need to handle the errors returned from your Activities and call CompleteSession()
if necessary.
If the context passed in already contains an open Session, CreateSession()
returns an error.
If all the Workers are currently busy and unable to handle a new Session, the framework keeps retrying until the CreationTimeout
period you specified in SessionOptions
has passed before returning an error.
(For more details, check the "Concurrent Session Limitation" section.)
CompleteSession()
releases the resources reserved on the Worker, so it's important to call it as soon as you no longer need the Session.
It cancels the session context and therefore all the Activity Executions using that Session Context.
It is safe to call CompleteSession()
on a failed Session, meaning that you can call it from a defer
function after the Session is successfully created.
If the Worker goes down between Activities, any scheduled Activities meant for the Session Worker are canceled.
If not, you get a workflow.ErrSessionFailed
error when the next call of workflow.ExecuteActivity()
is made from that Workflow.
package sessions
import (
"time"
"go.temporal.io/sdk/workflow"
)
// ...
// SomeFileProcessingWorkflow is a Workflow Definition.
func SomeFileProcessingWorkflow(ctx workflow.Context, param FileProcessingWFParam) error {
activityOptions := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, activityOptions)
// ...
sessionOptions := &workflow.SessionOptions{
CreationTimeout: time.Minute,
ExecutionTimeout: time.Minute,
}
// Create a Session with the Worker so that all Activities execute with the same Worker.
sessionCtx, err := workflow.CreateSession(ctx, sessionOptions)
if err != nil {
return err
}
defer workflow.CompleteSession(sessionCtx)
// ...
err = workflow.ExecuteActivity(sessionCtx, a.DownloadFile, param).Get(sessionCtx, &downloadResult)
// ...
err = workflow.ExecuteActivity(sessionCtx, a.ProcessFile, processParam).Get(sessionCtx, &processResult)
// ...
err = workflow.ExecuteActivity(sessionCtx, a.UploadFile, uploadParam).Get(sessionCtx, nil)
// ...
}
Session metadata
type SessionInfo struct {
// A unique Id for the session
SessionID string
// The hostname of the worker that is executing the session
HostName string
// ... other unexported fields
}
func GetSessionInfo(ctx Context) *SessionInfo
The Session Context also stores some Session metadata, which can be retrieved by the GetSessionInfo()
API.
If the Context passed in doesn't contain any Session metadata, this API will return a nil
pointer.
Recreate Session
For long-running Sessions, you may want to use the ContinueAsNew
feature to split the Workflow into multiple runs when all Activities need to be executed by the same Worker.
The RecreateSession()
API is designed for such a use case.
func RecreateSession(ctx Context, recreateToken []byte, sessionOptions *SessionOptions) (Context, error)
Its usage is the same as CreateSession()
except that it also takes in a recreateToken
, which is needed to create a new Session on the same Worker as the previous one.
You can get the token by calling the GetRecreateToken()
method of the SessionInfo
object.
token := workflow.GetSessionInfo(sessionCtx).GetRecreateToken()
Is there a complete example?
Yes, the file processing example in the temporalio/samples-go repo has been updated to use the session framework.
What happens to my Activity if the Worker dies?
If your Activity has already been scheduled, it will be canceled.
If not, you will get a workflow.ErrSessionFailed
error when you call workflow.ExecuteActivity()
.
Is the concurrent session limitation per process or per host?
It's per Worker Process, so make sure there's only one Worker Process running on the host if you plan to use this feature.
Future Work
Right now, a Session is considered failed if the Worker Process dies. However, for some use cases, you may only care whether the Worker host is alive or not. For these use cases, the Session should be automatically re-established if the Worker Process is restarted.
The current implementation assumes that all Sessions are consuming the same type of resource and there's only one global limitation. Our plan is to allow you to specify what type of resource your Session will consume and enforce different limitations on different types of resources.
Error Handling in Go
An Activity, or a Child Workflow, might fail, and you could handle errors differently based on the different error cases.
If the Activity returns an error as errors.New()
or fmt.Errorf()
, that error is converted into *temporal.ApplicationError
.
If the Activity returns an error as temporal.NewNonRetryableApplicationError("error message", details)
, that error is returned as *temporal.ApplicationError
.
There are other types of errors such as *temporal.TimeoutError
, *temporal.CanceledError
and
*temporal.PanicError
.
Following is an example of what your error code might look like:
Here's an example of handling Activity errors within Workflow code that differentiates between different error types.
err := workflow.ExecuteActivity(ctx, YourActivity, ...).Get(ctx, nil)
if err != nil {
var applicationErr *ApplicationError
if errors.As(err, &applicationErr) {
// retrieve error message
fmt.Println(applicationError.Error())
// handle Activity errors (created via NewApplicationError() API)
var detailMsg string // assuming Activity return error by NewApplicationError("message", true, "string details")
applicationErr.Details(&detailMsg) // extract strong typed details
// handle Activity errors (errors created other than using NewApplicationError() API)
switch applicationErr.Type() {
case "CustomErrTypeA":
// handle CustomErrTypeA
case CustomErrTypeB:
// handle CustomErrTypeB
default:
// newer version of Activity could return new errors that Workflow was not aware of.
}
}
var canceledErr *CanceledError
if errors.As(err, &canceledErr) {
// handle cancellation
}
var timeoutErr *TimeoutError
if errors.As(err, &timeoutErr) {
// handle timeout, could check timeout type by timeoutErr.TimeoutType()
switch err.TimeoutType() {
case commonpb.ScheduleToStart:
// Handle ScheduleToStart timeout.
case commonpb.StartToClose:
// Handle StartToClose timeout.
case commonpb.Heartbeat:
// Handle heartbeat timeout.
default:
}
}
var panicErr *PanicError
if errors.As(err, &panicErr) {
// handle panic, message and stack trace are available by panicErr.Error() and panicErr.StackTrace()
}
}
Selectors
Overview
In Go, the select
statement lets a goroutine wait on multiple communication operations.
A select
blocks until one of its cases can run, then it executes that case.
It chooses one at random if multiple are ready.
However, a normal Go select statement can not be used inside of Workflows directly because of the random nature.
Temporal's Go SDK Selector
s are similar and act as a replacement.
They can block on sending and receiving from Channels but as a bonus can listen on Future deferred work.
Usage of Selectors to defer and process work (in place of Go's select
) are necessary in order to ensure deterministic Workflow code execution (though using select
in Activity code is fine).
Full API Example
The API is sufficiently different from select
that it bears documenting:
func SampleWorkflow(ctx workflow.Context) error {
// standard Workflow setup code omitted...
// API Example: declare a new selector
selector := workflow.NewSelector(ctx)
// API Example: defer code execution until the Future that represents Activity result is ready
work := workflow.ExecuteActivity(ctx, ExampleActivity)
selector.AddFuture(work, func(f workflow.Future) {
// deferred code omitted...
})
// more parallel timers and activities initiated...
// API Example: receive information from a Channel
var signalVal string
channel := workflow.GetSignalChannel(ctx, channelName)
selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {
// matching on the channel doesn't consume the message.
// So it has to be explicitly consumed here
c.Receive(ctx, &signalVal)
// do something with received information
})
// API Example: block until the next Future is ready to run
// important! none of the deferred code runs until you call selector.Select
selector.Select(ctx)
// Todo: document selector.HasPending
}
Using Selectors with Futures
You usually add Future
s after Activities
:
// API Example: defer code execution until after an activity is done
work := workflow.ExecuteActivity(ctx, ExampleActivity)
selector.AddFuture(work, func(f workflow.Future) {
// deferred code omitted...
})
selector.Select(ctx)
is the primary mechanism which blocks on and executes Future
work.
It is intentionally flexible; you may call it conditionally or multiple times:
// API Example: blocking conditionally
if somecondition != nil {
selector.Select(ctx)
}
// API Example: popping off all remaining Futures
for i := 0; i < len(someArray); i++ {
selector.Select(ctx) // this will wait for one branch
// you can interrupt execution here
}
A Future matches only once per Selector instance even if Select is called multiple times. If multiple items are available, the order of matching is not defined.
Using Selectors with Timers
An important use case of futures is setting up a race between a timer and a pending activity, effectively adding a "soft" timeout that doesn't result in any errors or retries of that activity.
For example, the Timer sample shows how you can write a long running order processing operation where:
- if processing takes too long, we send out a notification email to user about the delay, but we won't cancel the operation
- if the operation finishes before the timer fires, then we want to cancel the timer.
var processingDone bool
f := workflow.ExecuteActivity(ctx, OrderProcessingActivity)
selector.AddFuture(f, func(f workflow.Future) {
processingDone = true
// cancel timerFuture
cancelHandler()
})
// use timer future to send notification email if processing takes too long
timerFuture := workflow.NewTimer(childCtx, processingTimeThreshold)
selector.AddFuture(timerFuture, func(f workflow.Future) {
if !processingDone {
// processing is not done yet when timer fires, send notification email
_ = workflow.ExecuteActivity(ctx, SendEmailActivity).Get(ctx, nil)
}
})
// wait the timer or the order processing to finish
selector.Select(ctx)
We create timers with the workflow.NewTimer
API.
Using Selectors with Channels
selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {})
is the primary mechanism which receives messages from Channels
.
// API Example: receive information from a Channel
var signalVal string
channel := workflow.GetSignalChannel(ctx, channelName)
selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &signalVal)
// do something with received information
})
Merely matching on the channel doesn't consume the message; it has to be explicitly consumed with a c.Receive(ctx, &signalVal)
call.
Querying Selector State
You can use the selector.HasPending
API to ensure that signals are not lost when a Workflow is closed (e.g. by ContinueAsNew
).
Learn More
Usage of Selectors is best learned by example:
- Setting up a race condition between an Activity and a Timer, and conditionally execute (Timer example)
- Receiving information in a Channel (Mutex example)
- Looping through a list of work and scheduling them all in parallel (DSL example)
- Executing activities in parallel, pick the first result, cancel remainder (Pick First example)