Skip to main content
Build a durable AI agent with Temporal

Build the agent Workflow and Worker

~60 minutesIntermediatePython
  1. Build the toolkit
  2. Define agent behavior
  3. Workflow & Worker
  4. Run and observe

With your goal, Activities, and prompts in place, you'll now build the Temporal Workflow that orchestrates the agent's multi-turn conversation, then set up a Worker to execute it.

Building the agent Workflow

Agents need to manage conversations that involve multiple turns including user interaction, tool execution, and state management. The challenge is maintaining coherence across these sessions while handling failures, retries, and long-running interactions. Your agent must coordinate several concurrent concerns such as validating user input against conversation context, determining when to execute tools, managing user input for tool execution, and maintaining conversation history that persists in the event of system failures. A traditional application would lose conversation state during failures, but Temporal Workflows provide durable execution that preserves context through any system interruption.

In this step, you will create the Temporal Workflow that orchestrates your agent's conversation loop. This Workflow handles user interactions, validates prompts, manages tool execution, and maintains conversation state, all while providing durability to the agent.

Creating the workflows submodule

First, create the directory structure for your Workflow implementations:

mkdir workflows

Next, create an empty __init__.py file in the directory to enable it as a submodule:

touch workflows/__init__.py

Now that your workflows directory is a submodule, you will create a few helper functions for your Workflow.

Implementing a few Workflow helper functions

Before implementing the Workflow, you will implement a few helper functions that perform repetitive operations like tool execution, argument validation, and conversation continuation.

First, create workflows/workflow_helpers.py and add the following import statements:

workflows/workflow_helpers.py
from datetime import timedelta
from typing import Any, Callable, Deque, Dict

from temporalio import workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import ActivityError

with workflow.unsafe.imports_passed_through():
from activities.activities import AgentActivities
from models.requests import ConversationHistory, ToolData, ToolPromptInput
from prompts.agent_prompt_generators import (
generate_missing_args_prompt,
generate_tool_completion_prompt,
)

Like previous import statements, this section includes libraries from the Python standard library and Temporal libraries. However, there are also libraries being imported with the with workflow.unsafe.imports_passed_through() statement. This statement is necessary when importing third-party libraries, including ones you implement, into a Workflow (or in this case, imported into a file that will be imported by the Workflow). This is done for performance and determinism safety reasons, which you can read more about in the Temporal documentation.

Next, declare the following timeout constants:

workflows/workflow_helpers.py
TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT = timedelta(seconds=30)
TOOL_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT = timedelta(minutes=30)
LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT = timedelta(seconds=30)
LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT = timedelta(minutes=30)

These timeout constants set sensible limits for tool execution and LLM calls, ensuring the calls have enough time to respond, but that the Workflow detects a failure within a reasonable amount of time.

Defining the tool execution Activity invocation function

The first function you'll implement is the handle_tool_execution function. Add the method header to the file:

workflows/workflow_helpers.py
async def handle_tool_execution(
current_tool: str,
tool_data: ToolData,
add_message_callback: Callable[..., Any],
prompt_queue: Deque[str],
) -> None:

This function takes in the current tool to execute, the tool data, a callback that stores the conversation history, and a queue for prompts that the agent will execute later to continue its goal. The function executes the tool as a dynamic Activity, and processes the results for the LLM to handle.

Add the code to invoke the Activity and process the results:

workflows/workflow_helpers.py
    """Execute a tool after confirmation and handle its result."""
workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}")
try:
dynamic_result = await workflow.execute_activity(
current_tool,
tool_data["args"],
schedule_to_close_timeout=TOOL_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
start_to_close_timeout=TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
)
dynamic_result["tool"] = current_tool
except ActivityError as e:
workflow.logger.error(f"Tool execution failed: {str(e)}")
dynamic_result = {"error": str(e), "tool": current_tool}

add_message_callback("tool_result", dynamic_result)
prompt_queue.append(generate_tool_completion_prompt(current_tool, dynamic_result))

It executes the tool by calling the name of the tool, which gets handled by the dynamic Activity you implemented. When calling the Activity, you specify the Activity timeouts using the constants you defined earlier. Whether the Activity succeeds or fails, the result is stored to the conversation history using the add_message_callback that was passed in. Then, the method invokes the generate_tool_completion_prompt function with the current_tool and result of the tool execution to create a prompt and add it to the prompt_queue, which the agent will handle on its next iteration.

Defining the missing argument handler function

Next you'll create the function that checks and handles missing tool arguments. Add the function header with the following arguments:

workflows/workflow_helpers.py
async def handle_missing_args(
current_tool: str,
args: Dict[str, Any],
tool_data: Dict[str, Any],
prompt_queue: Deque[str],
) -> bool:

This function takes in the current_tool, the args that were passed to the tool, the tool_data containing all data related to the tool, and the prompt_queue containing prompts the LLM still needs to act on.

Add the remaining code to check for any missing arguments:

workflows/workflow_helpers.py
    """Check for missing arguments and handle them if found."""
missing_args = [key for key, value in args.items() if value is None]

if missing_args:
prompt_queue.append(
generate_missing_args_prompt(current_tool, tool_data, missing_args)
)
workflow.logger.info(
f"Missing arguments for tool: {current_tool}: {' '.join(missing_args)}"
)
return True
return False

The tool arguments are checked, and if any are missing, the generate_missing_args_prompt is invoked and the result is added to the prompt_queue for the agent to execute on its next turn. The function then returns True. Otherwise, no arguments were missing and the function returns False.

Defining the history formatting function

Next you'll define functions for formatting the conversation history.

Add the following function to your code:

workflows/workflow_helpers.py
def format_history(conversation_history: ConversationHistory) -> str:
"""Format the conversation history into a single string."""
return " ".join(str(msg["response"]) for msg in conversation_history["messages"])

This function compacts responses from every message in the conversation history and returns it as a single string.

Defining the history summarization prompt function

Now you'll use the previous function to generate a prompt for the LLM to summarize the conversation.

Add the following function to your code:

workflows/workflow_helpers.py
def prompt_summary_with_history(
conversation_history: ConversationHistory,
) -> tuple[str, str]:
"""Generate a prompt for summarizing the conversation.
Used only for continue as new of the workflow."""
history_string = format_history(conversation_history)
context_instructions = f"Here is the conversation history between a user and a chatbot: {history_string}"
actual_prompt = (
"Please produce a two sentence summary of this conversation. "
'Put the summary in the format { "summary": "<plain text>" }'
)
return (context_instructions, actual_prompt)

The code calls the format_history function, then creates two variables, one containing the history and another containing the prompt. It then returns both variables as a tuple.

Defining the function to handle long Event Histories

Temporal Workflows have a limit on the length and size of a single Workflow Execution's Event History. A Temporal Workflow will Continue-As-New when the Event History reaches these limits, and will continue the execution in a new Workflow Execution, which in turn creates new Event History. Due to the length of LLM responses, you will implement a function to determine if a Continue-As-New is needed.

First, define the function header:

workflows/workflow_helpers.py
async def continue_as_new_if_needed(
conversation_history: ConversationHistory,
prompt_queue: Deque[str],
agent_goal: Any,
max_turns: int,
add_message_callback: Callable[..., Any],
) -> None:

The function receives the conversation_history as your custom type, the prompt_queue as the pass-by-object Deque used to control the flow of prompts, the agent's goal, how many turns the conversation should last for, and a function callback to add this interaction to the conversation history.

Next, add the function implementation:

workflows/workflow_helpers.py
    """Handle workflow continuation if message limit is reached."""
if len(conversation_history["messages"]) >= max_turns:
summary_context, summary_prompt = prompt_summary_with_history(
conversation_history
)
summary_input = ToolPromptInput(
prompt=summary_prompt, context_instructions=summary_context
)
conversation_summary = await workflow.start_activity_method(
AgentActivities.agent_toolPlanner,
summary_input,
schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
)
workflow.logger.info(f"Continuing as new after {max_turns} turns.")
add_message_callback("conversation_summary", conversation_summary)
workflow.continue_as_new(
args=[
{
"tool_params": {
"conversation_summary": conversation_summary,
"prompt_queue": prompt_queue,
},
"agent_goal": agent_goal,
}
]
)

The function first checks if the conversation history's length is greater than or equal to the maximum number of turns specified. If this evaluates to true, the function proceeds with its Continue-As-New process. First it calls prompt_summary_with_history to create a summary and prompt context using the current history. It then uses this output to create an input type, ToolPromptInput, based off of this summary for the agent to process. Next it calls the agent_toolPlanner Activity with this input to invoke the LLM with this summarized context. It then calls the add_message_callback function, which adds this event to the conversation history. Finally, it invokes workflow.continue_as_new to perform the Continue-As-New operation, which results in a new Workflow Execution starting at this point in the Event History, and the current Workflow Execution closing.

Defining the prompt entity identification function

Finally, add a function that returns a boolean indicating whether the prompt came from a user or not:

workflows/workflow_helpers.py
# LLM-tagged prompts start with "###"
# all others are from the user
def is_user_prompt(prompt) -> bool:
if prompt.startswith("###"):
return False
else:
return True

LLM prompts start with ###, so any prompt that doesn't begin with that character sequence is a user prompt.

The workflows/workflow_helpers.py is complete and will need no more revisions. You can review the complete file and copy the code here.
workflows/workflow_helpers.py
from datetime import timedelta
from typing import Any, Callable, Deque, Dict

from temporalio import workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import ActivityError

with workflow.unsafe.imports_passed_through():
from activities.activities import AgentActivities
from models.requests import ConversationHistory, ToolData, ToolPromptInput
from prompts.agent_prompt_generators import (
generate_missing_args_prompt,
generate_tool_completion_prompt,
)


TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT = timedelta(seconds=30)
TOOL_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT = timedelta(minutes=30)
LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT = timedelta(seconds=30)
LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT = timedelta(minutes=30)


async def handle_tool_execution(
current_tool: str,
tool_data: ToolData,
add_message_callback: Callable[..., Any],
prompt_queue: Deque[str],
) -> None:
"""Execute a tool after confirmation and handle its result."""
workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}")

try:
dynamic_result = await workflow.execute_activity(
current_tool,
tool_data["args"],
schedule_to_close_timeout=TOOL_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
start_to_close_timeout=TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
)
dynamic_result["tool"] = current_tool
except ActivityError as e:
workflow.logger.error(f"Tool execution failed: {str(e)}")
dynamic_result = {"error": str(e), "tool": current_tool}

add_message_callback("tool_result", dynamic_result)
prompt_queue.append(generate_tool_completion_prompt(current_tool, dynamic_result))


async def handle_missing_args(
current_tool: str,
args: Dict[str, Any],
tool_data: Dict[str, Any],
prompt_queue: Deque[str],
) -> bool:
"""Check for missing arguments and handle them if found."""
missing_args = [key for key, value in args.items() if value is None]

if missing_args:
prompt_queue.append(
generate_missing_args_prompt(current_tool, tool_data, missing_args)
)
workflow.logger.info(
f"Missing arguments for tool: {current_tool}: {' '.join(missing_args)}"
)
return True
return False


def format_history(conversation_history: ConversationHistory) -> str:
"""Format the conversation history into a single string."""
return " ".join(str(msg["response"]) for msg in conversation_history["messages"])


def prompt_summary_with_history(
conversation_history: ConversationHistory,
) -> tuple[str, str]:
"""Generate a prompt for summarizing the conversation.
Used only for continue as new of the workflow."""
history_string = format_history(conversation_history)
context_instructions = f"Here is the conversation history between a user and a chatbot: {history_string}"
actual_prompt = (
"Please produce a two sentence summary of this conversation. "
'Put the summary in the format { "summary": "<plain text>" }'
)
return (context_instructions, actual_prompt)


async def continue_as_new_if_needed(
conversation_history: ConversationHistory,
prompt_queue: Deque[str],
agent_goal: Any,
max_turns: int,
add_message_callback: Callable[..., Any],
) -> None:
"""Handle workflow continuation if message limit is reached."""
if len(conversation_history["messages"]) >= max_turns:
summary_context, summary_prompt = prompt_summary_with_history(
conversation_history
)
summary_input = ToolPromptInput(
prompt=summary_prompt, context_instructions=summary_context
)
conversation_summary = await workflow.start_activity_method(
AgentActivities.agent_toolPlanner,
summary_input,
schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
)
workflow.logger.info(f"Continuing as new after {max_turns} turns.")
add_message_callback("conversation_summary", conversation_summary)
workflow.continue_as_new(
args=[
{
"tool_params": {
"conversation_summary": conversation_summary,
"prompt_queue": prompt_queue,
},
"agent_goal": agent_goal,
}
]
)


# LLM-tagged prompts start with "###"
# all others are from the user
def is_user_prompt(prompt) -> bool:
if prompt.startswith("###"):
return False
else:
return True

Now that you have built out the supporting functions, you can build the agent Workflow.

Preparing the core agent Workflow

The core agent Workflow is the primary driver of your agent. It orchestrates LLM and tool execution, maintains conversation state, and makes decisions about what step to take next. The Workflow will consist of the primary Workflow class and method, as well as a few Signals, Queries, and class methods.

First, create workflows/agent_goal_workflow.py, and add the necessary imports:

workflows/agent_goal_workflow.py
from collections import deque
from datetime import timedelta
from typing import Any, Deque, Dict, Optional, Union

from temporalio import workflow
from temporalio.common import RetryPolicy

from models.core import AgentGoal
from models.requests import (
ConversationHistory,
CurrentTool,
EnvLookupInput,
EnvLookupOutput,
ToolData,
ValidationInput,
)
from workflows import workflow_helpers as helpers
from workflows.workflow_helpers import (
LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
)

with workflow.unsafe.imports_passed_through():
from activities.activities import AgentActivities
from models.requests import CombinedInput, ToolPromptInput
from prompts.agent_prompt_generators import generate_genai_prompt

# Constants
MAX_TURNS_BEFORE_CONTINUE = 250

These imports bring in the necessary types, helper functions, and constants you have defined so far, as well as libraries from the Temporal and Python standard library. You also added the MAX_TURNS_BEFORE_CONTINUE constant, and set the value to 250. The agent will use this value with the continue_as_new_if_needed helper function you implemented to decide if it should Continue-As-New. For the sake of this agent and its goal, 250 turns should be an adequate number.

Defining the agent class and constructor

You define a Temporal Workflow by creating a Python class. Create the AgentGoalWorkflow class, decorate it with @workflow.defn, and define the __init__ method:

workflows/agent_goal_workflow.py
@workflow.defn
class AgentGoalWorkflow:
"""Workflow that manages tool execution with user confirmation and conversation history."""

def __init__(self) -> None:
self.conversation_history: ConversationHistory = {"messages": []}
self.prompt_queue: Deque[str] = deque()
self.chat_ended: bool = False
self.tool_data: Optional[ToolData] = None
self.goal: Optional[AgentGoal] = None
self.waiting_for_confirm: bool = False
self.show_tool_args_confirmation: bool = (
True # set from env file in activity lookup_wf_env_settings
)
self.confirmed: bool = (
False # indicates that we have confirmation to proceed to run tool
)

Your Workflow must be decorated with the @workflow.defn decorator. This is what distinguishes it as a Temporal Workflow. While a Workflow isn't required to have a __init__ method, your agent will benefit from instance variables.

VariableDescription
conversation_historyA record of the entire chat conversation history
prompt_queueA queue maintaining tasks left for the agent to process
chat_endedA boolean to determine if the chat has ended or not
tool_dataA record of the current tool data
goalThe agent's goal
waiting_for_confirmA boolean signifying if the agent is ready to execute the tool
show_tool_args_confirmationA boolean to determine if extra confirmation is necessary before executing tools
confirmedA boolean for determining if the agent is confirmed to proceed

Next, you'll begin implementing the main Workflow method.

Defining the agent control variables

Every Temporal Workflow has a singular entry point, also known as the Workflow method. This method is decorated with the @workflow.run decorator. Your Workflow method will contain the primary business logic for your agent.

Declare the method header for your agent's Workflow method:

workflows/agent_goal_workflow.py
    @workflow.run
async def run(self, combined_input: CombinedInput) -> str:

The Workflow method must be decorated with the @workflow.run decorator, and must be implemented using Python's asyncio library. This method takes in one argument, a type you defined named CombinedInput, and returns a str. Recall that CombinedInput contains the AgentGoal and AgentGoalWorkflowParams types.

Add the next few lines of code to the run method to assign values to a few parameters:

workflows/agent_goal_workflow.py
        """Main workflow execution method."""
# setup phase, starts with blank tool_params and agent_goal prompt as defined in tools/goal_registry.py
params = combined_input.tool_params
self.goal = combined_input.agent_goal

await self.lookup_wf_env_settings()

The last line calls a method, lookup_wf_env_settings, that hasn't been defined yet, so define that as a method within the AgentGoalWorkflow class but not within the scope of your run method:

workflows/agent_goal_workflow.py
    # look up env settings in an activity so they're part of history
async def lookup_wf_env_settings(self) -> None:
env_lookup_input = EnvLookupInput(
show_confirm_env_var_name="SHOW_CONFIRM",
show_confirm_default=True,
)
env_output: EnvLookupOutput = await workflow.execute_activity_method(
AgentActivities.get_wf_env_vars,
env_lookup_input,
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
)
self.show_tool_args_confirmation = env_output.show_confirm

This method invokes the get_wf_env_vars Activity to read the environment variables and store them appropriately.

Next, add the final lines of code to finish instantiating the instance and local variables within the run method:

workflows/agent_goal_workflow.py
        if params and params.prompt_queue:
self.prompt_queue.extend(params.prompt_queue)

waiting_for_confirm: bool = False
current_tool: Optional[CurrentTool] = None

If the parameters include a prompt, they are added to the prompt_queue for the agent to process. The prompt_queue is the source of truth for tasks that the agent needs to execute to complete its goal. Tasks will be added throughout the lifecycle, which will drive execution forward.

Finally, you set the waiting for confirmation variable to false and the current tool to None. These variables will change as the agent processes the various tasks to complete its goal.

Now that you've defined the class and instantiated the control variables, you can build the core agent loop.

Implementing the core agent loop

The core of the agent's logic, processing, and validation takes place within a single main loop. Every iteration of the loop is considered a turn. The agent may perform an action in a turn, or set up an action to be performed on the next turn, and continue the loop to end its turn. This loop will run indefinitely until the agent determines it achieved its goal and returns the final result.

Handling the await conditions and exit condition

The first step is to create the loop and handle the await and exit conditions. Add the following lines of code within the run method directly following the await self.lookup_wf_env_settings() line:

workflows/agent_goal_workflow.py
        while True:
# wait indefinitely for input from signals - user_prompt, end_chat, or confirm as defined below
await workflow.wait_condition(
lambda: bool(self.prompt_queue) or self.chat_ended or self.confirmed
)

# handle chat should end. When chat ends, push conversation history to workflow results.
if self.chat_ended:
return f"{self.conversation_history}"

This section creates the loop, and then immediately awaits for a condition to become true so it can proceed. The conditions it's waiting on are for either something to be added to the prompt_queue so the agent has something to process, the chat ending either later in the loop or via Signal, or for the user to confirm execution. Once any of these three conditions is met, it continues execution. The agent then checks to see if the self.chat_ended instance variable is True, indicating that the agent can halt execution. If so, the agent will return the conversation history stored in the self.conversation instance variable, and the Workflow Execution will close.

Executing the tool

Next, your agent will determine if it is appropriate to execute a tool, and if it is, invoke an Activity to do so.

Continue by adding the following code to execute the tool:

workflows/agent_goal_workflow.py
            # Execute the tool
if self.ready_for_tool_execution() and current_tool is not None:
await self.execute_tool(current_tool)
continue

Before the agent executes a tool, the agent confirms that the tool meets the requirements for execution and that the current tool is not None. If both of these checks evaluate to True, the agent executes the tool. Once the tool has completed execution, it continues the loop, meaning it skips all further execution and returns to the top of the loop, ready to begin another iteration.

Adding in a few more helper methods

Next, implement three helper methods that the tool execution code block called, but had not yet implemented.

The first checks if the tool is ready for execution. Leave the run and append this new method to your class:

workflows/agent_goal_workflow.py
    # define if we're ready for tool execution
def ready_for_tool_execution(self) -> bool:

return (
self.confirmed and self.waiting_for_confirm and self.tool_data is not None
)

This method checks if the user confirmed execution via self.confirmed, if the agent has confirmed it has the data it needs to execute via self.waiting_for_confirm, and if self.tool_data is set. If this evaluates to True, the tool is ready for execution and the method returns True.

The second method executes the tool. Leave the run and append this new method to your class:

workflows/agent_goal_workflow.py
    # execute the tool - set self.waiting_for_confirm to False if we're not waiting for confirm anymore
# (always the case if it works successfully)
async def execute_tool(self, current_tool: CurrentTool) -> None:
workflow.logger.info(
f"workflow step: user has confirmed, executing the tool {current_tool}"
)
self.confirmed = False
confirmed_tool_data = self.tool_data.copy()
confirmed_tool_data["next"] = "confirm"
self.add_message("user_confirmed_tool_run", confirmed_tool_data)

# execute the tool by key as defined in tools/__init__.py
await helpers.handle_tool_execution(
current_tool,
self.tool_data,
self.add_message,
self.prompt_queue,
)

self.waiting_for_confirm = False

This method resets the self.confirmed variable, makes a copy of the tool data to then modify, and adds a message to the conversation history with this modified tool data. It then uses the handle_tool_execution function to invoke the tool as an Activity. Once the Activity has completed, it returns the waiting_for_confirm variable. On a successful execution, the self.waiting_for_confirm instance variable is set to False, resetting it and preparing the agent for its next turn in the conversation.

And finally, the execute_tool helper method called yet another helper method, the add_message method. This method adds messages to the conversation history.

workflows/agent_goal_workflow.py
    def add_message(self, actor: str, response: Union[str, Dict[str, Any]]) -> None:
"""Add a message to the conversation history.

Args:
actor: The entity that generated the message (e.g., "user", "agent")
response: The message content, either as a string or structured data
"""
if isinstance(response, dict):
response_str = str(response)
workflow.logger.debug(f"Adding {actor} message: {response_str[:100]}...")
else:
workflow.logger.debug(f"Adding {actor} message: {response[:100]}...")

self.conversation_history["messages"].append(
{"actor": actor, "response": response}
)

The method checks to see if the response parameter passed in is a dict or str. It then removes the first 100 characters, which contain boilerplate LLM response, and adds the message to the self.conversation_history instance variable.

Validating user prompts

Before processing any input from the user, the agent needs to validate it. You defined Activities in a prior section to validate the data, and now your Workflow will invoke them.

Continue by adding the prompt processing logic within the core agent loop:

workflows/agent_goal_workflow.py
            # process forward on the prompt queue if any
if self.prompt_queue:
# get most recent prompt
prompt = self.prompt_queue.popleft()
workflow.logger.info(
f"workflow step: processing message on the prompt queue, message is {prompt}"
)

# Validate user-provided prompts
if helpers.is_user_prompt(prompt):
self.add_message("user", prompt)

# Validate the prompt before proceeding
validation_input = ValidationInput(
prompt=prompt,
conversation_history=self.conversation_history,
agent_goal=self.goal,
)
validation_result = await workflow.execute_activity_method(
AgentActivities.agent_validatePrompt,
args=[validation_input],
schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
)

# If validation fails, provide that feedback to the user - i.e., "your words make no sense, puny human" end this iteration of processing
if not validation_result.validationResult:
workflow.logger.warning(
f"Prompt validation failed: {validation_result.validationFailedReason}"
)
self.add_message(
"agent", validation_result.validationFailedReason
)
continue

The validation code first checks to see if there are any prompts in the queue. If so, it removes the most recent prompt for processing. Next it calls the is_user_prompt helper function you defined earlier to determine who the author of the prompt is. If the prompt is from the agent, the validation step is skipped. However, if the prompt is from a user, it is validated. The agent creates a ValidationInput variable containing the prompt, the conversation history, and the agent's goal. The agent then executes the agent_validatePrompt Activity, passing the ValidationInput variable as input. If the validation passes, the Workflow proceeds execution. However, if the validation fails, the agent logs the error, adds it to conversation history and resets to the beginning using continue, where it will inform the user of the error and await a response.

It's important to recall that within agent_validatePrompt, regardless of success the Activity calls the agent_toolPlanner method. This provides a reason why the validation failed, if necessary.

Generating a context-aware prompt

Upon successful validation, the Workflow invokes another Activity to generate a context-aware prompt for the LLM to use.

Continue by adding the call to the generate_genai_prompt function you implemented in the prompts submodule to your code:

workflows/agent_goal_workflow.py
                # If valid, proceed with generating the context and prompt
context_instructions = generate_genai_prompt(
agent_goal=self.goal,
conversation_history=self.conversation_history,
raw_json=self.tool_data,
)

This function call takes the agent's goal, the current conversation history, and the tool's data as input to generate the prompt. Recall that the tool data may be blank, for example, on the first iteration as a tool hasn't been selected. The prompt template handles this and only renders the data if it exists.

Executing the tool planner

Now that the prompt is constructed, you can use the LLM to plan which tool to use.

Add the following code to call the agent_toolPlanner Activity and process the results:

workflows/agent_goal_workflow.py
                prompt_input = ToolPromptInput(
prompt=prompt, context_instructions=context_instructions
)

# connect to LLM and execute to get next steps
tool_data = await workflow.execute_activity_method(
AgentActivities.agent_toolPlanner,
prompt_input,
schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
)

tool_data["force_confirm"] = self.show_tool_args_confirmation
self.tool_data = ToolData(**tool_data)

# process the tool as dictated by the prompt response - what to do next, and with which tool
next_step = tool_data.get("next")
current_tool: Optional[CurrentTool] = tool_data.get("tool")

workflow.logger.info(
f"next_step: {next_step}, current tool is {current_tool}"
)

Before the agent executes the Activity, it creates a variable using your type ToolPromptInput that contains the prompt and context. It then invokes the agent_toolPlanner Activity, passing in this variable. The Activity makes a call to the LLM with the prompt to determine what tool the agent should use to proceed with the next step of its goal, and returns the response as a dict. If the SHOW_CONFIRM environment variable was set to True, then the force_confirm key is also set to True. Next, the self.tool_data instance variable is updated with the data returned from the Activity execution. It then sets the next_step and current_tool variables to prepare for the next phase of execution.

Determining the next_step

The next_step variable contains the next action the LLM decided the agent should take to achieve its goal. This variable can only contain the value question, confirm, and done, which the agent interprets and acts on. When the value is question, the agent asks a clarifying question of the user, such as requesting a missing parameter. This is handled automatically via the prompt. However, confirm and done require custom logic.

Add the following code to implement the path for these options:

workflows/agent_goal_workflow.py
                # make sure we're ready to run the tool & have everything we need
if next_step == "confirm" and current_tool:
args = tool_data.get("args", {})
# if we're missing arguments, ask for them
if await helpers.handle_missing_args(
current_tool, args, tool_data, self.prompt_queue
):
continue

self.waiting_for_confirm = True

# We have needed arguments, if we want to force the user to confirm, set that up
if self.show_tool_args_confirmation:
self.confirmed = False # set that we're not confirmed
workflow.logger.info("Waiting for user confirm signal...")
# if we have all needed arguments (handled above) and not holding for a debugging confirm, proceed:
else:
self.confirmed = True

# else if the next step is to be done with the conversation such as if the user requests it via asking to "end conversation"
elif next_step == "done":
self.add_message("agent", tool_data)

# here we could send conversation to AI for analysis

# end the workflow
return str(self.conversation_history)

If next_step is set to confirm, then the user confirmed their choice and the LLM has chosen to continue executing. If both confirm and current_tool have something assigned to them, the agent checks for missing arguments using the handle_missing_args function. Remember that if the handle_missing_args function determines an argument is missing, it adds a new prompt to the prompt_queue so the agent asks the user on the next turn. If an argument is missing, the prompt is added and the agent continues, leading to the user being asked for the missing argument. If no argument is missing, then self.waiting_for_confirm is set to True, which indicates that the agent is ready to execute the tool.

It then checks if self.show_tools_args_confirmation was set. If so, self.confirmed is set to False, forcing the user to confirm again on the next turn. Otherwise, self.confirmed is set to True, and the user approved the tool execution on the next turn.

However, if next_step is set to done, the LLM determined that the goal is complete, and no more work is necessary. The agent wraps up by adding a final message to the conversation history, and then returns the conversation history, closing the Workflow Execution.

Handling a long running execution

The final segment of the agent loop handles long running execution. Temporal Workflows have a limit on the size of a single Workflow Execution's Event History. If the Event History is too long, then the agent should perform a Continue-As-New operation to prevent a potential failure.

Add the following code to check and execute a Continue-As-New if necessary:

workflows/agent_goal_workflow.py
                self.add_message("agent", tool_data)
await helpers.continue_as_new_if_needed(
self.conversation_history,
self.prompt_queue,
self.goal,
MAX_TURNS_BEFORE_CONTINUE,
self.add_message,
)

First, the current tool data is added to the conversation history. Before, you defined a helper function continue_as_new_if_needed to determine if the Workflow should perform the Continue-As-New operation. This function makes its decision based on the number of turns the agent completed prior to calling the function. If it is greater, then the agent performs the Continue-As-New operation.

Finally, you are going to implement a method for external Temporal Clients to send and retrieve information to and from the Workflow Execution while it's running.

Communicating with the Workflow

Temporal Workflows allow data to be sent and retrieved during a running execution. These features are known as Signals and Queries.

Look back at the core event loop in the Workflow, specifically the await line at the very top of the loop:

workflows/agent_goal_workflow.py
        while True:
# wait indefinitely for input from signals - user_prompt, end_chat, or confirm as defined below
await workflow.wait_condition(
lambda: bool(self.prompt_queue) or self.chat_ended or self.confirmed
)

You may have noticed the chat_ended variable was never changed, or the user's input was never added to the prompt_queue. This is done via sending Signals to your running Workflow Execution.

Accepting the user's input

To accept user input and add it to the prompt_queue, define a Signal handler as a method within your agent_goal_workflow.py file, outside of the run function, and underneath your other helper functions.

Add the Signal handler to your code:

workflows/agent_goal_workflow.py
    # Signal that comes from api/main.py via a post to /send-prompt
@workflow.signal
async def user_prompt(self, prompt: str) -> None:
"""Signal handler for receiving user prompts."""
workflow.logger.info(f"signal received: user_prompt, prompt is {prompt}")
if self.chat_ended:
workflow.logger.info(f"Message dropped due to chat closed: {prompt}")
return
self.prompt_queue.append(prompt)

A Signal handler is an async method that is decorated with the @workflow.signal decorator. When the Signal is received, it is logged, and then the agent checks to see if the chat has ended. If it has, the Signal is dropped as no more processing work should take place. This is important, as it handles the edge case of the small amount of time between when the agent finishes, but prior to the Workflow Execution closing. Then the prompt is added to the end of the prompt_queue for the agent to eventually process.

Confirming the user's request

Another Signal to implement is the user confirming the use of a tool, specifically when SHOW_CONFIRM is set to True.

Add the following Signal handler to the bottom of your file:

workflows/agent_goal_workflow.py
    # Signal that comes from api/main.py via a post to /confirm
@workflow.signal
async def confirm(self) -> None:
"""Signal handler for user confirmation of tool execution."""
workflow.logger.info("Received user signal: confirmation")
self.confirmed = True

This code implements the Signal handler method, decorates it with @workflow.signal, and logs that the Signal was received. It then sets the self.confirmed instance variable to True, which will unblock the main agent loop.

Ending the chat

The last Signal handler your agent needs is to allow the user to end the chat.

Add the following Signal handler to the bottom of your file:

workflows/agent_goal_workflow.py
    # Signal that comes from api/main.py via a post to /end-chat
@workflow.signal
async def end_chat(self) -> None:
"""Signal handler for ending the chat session."""
workflow.logger.info("signal received: end_chat")
self.chat_ended = True

Similar to the previous Signal handler, this is a decorated method that sets an instance variable to True, in this case the self.chat_ended variable.

Sending information to a Workflow may not be the only action you want to do. You may also want to retrieve some information during its execution. Temporal provides this capability with Queries.

Retrieving the conversation history

Implementing a Query is similar to implementing a Signal: You define a method and decorate it. However, the method can't be async, and the decorator is @workflow.query.

Add the following Query to the bottom of your file, to retrieve the conversation history:

workflows/agent_goal_workflow.py
    @workflow.query
def get_conversation_history(self) -> ConversationHistory:
"""Query handler to retrieve the full conversation history."""
return self.conversation_history

This Query returns the current conversation history that is stored in the self.conversation_history instance variable.

Retrieving the latest tool data

The final Query returns the latest tool data.

Add the following code to the bottom of your file to implement it:

workflows/agent_goal_workflow.py
    @workflow.query
def get_latest_tool_data(self) -> Optional[ToolData]:
"""Query handler to retrieve the latest tool data response if available."""
return self.tool_data

This Query returns the current tool data, if available, that is stored in the self.tool_data instance variable.

Your Workflow now has the necessary Signals and Queries for a client API to properly communicate with it and implement a user interface on top of it.

The workflows/agent_goal_workflow.py is complete and will need no more revisions. You can review the complete file and copy the code here.
workflows/agent_goal_workflow.py
from collections import deque
from datetime import timedelta
from typing import Any, Deque, Dict, Optional, Union

from temporalio import workflow
from temporalio.common import RetryPolicy

from models.core import AgentGoal
from models.requests import (
ConversationHistory,
CurrentTool,
EnvLookupInput,
EnvLookupOutput,
ToolData,
ValidationInput,
)
from workflows import workflow_helpers as helpers
from workflows.workflow_helpers import (
LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
)

with workflow.unsafe.imports_passed_through():
from activities.activities import AgentActivities
from models.requests import CombinedInput, ToolPromptInput
from prompts.agent_prompt_generators import generate_genai_prompt

# Constants
MAX_TURNS_BEFORE_CONTINUE = 250


@workflow.defn
class AgentGoalWorkflow:
"""Workflow that manages tool execution with user confirmation and conversation history."""

def __init__(self) -> None:
self.conversation_history: ConversationHistory = {"messages": []}
self.prompt_queue: Deque[str] = deque()
self.chat_ended: bool = False
self.tool_data: Optional[ToolData] = None
self.goal: Optional[AgentGoal] = None
self.waiting_for_confirm: bool = False
self.show_tool_args_confirmation: bool = (
True # set from env file in activity lookup_wf_env_settings
)
self.confirmed: bool = (
False # indicates that we have confirmation to proceed to run tool
)

# see ../api/main.py#temporal_client.start_workflow() for how the input parameters are set
@workflow.run
async def run(self, combined_input: CombinedInput) -> str:
"""Main workflow execution method."""
# setup phase, starts with blank tool_params and agent_goal prompt as defined in tools/goal_registry.py
params = combined_input.tool_params
self.goal = combined_input.agent_goal

await self.lookup_wf_env_settings()

if params and params.prompt_queue:
self.prompt_queue.extend(params.prompt_queue)

current_tool: Optional[CurrentTool] = None

while True:
await workflow.wait_condition(
lambda: bool(self.prompt_queue) or self.chat_ended or self.confirmed
)

if self.chat_ended:
workflow.logger.info("Chat-end signal received. Chat ending.")
return f"{self.conversation_history}"

if self.ready_for_tool_execution() and current_tool is not None:
await self.execute_tool(current_tool)
continue

if self.prompt_queue:
prompt = self.prompt_queue.popleft()
workflow.logger.info(
f"workflow step: processing message on the prompt queue, message is {prompt}"
)

if helpers.is_user_prompt(prompt):
self.add_message("user", prompt)
validation_input = ValidationInput(
prompt=prompt,
conversation_history=self.conversation_history,
agent_goal=self.goal,
)
validation_result = await workflow.execute_activity_method(
AgentActivities.agent_validatePrompt,
args=[validation_input],
schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
)

if not validation_result.validationResult:
workflow.logger.warning(
f"Prompt validation failed: {validation_result.validationFailedReason}"
)
self.add_message(
"agent", validation_result.validationFailedReason
)
continue

context_instructions = generate_genai_prompt(
agent_goal=self.goal,
conversation_history=self.conversation_history,
raw_json=self.tool_data,
)

prompt_input = ToolPromptInput(
prompt=prompt, context_instructions=context_instructions
)

tool_data = await workflow.execute_activity_method(
AgentActivities.agent_toolPlanner,
prompt_input,
schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
)

tool_data["force_confirm"] = self.show_tool_args_confirmation
self.tool_data = ToolData(**tool_data)

next_step = tool_data.get("next")
current_tool: Optional[CurrentTool] = tool_data.get("tool")

workflow.logger.info(
f"next_step: {next_step}, current tool is {current_tool}"
)

if next_step == "confirm" and current_tool:
args = tool_data.get("args", {})
if await helpers.handle_missing_args(
current_tool, args, tool_data, self.prompt_queue
):
continue

self.waiting_for_confirm = True

if self.show_tool_args_confirmation:
self.confirmed = False
workflow.logger.info("Waiting for user confirm signal...")
else:
self.confirmed = True

elif next_step == "done":
self.add_message("agent", tool_data)
return str(self.conversation_history)

self.add_message("agent", tool_data)
await helpers.continue_as_new_if_needed(
self.conversation_history,
self.prompt_queue,
self.goal,
MAX_TURNS_BEFORE_CONTINUE,
self.add_message,
)

async def lookup_wf_env_settings(self) -> None:
env_lookup_input = EnvLookupInput(
show_confirm_env_var_name="SHOW_CONFIRM",
show_confirm_default=True,
)
env_output: EnvLookupOutput = await workflow.execute_activity_method(
AgentActivities.get_wf_env_vars,
env_lookup_input,
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
)
self.show_tool_args_confirmation = env_output.show_confirm

def ready_for_tool_execution(self) -> bool:
return (
self.confirmed and self.waiting_for_confirm and self.tool_data is not None
)

async def execute_tool(self, current_tool: CurrentTool) -> None:
workflow.logger.info(
f"workflow step: user has confirmed, executing the tool {current_tool}"
)
self.confirmed = False
confirmed_tool_data = self.tool_data.copy()
confirmed_tool_data["next"] = "confirm"
self.add_message("user_confirmed_tool_run", confirmed_tool_data)

await helpers.handle_tool_execution(
current_tool,
self.tool_data,
self.add_message,
self.prompt_queue,
)

self.waiting_for_confirm = False

def add_message(self, actor: str, response: Union[str, Dict[str, Any]]) -> None:
if isinstance(response, dict):
response_str = str(response)
workflow.logger.debug(f"Adding {actor} message: {response_str[:100]}...")
else:
workflow.logger.debug(f"Adding {actor} message: {response[:100]}...")

self.conversation_history["messages"].append(
{"actor": actor, "response": response}
)

@workflow.signal
async def user_prompt(self, prompt: str) -> None:
workflow.logger.info(f"signal received: user_prompt, prompt is {prompt}")
if self.chat_ended:
workflow.logger.info(f"Message dropped due to chat closed: {prompt}")
return
self.prompt_queue.append(prompt)

@workflow.signal
async def confirm(self) -> None:
workflow.logger.info("Received user signal: confirmation")
self.confirmed = True

@workflow.signal
async def end_chat(self) -> None:
workflow.logger.info("signal received: end_chat")
self.chat_ended = True

@workflow.query
def get_conversation_history(self) -> ConversationHistory:
return self.conversation_history

@workflow.query
def get_latest_tool_data(self) -> Optional[ToolData]:
return self.tool_data

This Workflow demonstrates the key patterns for building durable AI agents. It is event-driven, handling interactions with Signals and Queries, it validates user prompts and implements guardrails, it requires confirmation for tool execution, it maintains state and context across failures, and it's observable. The duration of the Workflow Execution is irrelevant. Thanks to Temporal, the session could go on for minutes, hours, days, or even weeks.

Before moving on to the next section, verify your files and directory structure is correct.
temporal-ai-agent/
├── .env
├── .gitignore
├── .python-version
├── README.md
├── pyproject.toml
├── uv.lock
├── activities/
| ├── __init__.py
| └── activities.py
├── models/
│ ├── __init__.py
│ ├── core.py
│ └── requests.py
├── prompts/
│ ├── __init__.py
│ ├── agent_prompt_generators.py
│ └── prompts.py
├── scripts/
│ ├── create_invoice_test.py
│ ├── find_events_test.py
│ └── search_flights_test.py
├── tools/
│ ├── __init__.py
│ ├── create_invoice.py
│ ├── find_events.py
│ ├── goal_registry.py
│ ├── search_flights.py
│ ├── tool_registry.py
│ └── data/
| └── find_events_data.json
└── workflows/
├── __init__.py
├── agent_goal_workflow.py
└── workflow_helpers.py

In the next section, you will implement the Temporal Worker, which is responsible for executing your Workflow and Activities.

Building the Temporal Worker

Temporal Workflows are not run by executing the agent_goal_workflow.py file. Workflows, Activities, Signal and Query handling, and all Temporal operations are handled by Temporal Workers.

Creating the Temporal client

A Worker uses a Temporal client to communicate with the Temporal service to coordinate execution. A Temporal client is also used to request execution of Temporal Workflows. Since this application will require multiple Temporal clients, you will implement a shared submodule that others can call to create a Temporal client. This reduces the need for duplicate code and potentially incorrectly setting the Task Queue.

First, create the shared directory and a blank __init__.py file to create the submodule:

mkdir shared
touch shared/__init__.py

Next, create the file config.py within the shared directory and add the following import statements:

shared/config.py
import os

from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.service import TLSConfig

You'll then load in the environment variables you specified earlier. If you are running this tutorial using the local development server, these are commented out in your .env file and will use the default settings.

shared/config.py
load_dotenv(override=True)

# Temporal connection settings
TEMPORAL_ADDRESS = os.getenv("TEMPORAL_ADDRESS", "localhost:7233")
TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default")
TEMPORAL_TASK_QUEUE = os.getenv("TEMPORAL_TASK_QUEUE", "agent-task-queue")

# Authentication settings
TEMPORAL_TLS_CERT = os.getenv("TEMPORAL_TLS_CERT", "")
TEMPORAL_TLS_KEY = os.getenv("TEMPORAL_TLS_KEY", "")
TEMPORAL_API_KEY = os.getenv("TEMPORAL_API_KEY", "")

Finally, add the code to configure a Temporal client:

shared/config.py
async def get_temporal_client() -> Client:
"""
Creates a Temporal client based on environment configuration.
Supports local server, mTLS, and API key authentication methods.
"""
# Default to no TLS for local development
tls_config = False
print(f"Address: {TEMPORAL_ADDRESS}, Namespace {TEMPORAL_NAMESPACE}")
print("(If unset, then will try to connect to local server)")

# Configure mTLS if certificate and key are provided
if TEMPORAL_TLS_CERT and TEMPORAL_TLS_KEY:
print(f"TLS cert: {TEMPORAL_TLS_CERT}")
print(f"TLS key: {TEMPORAL_TLS_KEY}")
with open(TEMPORAL_TLS_CERT, "rb") as f:
client_cert = f.read()
with open(TEMPORAL_TLS_KEY, "rb") as f:
client_key = f.read()
tls_config = TLSConfig(
client_cert=client_cert,
client_private_key=client_key,
)

# Use API key authentication if provided
if TEMPORAL_API_KEY:
print(f"API key: {TEMPORAL_API_KEY}")
return await Client.connect(
TEMPORAL_ADDRESS,
namespace=TEMPORAL_NAMESPACE,
api_key=TEMPORAL_API_KEY,
tls=True, # Always use TLS with API key
)

# Use mTLS or local connection
return await Client.connect(
TEMPORAL_ADDRESS,
namespace=TEMPORAL_NAMESPACE,
tls=tls_config,
)

This code checks whether or not you configured TLS certs for secure connection or a Temporal API key for connection to Temporal Cloud. It then returns a configured Temporal client, ready to communicate with the Temporal service.

Configuring the Worker

Now that you have a reusable way of creating a Temporal client, you can use that to configure your Temporal Worker.

Start by creating the worker directory:

mkdir worker

Then, create the file worker.py in the worker directory and add the following import statements:

worker/worker.py
import asyncio
import concurrent.futures
import logging
import os

from dotenv import load_dotenv
from temporalio.worker import Worker

from activities.activities import AgentActivities, dynamic_tool_activity
from shared.config import TEMPORAL_TASK_QUEUE, get_temporal_client
from workflows.agent_goal_workflow import AgentGoalWorkflow

These import statements include libraries from the standard library, third-party packages such as dotenv and the temporalio.worker library, as well as a few of the libraries you implemented. A Worker must register the Workflows and Activities it intends to execute, so it must import them, as well as the function for creating the Temporal client.

Next, create the main method and add the code responsible for initializing a few variables, including creating the Temporal client and creating an instance of your AgentActivities class.

worker/worker.py
async def main():
# Load environment variables
load_dotenv(override=True)

# Print LLM configuration info
llm_model = os.environ.get("LLM_MODEL", "openai/gpt-4")
print(f"Worker will use LLM model: {llm_model}")

# Create the client
client = await get_temporal_client()

# Initialize the activities class
activities = AgentActivities()
print(f"AgentActivities initialized with LLM model: {llm_model}")

print("Worker ready to process tasks!")
logging.basicConfig(level=logging.WARN)

This code loads in the environment variables from your .env file. It uses the LLM_MODEL environment variable to print which model the agent will call, defaulting to OpenAI's GPT-4 if none is set. It then creates a Temporal client, and an instance of your AgentActivities class before setting the log level to WARN.

Finally, add the code to configure and start your Worker:

worker/worker.py
    # Run the worker
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue=TEMPORAL_TASK_QUEUE,
workflows=[AgentGoalWorkflow],
activities=[
activities.agent_toolPlanner,
activities.get_wf_env_vars,
dynamic_tool_activity,
],
activity_executor=activity_executor,
)

print(f"Starting worker, connecting to task queue: {TEMPORAL_TASK_QUEUE}")
print("Ready to begin processing...")
await worker.run()

if __name__ == "__main__":
asyncio.run(main())

The code creates a ThreadPoolExecutor for the Worker to use as the activity_executor. Since an agent's tools can be either async or not, you must use one of the synchronous safe methods for Activity execution. You can read more about this in the Python SDK documentation.

Next, the Worker object is created, passing in the client, the task_queue, the activity_executor, and then registering the individual Workflows and Activities the Worker can execute. The Worker is then started with await worker.run(), which creates a long-running process that will poll the Temporal service, executing Workflow and Activities when they are requested.

Finally, the standard if __name__ == "__main__" calls the main function when you run worker.py, starting the Worker.

Now that you have implemented your Worker, verify that it runs.

Testing the Worker

Before starting the Worker, you need to start a Temporal service. To start the local development server, open a terminal and run the following command:

temporal server start-dev

This starts a local Temporal service running on port 7233 with the web UI running on port 8233. The output of this command should resemble (the exact version numbers may not match):

CLI 1.1.1 (Server 1.25.1, UI 2.31.2)

Server: localhost:7233
UI: http://localhost:8233
Metrics: http://localhost:53697/metrics

Next, open another terminal and run your Worker:

uv run worker/worker.py

Your Worker should start, and the output should be:

Worker will use LLM model: openai/gpt-4o
Address: localhost:7233, Namespace default
(If unset, then will try to connect to local server)
AgentActivities initialized with LLM model: openai/gpt-4o
Worker ready to process tasks!
Starting worker, connecting to task queue: agent-task-queue
Ready to begin processing...

The command will not exit, but will persist; this is expected. It is waiting for Workflows and Activity tasks to execute. If your Worker is running successfully, that's as much as you can test for the moment. Kill both the worker and Temporal service by pressing CTRL-C in each terminal.

Before moving on to the next section, verify that your files and directory structure are correct.
temporal-ai-agent/
├── .env
├── .gitignore
├── .python-version
├── README.md
├── pyproject.toml
├── uv.lock
├── activities/
| ├── __init__.py
| └── activities.py
├── models/
│ ├── __init__.py
│ ├── core.py
│ └── requests.py
├── prompts/
│ ├── __init__.py
│ ├── agent_prompt_generators.py
│ └── prompts.py
├── scripts/
│ ├── create_invoice_test.py
│ ├── find_events_test.py
│ └── search_flights_test.py
├── tools/
│ ├── __init__.py
│ ├── create_invoice.py
│ ├── find_events.py
│ ├── goal_registry.py
│ ├── search_flights.py
│ ├── tool_registry.py
│ └── data/
| └── find_events_data.json
├── worker/
│ └── worker.py
└── workflows/
├── __init__.py
├── agent_goal_workflow.py
└── workflow_helpers.py

Next, you will implement a REST API that will serve as the backend service for invoking your agent.

Get notified when we launch new educational content

New courses, tutorials, and learning resources - straight to your inbox.

Subscribe
Feedback