Skip to main content
Build robust API integrations using Lyceum’s cloud infrastructure. Handle authentication, rate limiting, and data processing at scale.

Basic API Requests

Make HTTP requests to external APIs:
import requests
import json
import time
from datetime import datetime

def basic_api_examples():
    """Demonstrate basic API request patterns"""
    
    print("=== BASIC API REQUESTS ===\n")
    
    # Example 1: GET request with JSON response
    print("1. Fetching user data from JSONPlaceholder...")
    try:
        response = requests.get('https://jsonplaceholder.typicode.com/users/1')
        response.raise_for_status()  # Raise exception for bad status codes
        
        user_data = response.json()
        print(f"User: {user_data['name']} ({user_data['email']})")
        print(f"Company: {user_data['company']['name']}")
        
    except requests.exceptions.RequestException as e:
        print(f"Error fetching user data: {e}")
    
    # Example 2: POST request with data
    print("\n2. Creating a new post...")
    try:
        new_post = {
            'title': 'My API Test Post',
            'body': 'This is a test post created via API',
            'userId': 1
        }
        
        response = requests.post(
            'https://jsonplaceholder.typicode.com/posts',
            json=new_post,
            headers={'Content-Type': 'application/json'}
        )
        response.raise_for_status()
        
        created_post = response.json()
        print(f"Created post with ID: {created_post['id']}")
        print(f"Title: {created_post['title']}")
        
    except requests.exceptions.RequestException as e:
        print(f"Error creating post: {e}")
    
    # Example 3: API with query parameters
    print("\n3. Searching posts...")
    try:
        params = {
            'userId': 1,
            '_limit': 5
        }
        
        response = requests.get(
            'https://jsonplaceholder.typicode.com/posts',
            params=params
        )
        response.raise_for_status()
        
        posts = response.json()
        print(f"Found {len(posts)} posts:")
        for post in posts:
            print(f"  - {post['title'][:50]}...")
            
    except requests.exceptions.RequestException as e:
        print(f"Error searching posts: {e}")

# Run basic examples
basic_api_examples()

Authentication Patterns

Handle different authentication methods:
import requests
import base64
import hashlib
import hmac
from datetime import datetime
import os

class APIAuthenticator:
    """Handle various API authentication methods"""
    
    def __init__(self):
        self.session = requests.Session()
    
    def basic_auth_example(self):
        """Basic authentication example"""
        print("=== BASIC AUTHENTICATION ===")
        
        # Method 1: Using requests auth
        username = "test_user"
        password = "test_password"
        
        response = requests.get(
            'https://httpbin.org/basic-auth/test_user/test_password',
            auth=(username, password)
        )
        
        if response.status_code == 200:
            print("✅ Basic auth successful")
            print(response.json())
        else:
            print(f"❌ Basic auth failed: {response.status_code}")
    
    def bearer_token_example(self):
        """Bearer token authentication"""
        print("\n=== BEARER TOKEN AUTHENTICATION ===")
        
        # Simulate getting a token (replace with actual OAuth flow)
        token = "sample_bearer_token_12345"
        
        headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }
        
        # Example API call with bearer token
        response = requests.get(
            'https://httpbin.org/bearer',
            headers=headers
        )
        
        print(f"Status: {response.status_code}")
        if response.status_code == 200:
            print("✅ Bearer token accepted")
        
    def api_key_example(self):
        """API key authentication"""
        print("\n=== API KEY AUTHENTICATION ===")
        
        api_key = "your-api-key-here"
        
        # Method 1: API key in headers
        headers = {'X-API-Key': api_key}
        response = requests.get(
            'https://httpbin.org/headers',
            headers=headers
        )
        
        print("Headers sent:")
        result = response.json()
        print(json.dumps(result['headers'], indent=2))
        
        # Method 2: API key in query parameters
        params = {'api_key': api_key, 'format': 'json'}
        response = requests.get(
            'https://httpbin.org/get',
            params=params
        )
        
        print("\nQuery parameters:")
        result = response.json()
        print(f"URL: {result['url']}")
    
    def oauth2_flow_simulation(self):
        """Simulate OAuth2 flow"""
        print("\n=== OAUTH2 FLOW SIMULATION ===")
        
        # This is a simulation - in real scenarios, use proper OAuth2 libraries
        client_id = "your_client_id"
        client_secret = "your_client_secret"
        
        # Step 1: Get access token (normally done via authorization code)
        token_data = {
            'grant_type': 'client_credentials',
            'client_id': client_id,
            'client_secret': client_secret
        }
        
        print("1. Simulating token request...")
        # In real scenarios, replace with actual OAuth2 endpoint
        print(f"Token request data: {token_data}")
        
        # Simulate token response
        simulated_token_response = {
            'access_token': 'simulated_access_token_12345',
            'token_type': 'Bearer',
            'expires_in': 3600,
            'scope': 'read write'
        }
        
        print("2. Simulated token response:")
        print(json.dumps(simulated_token_response, indent=2))
        
        # Step 2: Use access token
        access_token = simulated_token_response['access_token']
        headers = {'Authorization': f'Bearer {access_token}'}
        
        print("3. Using access token for API calls...")
        response = requests.get(
            'https://httpbin.org/bearer',
            headers=headers
        )
        
        print(f"API call status: {response.status_code}")

# Run authentication examples
auth = APIAuthenticator()
auth.basic_auth_example()
auth.bearer_token_example()
auth.api_key_example()
auth.oauth2_flow_simulation()

Rate Limiting and Retry Logic

Handle API rate limits gracefully:
import requests
import time
import random
from functools import wraps

class RateLimitedAPIClient:
    """API client with rate limiting and retry logic"""
    
    def __init__(self, base_url, rate_limit=10, retry_attempts=3):
        self.base_url = base_url
        self.rate_limit = rate_limit  # requests per second
        self.retry_attempts = retry_attempts
        self.session = requests.Session()
        self.last_request_time = 0
    
    def rate_limit_decorator(func):
        """Decorator to enforce rate limiting"""
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            # Calculate time since last request
            current_time = time.time()
            time_since_last = current_time - self.last_request_time
            min_interval = 1.0 / self.rate_limit
            
            if time_since_last < min_interval:
                sleep_time = min_interval - time_since_last
                print(f"Rate limiting: sleeping for {sleep_time:.2f} seconds")
                time.sleep(sleep_time)
            
            self.last_request_time = time.time()
            return func(self, *args, **kwargs)
        return wrapper
    
    @rate_limit_decorator
    def make_request(self, method, endpoint, **kwargs):
        """Make API request with retry logic"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        for attempt in range(self.retry_attempts):
            try:
                response = self.session.request(method, url, **kwargs)
                
                # Handle different status codes
                if response.status_code == 200:
                    return response
                elif response.status_code == 429:  # Too Many Requests
                    retry_after = int(response.headers.get('Retry-After', 60))
                    print(f"Rate limited. Waiting {retry_after} seconds...")
                    time.sleep(retry_after)
                    continue
                elif response.status_code >= 500:  # Server errors
                    wait_time = (2 ** attempt) + random.uniform(0, 1)
                    print(f"Server error {response.status_code}. Retrying in {wait_time:.1f}s...")
                    time.sleep(wait_time)
                    continue
                else:
                    response.raise_for_status()
                    
            except requests.exceptions.ConnectionError as e:
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                print(f"Connection error: {e}. Retrying in {wait_time:.1f}s...")
                time.sleep(wait_time)
                
            except requests.exceptions.Timeout as e:
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                print(f"Timeout error: {e}. Retrying in {wait_time:.1f}s...")
                time.sleep(wait_time)
        
        raise Exception(f"Failed after {self.retry_attempts} attempts")
    
    def get(self, endpoint, **kwargs):
        return self.make_request('GET', endpoint, **kwargs)
    
    def post(self, endpoint, **kwargs):
        return self.make_request('POST', endpoint, **kwargs)

# Example usage
def test_rate_limited_client():
    """Test the rate-limited API client"""
    print("=== RATE LIMITED API CLIENT ===\n")
    
    client = RateLimitedAPIClient(
        base_url='https://jsonplaceholder.typicode.com',
        rate_limit=2,  # 2 requests per second
        retry_attempts=3
    )
    
    # Make multiple requests to test rate limiting
    endpoints = [
        'posts/1',
        'posts/2', 
        'posts/3',
        'users/1',
        'users/2'
    ]
    
    results = []
    start_time = time.time()
    
    for endpoint in endpoints:
        try:
            print(f"Requesting {endpoint}...")
            response = client.get(endpoint)
            data = response.json()
            
            results.append({
                'endpoint': endpoint,
                'status': 'success',
                'title': data.get('title', data.get('name', 'N/A'))
            })
            
        except Exception as e:
            results.append({
                'endpoint': endpoint,
                'status': 'error',
                'error': str(e)
            })
    
    total_time = time.time() - start_time
    print(f"\nCompleted {len(endpoints)} requests in {total_time:.2f} seconds")
    
    print("\nResults:")
    for result in results:
        if result['status'] == 'success':
            print(f"✅ {result['endpoint']}: {result['title']}")
        else:
            print(f"❌ {result['endpoint']}: {result['error']}")
    
    return results

# Run rate limiting test
results = test_rate_limited_client()

Webhook Handler

Process incoming webhook data:
import json
import hashlib
import hmac
from datetime import datetime
import base64

class WebhookProcessor:
    """Process incoming webhook data"""
    
    def __init__(self, secret_key=None):
        self.secret_key = secret_key
        self.processed_webhooks = []
    
    def verify_signature(self, payload, signature, algorithm='sha256'):
        """Verify webhook signature for security"""
        if not self.secret_key:
            return True  # Skip verification if no secret
            
        expected_signature = hmac.new(
            self.secret_key.encode('utf-8'),
            payload.encode('utf-8'),
            getattr(hashlib, algorithm)
        ).hexdigest()
        
        # Remove algorithm prefix if present (e.g., "sha256=")
        if '=' in signature:
            signature = signature.split('=', 1)[1]
            
        return hmac.compare_digest(expected_signature, signature)
    
    def process_github_webhook(self, payload):
        """Process GitHub webhook"""
        print("=== GITHUB WEBHOOK ===")
        
        try:
            data = json.loads(payload) if isinstance(payload, str) else payload
            
            event_type = data.get('action', 'unknown')
            repository = data.get('repository', {}).get('name', 'unknown')
            
            print(f"Event: {event_type}")
            print(f"Repository: {repository}")
            
            if 'commits' in data:
                commits = data['commits']
                print(f"Commits: {len(commits)}")
                for commit in commits[:3]:  # Show first 3 commits
                    print(f"  - {commit['message'][:50]}... by {commit['author']['name']}")
            
            if 'pull_request' in data:
                pr = data['pull_request']
                print(f"PR #{pr['number']}: {pr['title']}")
                print(f"Author: {pr['user']['login']}")
                print(f"State: {pr['state']}")
            
            # Log webhook
            self.log_webhook('github', event_type, data)
            
        except Exception as e:
            print(f"Error processing GitHub webhook: {e}")
    
    def process_stripe_webhook(self, payload):
        """Process Stripe webhook"""
        print("=== STRIPE WEBHOOK ===")
        
        try:
            data = json.loads(payload) if isinstance(payload, str) else payload
            
            event_type = data.get('type', 'unknown')
            event_id = data.get('id', 'unknown')
            
            print(f"Event: {event_type}")
            print(f"Event ID: {event_id}")
            
            if event_type == 'payment_intent.succeeded':
                payment = data['data']['object']
                amount = payment['amount'] / 100  # Convert from cents
                currency = payment['currency'].upper()
                print(f"Payment succeeded: {currency} {amount}")
                
            elif event_type == 'customer.created':
                customer = data['data']['object']
                print(f"New customer: {customer.get('email', 'no email')}")
                
            elif event_type == 'invoice.payment_failed':
                invoice = data['data']['object']
                print(f"Payment failed for invoice: {invoice['id']}")
                
            # Log webhook
            self.log_webhook('stripe', event_type, data)
            
        except Exception as e:
            print(f"Error processing Stripe webhook: {e}")
    
    def process_custom_webhook(self, payload, webhook_type='custom'):
        """Process custom webhook"""
        print(f"=== {webhook_type.upper()} WEBHOOK ===")
        
        try:
            data = json.loads(payload) if isinstance(payload, str) else payload
            
            print(f"Webhook type: {webhook_type}")
            print(f"Payload keys: {list(data.keys())}")
            
            # Extract common fields
            timestamp = data.get('timestamp', datetime.now().isoformat())
            event = data.get('event', 'unknown')
            user_id = data.get('user_id', data.get('userId', 'unknown'))
            
            print(f"Event: {event}")
            print(f"User ID: {user_id}")
            print(f"Timestamp: {timestamp}")
            
            # Process based on event type
            if event == 'user_signup':
                print(f"New user signup: {data.get('email', 'no email')}")
            elif event == 'order_completed':
                print(f"Order completed: ${data.get('amount', 0)}")
            elif event == 'subscription_cancelled':
                print(f"Subscription cancelled: {data.get('plan', 'unknown plan')}")
            
            # Log webhook
            self.log_webhook(webhook_type, event, data)
            
        except Exception as e:
            print(f"Error processing {webhook_type} webhook: {e}")
    
    def log_webhook(self, source, event_type, data):
        """Log webhook for debugging and audit"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'source': source,
            'event_type': event_type,
            'data_keys': list(data.keys()) if isinstance(data, dict) else [],
            'data_size': len(str(data))
        }
        
        self.processed_webhooks.append(log_entry)
        
        # Save to storage for persistence
        with open('/lyceum/storage/webhook_log.json', 'w') as f:
            json.dump(self.processed_webhooks, f, indent=2)

# Example webhook processing
def demo_webhook_processing():
    """Demonstrate webhook processing"""
    
    processor = WebhookProcessor(secret_key='my_webhook_secret')
    
    # Simulate GitHub webhook
    github_payload = {
        "action": "opened",
        "repository": {
            "name": "my-awesome-project",
            "full_name": "user/my-awesome-project"
        },
        "pull_request": {
            "number": 42,
            "title": "Add new feature",
            "state": "open",
            "user": {
                "login": "developer123"
            }
        }
    }
    
    processor.process_github_webhook(github_payload)
    
    # Simulate Stripe webhook
    stripe_payload = {
        "id": "evt_1234567890",
        "type": "payment_intent.succeeded",
        "data": {
            "object": {
                "id": "pi_1234567890",
                "amount": 2000,  # $20.00
                "currency": "usd",
                "status": "succeeded"
            }
        }
    }
    
    processor.process_stripe_webhook(stripe_payload)
    
    # Simulate custom webhook
    custom_payload = {
        "event": "user_signup",
        "user_id": "user_12345",
        "email": "[email protected]",
        "timestamp": datetime.now().isoformat(),
        "plan": "premium",
        "source": "web"
    }
    
    processor.process_custom_webhook(custom_payload, 'user_events')
    
    print(f"\n=== WEBHOOK SUMMARY ===")
    print(f"Processed {len(processor.processed_webhooks)} webhooks")
    
    for webhook in processor.processed_webhooks:
        print(f"- {webhook['source']}: {webhook['event_type']} at {webhook['timestamp']}")

# Run webhook demo
demo_webhook_processing()

API Data Pipeline

Build a complete data pipeline using APIs:
import requests
import pandas as pd
import time
from datetime import datetime, timedelta
import json

class APIDataPipeline:
    """Complete API data pipeline"""
    
    def __init__(self):
        self.session = requests.Session()
        self.data_cache = {}
        
    def extract_user_data(self):
        """Extract user data from API"""
        print("=== EXTRACTING USER DATA ===")
        
        try:
            response = self.session.get('https://jsonplaceholder.typicode.com/users')
            response.raise_for_status()
            
            users = response.json()
            users_df = pd.DataFrame(users)
            
            # Extract nested data
            companies = pd.json_normalize(users, 'company', ['id'], sep='_', errors='ignore')
            addresses = pd.json_normalize(users, 'address', ['id'], sep='_', errors='ignore')
            
            print(f"Extracted {len(users_df)} users")
            print(f"Companies: {len(companies)} records")
            print(f"Addresses: {len(addresses)} records")
            
            self.data_cache['users'] = users_df
            self.data_cache['companies'] = companies
            self.data_cache['addresses'] = addresses
            
            return users_df
            
        except Exception as e:
            print(f"Error extracting user data: {e}")
            return None
    
    def extract_post_data(self):
        """Extract post data from API"""
        print("\n=== EXTRACTING POST DATA ===")
        
        try:
            response = self.session.get('https://jsonplaceholder.typicode.com/posts')
            response.raise_for_status()
            
            posts = response.json()
            posts_df = pd.DataFrame(posts)
            
            # Get comments for each post
            comments_data = []
            print("Fetching comments...")
            
            for post_id in posts_df['id'].head(10):  # Limit to first 10 for demo
                comment_response = self.session.get(f'https://jsonplaceholder.typicode.com/posts/{post_id}/comments')
                if comment_response.status_code == 200:
                    comments = comment_response.json()
                    for comment in comments:
                        comment['postId'] = post_id
                        comments_data.append(comment)
                
                time.sleep(0.1)  # Rate limiting
            
            comments_df = pd.DataFrame(comments_data)
            
            print(f"Extracted {len(posts_df)} posts")
            print(f"Extracted {len(comments_df)} comments")
            
            self.data_cache['posts'] = posts_df
            self.data_cache['comments'] = comments_df
            
            return posts_df, comments_df
            
        except Exception as e:
            print(f"Error extracting post data: {e}")
            return None, None
    
    def transform_data(self):
        """Transform and clean the extracted data"""
        print("\n=== TRANSFORMING DATA ===")
        
        if 'users' not in self.data_cache or 'posts' not in self.data_cache:
            print("No data to transform")
            return
        
        users_df = self.data_cache['users'].copy()
        posts_df = self.data_cache['posts'].copy()
        comments_df = self.data_cache.get('comments', pd.DataFrame())
        
        # Transform users
        users_df['full_name'] = users_df['name']
        users_df['domain'] = users_df['email'].str.split('@').str[1]
        users_df['phone_clean'] = users_df['phone'].str.replace(r'[^\d]', '', regex=True)
        
        # Transform posts
        posts_df['title_length'] = posts_df['title'].str.len()
        posts_df['body_length'] = posts_df['body'].str.len()
        posts_df['word_count'] = posts_df['body'].str.split().str.len()
        
        # Aggregate comments per post
        if not comments_df.empty:
            comment_stats = comments_df.groupby('postId').agg({
                'id': 'count',
                'body': lambda x: sum(len(body.split()) for body in x)
            }).rename(columns={'id': 'comment_count', 'body': 'comment_words'})
            
            posts_df = posts_df.merge(comment_stats, left_on='id', right_index=True, how='left')
            posts_df['comment_count'] = posts_df['comment_count'].fillna(0)
            posts_df['comment_words'] = posts_df['comment_words'].fillna(0)
        
        # Join users and posts
        user_posts = posts_df.merge(users_df[['id', 'name', 'email', 'domain']], 
                                   left_on='userId', right_on='id', suffixes=('_post', '_user'))
        
        print(f"Transformed data: {len(user_posts)} user-post combinations")
        
        # Calculate metrics
        user_metrics = user_posts.groupby(['userId', 'name']).agg({
            'id_post': 'count',
            'word_count': 'sum',
            'comment_count': 'sum',
            'title_length': 'mean',
            'body_length': 'mean'
        }).round(2)
        
        user_metrics.columns = ['total_posts', 'total_words', 'total_comments', 'avg_title_length', 'avg_body_length']
        
        print("\nUser metrics calculated:")
        print(user_metrics.head())
        
        self.data_cache['user_posts'] = user_posts
        self.data_cache['user_metrics'] = user_metrics
        
        return user_posts, user_metrics
    
    def load_data(self):
        """Load transformed data to storage"""
        print("\n=== LOADING DATA ===")
        
        # Save all datasets
        for name, data in self.data_cache.items():
            if isinstance(data, pd.DataFrame):
                filename = f'/lyceum/storage/{name}.csv'
                data.to_csv(filename, index=False)
                print(f"Saved {name}: {len(data)} rows")
        
        # Create summary report
        summary = {
            'pipeline_run': datetime.now().isoformat(),
            'datasets': {
                name: {
                    'rows': len(data),
                    'columns': len(data.columns) if hasattr(data, 'columns') else 0
                }
                for name, data in self.data_cache.items() 
                if isinstance(data, pd.DataFrame)
            },
            'top_contributors': self.data_cache.get('user_metrics', pd.DataFrame()).head(5).to_dict() if 'user_metrics' in self.data_cache else {}
        }
        
        with open('/lyceum/storage/pipeline_summary.json', 'w') as f:
            json.dump(summary, f, indent=2, default=str)
        
        print(f"\nPipeline completed successfully!")
        print(f"Total datasets: {len(summary['datasets'])}")
        
        return summary
    
    def run_pipeline(self):
        """Run the complete ETL pipeline"""
        print("=== STARTING API DATA PIPELINE ===\n")
        
        start_time = time.time()
        
        # Extract
        users_df = self.extract_user_data()
        posts_df, comments_df = self.extract_post_data()
        
        if users_df is None or posts_df is None:
            print("Pipeline failed during extraction")
            return
        
        # Transform
        user_posts, user_metrics = self.transform_data()
        
        # Load
        summary = self.load_data()
        
        execution_time = time.time() - start_time
        print(f"\nPipeline completed in {execution_time:.2f} seconds")
        
        return summary

# Run the complete pipeline
pipeline = APIDataPipeline()
result = pipeline.run_pipeline()
Use environment variables to store API keys and secrets. Access them with os.environ.get('API_KEY') to keep credentials secure.
Always implement proper error handling and rate limiting when working with external APIs. Respect API quotas and terms of service.
API responses and processed data are automatically saved to your storage. Use these patterns as starting points for your own API integrations.