pipefunc.mcp module#
Autogenerated MCP server for Pipefunc pipelines.
- class pipefunc.mcp.JobInfo(runner, started_at, run_folder, status, pipeline_name)[source]#
Bases:
objectInformation about an async pipeline job.
-
runner:
AsyncMap#
-
runner:
- pipefunc.mcp.build_mcp_server(pipeline, **fast_mcp_kwargs)[source]#
Build an MCP (Model Context Protocol) server for a Pipefunc pipeline.
This function creates a FastMCP server that exposes your Pipefunc pipeline as an MCP tool, allowing AI assistants and other MCP clients to execute your computational workflows. The server automatically generates parameter validation, documentation, and provides parallel execution capabilities.
- Parameters:
pipeline (
Pipeline) – A Pipefunc Pipeline object containing the computational workflow to expose. The pipeline’s functions, parameters, and mapspecs will be automatically analyzed to generate the MCP tool interface.**fast_mcp_kwargs (
Any) – Additional keyword arguments to pass to the FastMCP server. See {class}`fastmcp.FastMCP` for more details.
- Returns:
A configured FastMCP server instance ready to run. The server includes:
Automatic parameter validation using Pydantic models
Documentation based on the pipeline’s functions doc-strings and parameter annotations
Synchronous and asynchronous execution capabilities
Job management for async pipeline execution with progress tracking
JSON-serializable output formatting
- Return type:
fastmcp.FastMCP
Examples
Basic Usage:
Create and run an MCP server from a pipeline:
# my_mcp.py from physics_pipeline import pipeline_charge # import from module to enable correct serialization from pipefunc.mcp import build_mcp_server if __name__ == "__main__": # Important to use this 'if' for parallel execution! mcp = build_mcp_server(pipeline_charge) mcp.run(path="/charge", port=8000, transport="streamable-http")
Client Configuration:
Register the server with an MCP client (e.g., Cursor IDE
.cursor/mcp.json):{ "mcpServers": { "physics-simulation": { "url": "http://127.0.0.1:8000/charge" } } }
Alternative Transport Methods:
# HTTP server (recommended for development) mcp = build_mcp_server(pipeline) mcp.run(path="/api", port=8000, transport="streamable-http") # Standard I/O (for CLI integration) mcp = build_mcp_server(pipeline) mcp.run(transport="stdio") # Server-Sent Events mcp = build_mcp_server(pipeline) mcp.run(transport="sse")
Pipeline Requirements:
Your pipeline should be properly configured with JSON serializable inputs with proper type annotations:
from pipefunc import pipefunc, Pipeline @pipefunc(output_name="result") def calculate(x: float, y: float) -> float: return x * y + 2 pipeline = Pipeline([calculate]) mcp = build_mcp_server(pipeline)
Async Pipeline Execution:
The server provides tools for asynchronous pipeline execution with job management:
# Start an async job execute_pipeline_async(inputs={"x": [1, 2, 3], "y": [4, 5, 6]}) # Returns: {"job_id": "uuid-string", "run_folder": "runs/job_uuid-string"} # Check job status and progress check_job_status(job_id="uuid-string") # Returns status, progress, and results when complete # Cancel a running job cancel_job(job_id="uuid-string") # List all tracked jobs list_jobs() # Returns summary of all jobs with their status
Execution Modes:
The server provides two execution patterns:
Synchronous execution (
execute_pipeline_sync): Usespipeline.map()- blocks until completion, returns results immediately. Best for small-to-medium pipelines when you need results right away.Asynchronous execution (
execute_pipeline_async): Usespipeline.map_async()- returns immediately with job tracking. Best for long-running pipelines, background processing, and when you need progress monitoring or cancellation capabilities.
Notes
The server automatically handles type validation using the pipeline’s Pydantic model
Output arrays are converted to JSON-compatible lists
Parallel execution is enabled by default but can be disabled per request
Async execution provides job management with progress tracking and cancellation capabilities
Job registry is maintained globally across all MCP tool calls
See also
run_mcp_serverConvenience function to build and run server in one call
Pipeline.mapThe underlying method used to execute pipeline workflows