Python SDK developer's guide - Foundations
The Foundations section of the Temporal Developer's guide covers the minimum set of concepts and implementation details needed to build and run a Temporal ApplicationWhat is a Temporal Application
A Temporal Application is a set of Workflow Executions.
Learn more—that is, all the relevant steps to start a Workflow Execution that executes an Activity.
This guide is a work in progress. Some sections may be incomplete or missing for some languages. Information may change at any time.
If you can't find what you are looking for in the Developer's guide, it could be in older docs for SDKs.
In this section you can find the following:
- Run a development ClusterHow to install Temporal CLI and run a development server
undefined
Learn more - Install your SDKHow to install a Temporal SDK
A Temporal SDK provides a framework for Temporal Application development.
Learn more - Connect to a dev ClusterHow to connect a Temporal Client to a Temporal Cluster
When connecting a Temporal Client to a Temporal Cluster, you must provide the address and port number of the Temporal Cluster.
Learn more - Connect to Temporal CloudHow to connect to Temporal Cloud
Use a compatible mTLS CA certificate and mTLS private key and your Cloud Namespace to connect to Temporal Cloud.
Learn more - Develop a WorkflowHow to develop a basic Workflow
Workflows are the fundamental unit of a Temporal Application, and it all starts with the development of a Workflow Definition.
Learn more - Develop an ActivityHow to develop a basic Activity
One of the primary things that Workflows do is orchestrate the execution of Activities.
Learn more - Start an Activity ExecutionHow to start an Activity Execution
Calls to spawn Activity Executions are written within a Workflow Definition.
Learn more - Run a dev WorkerHow to run Worker Processes
The Worker Process is where Workflow Functions and Activity Functions are executed.
Learn more - Run a Temporal Cloud WorkerHow to run Worker Processes
The Worker Process is where Workflow Functions and Activity Functions are executed.
Learn more - Start a Workflow ExecutionHow to start a Workflow Execution
Workflow Execution semantics rely on several parameters—that is, to start a Workflow Execution you must supply a Task Queue that will be used for the Tasks (one that a Worker is polling), the Workflow Type, language-specific contextual data, and Workflow Function parameters.
Learn more
Run a development server
This section describes how to install the Temporal CLI and run a development Cluster. The local development Cluster comes packaged with the Temporal Web UI.
For information on deploying and running a production Cluster, see the Cluster deployment guide, or sign up for Temporal Cloud and let us run your production Cluster for you.
Temporal CLI is a tool for interacting with a Temporal Cluster from the command line and it includes a distribution of the Temporal Server and Web UI. This local development Cluster runs as a single process with zero runtime dependencies and it supports persistence to disk and in-memory mode through SQLite.
Install the Temporal CLI
Choose one of the following install methods to install the Temporal CLI.
- macOS
- Linux
- Windows
Install the Temporal CLI with Homebrew.
brew install temporal
Install the Temporal CLI with cURL.
curl -sSf https://temporal.download/cli.sh | sh
Install the Temporal CLI from CDN.
- Select the platform and architecture needed.
- Extract the downloaded archive.
- Add the
temporal
binary to your PATH.
Install the Temporal CLI with cURL.
curl -sSf https://temporal.download/cli.sh | sh
Install the Temporal CLI from CDN.
- Select the platform and architecture needed.
- Extract the downloaded archive.
- Add the
temporal
binary to your PATH.
- Install the Temporal CLI from CDN.
- Select the platform and architecture needed and download the binary.
- Extract the downloaded archive.
- Add the
temporal.exe
binary to your PATH.
Start the Temporal Development Server
Start the Temporal Development Server by using the server start-dev
command.
temporal server start-dev
This command automatically starts the Web UI, creates the default Namespace, and uses an in-memory database.
The Temporal Server should be available on localhost:7233
and the Temporal Web UI should be accessible at http://localhost:8233
.
The server's startup configuration can be customized using command line options. For a full list of options, run:
temporal server start-dev --help
Install a Temporal SDK
A Temporal SDKWhat is a Temporal SDK?
A Temporal SDK is a language-specific library that offers APIs to construct and use a Temporal Client to communicate with a Temporal Cluster, develop Workflow Definitions, and develop Worker Programs.
Learn more provides a framework for Temporal ApplicationWhat is a Temporal Application
A Temporal Application is a set of Workflow Executions.
Learn more development.
An SDK provides you with the following:
- A Temporal ClientWhat is a Temporal Client
A Temporal Client, provided by a Temporal SDK, provides a set of APIs to communicate with a Temporal Cluster.
Learn more to communicate with a Temporal ClusterWhat is a Temporal Cluster?
A Temporal Cluster is the Temporal Server paired with persistence.
Learn more. - APIs to develop WorkflowsWhat is a Workflow?
In day-to-day conversations, the term "Workflow" frequently denotes either a Workflow Type, a Workflow Definition, or a Workflow Execution.
Learn more. - APIs to create and manage Worker ProcessesWhat is a Worker?
In day-to-day conversations, the term Worker is used to denote both a Worker Program and a Worker Process. Temporal documentation aims to be explicit and differentiate between them.
Learn more. - APIs to author ActivitiesWhat is an Activity Definition?
An Activity Definition is the code that defines the constraints of an Activity Task Execution.
Learn more.
To install the latest version of the Temporal Python package, run the following command.
pip install temporalio
API reference
The Temporal Python SDK API reference is published on python.temporal.io.
Code samples
You can find a complete list of executable code samples in Temporal's GitHub repository.
Additionally, several of the Tutorials are backed by a fully executable template application.
Connect to a dev Cluster
A Temporal ClientWhat is a Temporal Client
A Temporal Client, provided by a Temporal SDK, provides a set of APIs to communicate with a Temporal Cluster.
Learn more enables you to communicate with the Temporal ClusterWhat is a Temporal Cluster?
A Temporal Cluster is the Temporal Server paired with persistence.
Learn more.
Communication with a Temporal Cluster includes, but isn't limited to, the following:
- Starting Workflow Executions.
- Sending Signals to Workflow Executions.
- Sending Queries to Workflow Executions.
- Getting the results of a Workflow Execution.
- Providing an Activity Task Token.
A Temporal Client cannot be initialized and used inside a Workflow. However, it is acceptable and common to use a Temporal Client inside an Activity to communicate with a Temporal Cluster.
When you are running a Cluster locally (such as Temporalite), the number of connection options you must provide is minimal.
Many SDKs default to the local host or IP address and port that Temporalite and Docker Compose serve (127.0.0.1:7233
).
Use the connect()
method on the Client class to create and connect to a Temporal Client to the Temporal Cluster.
# ...
async def main():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
YourWorkflow.run,
"your name",
id="your-workflow-id",
task_queue="your-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Connect to Temporal Cloud
When you connect to Temporal Cloud, you need to provide additional connection and client options that include the following:
- An address that includes your Cloud Namespace NameWhat is a Namespace?
A Namespace is a unit of isolation within the Temporal Platform
Learn more and a port number:<Namespace>.<ID>.tmprl.cloud:<port>
. - mTLS CA certificate.
- mTLS private key.
For more information about managing and generating client certificates for Temporal Cloud, see How to manage certificates in Temporal Cloud.
For more information about configuring TLS to secure inter- and intra-network communication for a Temporal Cluster, see Temporal Customization Samples.
Use the connect()
method on the Client class to create and connect to a Temporal Client to the Temporal Cluster.
Then specify the TLSConfig arguments to connect to a Temporal Cluster with TLS enabled.
The client_cert
must be combined with client_private_key
to authenticate the Client.
from temporalio.client import Client, TLSConfig
# ...
# ...
async def main():
with open("client-cert.pem", "rb") as f:
client_cert = f.read()
with open("client-private-key.pem", "rb") as f:
client_private_key = f.read()
client = await Client.connect(
"your-custom-namespace.tmprl.cloud:7233",
namespace="your-custom-namespace",
tls=TLSConfig(
client_cert=client_cert,
client_private_key=client_private_key,
# domain=domain, # TLS domain
# server_root_ca_cert=server_root_ca_cert, # ROOT CA to validate the server cert
),
)
Develop Workflows
Workflows are the fundamental unit of a Temporal Application, and it all starts with the development of a Workflow DefinitionWhat is a Workflow Definition?
A Workflow Definition is the code that defines the constraints of a Workflow Execution.
Learn more.
In the Temporal Python SDK programming model, Workflows are defined as classes.
Specify the @workflow.defn
decorator on the Workflow class to identify a Workflow.
Use the @workflow.run
to mark the entry point method to be invoked. This must be set on one asynchronous method defined on the same class as @workflow.defn
. Run methods have positional parameters.
from temporalio import workflow
# ...
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Workflow parameters
Temporal Workflows may have any number of custom parameters. However, we strongly recommend that objects are used as parameters, so that the object's individual fields may be altered without breaking the signature of the Workflow. All Workflow Definition parameters must be serializable.
Workflow parameters are the method parameters of the singular method decorated with @workflow.run
.
These can be any data type Temporal can convert, including dataclasses
when properly type-annotated.
Technically this can be multiple parameters, but Temporal strongly encourages a single dataclass
parameter containing all input fields.
from dataclasses import dataclass
# ...
# ...
@dataclass
class YourParams:
greeting: str
name: str
Workflow return values
Workflow return values must also be serializable. Returning results, returning errors, or throwing exceptions is fairly idiomatic in each language that is supported. However, Temporal APIs that must be used to get the result of a Workflow Execution will only ever receive one of either the result or the error.
To return a value of the Workflow, use return
to return an object.
To return the results of a Workflow Execution, use either start_workflow()
or execute_workflow()
asynchronous methods.
from temporalio import workflow
# ...
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Workflow Type
Workflows have a Type that are referred to as the Workflow name.
The following examples demonstrate how to set a custom name for your Workflow Type.
You can customize the Workflow name with a custom name in the decorator argument. For example, @workflow.defn(name="your-workflow-name")
. If the name parameter is not specified, the Workflow name defaults to the function name.
from temporalio import workflow
# ...
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Workflow logic requirements
Workflow logic is constrained by deterministic execution requirementsWhat is a Workflow Definition?
A Workflow Definition is the code that defines the constraints of a Workflow Execution.
Learn more.
Therefore, each language is limited to the use of certain idiomatic techniques.
However, each Temporal SDK provides a set of APIs that can be used inside your Workflow to interact with external (to the Workflow) application code.
Workflow code must be deterministic. This means:
- no threading
- no randomness
- no external calls to processes
- no network I/O
- no global state mutation
- no system date or time
All API safe for Workflows used in the temporalio.workflow
must run in the implicit asyncio
event loop and be deterministic.
Develop Activities
One of the primary things that Workflows do is orchestrate the execution of Activities.
An Activity is a normal function or method execution that's intended to execute a single, well-defined action (either short or long-running), such as querying a database, calling a third-party API, or transcoding a media file.
An Activity can interact with world outside the Temporal Platform or use a Temporal Client to interact with a Cluster.
For the Workflow to be able to execute the Activity, we must define the Activity DefinitionWhat is an Activity Definition?
An Activity Definition is the code that defines the constraints of an Activity Task Execution.
Learn more.
You can develop an Activity Definition by using the @activity.defn
decorator.
Register the function as an Activity with a custom name through a decorator argument, for example @activity.defn(name="your_activity")
.
from temporalio import activity
# ...
# ...
@activity.defn(name="your_activity")
async def your_activity(input: YourParams) -> str:
return f"{input.greeting}, {input.name}!"
Activity parameters
There is no explicit limit to the total number of parameters that an Activity DefinitionWhat is an Activity Definition?
An Activity Definition is the code that defines the constraints of an Activity Task Execution.
Learn more may support.
However, there is a limit of the total size of the data ends up encoded into a gRPC message Payload.
A single argument is limited to a maximum size of 2 MB. And the total size of a gRPC message, which includes all the arguments, is limited to a maximum of 4 MB.
Also, keep in mind that all Payload data is recorded in the Workflow Execution Event HistoryWhat is an Event History?
An append log of Events that represents the full state a Workflow Execution.
Learn more and large Event Histories can affect Worker performance.
This is because the entire Event History could be transferred to a Worker Process with a Workflow TaskWhat is a Workflow Task?
A Workflow Task is a Task that contains the context needed to make progress with a Workflow Execution.
Learn more.
Some SDKs require that you pass context objects, others do not. When it comes to your application data—that is, data that is serialized and encoded into a Payload—we recommend that you use a single object as an argument that wraps the application data passed to Activities. This is so that you can change what data is passed to the Activity without breaking a function or method signature.
Activity parameters are the function parameters of the function decorated with @activity.defn
.
These can be any data type Temporal can convert, including dataclasses when properly type-annotated.
Technically this can be multiple parameters, but Temporal strongly encourages a single dataclass parameter containing all input fields.
from temporalio import activity
from your_dataobject_dacx import YourParams
# ...
# ...
@activity.defn(name="your_activity")
async def your_activity(input: YourParams) -> str:
return f"{input.greeting}, {input.name}!"
Activity return values
All data returned from an Activity must be serializable.
There is no explicit limit to the amount of data that can be returned by an Activity, but keep in mind that all return values are recorded in a Workflow Execution Event HistoryWhat is an Event History?
An append log of Events that represents the full state a Workflow Execution.
Learn more.
An Activity Execution can return inputs and other Activity values.
The following example defines an Activity that takes a string as input and returns a string.
View source code# ...
@activity.defn(name="your_activity")
async def your_activity(input: YourParams) -> str:
return f"{input.greeting}, {input.name}!"
Activity Type
Activities have a Type that are referred to as the Activity name. The following examples demonstrate how to set a custom name for your Activity Type.
You can customize the Activity name with a custom name in the decorator argument. For example, @activity.defn(name="your-activity")
.
If the name parameter is not specified, the Activity name defaults to the function name.
# ...
@activity.defn(name="your_activity")
async def your_activity(input: YourParams) -> str:
return f"{input.greeting}, {input.name}!"
Activity Execution
Calls to spawn Activity ExecutionsWhat is an Activity Execution?
An Activity Execution is the full chain of Activity Task Executions.
Learn more are written within a Workflow DefinitionWhat is a Workflow Definition?
A Workflow Definition is the code that defines the constraints of a Workflow Execution.
Learn more.
The call to spawn an Activity Execution generates the ScheduleActivityTask Command.
This results in the set of three Activity TaskWhat is an Activity Task?
An Activity Task contains the context needed to make an Activity Task Execution.
Learn more related Events (ActivityTaskScheduled, ActivityTaskStarted, and ActivityTask[Closed])in your Workflow Execution Event History.
A single instance of the Activities implementation is shared across multiple simultaneous Activity invocations. Therefore, the Activity implementation code must be stateless.
The values passed to Activities through invocation parameters or returned through a result value are recorded in the Execution history. The entire Execution history is transferred from the Temporal service to Workflow Workers when a Workflow state needs to recover. A large Execution history can thus adversely impact the performance of your Workflow.
Therefore, be mindful of the amount of data you transfer through Activity invocation parameters or Return Values. Otherwise, no additional limitations exist on Activity implementations.
To spawn an Activity Execution, use the execute_activity()
operation from within your Workflow Definition.
execute_activity()
is a shortcut for start_activity()
that waits on its result.
To get just the handle to wait and cancel separately, use start_activity()
.
In most cases, use execute_activity()
unless advanced task capabilities are needed.
A single argument to the Activity is positional. Multiple arguments are not supported in the type-safe form of start_activity()
or execute_activity()
and must be supplied by the args
keyword argument.
from temporalio import workflow
# ...
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Required timeout
Activity Execution semantics rely on several parameters.
The only required value that needs to be set is either a Schedule-To-Close TimeoutWhat is a Start-To-Close Timeout?
A Start-To-Close Timeout is the maximum time allowed for a single Activity Task Execution.
Learn more or a Start-To-Close TimeoutWhat is a Start-To-Close Timeout?
A Start-To-Close Timeout is the maximum time allowed for a single Activity Task Execution.
Learn more.
These values are set in the Activity Options.
Activity options are set as keyword arguments after the Activity arguments.
Available timeouts are:
- schedule_to_close_timeout
- schedule_to_start_timeout
- start_to_close_timeout
@workflow.defn
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
name,
schedule_to_close_timeout=timedelta(seconds=5),
# schedule_to_start_timeout=timedelta(seconds=5),
# start_to_close_timeout=timedelta(seconds=5),
)
Get Activity results
The call to spawn an Activity ExecutionWhat is an Activity Execution?
An Activity Execution is the full chain of Activity Task Executions.
Learn more generates the ScheduleActivityTask Command and provides the Workflow with an Awaitable.
Workflow Executions can either block progress until the result is available through the Awaitable or continue progressing, making use of the result when it becomes available.
Use start_activity()
to start an Activity and return its handle, ActivityHandle
. Use execute_activity()
to return the results.
You must provide either schedule_to_close_timeout
or start_to_close_timeout
.
execute_activity()
is a shortcut for await start_activity()
. An asynchronous execute_activity()
helper is provided which takes the same arguments as start_activity()
and await
s on the result. execute_activity()
should be used in most cases unless advanced task capabilities are needed.
from temporalio import workflow
# ...
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Run a dev Worker
The Worker ProcessWhat is a Worker Process?
A Worker Process is responsible for polling a Task Queue, dequeueing a Task, executing your code in response to a Task, and responding to the Temporal Server with the results.
Learn more is where Workflow Functions and Activity Functions are executed.
- Each Worker EntityWhat is a Worker Entity?
A Worker Entity is the individual Worker within a Worker Process that listens to a specific Task Queue.
Learn more in the Worker Process must register the exact Workflow Types and Activity Types it may execute. - Each Worker Entity must also associate itself with exactly one Task QueueWhat is a Task Queue?
A Task Queue is a first-in, first-out queue that a Worker Process polls for Tasks.
Learn more. - Each Worker Entity polling the same Task Queue must be registered with the same Workflow Types and Activity Types.
A Worker EntityWhat is a Worker Entity?
A Worker Entity is the individual Worker within a Worker Process that listens to a specific Task Queue.
Learn more is the component within a Worker Process that listens to a specific Task Queue.
Although multiple Worker Entities can be in a single Worker Process, a single Worker Entity Worker Process may be perfectly sufficient. For more information, see the Worker tuning guide.
A Worker Entity contains both a Workflow Worker and an Activity Worker so that it can make progress for either a Workflow Execution or an Activity Execution.
To develop a Worker, use the Worker()
constructor and add your Client, Task Queue, Workflows, and Activities as arguments.
The following code example creates a Worker that polls for tasks from the Task Queue and executes the Workflow.
When a Worker is created, it accepts a list of Workflows in the workflows parameter, a list of Activities in the activities parameter, or both.
from temporalio.client import Client
from temporalio.worker import Worker
# ...
# ...
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="your-task-queue",
workflows=[YourWorkflow],
activities=[your_activity],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Register types
All Workers listening to the same Task Queue name must be registered to handle the exact same Workflows Types and Activity Types.
If a Worker polls a Task for a Workflow Type or Activity Type it does not know about, it fails that Task. However, the failure of the Task does not cause the associated Workflow Execution to fail.
When a Worker
is created, it accepts a list of Workflows in the workflows
parameter, a list of Activities in the activities
parameter, or both.
# ...
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="your-task-queue",
workflows=[YourWorkflow],
activities=[your_activity],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Start Workflow Execution
Workflow Execution semantics rely on several parameters—that is, to start a Workflow Execution you must supply a Task Queue that will be used for the Tasks (one that a Worker is polling), the Workflow Type, language-specific contextual data, and Workflow Function parameters.
In the examples below, all Workflow Executions are started using a Temporal Client. To spawn Workflow Executions from within another Workflow Execution, use either the Child Workflow or External Workflow APIs.
See the Customize Workflow Type section to see how to customize the name of the Workflow Type.
A request to spawn a Workflow Execution causes the Temporal Cluster to create the first Event (WorkflowExecutionStarted) in the Workflow Execution Event History. The Temporal Cluster then creates the first Workflow Task, resulting in the first WorkflowTaskScheduled Event.
To start a Workflow Execution in Python, use either the start_workflow()
or execute_workflow()
asynchronous methods in the Client.
# ...
async def main():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
YourWorkflow.run,
"your name",
id="your-workflow-id",
task_queue="your-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Set Task Queue
In most SDKs, the only Workflow Option that must be set is the name of the Task QueueWhat is a Task Queue?
A Task Queue is a first-in, first-out queue that a Worker Process polls for Tasks.
Learn more.
For any code to execute, a Worker Process must be running that contains a Worker Entity that is polling the same Task Queue name.
To set a Task Queue in Python, specify the task_queue
argument when executing a Workflow with either start_workflow()
or execute_workflow()
methods.
# ...
async def main():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
YourWorkflow.run,
"your name",
id="your-workflow-id",
task_queue="your-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Workflow Id
Although it is not required, we recommend providing your own Workflow IdWhat is a Workflow Id?
A Workflow Id is a customizable, application-level identifier for a Workflow Execution that is unique to an Open Workflow Execution within a Namespace.
Learn more that maps to a business process or business entity identifier, such as an order identifier or customer identifier.
To set a Workflow Id in Python, specify the id
argument when executing a Workflow with either start_workflow()
or execute_workflow()
methods.
The id
argument should be a unique identifier for the Workflow Execution.
# ...
async def main():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
YourWorkflow.run,
"your name",
id="your-workflow-id",
task_queue="your-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Get Workflow results
If the call to start a Workflow Execution is successful, you will gain access to the Workflow Execution's Run Id.
The Workflow Id, Run Id, and Namespace may be used to uniquely identify a Workflow Execution in the system and get its result.
It's possible to both block progress on the result (synchronous execution) or get the result at some other point in time (asynchronous execution).
In the Temporal Platform, it's also acceptable to use Queries as the preferred method for accessing the state and results of Workflow Executions.
Use start_workflow()
or get_workflow_handle()
to return a Workflow handle.
Then use the result
method to await on the result of the Workflow.
To get a handle for an existing Workflow by its Id, you can use get_workflow_handle()
, or use get_workflow_handle_for()
for type safety.
Then use describe()
to get the current status of the Workflow.
If the Workflow does not exist, this call fails.
# ...
async def main():
client = await Client.connect("localhost:7233")
handle = client.get_workflow_handle(
workflow_id="your-workflow-id",
)
results = await handle.result()
print(f"Result: {results}")
if __name__ == "__main__":
asyncio.run(main())