Build robust API integrations using Lyceum’s cloud infrastructure. Handle authentication, rate limiting, and data processing at scale.
Basic API Requests
Make HTTP requests to external APIs:Copy
import requests
import json
import time
from datetime import datetime
def basic_api_examples():
"""Demonstrate basic API request patterns"""
print("=== BASIC API REQUESTS ===\n")
# Example 1: GET request with JSON response
print("1. Fetching user data from JSONPlaceholder...")
try:
response = requests.get('https://jsonplaceholder.typicode.com/users/1')
response.raise_for_status() # Raise exception for bad status codes
user_data = response.json()
print(f"User: {user_data['name']} ({user_data['email']})")
print(f"Company: {user_data['company']['name']}")
except requests.exceptions.RequestException as e:
print(f"Error fetching user data: {e}")
# Example 2: POST request with data
print("\n2. Creating a new post...")
try:
new_post = {
'title': 'My API Test Post',
'body': 'This is a test post created via API',
'userId': 1
}
response = requests.post(
'https://jsonplaceholder.typicode.com/posts',
json=new_post,
headers={'Content-Type': 'application/json'}
)
response.raise_for_status()
created_post = response.json()
print(f"Created post with ID: {created_post['id']}")
print(f"Title: {created_post['title']}")
except requests.exceptions.RequestException as e:
print(f"Error creating post: {e}")
# Example 3: API with query parameters
print("\n3. Searching posts...")
try:
params = {
'userId': 1,
'_limit': 5
}
response = requests.get(
'https://jsonplaceholder.typicode.com/posts',
params=params
)
response.raise_for_status()
posts = response.json()
print(f"Found {len(posts)} posts:")
for post in posts:
print(f" - {post['title'][:50]}...")
except requests.exceptions.RequestException as e:
print(f"Error searching posts: {e}")
# Run basic examples
basic_api_examples()
Authentication Patterns
Handle different authentication methods:Copy
import requests
import base64
import hashlib
import hmac
from datetime import datetime
import os
class APIAuthenticator:
"""Handle various API authentication methods"""
def __init__(self):
self.session = requests.Session()
def basic_auth_example(self):
"""Basic authentication example"""
print("=== BASIC AUTHENTICATION ===")
# Method 1: Using requests auth
username = "test_user"
password = "test_password"
response = requests.get(
'https://httpbin.org/basic-auth/test_user/test_password',
auth=(username, password)
)
if response.status_code == 200:
print("✅ Basic auth successful")
print(response.json())
else:
print(f"❌ Basic auth failed: {response.status_code}")
def bearer_token_example(self):
"""Bearer token authentication"""
print("\n=== BEARER TOKEN AUTHENTICATION ===")
# Simulate getting a token (replace with actual OAuth flow)
token = "sample_bearer_token_12345"
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
# Example API call with bearer token
response = requests.get(
'https://httpbin.org/bearer',
headers=headers
)
print(f"Status: {response.status_code}")
if response.status_code == 200:
print("✅ Bearer token accepted")
def api_key_example(self):
"""API key authentication"""
print("\n=== API KEY AUTHENTICATION ===")
api_key = "your-api-key-here"
# Method 1: API key in headers
headers = {'X-API-Key': api_key}
response = requests.get(
'https://httpbin.org/headers',
headers=headers
)
print("Headers sent:")
result = response.json()
print(json.dumps(result['headers'], indent=2))
# Method 2: API key in query parameters
params = {'api_key': api_key, 'format': 'json'}
response = requests.get(
'https://httpbin.org/get',
params=params
)
print("\nQuery parameters:")
result = response.json()
print(f"URL: {result['url']}")
def oauth2_flow_simulation(self):
"""Simulate OAuth2 flow"""
print("\n=== OAUTH2 FLOW SIMULATION ===")
# This is a simulation - in real scenarios, use proper OAuth2 libraries
client_id = "your_client_id"
client_secret = "your_client_secret"
# Step 1: Get access token (normally done via authorization code)
token_data = {
'grant_type': 'client_credentials',
'client_id': client_id,
'client_secret': client_secret
}
print("1. Simulating token request...")
# In real scenarios, replace with actual OAuth2 endpoint
print(f"Token request data: {token_data}")
# Simulate token response
simulated_token_response = {
'access_token': 'simulated_access_token_12345',
'token_type': 'Bearer',
'expires_in': 3600,
'scope': 'read write'
}
print("2. Simulated token response:")
print(json.dumps(simulated_token_response, indent=2))
# Step 2: Use access token
access_token = simulated_token_response['access_token']
headers = {'Authorization': f'Bearer {access_token}'}
print("3. Using access token for API calls...")
response = requests.get(
'https://httpbin.org/bearer',
headers=headers
)
print(f"API call status: {response.status_code}")
# Run authentication examples
auth = APIAuthenticator()
auth.basic_auth_example()
auth.bearer_token_example()
auth.api_key_example()
auth.oauth2_flow_simulation()
Rate Limiting and Retry Logic
Handle API rate limits gracefully:Copy
import requests
import time
import random
from functools import wraps
class RateLimitedAPIClient:
"""API client with rate limiting and retry logic"""
def __init__(self, base_url, rate_limit=10, retry_attempts=3):
self.base_url = base_url
self.rate_limit = rate_limit # requests per second
self.retry_attempts = retry_attempts
self.session = requests.Session()
self.last_request_time = 0
def rate_limit_decorator(func):
"""Decorator to enforce rate limiting"""
@wraps(func)
def wrapper(self, *args, **kwargs):
# Calculate time since last request
current_time = time.time()
time_since_last = current_time - self.last_request_time
min_interval = 1.0 / self.rate_limit
if time_since_last < min_interval:
sleep_time = min_interval - time_since_last
print(f"Rate limiting: sleeping for {sleep_time:.2f} seconds")
time.sleep(sleep_time)
self.last_request_time = time.time()
return func(self, *args, **kwargs)
return wrapper
@rate_limit_decorator
def make_request(self, method, endpoint, **kwargs):
"""Make API request with retry logic"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(self.retry_attempts):
try:
response = self.session.request(method, url, **kwargs)
# Handle different status codes
if response.status_code == 200:
return response
elif response.status_code == 429: # Too Many Requests
retry_after = int(response.headers.get('Retry-After', 60))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
elif response.status_code >= 500: # Server errors
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Server error {response.status_code}. Retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
continue
else:
response.raise_for_status()
except requests.exceptions.ConnectionError as e:
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Connection error: {e}. Retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
except requests.exceptions.Timeout as e:
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Timeout error: {e}. Retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
raise Exception(f"Failed after {self.retry_attempts} attempts")
def get(self, endpoint, **kwargs):
return self.make_request('GET', endpoint, **kwargs)
def post(self, endpoint, **kwargs):
return self.make_request('POST', endpoint, **kwargs)
# Example usage
def test_rate_limited_client():
"""Test the rate-limited API client"""
print("=== RATE LIMITED API CLIENT ===\n")
client = RateLimitedAPIClient(
base_url='https://jsonplaceholder.typicode.com',
rate_limit=2, # 2 requests per second
retry_attempts=3
)
# Make multiple requests to test rate limiting
endpoints = [
'posts/1',
'posts/2',
'posts/3',
'users/1',
'users/2'
]
results = []
start_time = time.time()
for endpoint in endpoints:
try:
print(f"Requesting {endpoint}...")
response = client.get(endpoint)
data = response.json()
results.append({
'endpoint': endpoint,
'status': 'success',
'title': data.get('title', data.get('name', 'N/A'))
})
except Exception as e:
results.append({
'endpoint': endpoint,
'status': 'error',
'error': str(e)
})
total_time = time.time() - start_time
print(f"\nCompleted {len(endpoints)} requests in {total_time:.2f} seconds")
print("\nResults:")
for result in results:
if result['status'] == 'success':
print(f"✅ {result['endpoint']}: {result['title']}")
else:
print(f"❌ {result['endpoint']}: {result['error']}")
return results
# Run rate limiting test
results = test_rate_limited_client()
Webhook Handler
Process incoming webhook data:Copy
import json
import hashlib
import hmac
from datetime import datetime
import base64
class WebhookProcessor:
"""Process incoming webhook data"""
def __init__(self, secret_key=None):
self.secret_key = secret_key
self.processed_webhooks = []
def verify_signature(self, payload, signature, algorithm='sha256'):
"""Verify webhook signature for security"""
if not self.secret_key:
return True # Skip verification if no secret
expected_signature = hmac.new(
self.secret_key.encode('utf-8'),
payload.encode('utf-8'),
getattr(hashlib, algorithm)
).hexdigest()
# Remove algorithm prefix if present (e.g., "sha256=")
if '=' in signature:
signature = signature.split('=', 1)[1]
return hmac.compare_digest(expected_signature, signature)
def process_github_webhook(self, payload):
"""Process GitHub webhook"""
print("=== GITHUB WEBHOOK ===")
try:
data = json.loads(payload) if isinstance(payload, str) else payload
event_type = data.get('action', 'unknown')
repository = data.get('repository', {}).get('name', 'unknown')
print(f"Event: {event_type}")
print(f"Repository: {repository}")
if 'commits' in data:
commits = data['commits']
print(f"Commits: {len(commits)}")
for commit in commits[:3]: # Show first 3 commits
print(f" - {commit['message'][:50]}... by {commit['author']['name']}")
if 'pull_request' in data:
pr = data['pull_request']
print(f"PR #{pr['number']}: {pr['title']}")
print(f"Author: {pr['user']['login']}")
print(f"State: {pr['state']}")
# Log webhook
self.log_webhook('github', event_type, data)
except Exception as e:
print(f"Error processing GitHub webhook: {e}")
def process_stripe_webhook(self, payload):
"""Process Stripe webhook"""
print("=== STRIPE WEBHOOK ===")
try:
data = json.loads(payload) if isinstance(payload, str) else payload
event_type = data.get('type', 'unknown')
event_id = data.get('id', 'unknown')
print(f"Event: {event_type}")
print(f"Event ID: {event_id}")
if event_type == 'payment_intent.succeeded':
payment = data['data']['object']
amount = payment['amount'] / 100 # Convert from cents
currency = payment['currency'].upper()
print(f"Payment succeeded: {currency} {amount}")
elif event_type == 'customer.created':
customer = data['data']['object']
print(f"New customer: {customer.get('email', 'no email')}")
elif event_type == 'invoice.payment_failed':
invoice = data['data']['object']
print(f"Payment failed for invoice: {invoice['id']}")
# Log webhook
self.log_webhook('stripe', event_type, data)
except Exception as e:
print(f"Error processing Stripe webhook: {e}")
def process_custom_webhook(self, payload, webhook_type='custom'):
"""Process custom webhook"""
print(f"=== {webhook_type.upper()} WEBHOOK ===")
try:
data = json.loads(payload) if isinstance(payload, str) else payload
print(f"Webhook type: {webhook_type}")
print(f"Payload keys: {list(data.keys())}")
# Extract common fields
timestamp = data.get('timestamp', datetime.now().isoformat())
event = data.get('event', 'unknown')
user_id = data.get('user_id', data.get('userId', 'unknown'))
print(f"Event: {event}")
print(f"User ID: {user_id}")
print(f"Timestamp: {timestamp}")
# Process based on event type
if event == 'user_signup':
print(f"New user signup: {data.get('email', 'no email')}")
elif event == 'order_completed':
print(f"Order completed: ${data.get('amount', 0)}")
elif event == 'subscription_cancelled':
print(f"Subscription cancelled: {data.get('plan', 'unknown plan')}")
# Log webhook
self.log_webhook(webhook_type, event, data)
except Exception as e:
print(f"Error processing {webhook_type} webhook: {e}")
def log_webhook(self, source, event_type, data):
"""Log webhook for debugging and audit"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'source': source,
'event_type': event_type,
'data_keys': list(data.keys()) if isinstance(data, dict) else [],
'data_size': len(str(data))
}
self.processed_webhooks.append(log_entry)
# Save to storage for persistence
with open('/lyceum/storage/webhook_log.json', 'w') as f:
json.dump(self.processed_webhooks, f, indent=2)
# Example webhook processing
def demo_webhook_processing():
"""Demonstrate webhook processing"""
processor = WebhookProcessor(secret_key='my_webhook_secret')
# Simulate GitHub webhook
github_payload = {
"action": "opened",
"repository": {
"name": "my-awesome-project",
"full_name": "user/my-awesome-project"
},
"pull_request": {
"number": 42,
"title": "Add new feature",
"state": "open",
"user": {
"login": "developer123"
}
}
}
processor.process_github_webhook(github_payload)
# Simulate Stripe webhook
stripe_payload = {
"id": "evt_1234567890",
"type": "payment_intent.succeeded",
"data": {
"object": {
"id": "pi_1234567890",
"amount": 2000, # $20.00
"currency": "usd",
"status": "succeeded"
}
}
}
processor.process_stripe_webhook(stripe_payload)
# Simulate custom webhook
custom_payload = {
"event": "user_signup",
"user_id": "user_12345",
"email": "[email protected]",
"timestamp": datetime.now().isoformat(),
"plan": "premium",
"source": "web"
}
processor.process_custom_webhook(custom_payload, 'user_events')
print(f"\n=== WEBHOOK SUMMARY ===")
print(f"Processed {len(processor.processed_webhooks)} webhooks")
for webhook in processor.processed_webhooks:
print(f"- {webhook['source']}: {webhook['event_type']} at {webhook['timestamp']}")
# Run webhook demo
demo_webhook_processing()
API Data Pipeline
Build a complete data pipeline using APIs:Copy
import requests
import pandas as pd
import time
from datetime import datetime, timedelta
import json
class APIDataPipeline:
"""Complete API data pipeline"""
def __init__(self):
self.session = requests.Session()
self.data_cache = {}
def extract_user_data(self):
"""Extract user data from API"""
print("=== EXTRACTING USER DATA ===")
try:
response = self.session.get('https://jsonplaceholder.typicode.com/users')
response.raise_for_status()
users = response.json()
users_df = pd.DataFrame(users)
# Extract nested data
companies = pd.json_normalize(users, 'company', ['id'], sep='_', errors='ignore')
addresses = pd.json_normalize(users, 'address', ['id'], sep='_', errors='ignore')
print(f"Extracted {len(users_df)} users")
print(f"Companies: {len(companies)} records")
print(f"Addresses: {len(addresses)} records")
self.data_cache['users'] = users_df
self.data_cache['companies'] = companies
self.data_cache['addresses'] = addresses
return users_df
except Exception as e:
print(f"Error extracting user data: {e}")
return None
def extract_post_data(self):
"""Extract post data from API"""
print("\n=== EXTRACTING POST DATA ===")
try:
response = self.session.get('https://jsonplaceholder.typicode.com/posts')
response.raise_for_status()
posts = response.json()
posts_df = pd.DataFrame(posts)
# Get comments for each post
comments_data = []
print("Fetching comments...")
for post_id in posts_df['id'].head(10): # Limit to first 10 for demo
comment_response = self.session.get(f'https://jsonplaceholder.typicode.com/posts/{post_id}/comments')
if comment_response.status_code == 200:
comments = comment_response.json()
for comment in comments:
comment['postId'] = post_id
comments_data.append(comment)
time.sleep(0.1) # Rate limiting
comments_df = pd.DataFrame(comments_data)
print(f"Extracted {len(posts_df)} posts")
print(f"Extracted {len(comments_df)} comments")
self.data_cache['posts'] = posts_df
self.data_cache['comments'] = comments_df
return posts_df, comments_df
except Exception as e:
print(f"Error extracting post data: {e}")
return None, None
def transform_data(self):
"""Transform and clean the extracted data"""
print("\n=== TRANSFORMING DATA ===")
if 'users' not in self.data_cache or 'posts' not in self.data_cache:
print("No data to transform")
return
users_df = self.data_cache['users'].copy()
posts_df = self.data_cache['posts'].copy()
comments_df = self.data_cache.get('comments', pd.DataFrame())
# Transform users
users_df['full_name'] = users_df['name']
users_df['domain'] = users_df['email'].str.split('@').str[1]
users_df['phone_clean'] = users_df['phone'].str.replace(r'[^\d]', '', regex=True)
# Transform posts
posts_df['title_length'] = posts_df['title'].str.len()
posts_df['body_length'] = posts_df['body'].str.len()
posts_df['word_count'] = posts_df['body'].str.split().str.len()
# Aggregate comments per post
if not comments_df.empty:
comment_stats = comments_df.groupby('postId').agg({
'id': 'count',
'body': lambda x: sum(len(body.split()) for body in x)
}).rename(columns={'id': 'comment_count', 'body': 'comment_words'})
posts_df = posts_df.merge(comment_stats, left_on='id', right_index=True, how='left')
posts_df['comment_count'] = posts_df['comment_count'].fillna(0)
posts_df['comment_words'] = posts_df['comment_words'].fillna(0)
# Join users and posts
user_posts = posts_df.merge(users_df[['id', 'name', 'email', 'domain']],
left_on='userId', right_on='id', suffixes=('_post', '_user'))
print(f"Transformed data: {len(user_posts)} user-post combinations")
# Calculate metrics
user_metrics = user_posts.groupby(['userId', 'name']).agg({
'id_post': 'count',
'word_count': 'sum',
'comment_count': 'sum',
'title_length': 'mean',
'body_length': 'mean'
}).round(2)
user_metrics.columns = ['total_posts', 'total_words', 'total_comments', 'avg_title_length', 'avg_body_length']
print("\nUser metrics calculated:")
print(user_metrics.head())
self.data_cache['user_posts'] = user_posts
self.data_cache['user_metrics'] = user_metrics
return user_posts, user_metrics
def load_data(self):
"""Load transformed data to storage"""
print("\n=== LOADING DATA ===")
# Save all datasets
for name, data in self.data_cache.items():
if isinstance(data, pd.DataFrame):
filename = f'/lyceum/storage/{name}.csv'
data.to_csv(filename, index=False)
print(f"Saved {name}: {len(data)} rows")
# Create summary report
summary = {
'pipeline_run': datetime.now().isoformat(),
'datasets': {
name: {
'rows': len(data),
'columns': len(data.columns) if hasattr(data, 'columns') else 0
}
for name, data in self.data_cache.items()
if isinstance(data, pd.DataFrame)
},
'top_contributors': self.data_cache.get('user_metrics', pd.DataFrame()).head(5).to_dict() if 'user_metrics' in self.data_cache else {}
}
with open('/lyceum/storage/pipeline_summary.json', 'w') as f:
json.dump(summary, f, indent=2, default=str)
print(f"\nPipeline completed successfully!")
print(f"Total datasets: {len(summary['datasets'])}")
return summary
def run_pipeline(self):
"""Run the complete ETL pipeline"""
print("=== STARTING API DATA PIPELINE ===\n")
start_time = time.time()
# Extract
users_df = self.extract_user_data()
posts_df, comments_df = self.extract_post_data()
if users_df is None or posts_df is None:
print("Pipeline failed during extraction")
return
# Transform
user_posts, user_metrics = self.transform_data()
# Load
summary = self.load_data()
execution_time = time.time() - start_time
print(f"\nPipeline completed in {execution_time:.2f} seconds")
return summary
# Run the complete pipeline
pipeline = APIDataPipeline()
result = pipeline.run_pipeline()
Use environment variables to store API keys and secrets. Access them with
os.environ.get('API_KEY') to keep credentials secure.Always implement proper error handling and rate limiting when working with external APIs. Respect API quotas and terms of service.
API responses and processed data are automatically saved to your storage. Use these patterns as starting points for your own API integrations.

