Skip to main content

Multi-Connection Strategies

A/B Testing AI Providers

Test multiple AI providers simultaneously to find the best fit. Setup:
  1. Create connection with Provider A
  2. Create connection with Provider B
  3. Route percentage of calls to each
  4. Compare metrics in dashboard
Comparison Framework:
# Track metrics by provider
metrics = {
    'openai': {'latency': [], 'cost': [], 'quality': []},
    'elevenlabs': {'latency': [], 'cost': [], 'quality': []}
}

# Analyze after testing period
for provider, data in metrics.items():
    avg_latency = sum(data['latency']) / len(data['latency'])
    total_cost = sum(data['cost'])
    print(f"{provider}: {avg_latency}ms latency, ${total_cost} cost")
Decision Factors:
  • Average latency
  • Cost per minute
  • User satisfaction/quality scores
  • Error rates
  • Throughput/concurrency limits

Geo-Distributed Architecture

Deploy with regional redundancy. Setup:
Caller in US  →  US Region  →  Telepath  →  OpenAI
Caller in EU  →  EU Region  →  Telepath  →  ElevenLabs
Implementation:
  1. Route calls based on caller location
  2. Use regional AI providers if available
  3. Set up regional carrier connections
  4. Monitor latency per region
Benefits:
  • Lower latency for users
  • Better compliance (data residency)
  • Redundancy if one region fails

Fallback Chains

Gracefully handle provider failures.
providers = [
    {
        'name': 'openai',
        'connection_id': 'conn_primary',
        'weight': 0.7
    },
    {
        'name': 'elevenlabs',
        'connection_id': 'conn_fallback',
        'weight': 0.3
    }
]

def select_provider(failed_providers=[]):
    """Select provider, excluding failed ones"""
    available = [p for p in providers if p['name'] not in failed_providers]

    # Route to available provider with weight
    total_weight = sum(p['weight'] for p in available)
    normalized = [p['weight'] / total_weight for p in available]

    return random.choices(available, weights=normalized)[0]
Advantages:
  • Automatic failover if provider fails
  • Graceful degradation
  • No service interruption

Custom WebSocket Implementation

Advanced Audio Processing

Process audio before sending to AI agent.
import asyncio
import websockets
import numpy as np
from scipy import signal

class AudioProcessor:
    def __init__(self):
        self.noise_profile = None
        self.sample_rate = 16000

    async def process_inbound(self, audio_chunk):
        """Enhance audio quality before AI processing"""

        # Convert bytes to numpy array
        audio = np.frombuffer(audio_chunk, dtype=np.int16).astype(np.float32)

        # Noise suppression
        if self.noise_profile is None:
            self.noise_profile = self._estimate_noise(audio)
        audio = self._suppress_noise(audio)

        # Normalization
        audio = audio / np.max(np.abs(audio))

        # High-pass filter to remove rumble
        sos = signal.butter(4, 300, 'hp', fs=self.sample_rate, output='sos')
        audio = signal.sosfilt(sos, audio)

        # Convert back to int16
        return (audio * 32767).astype(np.int16).tobytes()

    def _estimate_noise(self, audio):
        """Estimate noise profile from silent sections"""
        # Simplified: just take RMS of quiet parts
        return np.std(audio[:int(len(audio)*0.1)])

    def _suppress_noise(self, audio):
        """Spectral subtraction for noise reduction"""
        # Compute STFT
        f, t, Zxx = signal.stft(audio, fs=self.sample_rate)

        # Estimate noise magnitude
        noise_mag = np.mean(np.abs(Zxx[:, :10]), axis=1, keepdims=True)

        # Subtract noise
        Zxx_cleaned = Zxx - noise_mag
        Zxx_cleaned = np.maximum(Zxx_cleaned, 0)  # Prevent negative values

        # Inverse STFT
        _, audio_cleaned = signal.istft(Zxx_cleaned, fs=self.sample_rate)
        return audio_cleaned

Dynamic Codec Negotiation

Handle multiple codec formats.
class TelephonyBridge:
    SUPPORTED_CODECS = {
        'PCMU': {'rate': 8000, 'bits': 8},
        'PCMA': {'rate': 8000, 'bits': 8},
        'G722': {'rate': 16000, 'bits': 16}
    }

    async def negotiate_codec(self, carrier_codecs):
        """Choose best codec from carrier offerings"""

        # Prefer G.722 for quality, fallback to G.711
        preferred_order = ['G722', 'PCMU', 'PCMA']

        for codec in preferred_order:
            if codec in carrier_codecs:
                return codec

        # Should never happen, but have fallback
        return carrier_codecs[0]

    async def convert_audio(self, audio, from_codec, to_codec):
        """Convert between codec formats"""

        # Decode from source codec
        if from_codec == 'G722':
            decoded = self._decode_g722(audio)
        elif from_codec == 'PCMU':
            decoded = self._decode_pcmu(audio)
        else:
            decoded = self._decode_pcma(audio)

        # Encode to target codec
        if to_codec == 'G722':
            encoded = self._encode_g722(decoded)
        elif to_codec == 'PCMU':
            encoded = self._encode_pcmu(decoded)
        else:
            encoded = self._encode_pcma(decoded)

        return encoded

Stateful Agent Implementation

Maintain conversation state across calls.
from datetime import datetime, timedelta
import json

class StatefulAgent:
    def __init__(self, agent_id):
        self.agent_id = agent_id
        self.conversation_history = {}
        self.user_profiles = {}
        self.cache_ttl = timedelta(hours=24)

    async def handle_call(self, call_id, caller_number):
        """Handle call with state awareness"""

        # Load user profile
        profile = await self.load_user_profile(caller_number)

        # Build context from previous calls
        context = {
            'user_name': profile.get('name', 'Guest'),
            'account_type': profile.get('type', 'free'),
            'previous_issues': profile.get('issues', []),
            'preferred_language': profile.get('language', 'en')
        }

        # Build system prompt with context
        system_prompt = self._build_context_prompt(context)

        # Handle call
        response = await self._forward_to_ai(call_id, system_prompt)

        # Update user profile with interaction results
        await self.update_user_profile(
            caller_number,
            response.get('sentiment'),
            response.get('resolution')
        )

        return response

    def _build_context_prompt(self, context):
        """Build system prompt with user context"""
        base = f"""You are a helpful support agent.

User Information:
- Name: {context['user_name']}
- Account Type: {context['account_type']}
- Preferred Language: {context['preferred_language']}
"""

        if context['previous_issues']:
            base += f"\nPrevious Issues:\n"
            for issue in context['previous_issues'][-3:]:  # Last 3 issues
                base += f"- {issue['type']}: {issue['description']}\n"

        return base

    async def load_user_profile(self, phone_number):
        """Load or create user profile"""
        # Check cache first
        if phone_number in self.user_profiles:
            cached = self.user_profiles[phone_number]
            if datetime.now() - cached['timestamp'] < self.cache_ttl:
                return cached['data']

        # Load from database
        profile = await self.database.get_user_by_phone(phone_number)

        # Cache it
        self.user_profiles[phone_number] = {
            'data': profile,
            'timestamp': datetime.now()
        }

        return profile

    async def update_user_profile(self, phone_number, sentiment, resolution):
        """Update profile after call"""
        profile = await self.load_user_profile(phone_number)

        if sentiment == 'positive':
            profile['satisfaction'] = profile.get('satisfaction', 0) + 1

        if resolution:
            if resolution not in profile.get('resolved_issues', []):
                profile['resolved_issues'] = profile.get('resolved_issues', [])
                profile['resolved_issues'].append(resolution)

        await self.database.update_user(phone_number, profile)

High-Volume Scaling

Connection Pooling

Reuse connections for efficiency.
from concurrent.futures import ThreadPoolExecutor
import queue

class ConnectionPool:
    def __init__(self, size=10):
        self.size = size
        self.available = queue.Queue(maxsize=size)
        self.in_use = set()

        # Initialize pool
        for i in range(size):
            conn = self._create_connection()
            self.available.put(conn)

    def _create_connection(self):
        """Create new Telepath API connection"""
        return TelephonyBridge(
            api_key=os.environ['TELEPATH_API_KEY']
        )

    def acquire(self, timeout=5):
        """Get connection from pool"""
        try:
            conn = self.available.get(timeout=timeout)
            self.in_use.add(conn)
            return conn
        except queue.Empty:
            raise Exception("Connection pool exhausted")

    def release(self, conn):
        """Return connection to pool"""
        self.in_use.remove(conn)
        self.available.put(conn)

Rate Limiting

Control API call rates.
from typing import Optional
import time

class RateLimiter:
    def __init__(self, calls_per_minute=6000):
        self.calls_per_minute = calls_per_minute
        self.call_times = []
        self.min_interval = 60 / calls_per_minute

    async def wait_if_needed(self):
        """Wait if rate limit would be exceeded"""
        now = time.time()
        minute_ago = now - 60

        # Remove old entries
        self.call_times = [t for t in self.call_times if t > minute_ago]

        # Check limit
        if len(self.call_times) >= self.calls_per_minute:
            sleep_time = self.call_times[0] + 60 - now
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

        self.call_times.append(now)

# Usage
limiter = RateLimiter(calls_per_minute=5000)

async def make_call(number):
    await limiter.wait_if_needed()
    return await telepath.call(number)

Monitoring & Observability

Custom Metrics Collection

Track application-specific metrics.
from prometheus_client import Counter, Histogram, Gauge

# Define metrics
call_counter = Counter(
    'calls_total',
    'Total calls processed',
    ['provider', 'status']
)

latency_histogram = Histogram(
    'call_latency_seconds',
    'Call latency distribution',
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0]
)

active_calls = Gauge(
    'active_calls',
    'Currently active calls'
)

# Use metrics
def on_call_start(call_id, provider):
    active_calls.inc()

def on_call_end(call_id, provider, status, duration):
    call_counter.labels(provider=provider, status=status).inc()
    latency_histogram.observe(duration)
    active_calls.dec()

Distributed Tracing

Track calls across systems.
from opentelemetry import trace, context
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Setup Jaeger
jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

tracer = trace.get_tracer(__name__)

# Instrument code
with tracer.start_as_current_span("handle_call") as span:
    span.set_attribute("call.id", call_id)
    span.set_attribute("call.from", caller_number)

    with tracer.start_as_current_span("authenticate") as auth_span:
        authenticate_call(call_id)

    with tracer.start_as_current_span("route_to_ai") as routing_span:
        route_to_ai(call_id, provider)

Error Recovery Patterns

Automatic Retry with Backoff

import asyncio
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1):
    """Retry with exponential backoff"""
    def decorator(func):
        async def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise

                    delay = base_delay * (2 ** attempt)
                    logger.warning(
                        f"Attempt {attempt+1} failed, retrying in {delay}s: {e}"
                    )
                    await asyncio.sleep(delay)

        return wrapper
    return decorator

@retry_with_backoff(max_retries=3, base_delay=1)
async def call_ai_provider(prompt):
    return await openai.chat.completions.create(
        model="gpt-4o-realtime-preview",
        messages=[{"role": "user", "content": prompt}]
    )

Circuit Breaker Pattern

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject calls
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None

    async def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

    def _should_attempt_reset(self):
        elapsed = (datetime.now() - self.last_failure_time).seconds
        return elapsed > self.timeout

Security Best Practices

Encrypted Configuration

from cryptography.fernet import Fernet
import json

class ConfigManager:
    def __init__(self, encryption_key):
        self.cipher = Fernet(encryption_key)
        self.config = {}

    def save_encrypted(self, data, filename):
        """Save encrypted config"""
        json_data = json.dumps(data)
        encrypted = self.cipher.encrypt(json_data.encode())

        with open(filename, 'wb') as f:
            f.write(encrypted)

    def load_encrypted(self, filename):
        """Load encrypted config"""
        with open(filename, 'rb') as f:
            encrypted = f.read()

        decrypted = self.cipher.decrypt(encrypted)
        return json.loads(decrypted.decode())

See Also