Skip to content
NewsDataHub NewsDataHub Learning Center

Resilient REST API Integration Cheat Sheet — Retry Logic, Caching & Error Handling

This cheat sheet provides practical patterns for building production-ready applications that depend on external REST APIs. Use it as a quick reference when implementing resilience features or troubleshooting API integration issues.

For comprehensive explanations, see: Building Resilient Applications with REST API Integration

Watch the complete implementation of these patterns in action:


Retry these errors:

  • Network timeouts — Connection or read timeouts
  • HTTP 429 — Rate limiting (use longer backoff)
  • HTTP 5xx — Server errors (500, 502, 503, 504)
  • Connection errors — DNS failures, connection refused (transient)

Don’t retry these errors:

  • HTTP 4xx — Client errors (400, 401, 403, 404) except 429
    • 400: Bad request (your code is wrong)
    • 401: Unauthorized (invalid credentials)
    • 403: Forbidden (insufficient permissions)
    • 404: Not found (resource doesn’t exist)
  • Validation errors — Schema validation failures
  • Authentication failures — Invalid API keys, expired tokens

Use for: Transient errors, timeouts, 5xx responses

Pattern:

  • Attempt 1: Immediate
  • Attempt 2: Wait 0.5-1 second
  • Attempt 3: Wait 2 seconds
  • Attempt 4: Wait 4 seconds

Python example:

import time
import requests
from typing import Optional
def fetch_with_retry(url: str, max_retries: int = 3, base_delay: float = 0.5, headers: dict = None) -> Optional[dict]:
"""
Fetch data with exponential backoff retry logic.
Args:
url: API endpoint URL
max_retries: Maximum number of retry attempts
base_delay: Initial delay in seconds (doubles each retry)
headers: Optional HTTP headers (e.g., for authentication)
Returns:
Response JSON or None if all retries failed
"""
for attempt in range(max_retries):
try:
response = requests.get(url, headers=headers, timeout=10)
# Success
if response.status_code == 200:
return response.json()
# Retry on 5xx or 429
if response.status_code >= 500 or response.status_code == 429:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
print(f"Error {response.status_code}, retrying in {delay}s...")
time.sleep(delay)
continue
# Don't retry 4xx errors
print(f"Client error {response.status_code}, not retrying")
return None
except requests.Timeout:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
print(f"Timeout, retrying in {delay}s...")
time.sleep(delay)
else:
print("Max retries reached")
return None
except requests.RequestException as e:
print(f"Request failed: {e}")
return None
return None
# Usage (generic example)
data = fetch_with_retry('https://api.example.com/data')
# Usage with authentication
headers = {'X-API-Key': 'YOUR_API_KEY'}
data = fetch_with_retry('https://api.newsdatahub.com/v1/news', headers=headers)

JavaScript example:

async function fetchWithRetry(url, maxRetries = 3, baseDelay = 500) {
/**
* Fetch data with exponential backoff retry logic
* @param {string} url - API endpoint URL
* @param {number} maxRetries - Maximum retry attempts
* @param {number} baseDelay - Initial delay in milliseconds
* @returns {Promise<object|null>} Response data or null
*/
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const response = await fetch(url, {
signal: AbortSignal.timeout(10000) // 10s timeout
});
// Success
if (response.ok) {
return await response.json();
}
// Retry on 5xx or 429
if (response.status >= 500 || response.status === 429) {
if (attempt < maxRetries - 1) {
const delay = baseDelay * Math.pow(2, attempt);
console.log(`Error ${response.status}, retrying in ${delay}ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
}
// Don't retry 4xx
console.log(`Client error ${response.status}, not retrying`);
return null;
} catch (error) {
if (error.name === 'TimeoutError' || error.name === 'AbortError') {
if (attempt < maxRetries - 1) {
const delay = baseDelay * Math.pow(2, attempt);
console.log(`Timeout, retrying in ${delay}ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
} else {
console.log('Max retries reached');
return null;
}
} else {
console.error('Request failed:', error);
return null;
}
}
}
return null;
}
// Usage
const data = await fetchWithRetry('https://api.example.com/data');

Use for: HTTP 429 rate limit errors

Pattern:

  • Calculate delay based on rate limit window
  • Example: 5 requests/minute → wait 12-15 seconds between retries

Python example:

def fetch_with_rate_limit_handling(url: str, requests_per_minute: int = 5):
"""Handle rate limits with appropriate backoff."""
try:
response = requests.get(url)
if response.status_code == 429:
# If API provides Retry-After header, use it
retry_after = response.headers.get('Retry-After')
if retry_after:
delay = int(retry_after)
else:
# Calculate based on rate limit
delay = (60 / requests_per_minute) + 2 # Add 2s buffer
print(f"Rate limited, waiting {delay}s...")
time.sleep(delay)
return fetch_with_rate_limit_handling(url, requests_per_minute)
return response.json()
except Exception as e:
print(f"Error: {e}")
return None

Use for: Preventing cascading failures when API is consistently down

Pattern:

  • Closed: Normal operation, requests pass through
  • Open: Too many failures, block requests temporarily
  • Half-Open: Test if service recovered

Python example:

import requests
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout # Seconds before attempting recovery
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection."""
# If open, check if timeout expired
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
self.state = CircuitState.HALF_OPEN
print("Circuit breaker: Half-open, testing...")
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
# Success - reset or close circuit
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
print("Circuit breaker: Closed, service recovered")
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker: OPEN after {self.failure_count} failures")
raise e
# Usage
breaker = CircuitBreaker(failure_threshold=3, timeout=30)
def fetch_data():
response = requests.get('https://api.example.com/data', timeout=5)
response.raise_for_status()
return response.json()
try:
data = breaker.call(fetch_data)
except Exception as e:
print(f"Request blocked or failed: {e}")

Fresh cache: Return data only if recent (e.g., less than 10 minutes old) Stale cache: Return any cached data, regardless of age

Python example:

import json
import time
from pathlib import Path
from typing import Optional
class TwoTierCache:
def __init__(self, cache_file: str, fresh_ttl: int = 600):
"""
Args:
cache_file: Path to cache file
fresh_ttl: Fresh data TTL in seconds (default 10 minutes)
"""
self.cache_file = Path(cache_file)
self.fresh_ttl = fresh_ttl
def get_fresh(self, key: str) -> Optional[dict]:
"""Get data only if fresh (within TTL)."""
if not self.cache_file.exists():
return None
with open(self.cache_file, 'r') as f:
cache = json.load(f)
if key not in cache:
return None
entry = cache[key]
age = time.time() - entry['timestamp']
if age < self.fresh_ttl:
return entry['data']
return None
def get_stale(self, key: str) -> Optional[dict]:
"""Get data regardless of age (fallback)."""
if not self.cache_file.exists():
return None
with open(self.cache_file, 'r') as f:
cache = json.load(f)
return cache.get(key, {}).get('data')
def set(self, key: str, data: dict):
"""Save data to cache with timestamp."""
cache = {}
if self.cache_file.exists():
with open(self.cache_file, 'r') as f:
cache = json.load(f)
cache[key] = {
'data': data,
'timestamp': time.time()
}
with open(self.cache_file, 'w') as f:
json.dump(cache, f)
# Usage
cache = TwoTierCache('news_cache.json', fresh_ttl=600)
# Try fresh cache first
data = cache.get_fresh('tech_news')
if not data:
# Fresh cache miss, fetch from API
try:
# Note: Include authentication headers for NewsDataHub API
headers = {
'X-API-Key': 'YOUR_API_KEY',
'User-Agent': 'resilient-api-integration-cheat-sheet/1.0-py'
}
response = requests.get(
'https://api.newsdatahub.com/v1/news',
headers=headers,
params={'topic': 'technology'},
timeout=10
)
if response.status_code == 200:
data = response.json()
cache.set('tech_news', data)
except Exception:
# API failed, try stale cache as fallback
data = cache.get_stale('tech_news')
if data:
print("Using stale cache data due to API failure")

Time-based (TTL):

# Recommended TTL by data type
CACHE_TTL = {
'breaking_news': 60, # 1 minute
'recent_news': 300, # 5 minutes
'historical_data': 3600, # 1 hour
'source_metadata': 86400, # 24 hours
}

Event-based:

def invalidate_cache_on_event(cache, event_type):
"""Invalidate specific cache entries on events."""
if event_type == 'new_article_published':
cache.delete('latest_news')
cache.delete('breaking_news')
elif event_type == 'source_updated':
cache.delete('source_list')

Size-based (LRU):

from functools import lru_cache
@lru_cache(maxsize=100)
def get_article(article_id):
"""Cache with LRU eviction (keeps 100 most recent)."""
return fetch_article_from_api(article_id)

def resilient_api_call(url: str, cache_key: str, headers: dict = None) -> Optional[dict]:
"""
Complete resilient API call with caching and retry logic.
Flow:
1. Check fresh cache
2. If miss, attempt API call with retries
3. If API fails, fall back to stale cache
4. If all fails, return None
"""
cache = TwoTierCache('api_cache.json', fresh_ttl=600)
# Step 1: Try fresh cache
data = cache.get_fresh(cache_key)
if data:
print(f"Fresh cache hit")
return data
# Step 2: Try API with retry logic
try:
# Note: Pass authentication headers for APIs that require them
data = fetch_with_retry(url, max_retries=3, headers=headers)
if data:
cache.set(cache_key, data)
print(f"API call successful")
return data
except Exception as e:
print(f"API call failed: {e}")
# Step 3: Fall back to stale cache
data = cache.get_stale(cache_key)
if data:
print(f"Using stale cache (API failed)")
return data
# Step 4: All strategies failed
print(f"All strategies failed")
return None
# Usage
headers = {'X-API-Key': 'YOUR_API_KEY'}
news_data = resilient_api_call(
'https://api.newsdatahub.com/v1/news?topic=technology',
'tech_news',
headers
)

CodeMeaningAction
200SuccessReturn data
400Bad requestDon’t retry, fix request
401UnauthorizedCheck API key, don’t retry
403ForbiddenCheck permissions, don’t retry
404Not foundResource doesn’t exist, don’t retry
429Rate limitedRetry with long backoff (12-60s)
500Server errorRetry with exponential backoff
502Bad gatewayRetry with exponential backoff
503Service unavailableRetry with exponential backoff
504Gateway timeoutRetry with exponential backoff
import requests
from requests.exceptions import (
Timeout,
ConnectionError,
HTTPError,
RequestException
)
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
except Timeout:
# Retry recommended
print("Request timed out")
except ConnectionError:
# Retry recommended (transient network issue)
print("Connection failed")
except HTTPError as e:
if e.response.status_code >= 500:
# Retry recommended
print(f"Server error: {e.response.status_code}")
elif e.response.status_code == 429:
# Retry with long backoff
print("Rate limited")
else:
# Don't retry (4xx client error)
print(f"Client error: {e.response.status_code}")
except RequestException as e:
# Generic error, log and handle
print(f"Request failed: {e}")

def get_news_with_degradation(topic: str, api_key: str) -> dict:
"""
Get news with graceful degradation.
Priority:
1. Fresh API data
2. Cached data (with staleness indicator)
3. Static fallback content
"""
try:
# Try API with authentication
headers = {
'X-API-Key': api_key,
'User-Agent': 'resilient-api-integration-cheat-sheet/1.0-py'
}
response = requests.get(
f'https://api.newsdatahub.com/v1/news',
headers=headers,
params={'topic': topic},
timeout=10
)
if response.status_code == 200:
data = response.json()
return {
'articles': data['data'],
'status': 'live',
'message': 'Showing latest news'
}
except Exception:
pass
# Try cache
cached_data = cache.get_stale(f'news_{topic}')
if cached_data:
return {
'articles': cached_data['data'],
'status': 'degraded',
'message': 'Showing cached news (API temporarily unavailable)'
}
# Final fallback
return {
'articles': [],
'status': 'unavailable',
'message': 'News service temporarily unavailable. Please try again later.'
}

API Performance:

  • Response time (p50, p95, p99)
  • Success rate (%)
  • Error rate by type (4xx, 5xx, timeout)
  • Retry rate
  • Cache hit rate

Degraded Mode Indicators:

  • Stale cache usage frequency
  • Circuit breaker open events
  • Failed request count
  • Time in degraded state

Business Metrics:

  • API quota usage
  • Cost per request
  • Data freshness age
  • User-facing errors
import logging
import json
logger = logging.getLogger(__name__)
def log_api_call(url, status_code, duration, from_cache=False, error=None):
"""Structured logging for API calls."""
log_data = {
'url': url,
'status_code': status_code,
'duration_ms': duration,
'from_cache': from_cache,
'error': str(error) if error else None,
'timestamp': time.time()
}
if error or status_code >= 400:
logger.error(json.dumps(log_data))
else:
logger.info(json.dumps(log_data))
MetricWarningCritical
Error rate>5%>10%
Response time p95>2s>5s
Cache hit rateless than 50%less than 30%
Retry rate>20%>40%
Circuit breaker open1 event>3 events/hour

Use when: Traffic is unpredictable, quota management is critical

Pattern:

  • Scheduled job refreshes cache every N hours
  • Application always serves from cache
  • API usage predictable and constant

Python example (using APScheduler):

from apscheduler.schedulers.background import BackgroundScheduler
import requests
import json
def refresh_cache():
"""Background job to refresh cache."""
topics = ['technology', 'finance', 'politics']
for topic in topics:
try:
response = requests.get(
f'https://api.newsdatahub.com/v1/news',
headers={
'X-API-Key': 'YOUR_KEY',
'User-Agent': 'resilient-api-integration-cheat-sheet/1.0-py'
},
params={'topic': topic, 'per_page': 100}
)
if response.status_code == 200:
with open(f'cache_{topic}.json', 'w') as f:
json.dump(response.json(), f)
print(f"Refreshed {topic} cache")
else:
print(f"Failed to refresh {topic}: {response.status_code}")
except Exception as e:
print(f"Error refreshing {topic}: {e}")
# Schedule refresh every 3 hours
scheduler = BackgroundScheduler()
scheduler.add_job(refresh_cache, 'interval', hours=3)
scheduler.start()
# Application always reads from cache
def get_news(topic):
"""Read from cache (no direct API calls)."""
try:
with open(f'cache_{topic}.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
return {'data': [], 'message': 'Cache not yet populated'}

Need to call external API?
├─ Is data critical in real-time?
│ ├─ YES → Use request-response with retry + fresh cache
│ └─ NO → Consider background refresh mode
├─ How predictable is traffic?
│ ├─ Predictable → Request-response OK
│ └─ Unpredictable → Background refresh safer for quota
├─ What if API is down?
│ ├─ Can show stale data → Implement stale cache fallback
│ ├─ Can show partial features → Graceful degradation
│ └─ Cannot function → Implement circuit breaker, clear error messages
└─ How many API calls expected?
├─ Less than 1000/day → Free tier, simple retry logic
├─ 1000-10000/day → Paid tier, caching + retry
└─ More than 10000/day → Enterprise tier, background refresh + circuit breaker

Before Launch:

  • Retry logic implemented with exponential backoff
  • Rate limit handling (429 errors)
  • Fresh cache layer (TTL: 5-15 minutes)
  • Stale cache fallback (for API failures)
  • Graceful degradation (clear user messaging)
  • Error classification (retry vs don’t retry)
  • Timeout configurations (10-30 seconds)
  • Circuit breaker (if high-volume)
  • Structured logging for all API calls
  • Monitoring and alerting configured
  • API quota tracking
  • Secrets management (API keys not in code)
  • User-facing error messages tested

Monitoring Setup:

  • Track error rates (4xx, 5xx, timeout)
  • Track response times (p50, p95, p99)
  • Track cache hit rate
  • Track retry rate
  • Alert on degraded mode
  • Alert on quota nearing limit
  • Dashboard for real-time visibility

import requests
import time
import json
from typing import Optional
from pathlib import Path
class ResilientNewsAPI:
"""Production-ready NewsDataHub API client."""
def __init__(self, api_key: str, cache_dir: str = './cache'):
self.api_key = api_key
self.base_url = 'https://api.newsdatahub.com/v1'
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def get_news(self, topic: str, per_page: int = 20) -> Optional[dict]:
"""
Get news with full resilience:
- Fresh cache check
- Retry with exponential backoff
- Stale cache fallback
- Graceful degradation
"""
cache_key = f'news_{topic}_{per_page}'
# Try fresh cache
fresh_data = self._get_fresh_cache(cache_key)
if fresh_data:
return {'data': fresh_data, 'source': 'fresh_cache'}
# Try API with retry
api_data = self._fetch_with_retry(
f'{self.base_url}/news',
params={'topic': topic, 'per_page': per_page}
)
if api_data:
self._save_cache(cache_key, api_data)
return {'data': api_data, 'source': 'api'}
# Fallback to stale cache
stale_data = self._get_stale_cache(cache_key)
if stale_data:
return {'data': stale_data, 'source': 'stale_cache'}
# All strategies failed
return None
def _fetch_with_retry(self, endpoint: str, params: dict, max_retries: int = 3) -> Optional[dict]:
"""Fetch from API with exponential backoff retry."""
for attempt in range(max_retries):
try:
response = requests.get(
endpoint,
headers={
'X-API-Key': self.api_key,
'User-Agent': 'resilient-api-integration-cheat-sheet/1.0-py'
},
params=params,
timeout=10
)
if response.status_code == 200:
return response.json()
if response.status_code >= 500 or response.status_code == 429:
if attempt < max_retries - 1:
delay = 0.5 * (2 ** attempt)
print(f"Error {response.status_code}, retry in {delay}s")
time.sleep(delay)
continue
print(f"API error {response.status_code}")
return None
except requests.Timeout:
if attempt < max_retries - 1:
delay = 0.5 * (2 ** attempt)
print(f"Timeout, retry in {delay}s")
time.sleep(delay)
else:
print("Max retries reached")
return None
except Exception as e:
print(f"Request failed: {e}")
return None
return None
def _get_fresh_cache(self, key: str, ttl: int = 600) -> Optional[dict]:
"""Get cache if < TTL seconds old."""
cache_file = self.cache_dir / f'{key}.json'
if not cache_file.exists():
return None
age = time.time() - cache_file.stat().st_mtime
if age < ttl:
with open(cache_file, 'r') as f:
return json.load(f)
return None
def _get_stale_cache(self, key: str) -> Optional[dict]:
"""Get cache regardless of age."""
cache_file = self.cache_dir / f'{key}.json'
if cache_file.exists():
with open(cache_file, 'r') as f:
return json.load(f)
return None
def _save_cache(self, key: str, data: dict):
"""Save data to cache."""
cache_file = self.cache_dir / f'{key}.json'
with open(cache_file, 'w') as f:
json.dump(data, f)
# Usage
api = ResilientNewsAPI(api_key='YOUR_API_KEY')
result = api.get_news('technology', per_page=50)
if result:
print(f"Source: {result['source']}")
print(f"Articles: {len(result['data'].get('data', []))}")
else:
print("Failed to get news data")

Related Resources:

Olga S.

Founder of NewsDataHub — Distributed Systems & Data Engineering

Connect on LinkedIn