Multi-Connection Strategies
A/B Testing AI Providers
Test multiple AI providers simultaneously to find the best fit. Setup:- Create connection with Provider A
- Create connection with Provider B
- Route percentage of calls to each
- Compare metrics in dashboard
Copy
Ask AI
# Track metrics by provider
metrics = {
'openai': {'latency': [], 'cost': [], 'quality': []},
'elevenlabs': {'latency': [], 'cost': [], 'quality': []}
}
# Analyze after testing period
for provider, data in metrics.items():
avg_latency = sum(data['latency']) / len(data['latency'])
total_cost = sum(data['cost'])
print(f"{provider}: {avg_latency}ms latency, ${total_cost} cost")
- Average latency
- Cost per minute
- User satisfaction/quality scores
- Error rates
- Throughput/concurrency limits
Geo-Distributed Architecture
Deploy with regional redundancy. Setup:Copy
Ask AI
Caller in US → US Region → Telepath → OpenAI
Caller in EU → EU Region → Telepath → ElevenLabs
- Route calls based on caller location
- Use regional AI providers if available
- Set up regional carrier connections
- Monitor latency per region
- Lower latency for users
- Better compliance (data residency)
- Redundancy if one region fails
Fallback Chains
Gracefully handle provider failures.Copy
Ask AI
providers = [
{
'name': 'openai',
'connection_id': 'conn_primary',
'weight': 0.7
},
{
'name': 'elevenlabs',
'connection_id': 'conn_fallback',
'weight': 0.3
}
]
def select_provider(failed_providers=[]):
"""Select provider, excluding failed ones"""
available = [p for p in providers if p['name'] not in failed_providers]
# Route to available provider with weight
total_weight = sum(p['weight'] for p in available)
normalized = [p['weight'] / total_weight for p in available]
return random.choices(available, weights=normalized)[0]
- Automatic failover if provider fails
- Graceful degradation
- No service interruption
Custom WebSocket Implementation
Advanced Audio Processing
Process audio before sending to AI agent.Copy
Ask AI
import asyncio
import websockets
import numpy as np
from scipy import signal
class AudioProcessor:
def __init__(self):
self.noise_profile = None
self.sample_rate = 16000
async def process_inbound(self, audio_chunk):
"""Enhance audio quality before AI processing"""
# Convert bytes to numpy array
audio = np.frombuffer(audio_chunk, dtype=np.int16).astype(np.float32)
# Noise suppression
if self.noise_profile is None:
self.noise_profile = self._estimate_noise(audio)
audio = self._suppress_noise(audio)
# Normalization
audio = audio / np.max(np.abs(audio))
# High-pass filter to remove rumble
sos = signal.butter(4, 300, 'hp', fs=self.sample_rate, output='sos')
audio = signal.sosfilt(sos, audio)
# Convert back to int16
return (audio * 32767).astype(np.int16).tobytes()
def _estimate_noise(self, audio):
"""Estimate noise profile from silent sections"""
# Simplified: just take RMS of quiet parts
return np.std(audio[:int(len(audio)*0.1)])
def _suppress_noise(self, audio):
"""Spectral subtraction for noise reduction"""
# Compute STFT
f, t, Zxx = signal.stft(audio, fs=self.sample_rate)
# Estimate noise magnitude
noise_mag = np.mean(np.abs(Zxx[:, :10]), axis=1, keepdims=True)
# Subtract noise
Zxx_cleaned = Zxx - noise_mag
Zxx_cleaned = np.maximum(Zxx_cleaned, 0) # Prevent negative values
# Inverse STFT
_, audio_cleaned = signal.istft(Zxx_cleaned, fs=self.sample_rate)
return audio_cleaned
Dynamic Codec Negotiation
Handle multiple codec formats.Copy
Ask AI
class TelephonyBridge:
SUPPORTED_CODECS = {
'PCMU': {'rate': 8000, 'bits': 8},
'PCMA': {'rate': 8000, 'bits': 8},
'G722': {'rate': 16000, 'bits': 16}
}
async def negotiate_codec(self, carrier_codecs):
"""Choose best codec from carrier offerings"""
# Prefer G.722 for quality, fallback to G.711
preferred_order = ['G722', 'PCMU', 'PCMA']
for codec in preferred_order:
if codec in carrier_codecs:
return codec
# Should never happen, but have fallback
return carrier_codecs[0]
async def convert_audio(self, audio, from_codec, to_codec):
"""Convert between codec formats"""
# Decode from source codec
if from_codec == 'G722':
decoded = self._decode_g722(audio)
elif from_codec == 'PCMU':
decoded = self._decode_pcmu(audio)
else:
decoded = self._decode_pcma(audio)
# Encode to target codec
if to_codec == 'G722':
encoded = self._encode_g722(decoded)
elif to_codec == 'PCMU':
encoded = self._encode_pcmu(decoded)
else:
encoded = self._encode_pcma(decoded)
return encoded
Stateful Agent Implementation
Maintain conversation state across calls.Copy
Ask AI
from datetime import datetime, timedelta
import json
class StatefulAgent:
def __init__(self, agent_id):
self.agent_id = agent_id
self.conversation_history = {}
self.user_profiles = {}
self.cache_ttl = timedelta(hours=24)
async def handle_call(self, call_id, caller_number):
"""Handle call with state awareness"""
# Load user profile
profile = await self.load_user_profile(caller_number)
# Build context from previous calls
context = {
'user_name': profile.get('name', 'Guest'),
'account_type': profile.get('type', 'free'),
'previous_issues': profile.get('issues', []),
'preferred_language': profile.get('language', 'en')
}
# Build system prompt with context
system_prompt = self._build_context_prompt(context)
# Handle call
response = await self._forward_to_ai(call_id, system_prompt)
# Update user profile with interaction results
await self.update_user_profile(
caller_number,
response.get('sentiment'),
response.get('resolution')
)
return response
def _build_context_prompt(self, context):
"""Build system prompt with user context"""
base = f"""You are a helpful support agent.
User Information:
- Name: {context['user_name']}
- Account Type: {context['account_type']}
- Preferred Language: {context['preferred_language']}
"""
if context['previous_issues']:
base += f"\nPrevious Issues:\n"
for issue in context['previous_issues'][-3:]: # Last 3 issues
base += f"- {issue['type']}: {issue['description']}\n"
return base
async def load_user_profile(self, phone_number):
"""Load or create user profile"""
# Check cache first
if phone_number in self.user_profiles:
cached = self.user_profiles[phone_number]
if datetime.now() - cached['timestamp'] < self.cache_ttl:
return cached['data']
# Load from database
profile = await self.database.get_user_by_phone(phone_number)
# Cache it
self.user_profiles[phone_number] = {
'data': profile,
'timestamp': datetime.now()
}
return profile
async def update_user_profile(self, phone_number, sentiment, resolution):
"""Update profile after call"""
profile = await self.load_user_profile(phone_number)
if sentiment == 'positive':
profile['satisfaction'] = profile.get('satisfaction', 0) + 1
if resolution:
if resolution not in profile.get('resolved_issues', []):
profile['resolved_issues'] = profile.get('resolved_issues', [])
profile['resolved_issues'].append(resolution)
await self.database.update_user(phone_number, profile)
High-Volume Scaling
Connection Pooling
Reuse connections for efficiency.Copy
Ask AI
from concurrent.futures import ThreadPoolExecutor
import queue
class ConnectionPool:
def __init__(self, size=10):
self.size = size
self.available = queue.Queue(maxsize=size)
self.in_use = set()
# Initialize pool
for i in range(size):
conn = self._create_connection()
self.available.put(conn)
def _create_connection(self):
"""Create new Telepath API connection"""
return TelephonyBridge(
api_key=os.environ['TELEPATH_API_KEY']
)
def acquire(self, timeout=5):
"""Get connection from pool"""
try:
conn = self.available.get(timeout=timeout)
self.in_use.add(conn)
return conn
except queue.Empty:
raise Exception("Connection pool exhausted")
def release(self, conn):
"""Return connection to pool"""
self.in_use.remove(conn)
self.available.put(conn)
Rate Limiting
Control API call rates.Copy
Ask AI
from typing import Optional
import time
class RateLimiter:
def __init__(self, calls_per_minute=6000):
self.calls_per_minute = calls_per_minute
self.call_times = []
self.min_interval = 60 / calls_per_minute
async def wait_if_needed(self):
"""Wait if rate limit would be exceeded"""
now = time.time()
minute_ago = now - 60
# Remove old entries
self.call_times = [t for t in self.call_times if t > minute_ago]
# Check limit
if len(self.call_times) >= self.calls_per_minute:
sleep_time = self.call_times[0] + 60 - now
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.call_times.append(now)
# Usage
limiter = RateLimiter(calls_per_minute=5000)
async def make_call(number):
await limiter.wait_if_needed()
return await telepath.call(number)
Monitoring & Observability
Custom Metrics Collection
Track application-specific metrics.Copy
Ask AI
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
call_counter = Counter(
'calls_total',
'Total calls processed',
['provider', 'status']
)
latency_histogram = Histogram(
'call_latency_seconds',
'Call latency distribution',
buckets=[0.1, 0.5, 1.0, 2.0, 5.0]
)
active_calls = Gauge(
'active_calls',
'Currently active calls'
)
# Use metrics
def on_call_start(call_id, provider):
active_calls.inc()
def on_call_end(call_id, provider, status, duration):
call_counter.labels(provider=provider, status=status).inc()
latency_histogram.observe(duration)
active_calls.dec()
Distributed Tracing
Track calls across systems.Copy
Ask AI
from opentelemetry import trace, context
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Setup Jaeger
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
tracer = trace.get_tracer(__name__)
# Instrument code
with tracer.start_as_current_span("handle_call") as span:
span.set_attribute("call.id", call_id)
span.set_attribute("call.from", caller_number)
with tracer.start_as_current_span("authenticate") as auth_span:
authenticate_call(call_id)
with tracer.start_as_current_span("route_to_ai") as routing_span:
route_to_ai(call_id, provider)
Error Recovery Patterns
Automatic Retry with Backoff
Copy
Ask AI
import asyncio
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1):
"""Retry with exponential backoff"""
def decorator(func):
async def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
logger.warning(
f"Attempt {attempt+1} failed, retrying in {delay}s: {e}"
)
await asyncio.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=3, base_delay=1)
async def call_ai_provider(prompt):
return await openai.chat.completions.create(
model="gpt-4o-realtime-preview",
messages=[{"role": "user", "content": prompt}]
)
Circuit Breaker Pattern
Copy
Ask AI
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject calls
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _should_attempt_reset(self):
elapsed = (datetime.now() - self.last_failure_time).seconds
return elapsed > self.timeout
Security Best Practices
Encrypted Configuration
Copy
Ask AI
from cryptography.fernet import Fernet
import json
class ConfigManager:
def __init__(self, encryption_key):
self.cipher = Fernet(encryption_key)
self.config = {}
def save_encrypted(self, data, filename):
"""Save encrypted config"""
json_data = json.dumps(data)
encrypted = self.cipher.encrypt(json_data.encode())
with open(filename, 'wb') as f:
f.write(encrypted)
def load_encrypted(self, filename):
"""Load encrypted config"""
with open(filename, 'rb') as f:
encrypted = f.read()
decrypted = self.cipher.decrypt(encrypted)
return json.loads(decrypted.decode())

