Skip to content
Open
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [hello_standalone_activity](hello_standalone_activity) - Use activities without using a workflow.
* [langchain](langchain) - Orchestrate workflows for LangChain.
* [langgraph_plugin](langgraph_plugin) - Run LangGraph workflows as durable Temporal workflows (Graph API and Functional API).
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist.
Expand Down
67 changes: 67 additions & 0 deletions langgraph_plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# LangGraph Plugin Samples

These samples demonstrate the [Temporal LangGraph plugin](https://github.com/temporalio/sdk-python/pull/1448), which runs LangGraph workflows as durable Temporal workflows. Each LangGraph graph node (Graph API) or `@task` (Functional API) executes as a Temporal activity with automatic retries, timeouts, and crash recovery.

Samples are organized by API style:

- **Graph API** (`graph_api/`) -- Define workflows as `StateGraph` with nodes and edges.
- **Functional API** (`functional_api/`) -- Define workflows with `@task` and `@entrypoint` decorators for an imperative programming style.

## Samples

| Sample | Graph API | Functional API | Description |
|--------|:---------:|:--------------:|-------------|
| **Hello World** | [graph_api/hello_world](graph_api/hello_world) | [functional_api/hello_world](functional_api/hello_world) | Minimal sample -- single node/task that processes a query string. Start here. |
| **Human-in-the-loop** | [graph_api/human_in_the_loop](graph_api/human_in_the_loop) | [functional_api/human_in_the_loop](functional_api/human_in_the_loop) | Chatbot that uses `interrupt()` to pause for human approval, Temporal signals to receive feedback, and queries to expose the pending draft. |
| **Continue-as-new** | [graph_api/continue_as_new](graph_api/continue_as_new) | [functional_api/continue_as_new](functional_api/continue_as_new) | Multi-stage data pipeline that uses `continue-as-new` with task result caching so previously-completed stages are not re-executed. |
| **ReAct Agent** | [graph_api/react_agent](graph_api/react_agent) | [functional_api/react_agent](functional_api/react_agent) | Tool-calling agent loop. Graph API uses conditional edges; Functional API uses a `while` loop. |
| **Control Flow** | -- | [functional_api/control_flow](functional_api/control_flow) | Demonstrates parallel task execution, `for` loops, and `if/else` branching -- patterns that are natural in the Functional API. |
Comment thread
DABH marked this conversation as resolved.
| **LangSmith Tracing** | [graph_api/langsmith_tracing](graph_api/langsmith_tracing) | [functional_api/langsmith_tracing](functional_api/langsmith_tracing) | Combines `LangGraphPlugin` with Temporal's `LangSmithPlugin` for durable execution + full observability of LLM calls. Requires API keys. |

## Prerequisites

1. Install dependencies:

```bash
uv sync --group langgraph
```

Comment thread
DABH marked this conversation as resolved.
2. Start a [Temporal dev server](https://docs.temporal.io/cli#start-dev-server):

```bash
temporal server start-dev
```

## Running a Sample

Each sample has two scripts -- start the worker first, then the workflow starter in a separate terminal.

```bash
# Terminal 1: start the worker
uv run langgraph_plugin/<api>/<sample>/run_worker.py

# Terminal 2: start the workflow
uv run langgraph_plugin/<api>/<sample>/run_workflow.py
```

For example, to run the Graph API human-in-the-loop chatbot:

```bash
# Terminal 1
uv run langgraph_plugin/graph_api/human_in_the_loop/run_worker.py

# Terminal 2
uv run langgraph_plugin/graph_api/human_in_the_loop/run_workflow.py
```

## Key Features Demonstrated

- **Durable execution** -- Every graph node / `@task` runs as a Temporal activity with configurable timeouts and retry policies.
- **Human-in-the-loop** -- LangGraph's `interrupt()` pauses the graph; Temporal signals deliver human input; queries expose pending state to UIs.
- **Continue-as-new with caching** -- `get_cache()` captures completed task results; passing the cache to the next execution avoids re-running them.
- **Conditional routing** -- Graph API's `add_conditional_edges` and Functional API's native `if/else`/`while` for agent loops.
- **Parallel execution** -- Functional API launches multiple tasks concurrently by creating futures before awaiting them.

## Related

- [SDK PR: LangGraph plugin](https://github.com/temporalio/sdk-python/pull/1448)
1 change: 1 addition & 0 deletions langgraph_plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Temporal LangGraph plugin samples."""
1 change: 1 addition & 0 deletions langgraph_plugin/functional_api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""LangGraph Functional API samples using @task and @entrypoint."""
36 changes: 36 additions & 0 deletions langgraph_plugin/functional_api/continue_as_new/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Continue-as-New with Caching (Functional API)

Same pattern as the Graph API version, using `@task` and `@entrypoint` decorators.

## What This Sample Demonstrates

- Task result caching across continue-as-new boundaries with `get_cache()`
- Restoring cached results with `entrypoint(name, cache=...)`
- Each `@task` executes exactly once despite multiple workflow invocations

## How It Works

1. Three tasks run sequentially: `extract` (x2) -> `transform` (+50) -> `load` (x3).
2. After the first invocation, the workflow continues-as-new with the cache.
3. On subsequent invocations, all tasks return cached results instantly.
4. Input 10 -> 20 -> 70 -> 210.

## Running the Sample

Prerequisites: `uv sync --group langgraph` and a running Temporal dev server.

```bash
# Terminal 1
uv run langgraph_plugin/functional_api/continue_as_new/run_worker.py

# Terminal 2
uv run langgraph_plugin/functional_api/continue_as_new/run_workflow.py
```

## Files

| File | Description |
|------|-------------|
| `workflow.py` | `@task` functions, `@entrypoint`, `PipelineInput`, and `PipelineFunctionalWorkflow` |
| `run_worker.py` | Registers tasks and entrypoint with `LangGraphPlugin`, starts worker |
| `run_workflow.py` | Executes the pipeline workflow and prints the result |
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Continue-as-new pipeline with task result caching."""
37 changes: 37 additions & 0 deletions langgraph_plugin/functional_api/continue_as_new/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Worker for the continue-as-new pipeline (Functional API)."""

import asyncio
import os

from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker

from langgraph_plugin.functional_api.continue_as_new.workflow import (
PipelineFunctionalWorkflow,
activity_options,
all_tasks,
pipeline_entrypoint,
)


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
plugin = LangGraphPlugin(
entrypoints={"pipeline": pipeline_entrypoint},
tasks=all_tasks,
activity_options=activity_options,
)

worker = Worker(
client,
task_queue="langgraph-pipeline-functional",
workflows=[PipelineFunctionalWorkflow],
plugins=[plugin],
)
print("Worker started. Ctrl+C to exit.")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
31 changes: 31 additions & 0 deletions langgraph_plugin/functional_api/continue_as_new/run_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Start the continue-as-new pipeline workflow (Functional API)."""

import asyncio
import os
from datetime import timedelta

from temporalio.client import Client

from langgraph_plugin.functional_api.continue_as_new.workflow import (
PipelineFunctionalWorkflow,
PipelineInput,
)


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))

result = await client.execute_workflow(
PipelineFunctionalWorkflow.run,
PipelineInput(data=10),
id="pipeline-functional-workflow",
task_queue="langgraph-pipeline-functional",
execution_timeout=timedelta(seconds=60),
)

# 10*2=20 -> 20+50=70 -> 70*3=210
print(f"Pipeline result: {result}")


if __name__ == "__main__":
asyncio.run(main())
81 changes: 81 additions & 0 deletions langgraph_plugin/functional_api/continue_as_new/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Continue-as-new with caching using the LangGraph Functional API with Temporal.

Same pattern as the Graph API version, but using @task and @entrypoint decorators.
"""

from dataclasses import dataclass
from datetime import timedelta
from typing import Any

from langgraph.func import entrypoint as lg_entrypoint
from langgraph.func import task
from temporalio import workflow
from temporalio.contrib.langgraph import entrypoint, get_cache


@task
def extract(data: int) -> int:
"""Stage 1: Extract -- simulate data extraction by doubling the input."""
return data * 2


@task
def transform(data: int) -> int:
"""Stage 2: Transform -- simulate transformation by adding 50."""
return data + 50


@task
def load(data: int) -> int:
"""Stage 3: Load -- simulate loading by tripling the result."""
return data * 3


@lg_entrypoint()
async def pipeline_entrypoint(data: int) -> dict:
"""Run the 3-stage pipeline: extract -> transform -> load."""
extracted = await extract(data)
transformed = await transform(extracted)
loaded = await load(transformed)
return {"result": loaded}


all_tasks = [extract, transform, load]

activity_options = {
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=30)}
for t in all_tasks
}


@dataclass
class PipelineInput:
data: int
cache: dict[str, Any] | None = None
phase: int = 1


@workflow.defn
class PipelineFunctionalWorkflow:
"""Runs the pipeline, continuing-as-new after each phase.

Input 10: 10*2=20 -> 20+50=70 -> 70*3=210
Each task executes once; phases 2 and 3 use cached results.
"""

@workflow.run
async def run(self, input_data: PipelineInput) -> dict[str, Any]:
result = await entrypoint("pipeline", cache=input_data.cache).ainvoke(
input_data.data
)

if input_data.phase < 3:
workflow.continue_as_new(
PipelineInput(
data=input_data.data,
cache=get_cache(),
phase=input_data.phase + 1,
)
)

return result
Comment thread
DABH marked this conversation as resolved.
37 changes: 37 additions & 0 deletions langgraph_plugin/functional_api/control_flow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Control Flow (Functional API)

Demonstrates the Functional API's strength for complex control flow: parallel execution, sequential loops, and conditional branching — all as natural Python code.

## What This Sample Demonstrates

- **Parallel execution**: launching multiple tasks concurrently by creating futures before awaiting
- **For loops**: processing items sequentially with `for item in items`
- **If/else branching**: routing items based on classification results
- Why the Functional API is ideal for programmatic composition patterns

## How It Works

1. A batch of items is validated **in parallel** — all `validate_item` tasks launch concurrently.
2. Valid items are processed **sequentially** in a for loop.
3. Each item is classified, then routed via **if/else** to `process_urgent` or `process_normal`.
4. Results are aggregated with a `summarize` task.

## Running the Sample

Prerequisites: `uv sync --group langgraph` and a running Temporal dev server.

```bash
# Terminal 1
uv run langgraph_plugin/functional_api/control_flow/run_worker.py

# Terminal 2
uv run langgraph_plugin/functional_api/control_flow/run_workflow.py
```

## Files

| File | Description |
|------|-------------|
| `workflow.py` | `@task` functions (validate, classify, process, summarize), `@entrypoint`, and `ControlFlowWorkflow` |
| `run_worker.py` | Registers tasks and entrypoint with `LangGraphPlugin`, starts worker |
| `run_workflow.py` | Sends a batch of items and prints processing results |
1 change: 1 addition & 0 deletions langgraph_plugin/functional_api/control_flow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Control flow: parallel execution, for loops, and if/else branching."""
37 changes: 37 additions & 0 deletions langgraph_plugin/functional_api/control_flow/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Worker for the control flow pipeline (Functional API)."""

import asyncio
import os

from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker

from langgraph_plugin.functional_api.control_flow.workflow import (
ControlFlowWorkflow,
activity_options,
all_tasks,
control_flow_pipeline,
)


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
plugin = LangGraphPlugin(
entrypoints={"control_flow": control_flow_pipeline},
tasks=all_tasks,
activity_options=activity_options,
)

worker = Worker(
client,
task_queue="langgraph-control-flow",
workflows=[ControlFlowWorkflow],
plugins=[plugin],
)
print("Worker started. Ctrl+C to exit.")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
38 changes: 38 additions & 0 deletions langgraph_plugin/functional_api/control_flow/run_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Start the control flow pipeline workflow (Functional API)."""

import asyncio
import os

from temporalio.client import Client

from langgraph_plugin.functional_api.control_flow.workflow import (
ControlFlowWorkflow,
)


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))

items = [
"Fix login bug",
"URGENT: Production outage in payments",
"Update README",
"INVALID:",
"Urgent: Security patch needed",
"Refactor test suite",
]

result = await client.execute_workflow(
ControlFlowWorkflow.run,
items,
id="control-flow-workflow",
task_queue="langgraph-control-flow",
)

print(f"Summary: {result['summary']}")
for r in result["results"]:
print(f" {r}")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading