Documentation Index
Fetch the complete documentation index at: https://mintlify.com/Arize-ai/openinference/llms.txt
Use this file to discover all available pages before exploring further.
This example demonstrates how to build a RAG (Retrieval-Augmented Generation) pipeline with DSPy and instrument it with OpenInference tracing.
Prerequisites
- Python 3.9+
- OpenAI API key
- Phoenix or another OpenTelemetry collector
Installation
Install dependencies
pip install dspy-ai \
openinference-instrumentation-dspy \
opentelemetry-sdk \
opentelemetry-exporter-otlp \
python-dotenv
Set environment variables
export OPENAI_API_KEY="your-api-key"
export COLLECTOR_ENDPOINT="http://localhost:6006/v1/traces"
Instrumentation Setup
Create an instrumentation module:
import os
from dotenv import load_dotenv
from openinference.instrumentation.dspy import DSPyInstrumentor
from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
load_dotenv()
collector_endpoint = os.getenv("COLLECTOR_ENDPOINT", "http://localhost:6006/v1/traces")
def instrument():
resource = Resource(attributes={})
tracer_provider = trace_sdk.TracerProvider(resource=resource)
span_exporter = OTLPSpanExporter(endpoint=collector_endpoint)
span_processor = SimpleSpanProcessor(span_exporter=span_exporter)
tracer_provider.add_span_processor(span_processor=span_processor)
trace_api.set_tracer_provider(tracer_provider=tracer_provider)
DSPyInstrumentor().instrument()
Basic DSPy RAG Module
import dspy
from instrument import instrument
# Initialize instrumentation
instrument()
# Configure DSPy with OpenAI
lm = dspy.OpenAI(model="gpt-3.5-turbo", max_tokens=300)
dspy.settings.configure(lm=lm)
# Define a simple RAG signature
class GenerateAnswer(dspy.Signature):
"""Answer questions with short factoid answers."""
context = dspy.InputField(desc="may contain relevant facts")
question = dspy.InputField()
answer = dspy.OutputField(desc="often between 1 and 5 words")
# Create a RAG module
class RAG(dspy.Module):
def __init__(self, num_passages=3):
super().__init__()
self.retrieve = dspy.Retrieve(k=num_passages)
self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
def forward(self, question):
context = self.retrieve(question).passages
prediction = self.generate_answer(context=context, question=question)
return dspy.Prediction(context=context, answer=prediction.answer)
# Use the RAG module
rag = RAG()
response = rag(question="What is the capital of France?")
print(f"Answer: {response.answer}")
print(f"Context: {response.context}")
Complete FastAPI Example
Here’s a production-ready FastAPI application with DSPy:
import os
from dotenv import load_dotenv
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import dspy
from instrument import instrument
load_dotenv()
# Initialize instrumentation
do_not_instrument = os.getenv("INSTRUMENT_DSPY", "true") == "false"
if not do_not_instrument:
instrument()
# Configure DSPy
lm = dspy.OpenAI(model="gpt-3.5-turbo", max_tokens=300)
rm = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts")
dspy.settings.configure(lm=lm, rm=rm)
app = FastAPI(title="DSPy x FastAPI")
environment = os.getenv("ENVIRONMENT", "dev")
if environment == "dev":
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Define request/response models
class MessageData(BaseModel):
question: str
num_passages: int = 3
class RAGResponse(BaseModel):
answer: str
context: list[str]
# RAG signature and module
class GenerateAnswer(dspy.Signature):
"""Answer questions with short factoid answers."""
context = dspy.InputField(desc="may contain relevant facts")
question = dspy.InputField()
answer = dspy.OutputField(desc="often between 1 and 5 words")
class RAG(dspy.Module):
def __init__(self, num_passages=3):
super().__init__()
self.retrieve = dspy.Retrieve(k=num_passages)
self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
def forward(self, question):
context = self.retrieve(question).passages
prediction = self.generate_answer(context=context, question=question)
return dspy.Prediction(context=context, answer=prediction.answer)
# Initialize module
rag_module = None
@app.on_event("startup")
async def startup_event():
global rag_module
rag_module = RAG()
@app.post("/api/rag/query", response_model=RAGResponse)
async def query(payload: MessageData):
response = rag_module(question=payload.question)
return RAGResponse(answer=response.answer, context=response.context)
@app.get("/api/rag/healthcheck")
async def healthcheck():
return {"message": "All systems go."}
if __name__ == "__main__":
uvicorn.run(app="main:app", host="0.0.0.0", port=8000, reload=True)
Optimizing with DSPy Compiler
DSPy’s key feature is automatic optimization:
import dspy
from dspy.teleprompt import BootstrapFewShot
# Define training examples
trainset = [
dspy.Example(question="What is the capital of France?", answer="Paris").with_inputs("question"),
dspy.Example(question="Who wrote Romeo and Juliet?", answer="Shakespeare").with_inputs("question"),
dspy.Example(question="What is the largest planet?", answer="Jupiter").with_inputs("question"),
]
# Define validation metric
def validate_answer(example, pred, trace=None):
answer_match = example.answer.lower() in pred.answer.lower()
return answer_match
# Compile the RAG module
compiler = BootstrapFewShot(metric=validate_answer, max_bootstrapped_demos=2)
compiled_rag = compiler.compile(RAG(), trainset=trainset)
# Use the optimized module
response = compiled_rag(question="What is the capital of Germany?")
print(response.answer)
Key Features
Automatic Module Tracing
DSPy instrumentation captures:
- Module execution: All DSPy module forwards
- LM calls: Language model predictions with prompts
- Retrieval: Document retrieval operations
- Optimization: Compiler operations and few-shot selection
Signature Tracking
The instrumentation records:
- Input and output fields
- Field descriptions and constraints
- Type annotations
Compilation Observability
When using DSPy optimizers:
- Bootstrap demonstration selection
- Metric evaluations
- Prompt evolution
Next Steps