pipefunc.mcp module#

Autogenerated MCP server for Pipefunc pipelines.

class pipefunc.mcp.JobInfo(runner, started_at, run_folder, status, pipeline_name)[source]#

Bases: object

Information about an async pipeline job.

runner: AsyncMap#
started_at: datetime#
run_folder: str#
status: Literal['running', 'completed', 'cancelled']#
pipeline_name: str#
pipefunc.mcp.build_mcp_server(pipeline, **fast_mcp_kwargs)[source]#

Build an MCP (Model Context Protocol) server for a Pipefunc pipeline.

This function creates a FastMCP server that exposes your Pipefunc pipeline as an MCP tool, allowing AI assistants and other MCP clients to execute your computational workflows. The server automatically generates parameter validation, documentation, and provides parallel execution capabilities.

Parameters:
  • pipeline (Pipeline) – A Pipefunc Pipeline object containing the computational workflow to expose. The pipeline’s functions, parameters, and mapspecs will be automatically analyzed to generate the MCP tool interface.

  • **fast_mcp_kwargs (Any) – Additional keyword arguments to pass to the FastMCP server. See {class}`fastmcp.FastMCP` for more details.

Returns:

A configured FastMCP server instance ready to run. The server includes:

  • Automatic parameter validation using Pydantic models

  • Documentation based on the pipeline’s functions doc-strings and parameter annotations

  • Synchronous and asynchronous execution capabilities

  • Job management for async pipeline execution with progress tracking

  • JSON-serializable output formatting

Return type:

fastmcp.FastMCP

Examples

Basic Usage:

Create and run an MCP server from a pipeline:

# my_mcp.py
from physics_pipeline import pipeline_charge  # import from module to enable correct serialization
from pipefunc.mcp import build_mcp_server

if __name__ == "__main__":  # Important to use this 'if' for parallel execution!
    mcp = build_mcp_server(pipeline_charge)
    mcp.run(path="/charge", port=8000, transport="streamable-http")

Client Configuration:

Register the server with an MCP client (e.g., Cursor IDE .cursor/mcp.json):

{
  "mcpServers": {
    "physics-simulation": {
      "url": "http://127.0.0.1:8000/charge"
    }
  }
}

Alternative Transport Methods:

# HTTP server (recommended for development)
mcp = build_mcp_server(pipeline)
mcp.run(path="/api", port=8000, transport="streamable-http")

# Standard I/O (for CLI integration)
mcp = build_mcp_server(pipeline)
mcp.run(transport="stdio")

# Server-Sent Events
mcp = build_mcp_server(pipeline)
mcp.run(transport="sse")

Pipeline Requirements:

Your pipeline should be properly configured with JSON serializable inputs with proper type annotations:

from pipefunc import pipefunc, Pipeline

@pipefunc(output_name="result")
def calculate(x: float, y: float) -> float:
    return x * y + 2

pipeline = Pipeline([calculate])
mcp = build_mcp_server(pipeline)

Async Pipeline Execution:

The server provides tools for asynchronous pipeline execution with job management:

# Start an async job
execute_pipeline_async(inputs={"x": [1, 2, 3], "y": [4, 5, 6]})
# Returns: {"job_id": "uuid-string", "run_folder": "runs/job_uuid-string"}

# Check job status and progress
check_job_status(job_id="uuid-string")
# Returns status, progress, and results when complete

# Cancel a running job
cancel_job(job_id="uuid-string")

# List all tracked jobs
list_jobs()
# Returns summary of all jobs with their status

Execution Modes:

The server provides two execution patterns:

  1. Synchronous execution (execute_pipeline_sync): Uses pipeline.map() - blocks until completion, returns results immediately. Best for small-to-medium pipelines when you need results right away.

  2. Asynchronous execution (execute_pipeline_async): Uses pipeline.map_async() - returns immediately with job tracking. Best for long-running pipelines, background processing, and when you need progress monitoring or cancellation capabilities.

Notes

  • The server automatically handles type validation using the pipeline’s Pydantic model

  • Output arrays are converted to JSON-compatible lists

  • Parallel execution is enabled by default but can be disabled per request

  • Async execution provides job management with progress tracking and cancellation capabilities

  • Job registry is maintained globally across all MCP tool calls

See also

run_mcp_server

Convenience function to build and run server in one call

Pipeline.map

The underlying method used to execute pipeline workflows