AI Engineering

AI Infrastructure for Production LLM Apps

AI infrastructure for LLM apps covers more than model calls. Learn the serving, caching, observability, and reliability patterns that keep production systems stable.

May 31, 2026 13 min read
AI Infrastructure for Production LLM Apps

Teams that build their first LLM prototype usually spend 80% of their time on the model interaction — the prompt, the model choice, the output format. When they take that prototype to production, they discover that model calls are maybe 20% of the actual engineering work. The other 80% is infrastructure: routing, caching, rate limiting, fallbacks, observability, cost control, and the operational discipline to keep it all running.

AI infrastructure is the layer between your application code and the model APIs. Get it right and you can ship changes quickly, control costs, and debug failures in minutes. Get it wrong and you're flying blind on costs, your application goes down every time a model provider has an outage, and your team spends nights firefighting.

At Laxaar, we've built production LLM systems across use cases ranging from document processing to customer-facing assistants. The infrastructure patterns we keep reaching for are the ones in this guide.

What you'll learn

The AI infrastructure stack: what it actually includes

The full AI infrastructure stack has more layers than most teams plan for upfront:

LayerResponsibilityCommon tools
Model gatewayRequest routing, auth, rate limiting, loggingLiteLLM, Portkey, custom proxy
Prompt managementVersioning, storage, A/B testingFiles in git, Langfuse, PromptLayer
CachingSemantic and exact-match response cachingRedis, GPTCache, provider-native
RetrievalVector search, document storageQdrant, Pinecone, pgvector
OrchestrationAgent loops, tool calling, flow controlLangGraph, custom async Python
ObservabilityTracing, cost tracking, latency monitoringLangfuse, OpenTelemetry, Datadog
EvaluationOffline evals, regression detectionCustom harness, Braintrust
SecretsAPI key management, rotationAWS Secrets Manager, Vault

The most common omission is the gateway layer. Most teams start by calling provider APIs directly from application code — each service with its own API key, no central logging, no rate limiting. That works at prototype scale and creates serious problems at production scale.

LLM gateway: the most important piece you're probably missing

An LLM gateway is a service that sits between your application code and model provider APIs. It handles authentication, routing, logging, rate limiting, and fallbacks in one place. Every production LLM system should have one.

The gateway pattern solves a specific set of problems that compound as you scale:

  • Credential management — one place to rotate API keys instead of updating each service
  • Unified logging — every LLM call logged with input, output, model, latency, and cost before you write any application code
  • Rate limit enforcement — per-user, per-feature, or per-tenant limits enforced centrally
  • Provider abstraction — swap models or providers without changing application code

LiteLLM is the open-source option Laxaar uses most often. It speaks the OpenAI API spec and translates to 100+ models and providers under the hood.

# LiteLLM proxy configuration (config.yaml)
# litellm --config config.yaml --port 4000

model_list:
  - model_name: gpt-4o
    litellm_params:
      model: openai/gpt-4o-2024-08-06
      api_key: os.environ/OPENAI_API_KEY

  - model_name: claude-opus
    litellm_params:
      model: anthropic/claude-opus-4-5
      api_key: os.environ/ANTHROPIC_API_KEY

  - model_name: claude-haiku
    litellm_params:
      model: anthropic/claude-haiku-4-5
      api_key: os.environ/ANTHROPIC_API_KEY

general_settings:
  master_key: os.environ/LITELLM_MASTER_KEY

litellm_settings:
  success_callback: ["langfuse"]
  failure_callback: ["langfuse"]
  callbacks: ["cache"]
  cache: true
  cache_params:
    type: redis
    host: os.environ/REDIS_HOST
    port: 6379
# Application code calls the gateway, not the provider directly
from openai import OpenAI

# Same client code works for any model behind the gateway
client = OpenAI(
    api_key="sk-your-litellm-master-key",
    base_url="http://your-gateway:4000",
)

response = client.chat.completions.create(
    model="claude-opus",   # Gateway routes this to Anthropic
    messages=[{"role": "user", "content": "Summarize this contract."}],
)

For teams with strict compliance requirements or who want more control, a custom gateway built on FastAPI gives you full flexibility. The custom approach adds engineering cost but removes dependency on a third-party proxy in your critical path.

Caching: the highest-impact cost control

LLM inference is expensive. Caching is the fastest way to cut that cost without changing anything about model quality.

Two types of caching apply to LLM systems:

Exact-match caching returns a cached response when the input is identical to a previous request. Works well for FAQ-style queries, code generation for common patterns, and any workflow where users ask the same questions repeatedly. Implementation is straightforward — hash the prompt, check the cache, return the cached response if it exists.

Semantic caching returns a cached response when the input is semantically similar to a previous request, even if the text differs. More complex but captures more cache hits. GPTCache implements this using embedding similarity over a vector store of cached (query, response) pairs.

import hashlib
import json
import redis
from anthropic import Anthropic

client = Anthropic()
cache = redis.Redis(host="localhost", port=6379, decode_responses=True)

def cached_llm_call(
    system: str,
    user_message: str,
    model: str = "claude-opus-4-5",
    ttl_seconds: int = 3600,
) -> str:
    # Build cache key from model + system + user message
    cache_key = hashlib.sha256(
        json.dumps({"model": model, "system": system, "user": user_message}, sort_keys=True).encode()
    ).hexdigest()

    cached = cache.get(f"llm:{cache_key}")
    if cached:
        return cached  # Cache hit — no API call

    # Cache miss — call the API
    response = client.messages.create(
        model=model,
        max_tokens=1024,
        system=system,
        messages=[{"role": "user", "content": user_message}],
    )
    result = response.content[0].text

    cache.setex(f"llm:{cache_key}", ttl_seconds, result)
    return result

Beyond application-level caching, use provider-native prompt caching. Anthropic's prompt caching reduces input token cost by ~90% for the cached portion. OpenAI offers automatic prompt caching for prompts over 1,024 tokens. These don't require any cache infrastructure on your side — just a stable system prompt and an API flag.

# Anthropic prompt caching — mark the stable system prompt for caching
response = client.messages.create(
    model="claude-opus-4-5",
    max_tokens=1024,
    system=[
        {
            "type": "text",
            "text": very_long_stable_system_prompt,
            "cache_control": {"type": "ephemeral"},
        }
    ],
    messages=[{"role": "user", "content": per_request_input}],
)
# cache_read_input_tokens in response.usage tells you how many tokens were served from cache

In our experience at Laxaar, exact-match caching plus prompt caching typically cuts 30–50% off inference costs on typical production workloads, with no quality trade-off.

Rate limiting and cost guardrails

Without rate limiting, a single misbehaving user or runaway process can exhaust your API quota or generate an enormous bill before you notice. Both happen in production.

Rate limiting for LLM systems has a wrinkle: the meaningful unit is tokens, not requests. A request that sends a 100k-token document is orders of magnitude more expensive than a request with a 50-token query. Request-count rate limits don't capture this; token-count limits do.

import redis
import time
from typing import Optional

class TokenRateLimiter:
    """
    Sliding window rate limiter based on token consumption.
    Limits: tokens per minute and tokens per day, per user.
    """

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    def check_and_consume(
        self,
        user_id: str,
        estimated_tokens: int,
        limit_per_minute: int = 100_000,
        limit_per_day: int = 1_000_000,
    ) -> tuple[bool, Optional[str]]:
        now = int(time.time())
        minute_key = f"rl:{user_id}:min:{now // 60}"
        day_key = f"rl:{user_id}:day:{now // 86400}"

        pipe = self.redis.pipeline()
        pipe.incrby(minute_key, estimated_tokens)
        pipe.expire(minute_key, 120)  # Keep for 2 minutes
        pipe.incrby(day_key, estimated_tokens)
        pipe.expire(day_key, 172800)  # Keep for 2 days
        results = pipe.execute()

        minute_total, _, day_total, _ = results

        if minute_total > limit_per_minute:
            return False, f"Rate limit: {minute_total} tokens this minute (limit {limit_per_minute})"
        if day_total > limit_per_day:
            return False, f"Rate limit: {day_total} tokens today (limit {limit_per_day})"

        return True, None

def estimate_tokens(text: str) -> int:
    """Rough token estimate: 1 token ≈ 4 characters."""
    return max(1, len(text) // 4)

Cost guardrails are a separate concern from rate limiting. They answer the question: what happens when your total monthly spend approaches the budget? Implement hard stops (pause all LLM calls above a threshold), soft alerts (notify the team at 70% of budget), and per-feature cost attribution (so you know which feature is responsible for the spike).

One practice worth implementing early: tag every LLM call with the feature or workflow that generated it. When a cost anomaly appears, you need to know which part of the system is responsible.

Fallback routing and provider resilience

Model providers have outages. When they do, your application shouldn't go down with them. Fallback routing automatically redirects failed requests to an alternative model or provider.

The simplest version: if the primary provider returns a 5xx error or times out, retry once, then fall back to a secondary provider.

import time
from anthropic import Anthropic, APIStatusError, APITimeoutError
from openai import OpenAI, APIError as OpenAIError

anthropic_client = Anthropic()
openai_client = OpenAI()

def resilient_completion(
    system: str,
    user_message: str,
    primary_model: str = "claude-opus-4-5",
    fallback_model: str = "gpt-4o-2024-08-06",
    timeout: float = 30.0,
) -> str:
    """Try primary model, fall back to secondary on failure."""

    # Primary: Anthropic
    for attempt in range(2):
        try:
            response = anthropic_client.messages.create(
                model=primary_model,
                max_tokens=1024,
                system=system,
                messages=[{"role": "user", "content": user_message}],
                timeout=timeout,
            )
            return response.content[0].text
        except (APIStatusError, APITimeoutError) as e:
            if attempt == 0:
                time.sleep(1)  # Brief pause before retry
                continue
            # Both attempts failed — fall through to fallback
            print(f"Primary model failed after 2 attempts: {e}")

    # Fallback: OpenAI
    try:
        response = openai_client.chat.completions.create(
            model=fallback_model,
            messages=[
                {"role": "system", "content": system},
                {"role": "user", "content": user_message},
            ],
            timeout=timeout,
        )
        return response.choices[0].message.content
    except OpenAIError as e:
        raise RuntimeError(f"Both primary and fallback models failed. Last error: {e}")

For more sophisticated routing, LiteLLM's router supports weighted load balancing, cooldown periods for failing providers, and latency-based routing. Worth using over a hand-rolled solution once your routing logic gets complex.

One design decision worth stating clearly: not all requests need a fallback. A best-effort summarization feature can degrade gracefully when the model is unavailable. A request that gates access to a critical business operation needs a fallback. Design your fallback coverage around the criticality of each use case.

Observability: tracing LLM calls end to end

You can't debug what you can't see. LLM observability means capturing enough data about every model call to answer: what did we send, what did we get back, how long did it take, what did it cost, and was the output any good?

Standard metrics to capture on every call:

from dataclasses import dataclass, field
from datetime import datetime
import time
import uuid

@dataclass
class LLMCallTrace:
    trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
    model: str = ""
    feature: str = ""          # Which feature/workflow generated this call
    user_id: str = ""
    input_tokens: int = 0
    output_tokens: int = 0
    cached_tokens: int = 0
    latency_ms: float = 0.0
    cost_usd: float = 0.0
    success: bool = True
    error_type: str = ""
    # Don't log full input/output by default — handle PII carefully
    input_hash: str = ""       # Hash for deduplication, not content logging
    output_length: int = 0

TOKEN_COSTS = {
    "claude-opus-4-5": {"input": 15.0 / 1_000_000, "output": 75.0 / 1_000_000, "cache_read": 1.5 / 1_000_000},
    "claude-haiku-4-5": {"input": 0.80 / 1_000_000, "output": 4.0 / 1_000_000, "cache_read": 0.08 / 1_000_000},
    "gpt-4o-2024-08-06": {"input": 2.50 / 1_000_000, "output": 10.0 / 1_000_000, "cache_read": 1.25 / 1_000_000},
}

def calculate_cost(model: str, input_tokens: int, output_tokens: int, cached_tokens: int = 0) -> float:
    rates = TOKEN_COSTS.get(model, {"input": 0, "output": 0, "cache_read": 0})
    non_cached_input = max(0, input_tokens - cached_tokens)
    return (
        non_cached_input * rates["input"]
        + cached_tokens * rates["cache_read"]
        + output_tokens * rates["output"]
    )

For the observability backend, Langfuse is the purpose-built option — it captures LLM traces natively, supports nested span tracking for agent calls, and integrates with LiteLLM out of the box. For teams already running OpenTelemetry, the opentelemetry-instrumentation-openai and opentelemetry-instrumentation-anthropic packages add LLM-specific attributes to your existing traces.

Dashboards should answer three questions at a glance: what's the current cost run rate, what's the p95 latency, and are there any error rate spikes. Everything else is drill-down.

Async patterns for high-throughput workloads

Synchronous LLM calls are a throughput bottleneck. A single call might take 2–10 seconds. Processing 1,000 documents synchronously would take hours. Async processing with a job queue cuts that to minutes.

The standard pattern: enqueue work, process in parallel workers up to your API rate limit, store results.

import asyncio
import aiohttp
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def process_document(session_semaphore: asyncio.Semaphore, doc: dict) -> dict:
    """Process one document, respecting the concurrency limit."""
    async with session_semaphore:
        try:
            response = await client.messages.create(
                model="claude-haiku-4-5",  # Use a faster model for batch jobs
                max_tokens=512,
                system="Extract the key points from this document as a JSON array of strings.",
                messages=[{"role": "user", "content": doc["text"]}],
            )
            return {"id": doc["id"], "result": response.content[0].text, "success": True}
        except Exception as e:
            return {"id": doc["id"], "error": str(e), "success": False}

async def batch_process(documents: list[dict], max_concurrent: int = 20) -> list[dict]:
    """Process documents in parallel, up to max_concurrent at once."""
    semaphore = asyncio.Semaphore(max_concurrent)
    tasks = [process_document(semaphore, doc) for doc in documents]
    results = await asyncio.gather(*tasks, return_exceptions=False)
    return list(results)

# Usage
import asyncio
results = asyncio.run(batch_process(documents, max_concurrent=20))

Set max_concurrent based on your provider's rate limits, not your server's CPU count. Anthropic's Claude API allows up to 4,000 requests per minute on most tiers; OpenAI's GPT-4o allows up to 5,000 requests per minute on Tier 4. Divide by your average request duration to get the right concurrency setting.

For document processing pipelines, choosing the right model per task matters as much as concurrency. Use Claude Haiku or GPT-4o-mini for classification, extraction, and other high-volume tasks where speed and cost matter. Reserve Opus and GPT-4o for complex reasoning tasks where output quality justifies the cost. The cost difference is 20–50x; the quality difference on simple tasks is often negligible.

For the retrieval infrastructure that feeds these pipelines, see our AI data pipelines expertise. And if you're building agent systems on top of this infrastructure, building production AI agents covers the orchestration layer.

Frequently Asked Questions

Do I need all of these infrastructure pieces from day one?

No. Start with logging (you always need to be able to debug) and prompt caching (free cost savings). Add the gateway layer as soon as you have more than one service making LLM calls. Add rate limiting when you have external users. Add fallback routing when availability becomes a hard requirement. Infrastructure should match your scale, not anticipate a scale you might never reach.

What's the right model for cost-sensitive batch processing?

Claude Haiku 4 and GPT-4o-mini are the workhorses for high-volume tasks: classification, extraction, summarization, formatting. They're 10–50x cheaper than frontier models and perform near-identically on well-scoped tasks. The pattern Laxaar uses: run a quality eval on your specific task with the cheap model before committing to it. If the cheap model scores within 5% of the frontier model, use the cheap model.

How do I handle API keys securely across multiple environments?

Store API keys in a secrets manager (AWS Secrets Manager, Google Secret Manager, HashiCorp Vault) and inject them into your application at runtime via environment variables. Never commit API keys to source control or include them in Docker images. For local development, use a .env file that's listed in .gitignore. Rotate keys on a schedule — most provider breaches come from exposed keys in old commits or accident pastes, not from API-level vulnerabilities.

What's the right caching TTL for LLM responses?

It depends on how often the underlying data changes. For FAQ responses or static document summaries, 24–48 hours is reasonable. For responses that incorporate user-specific data or real-time information, cache for minutes or don't cache at all. When in doubt, use a shorter TTL — a stale response that confidently gives wrong information is worse than a cache miss.

How do I trace LLM calls through a multi-step agent workflow?

Use a parent trace ID generated at the start of each user request, and pass it through every LLM call in the workflow as metadata. Tools like Langfuse support hierarchical tracing — each LLM call becomes a child span under the parent trace, so you can see the full sequence of calls, their inputs and outputs, and the total cost and latency for the complete workflow. This is essential for debugging agents where a failure in step 4 might be caused by a bad output in step 2.

When should I build a custom gateway vs. use LiteLLM or Portkey?

Use LiteLLM or Portkey when you want a gateway running in weeks, not months. Build custom when you have requirements they don't support — unusual auth patterns, deep integration with proprietary internal systems, regulatory requirements around where traffic is logged. The custom gateway is a significant ongoing maintenance burden; make sure the requirements genuinely exceed what open-source options provide before committing.


Production LLM infrastructure is engineering work, not DevOps afterthought. If you're building an LLM system that needs to be reliable, cost-controlled, and observable, talk to the Laxaar team — this infrastructure layer is something we design from the ground up on every engagement.

AI InfrastructureLLM SystemsAI Engineering
Grow your business with us

Take your business to the next level.

Tell us what you're building. We'll come back inside one business day with a fixed scope, timeline, and team — or an honest “this isn't a fit”.

ENGINEERING PHILOSOPHY

Code is useless if it's not comprehensible to those who maintain it. We write code the next person can actually understand.