Skip to main content
Process large datasets, perform ETL operations, and analyze data at scale using Lyceum’s cloud infrastructure.

CSV Data Analysis

Process and analyze CSV files with pandas:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Load data from storage
try:
    df = pd.read_csv('/lyceum/storage/sales_data.csv')
    print(f"Loaded dataset with {len(df)} rows and {len(df.columns)} columns")
except FileNotFoundError:
    # Create sample sales data if file doesn't exist
    print("Creating sample sales data...")
    np.random.seed(42)
    
    dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
    products = ['Product_A', 'Product_B', 'Product_C', 'Product_D', 'Product_E']
    regions = ['North', 'South', 'East', 'West']
    
    data = []
    for date in dates:
        for _ in range(np.random.randint(5, 20)):  # 5-20 sales per day
            data.append({
                'date': date,
                'product': np.random.choice(products),
                'region': np.random.choice(regions),
                'sales_amount': np.random.lognormal(4, 1),  # Log-normal distribution
                'quantity': np.random.randint(1, 10),
                'customer_type': np.random.choice(['B2B', 'B2C'], p=[0.3, 0.7])
            })
    
    df = pd.DataFrame(data)
    df['date'] = pd.to_datetime(df['date'])
    df.to_csv('/lyceum/storage/sales_data.csv', index=False)
    print(f"Generated {len(df)} sales records")

# Data exploration
print("\n=== DATA OVERVIEW ===")
print(df.info())
print("\nFirst 5 rows:")
print(df.head())

print("\n=== SUMMARY STATISTICS ===")
print(df.describe())

# Data quality checks
print("\n=== DATA QUALITY ===")
print("Missing values:")
print(df.isnull().sum())

print("\nDuplicate rows:", df.duplicated().sum())

# Business analytics
print("\n=== BUSINESS INSIGHTS ===")

# Convert date column
df['date'] = pd.to_datetime(df['date'])
df['month'] = df['date'].dt.to_period('M')
df['quarter'] = df['date'].dt.to_period('Q')

# Top products by revenue
product_revenue = df.groupby('product')['sales_amount'].sum().sort_values(ascending=False)
print("Top products by revenue:")
print(product_revenue)

# Regional performance
region_stats = df.groupby('region').agg({
    'sales_amount': ['sum', 'mean', 'count'],
    'quantity': 'sum'
}).round(2)
print("\nRegional performance:")
print(region_stats)

# Monthly trends
monthly_sales = df.groupby('month')['sales_amount'].sum()
print("\nMonthly sales trend:")
print(monthly_sales.tail(10))

# Customer type analysis
customer_analysis = df.groupby('customer_type').agg({
    'sales_amount': ['sum', 'mean'],
    'quantity': 'sum'
}).round(2)
print("\nCustomer type analysis:")
print(customer_analysis)

# Create visualizations
fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# 1. Monthly sales trend
monthly_sales.plot(kind='line', ax=axes[0, 0])
axes[0, 0].set_title('Monthly Sales Trend')
axes[0, 0].set_ylabel('Sales Amount')

# 2. Product revenue
product_revenue.plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Revenue by Product')
axes[0, 1].set_ylabel('Total Revenue')
axes[0, 1].tick_params(axis='x', rotation=45)

# 3. Regional distribution
region_revenue = df.groupby('region')['sales_amount'].sum()
axes[1, 0].pie(region_revenue.values, labels=region_revenue.index, autopct='%1.1f%%')
axes[1, 0].set_title('Revenue Distribution by Region')

# 4. Sales amount distribution
axes[1, 1].hist(df['sales_amount'], bins=50, alpha=0.7)
axes[1, 1].set_title('Sales Amount Distribution')
axes[1, 1].set_xlabel('Sales Amount')
axes[1, 1].set_ylabel('Frequency')

plt.tight_layout()
plt.savefig('/lyceum/storage/sales_analysis.png', dpi=300, bbox_inches='tight')

# Generate summary report
report = {
    'total_revenue': df['sales_amount'].sum(),
    'total_transactions': len(df),
    'avg_transaction_value': df['sales_amount'].mean(),
    'date_range': {
        'start': df['date'].min().strftime('%Y-%m-%d'),
        'end': df['date'].max().strftime('%Y-%m-%d')
    },
    'top_product': product_revenue.index[0],
    'top_region': region_revenue.idxmax(),
    'b2b_percentage': (df['customer_type'] == 'B2B').mean() * 100
}

import json
with open('/lyceum/storage/sales_report.json', 'w') as f:
    json.dump(report, f, indent=2, default=str)

print(f"\n=== FINAL REPORT ===")
print(f"Total Revenue: ${report['total_revenue']:,.2f}")
print(f"Total Transactions: {report['total_transactions']:,}")
print(f"Average Transaction: ${report['avg_transaction_value']:.2f}")
print(f"Date Range: {report['date_range']['start']} to {report['date_range']['end']}")
print(f"Top Product: {report['top_product']}")
print(f"Top Region: {report['top_region']}")
print(f"B2B Percentage: {report['b2b_percentage']:.1f}%")

print("\nFiles saved to storage:")
print("- sales_data.csv (processed data)")
print("- sales_analysis.png (visualizations)")
print("- sales_report.json (summary report)")

JSON Data Processing

Process JSON data from APIs or files:
import json
import pandas as pd
import requests
from datetime import datetime, timedelta

def process_api_data():
    """Process JSON data from APIs"""
    
    # Simulate API data (replace with actual API calls)
    api_data = {
        "users": [
            {"id": i, "name": f"User_{i}", "email": f"user{i}@example.com", 
             "age": 20 + (i % 50), "city": ["NYC", "LA", "Chicago", "Houston"][i % 4],
             "signup_date": (datetime.now() - timedelta(days=i*30)).isoformat()}
            for i in range(1, 101)
        ],
        "transactions": [
            {"user_id": (i % 100) + 1, "amount": round(10 + (i * 1.5), 2), 
             "transaction_date": (datetime.now() - timedelta(days=i)).isoformat(),
             "category": ["food", "transport", "entertainment", "shopping"][i % 4]}
            for i in range(500)
        ]
    }
    
    print("Processing JSON API data...")
    
    # Convert to DataFrames
    users_df = pd.DataFrame(api_data['users'])
    transactions_df = pd.DataFrame(api_data['transactions'])
    
    # Data type conversions
    users_df['signup_date'] = pd.to_datetime(users_df['signup_date'])
    transactions_df['transaction_date'] = pd.to_datetime(transactions_df['transaction_date'])
    
    print(f"Loaded {len(users_df)} users and {len(transactions_df)} transactions")
    
    # Join data
    merged_df = transactions_df.merge(users_df, left_on='user_id', right_on='id', how='left')
    
    # Analytics
    print("\n=== USER ANALYTICS ===")
    city_stats = users_df['city'].value_counts()
    print("Users by city:")
    print(city_stats)
    
    age_groups = pd.cut(users_df['age'], bins=[0, 25, 35, 45, 100], 
                       labels=['18-25', '26-35', '36-45', '46+'])
    print("\nUsers by age group:")
    print(age_groups.value_counts())
    
    print("\n=== TRANSACTION ANALYTICS ===")
    category_spending = merged_df.groupby('category')['amount'].sum().sort_values(ascending=False)
    print("Spending by category:")
    print(category_spending)
    
    user_spending = merged_df.groupby(['user_id', 'name'])['amount'].sum().sort_values(ascending=False)
    print("\nTop 10 spenders:")
    print(user_spending.head(10))
    
    city_spending = merged_df.groupby('city')['amount'].mean()
    print("\nAverage spending by city:")
    print(city_spending)
    
    # Save processed data
    users_df.to_csv('/lyceum/storage/users.csv', index=False)
    transactions_df.to_csv('/lyceum/storage/transactions.csv', index=False)
    merged_df.to_csv('/lyceum/storage/user_transactions.csv', index=False)
    
    # Save analytics
    analytics = {
        'users_by_city': city_stats.to_dict(),
        'spending_by_category': category_spending.to_dict(),
        'avg_spending_by_city': city_spending.to_dict(),
        'total_users': len(users_df),
        'total_transactions': len(transactions_df),
        'total_revenue': merged_df['amount'].sum(),
        'processed_at': datetime.now().isoformat()
    }
    
    with open('/lyceum/storage/analytics.json', 'w') as f:
        json.dump(analytics, f, indent=2, default=str)
    
    print(f"\nProcessing complete! Total revenue: ${analytics['total_revenue']:,.2f}")
    return merged_df

# Process the data
result_df = process_api_data()

# Advanced analytics
print("\n=== ADVANCED ANALYTICS ===")

# Monthly transaction trends
result_df['month'] = result_df['transaction_date'].dt.to_period('M')
monthly_trends = result_df.groupby('month').agg({
    'amount': ['sum', 'count', 'mean']
}).round(2)

print("Monthly trends:")
print(monthly_trends)

# User cohorts (by signup month)
result_df['signup_month'] = result_df['signup_date'].dt.to_period('M')
cohort_analysis = result_df.groupby(['signup_month', 'month']).size().reset_index(name='transactions')

print("\nCohort analysis (sample):")
print(cohort_analysis.head(10))

# RFM Analysis (Recency, Frequency, Monetary)
current_date = result_df['transaction_date'].max()
rfm = result_df.groupby('user_id').agg({
    'transaction_date': lambda x: (current_date - x.max()).days,  # Recency
    'amount': ['count', 'sum']  # Frequency, Monetary
}).round(2)

rfm.columns = ['recency', 'frequency', 'monetary']
rfm['rfm_score'] = pd.qcut(rfm['recency'], 5, labels=[5,4,3,2,1]) + \
                   pd.qcut(rfm['frequency'].rank(method='first'), 5, labels=[1,2,3,4,5]) + \
                   pd.qcut(rfm['monetary'], 5, labels=[1,2,3,4,5])

print("\nRFM Analysis (top 10 customers):")
print(rfm.sort_values('monetary', ascending=False).head(10))

rfm.to_csv('/lyceum/storage/rfm_analysis.csv')

File Processing Pipeline

Process multiple files in batch:
import os
import pandas as pd
import glob
from pathlib import Path

def batch_file_processor():
    """Process multiple files in batch"""
    
    storage_path = '/lyceum/storage/'
    
    # Create sample files if none exist
    sample_files = ['data_2023_q1.csv', 'data_2023_q2.csv', 'data_2023_q3.csv', 'data_2023_q4.csv']
    
    for i, filename in enumerate(sample_files):
        filepath = os.path.join(storage_path, filename)
        if not os.path.exists(filepath):
            # Create sample quarterly data
            dates = pd.date_range(f'2023-{i*3+1:02d}-01', periods=90, freq='D')
            sample_data = pd.DataFrame({
                'date': dates,
                'revenue': np.random.lognormal(4, 0.5, len(dates)),
                'customers': np.random.poisson(50, len(dates)),
                'region': np.random.choice(['North', 'South', 'East', 'West'], len(dates))
            })
            sample_data.to_csv(filepath, index=False)
            print(f"Created sample file: {filename}")
    
    # Find all CSV files
    csv_files = glob.glob(os.path.join(storage_path, '*.csv'))
    data_files = [f for f in csv_files if 'data_2023' in f]
    
    print(f"Found {len(data_files)} data files to process")
    
    all_data = []
    file_stats = []
    
    for file_path in data_files:
        filename = os.path.basename(file_path)
        print(f"Processing {filename}...")
        
        try:
            df = pd.read_csv(file_path)
            df['source_file'] = filename
            df['date'] = pd.to_datetime(df['date'])
            
            # Calculate file statistics
            file_stat = {
                'filename': filename,
                'rows': len(df),
                'columns': len(df.columns),
                'date_range': f"{df['date'].min().date()} to {df['date'].max().date()}",
                'total_revenue': df['revenue'].sum() if 'revenue' in df.columns else 0,
                'avg_customers': df['customers'].mean() if 'customers' in df.columns else 0
            }
            file_stats.append(file_stat)
            
            all_data.append(df)
            print(f"  - {len(df)} rows, revenue: ${file_stat['total_revenue']:,.2f}")
            
        except Exception as e:
            print(f"  - Error processing {filename}: {e}")
    
    if not all_data:
        print("No data files found or processed successfully")
        return
    
    # Combine all data
    combined_df = pd.concat(all_data, ignore_index=True)
    print(f"\nCombined dataset: {len(combined_df)} rows")
    
    # Aggregate analysis
    print("\n=== COMBINED ANALYSIS ===")
    
    # Monthly aggregation
    combined_df['month'] = combined_df['date'].dt.to_period('M')
    monthly_agg = combined_df.groupby('month').agg({
        'revenue': 'sum',
        'customers': 'sum'
    }).round(2)
    
    print("Monthly aggregated data:")
    print(monthly_agg)
    
    # Regional analysis
    regional_agg = combined_df.groupby('region').agg({
        'revenue': ['sum', 'mean'],
        'customers': ['sum', 'mean']
    }).round(2)
    
    print("\nRegional analysis:")
    print(regional_agg)
    
    # Quarterly comparison
    combined_df['quarter'] = combined_df['date'].dt.to_period('Q')
    quarterly_agg = combined_df.groupby('quarter').agg({
        'revenue': 'sum',
        'customers': 'sum'
    }).round(2)
    
    print("\nQuarterly comparison:")
    print(quarterly_agg)
    
    # Save results
    combined_df.to_csv(f'{storage_path}combined_data.csv', index=False)
    monthly_agg.to_csv(f'{storage_path}monthly_summary.csv')
    quarterly_agg.to_csv(f'{storage_path}quarterly_summary.csv')
    
    # File processing report
    report = {
        'processing_summary': {
            'files_processed': len(data_files),
            'total_rows': len(combined_df),
            'date_range': f"{combined_df['date'].min().date()} to {combined_df['date'].max().date()}",
            'total_revenue': combined_df['revenue'].sum(),
            'total_customers': combined_df['customers'].sum()
        },
        'file_details': file_stats,
        'monthly_totals': monthly_agg.to_dict(),
        'quarterly_totals': quarterly_agg.to_dict()
    }
    
    with open(f'{storage_path}processing_report.json', 'w') as f:
        json.dump(report, f, indent=2, default=str)
    
    print(f"\n=== PROCESSING COMPLETE ===")
    print(f"Total Revenue: ${report['processing_summary']['total_revenue']:,.2f}")
    print(f"Total Customers: {report['processing_summary']['total_customers']:,}")
    print("\nGenerated files:")
    print("- combined_data.csv")
    print("- monthly_summary.csv") 
    print("- quarterly_summary.csv")
    print("- processing_report.json")

# Run the batch processor
batch_file_processor()

Web Scraping and Data Collection

Collect data from web sources:
import requests
import pandas as pd
from bs4 import BeautifulSoup
import time
import json

def scrape_sample_data():
    """Scrape sample data from web sources"""
    
    print("=== WEB DATA COLLECTION ===")
    
    # Example 1: API data collection
    print("\n1. Collecting data from JSONPlaceholder API...")
    
    try:
        # Fetch posts
        posts_response = requests.get('https://jsonplaceholder.typicode.com/posts')
        posts_data = posts_response.json()
        
        # Fetch users
        users_response = requests.get('https://jsonplaceholder.typicode.com/users')
        users_data = users_response.json()
        
        # Convert to DataFrames
        posts_df = pd.DataFrame(posts_data)
        users_df = pd.DataFrame(users_data)
        
        print(f"Collected {len(posts_df)} posts and {len(users_df)} users")
        
        # Join data
        merged_df = posts_df.merge(users_df, left_on='userId', right_on='id', suffixes=('_post', '_user'))
        
        # Analysis
        user_post_counts = merged_df['name'].value_counts()
        print("\nPosts per user:")
        print(user_post_counts.head())
        
        # Save data
        posts_df.to_csv('/lyceum/storage/posts.csv', index=False)
        users_df.to_csv('/lyceum/storage/users_api.csv', index=False)
        
    except Exception as e:
        print(f"Error collecting API data: {e}")
    
    # Example 2: Public dataset processing
    print("\n2. Processing public dataset...")
    
    try:
        # This is a sample - replace with actual public data sources
        # For demo, we'll simulate financial data
        
        import numpy as np
        from datetime import datetime, timedelta
        
        # Generate sample stock-like data
        dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
        np.random.seed(42)
        
        # Simulate multiple stocks
        stocks = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']
        stock_data = []
        
        for stock in stocks:
            base_price = np.random.uniform(100, 500)
            prices = []
            
            for i, date in enumerate(dates):
                if i == 0:
                    price = base_price
                else:
                    # Random walk with slight upward trend
                    change = np.random.normal(0.001, 0.02)  # 0.1% drift, 2% volatility
                    price = prices[-1] * (1 + change)
                
                prices.append(price)
                
                stock_data.append({
                    'date': date,
                    'symbol': stock,
                    'price': round(price, 2),
                    'volume': np.random.randint(1000000, 10000000)
                })
        
        stock_df = pd.DataFrame(stock_data)
        
        print(f"Generated {len(stock_df)} stock price records")
        
        # Technical analysis
        for stock in stocks:
            stock_subset = stock_df[stock_df['symbol'] == stock].copy()
            stock_subset = stock_subset.sort_values('date')
            
            # Moving averages
            stock_subset['ma_20'] = stock_subset['price'].rolling(window=20).mean()
            stock_subset['ma_50'] = stock_subset['price'].rolling(window=50).mean()
            
            # Price changes
            stock_subset['price_change'] = stock_subset['price'].pct_change()
            stock_subset['cumulative_return'] = (1 + stock_subset['price_change']).cumprod() - 1
            
            # Update main dataframe
            stock_df.loc[stock_df['symbol'] == stock, 'ma_20'] = stock_subset['ma_20'].values
            stock_df.loc[stock_df['symbol'] == stock, 'ma_50'] = stock_subset['ma_50'].values
            stock_df.loc[stock_df['symbol'] == stock, 'price_change'] = stock_subset['price_change'].values
            stock_df.loc[stock_df['symbol'] == stock, 'cumulative_return'] = stock_subset['cumulative_return'].values
        
        # Analysis
        final_returns = stock_df.groupby('symbol')['cumulative_return'].last().sort_values(ascending=False)
        print("\nStock performance (1-year returns):")
        print(final_returns)
        
        volatility = stock_df.groupby('symbol')['price_change'].std().sort_values(ascending=False)
        print("\nStock volatility:")
        print(volatility)
        
        # Save data
        stock_df.to_csv('/lyceum/storage/stock_data.csv', index=False)
        
        # Create summary
        summary = {
            'collection_date': datetime.now().isoformat(),
            'data_sources': [
                'JSONPlaceholder API (posts and users)',
                'Simulated stock data'
            ],
            'records_collected': {
                'posts': len(posts_df) if 'posts_df' in locals() else 0,
                'users': len(users_df) if 'users_df' in locals() else 0,
                'stock_prices': len(stock_df)
            },
            'stock_analysis': {
                'best_performer': final_returns.index[0],
                'best_return': final_returns.iloc[0],
                'most_volatile': volatility.index[0],
                'highest_volatility': volatility.iloc[0]
            }
        }
        
        with open('/lyceum/storage/collection_summary.json', 'w') as f:
            json.dump(summary, f, indent=2, default=str)
        
        print(f"\n=== COLLECTION COMPLETE ===")
        print(f"Best performer: {summary['stock_analysis']['best_performer']} ({summary['stock_analysis']['best_return']:.2%})")
        print(f"Most volatile: {summary['stock_analysis']['most_volatile']}")
        
        print("\nFiles saved:")
        print("- posts.csv")
        print("- users_api.csv") 
        print("- stock_data.csv")
        print("- collection_summary.json")
        
    except Exception as e:
        print(f"Error in data processing: {e}")

# Run data collection
scrape_sample_data()

print("\n=== DATA QUALITY REPORT ===")

# Check all generated files
storage_path = '/lyceum/storage/'
files = [f for f in os.listdir(storage_path) if f.endswith('.csv')]

for file in files:
    try:
        df = pd.read_csv(os.path.join(storage_path, file))
        print(f"\n{file}:")
        print(f"  Rows: {len(df):,}")
        print(f"  Columns: {len(df.columns)}")
        print(f"  Missing values: {df.isnull().sum().sum()}")
        print(f"  Memory usage: {df.memory_usage(deep=True).sum() / 1024:.1f} KB")
    except Exception as e:
        print(f"  Error reading {file}: {e}")
Large datasets can be processed in chunks using pd.read_csv(chunksize=10000) to manage memory usage efficiently.
When scraping websites, respect robots.txt and implement rate limiting to avoid overwhelming servers. Always check the website’s terms of service.
All processed data is automatically saved to your storage and remains available after execution. Download the results or use them in subsequent processing steps.