Process large datasets, perform ETL operations, and analyze data at scale using Lyceum’s cloud infrastructure.
CSV Data Analysis
Process and analyze CSV files with pandas:Copy
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# Load data from storage
try:
df = pd.read_csv('/lyceum/storage/sales_data.csv')
print(f"Loaded dataset with {len(df)} rows and {len(df.columns)} columns")
except FileNotFoundError:
# Create sample sales data if file doesn't exist
print("Creating sample sales data...")
np.random.seed(42)
dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
products = ['Product_A', 'Product_B', 'Product_C', 'Product_D', 'Product_E']
regions = ['North', 'South', 'East', 'West']
data = []
for date in dates:
for _ in range(np.random.randint(5, 20)): # 5-20 sales per day
data.append({
'date': date,
'product': np.random.choice(products),
'region': np.random.choice(regions),
'sales_amount': np.random.lognormal(4, 1), # Log-normal distribution
'quantity': np.random.randint(1, 10),
'customer_type': np.random.choice(['B2B', 'B2C'], p=[0.3, 0.7])
})
df = pd.DataFrame(data)
df['date'] = pd.to_datetime(df['date'])
df.to_csv('/lyceum/storage/sales_data.csv', index=False)
print(f"Generated {len(df)} sales records")
# Data exploration
print("\n=== DATA OVERVIEW ===")
print(df.info())
print("\nFirst 5 rows:")
print(df.head())
print("\n=== SUMMARY STATISTICS ===")
print(df.describe())
# Data quality checks
print("\n=== DATA QUALITY ===")
print("Missing values:")
print(df.isnull().sum())
print("\nDuplicate rows:", df.duplicated().sum())
# Business analytics
print("\n=== BUSINESS INSIGHTS ===")
# Convert date column
df['date'] = pd.to_datetime(df['date'])
df['month'] = df['date'].dt.to_period('M')
df['quarter'] = df['date'].dt.to_period('Q')
# Top products by revenue
product_revenue = df.groupby('product')['sales_amount'].sum().sort_values(ascending=False)
print("Top products by revenue:")
print(product_revenue)
# Regional performance
region_stats = df.groupby('region').agg({
'sales_amount': ['sum', 'mean', 'count'],
'quantity': 'sum'
}).round(2)
print("\nRegional performance:")
print(region_stats)
# Monthly trends
monthly_sales = df.groupby('month')['sales_amount'].sum()
print("\nMonthly sales trend:")
print(monthly_sales.tail(10))
# Customer type analysis
customer_analysis = df.groupby('customer_type').agg({
'sales_amount': ['sum', 'mean'],
'quantity': 'sum'
}).round(2)
print("\nCustomer type analysis:")
print(customer_analysis)
# Create visualizations
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 1. Monthly sales trend
monthly_sales.plot(kind='line', ax=axes[0, 0])
axes[0, 0].set_title('Monthly Sales Trend')
axes[0, 0].set_ylabel('Sales Amount')
# 2. Product revenue
product_revenue.plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Revenue by Product')
axes[0, 1].set_ylabel('Total Revenue')
axes[0, 1].tick_params(axis='x', rotation=45)
# 3. Regional distribution
region_revenue = df.groupby('region')['sales_amount'].sum()
axes[1, 0].pie(region_revenue.values, labels=region_revenue.index, autopct='%1.1f%%')
axes[1, 0].set_title('Revenue Distribution by Region')
# 4. Sales amount distribution
axes[1, 1].hist(df['sales_amount'], bins=50, alpha=0.7)
axes[1, 1].set_title('Sales Amount Distribution')
axes[1, 1].set_xlabel('Sales Amount')
axes[1, 1].set_ylabel('Frequency')
plt.tight_layout()
plt.savefig('/lyceum/storage/sales_analysis.png', dpi=300, bbox_inches='tight')
# Generate summary report
report = {
'total_revenue': df['sales_amount'].sum(),
'total_transactions': len(df),
'avg_transaction_value': df['sales_amount'].mean(),
'date_range': {
'start': df['date'].min().strftime('%Y-%m-%d'),
'end': df['date'].max().strftime('%Y-%m-%d')
},
'top_product': product_revenue.index[0],
'top_region': region_revenue.idxmax(),
'b2b_percentage': (df['customer_type'] == 'B2B').mean() * 100
}
import json
with open('/lyceum/storage/sales_report.json', 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"\n=== FINAL REPORT ===")
print(f"Total Revenue: ${report['total_revenue']:,.2f}")
print(f"Total Transactions: {report['total_transactions']:,}")
print(f"Average Transaction: ${report['avg_transaction_value']:.2f}")
print(f"Date Range: {report['date_range']['start']} to {report['date_range']['end']}")
print(f"Top Product: {report['top_product']}")
print(f"Top Region: {report['top_region']}")
print(f"B2B Percentage: {report['b2b_percentage']:.1f}%")
print("\nFiles saved to storage:")
print("- sales_data.csv (processed data)")
print("- sales_analysis.png (visualizations)")
print("- sales_report.json (summary report)")
JSON Data Processing
Process JSON data from APIs or files:Copy
import json
import pandas as pd
import requests
from datetime import datetime, timedelta
def process_api_data():
"""Process JSON data from APIs"""
# Simulate API data (replace with actual API calls)
api_data = {
"users": [
{"id": i, "name": f"User_{i}", "email": f"user{i}@example.com",
"age": 20 + (i % 50), "city": ["NYC", "LA", "Chicago", "Houston"][i % 4],
"signup_date": (datetime.now() - timedelta(days=i*30)).isoformat()}
for i in range(1, 101)
],
"transactions": [
{"user_id": (i % 100) + 1, "amount": round(10 + (i * 1.5), 2),
"transaction_date": (datetime.now() - timedelta(days=i)).isoformat(),
"category": ["food", "transport", "entertainment", "shopping"][i % 4]}
for i in range(500)
]
}
print("Processing JSON API data...")
# Convert to DataFrames
users_df = pd.DataFrame(api_data['users'])
transactions_df = pd.DataFrame(api_data['transactions'])
# Data type conversions
users_df['signup_date'] = pd.to_datetime(users_df['signup_date'])
transactions_df['transaction_date'] = pd.to_datetime(transactions_df['transaction_date'])
print(f"Loaded {len(users_df)} users and {len(transactions_df)} transactions")
# Join data
merged_df = transactions_df.merge(users_df, left_on='user_id', right_on='id', how='left')
# Analytics
print("\n=== USER ANALYTICS ===")
city_stats = users_df['city'].value_counts()
print("Users by city:")
print(city_stats)
age_groups = pd.cut(users_df['age'], bins=[0, 25, 35, 45, 100],
labels=['18-25', '26-35', '36-45', '46+'])
print("\nUsers by age group:")
print(age_groups.value_counts())
print("\n=== TRANSACTION ANALYTICS ===")
category_spending = merged_df.groupby('category')['amount'].sum().sort_values(ascending=False)
print("Spending by category:")
print(category_spending)
user_spending = merged_df.groupby(['user_id', 'name'])['amount'].sum().sort_values(ascending=False)
print("\nTop 10 spenders:")
print(user_spending.head(10))
city_spending = merged_df.groupby('city')['amount'].mean()
print("\nAverage spending by city:")
print(city_spending)
# Save processed data
users_df.to_csv('/lyceum/storage/users.csv', index=False)
transactions_df.to_csv('/lyceum/storage/transactions.csv', index=False)
merged_df.to_csv('/lyceum/storage/user_transactions.csv', index=False)
# Save analytics
analytics = {
'users_by_city': city_stats.to_dict(),
'spending_by_category': category_spending.to_dict(),
'avg_spending_by_city': city_spending.to_dict(),
'total_users': len(users_df),
'total_transactions': len(transactions_df),
'total_revenue': merged_df['amount'].sum(),
'processed_at': datetime.now().isoformat()
}
with open('/lyceum/storage/analytics.json', 'w') as f:
json.dump(analytics, f, indent=2, default=str)
print(f"\nProcessing complete! Total revenue: ${analytics['total_revenue']:,.2f}")
return merged_df
# Process the data
result_df = process_api_data()
# Advanced analytics
print("\n=== ADVANCED ANALYTICS ===")
# Monthly transaction trends
result_df['month'] = result_df['transaction_date'].dt.to_period('M')
monthly_trends = result_df.groupby('month').agg({
'amount': ['sum', 'count', 'mean']
}).round(2)
print("Monthly trends:")
print(monthly_trends)
# User cohorts (by signup month)
result_df['signup_month'] = result_df['signup_date'].dt.to_period('M')
cohort_analysis = result_df.groupby(['signup_month', 'month']).size().reset_index(name='transactions')
print("\nCohort analysis (sample):")
print(cohort_analysis.head(10))
# RFM Analysis (Recency, Frequency, Monetary)
current_date = result_df['transaction_date'].max()
rfm = result_df.groupby('user_id').agg({
'transaction_date': lambda x: (current_date - x.max()).days, # Recency
'amount': ['count', 'sum'] # Frequency, Monetary
}).round(2)
rfm.columns = ['recency', 'frequency', 'monetary']
rfm['rfm_score'] = pd.qcut(rfm['recency'], 5, labels=[5,4,3,2,1]) + \
pd.qcut(rfm['frequency'].rank(method='first'), 5, labels=[1,2,3,4,5]) + \
pd.qcut(rfm['monetary'], 5, labels=[1,2,3,4,5])
print("\nRFM Analysis (top 10 customers):")
print(rfm.sort_values('monetary', ascending=False).head(10))
rfm.to_csv('/lyceum/storage/rfm_analysis.csv')
File Processing Pipeline
Process multiple files in batch:Copy
import os
import pandas as pd
import glob
from pathlib import Path
def batch_file_processor():
"""Process multiple files in batch"""
storage_path = '/lyceum/storage/'
# Create sample files if none exist
sample_files = ['data_2023_q1.csv', 'data_2023_q2.csv', 'data_2023_q3.csv', 'data_2023_q4.csv']
for i, filename in enumerate(sample_files):
filepath = os.path.join(storage_path, filename)
if not os.path.exists(filepath):
# Create sample quarterly data
dates = pd.date_range(f'2023-{i*3+1:02d}-01', periods=90, freq='D')
sample_data = pd.DataFrame({
'date': dates,
'revenue': np.random.lognormal(4, 0.5, len(dates)),
'customers': np.random.poisson(50, len(dates)),
'region': np.random.choice(['North', 'South', 'East', 'West'], len(dates))
})
sample_data.to_csv(filepath, index=False)
print(f"Created sample file: {filename}")
# Find all CSV files
csv_files = glob.glob(os.path.join(storage_path, '*.csv'))
data_files = [f for f in csv_files if 'data_2023' in f]
print(f"Found {len(data_files)} data files to process")
all_data = []
file_stats = []
for file_path in data_files:
filename = os.path.basename(file_path)
print(f"Processing {filename}...")
try:
df = pd.read_csv(file_path)
df['source_file'] = filename
df['date'] = pd.to_datetime(df['date'])
# Calculate file statistics
file_stat = {
'filename': filename,
'rows': len(df),
'columns': len(df.columns),
'date_range': f"{df['date'].min().date()} to {df['date'].max().date()}",
'total_revenue': df['revenue'].sum() if 'revenue' in df.columns else 0,
'avg_customers': df['customers'].mean() if 'customers' in df.columns else 0
}
file_stats.append(file_stat)
all_data.append(df)
print(f" - {len(df)} rows, revenue: ${file_stat['total_revenue']:,.2f}")
except Exception as e:
print(f" - Error processing {filename}: {e}")
if not all_data:
print("No data files found or processed successfully")
return
# Combine all data
combined_df = pd.concat(all_data, ignore_index=True)
print(f"\nCombined dataset: {len(combined_df)} rows")
# Aggregate analysis
print("\n=== COMBINED ANALYSIS ===")
# Monthly aggregation
combined_df['month'] = combined_df['date'].dt.to_period('M')
monthly_agg = combined_df.groupby('month').agg({
'revenue': 'sum',
'customers': 'sum'
}).round(2)
print("Monthly aggregated data:")
print(monthly_agg)
# Regional analysis
regional_agg = combined_df.groupby('region').agg({
'revenue': ['sum', 'mean'],
'customers': ['sum', 'mean']
}).round(2)
print("\nRegional analysis:")
print(regional_agg)
# Quarterly comparison
combined_df['quarter'] = combined_df['date'].dt.to_period('Q')
quarterly_agg = combined_df.groupby('quarter').agg({
'revenue': 'sum',
'customers': 'sum'
}).round(2)
print("\nQuarterly comparison:")
print(quarterly_agg)
# Save results
combined_df.to_csv(f'{storage_path}combined_data.csv', index=False)
monthly_agg.to_csv(f'{storage_path}monthly_summary.csv')
quarterly_agg.to_csv(f'{storage_path}quarterly_summary.csv')
# File processing report
report = {
'processing_summary': {
'files_processed': len(data_files),
'total_rows': len(combined_df),
'date_range': f"{combined_df['date'].min().date()} to {combined_df['date'].max().date()}",
'total_revenue': combined_df['revenue'].sum(),
'total_customers': combined_df['customers'].sum()
},
'file_details': file_stats,
'monthly_totals': monthly_agg.to_dict(),
'quarterly_totals': quarterly_agg.to_dict()
}
with open(f'{storage_path}processing_report.json', 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"\n=== PROCESSING COMPLETE ===")
print(f"Total Revenue: ${report['processing_summary']['total_revenue']:,.2f}")
print(f"Total Customers: {report['processing_summary']['total_customers']:,}")
print("\nGenerated files:")
print("- combined_data.csv")
print("- monthly_summary.csv")
print("- quarterly_summary.csv")
print("- processing_report.json")
# Run the batch processor
batch_file_processor()
Web Scraping and Data Collection
Collect data from web sources:Copy
import requests
import pandas as pd
from bs4 import BeautifulSoup
import time
import json
def scrape_sample_data():
"""Scrape sample data from web sources"""
print("=== WEB DATA COLLECTION ===")
# Example 1: API data collection
print("\n1. Collecting data from JSONPlaceholder API...")
try:
# Fetch posts
posts_response = requests.get('https://jsonplaceholder.typicode.com/posts')
posts_data = posts_response.json()
# Fetch users
users_response = requests.get('https://jsonplaceholder.typicode.com/users')
users_data = users_response.json()
# Convert to DataFrames
posts_df = pd.DataFrame(posts_data)
users_df = pd.DataFrame(users_data)
print(f"Collected {len(posts_df)} posts and {len(users_df)} users")
# Join data
merged_df = posts_df.merge(users_df, left_on='userId', right_on='id', suffixes=('_post', '_user'))
# Analysis
user_post_counts = merged_df['name'].value_counts()
print("\nPosts per user:")
print(user_post_counts.head())
# Save data
posts_df.to_csv('/lyceum/storage/posts.csv', index=False)
users_df.to_csv('/lyceum/storage/users_api.csv', index=False)
except Exception as e:
print(f"Error collecting API data: {e}")
# Example 2: Public dataset processing
print("\n2. Processing public dataset...")
try:
# This is a sample - replace with actual public data sources
# For demo, we'll simulate financial data
import numpy as np
from datetime import datetime, timedelta
# Generate sample stock-like data
dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
np.random.seed(42)
# Simulate multiple stocks
stocks = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']
stock_data = []
for stock in stocks:
base_price = np.random.uniform(100, 500)
prices = []
for i, date in enumerate(dates):
if i == 0:
price = base_price
else:
# Random walk with slight upward trend
change = np.random.normal(0.001, 0.02) # 0.1% drift, 2% volatility
price = prices[-1] * (1 + change)
prices.append(price)
stock_data.append({
'date': date,
'symbol': stock,
'price': round(price, 2),
'volume': np.random.randint(1000000, 10000000)
})
stock_df = pd.DataFrame(stock_data)
print(f"Generated {len(stock_df)} stock price records")
# Technical analysis
for stock in stocks:
stock_subset = stock_df[stock_df['symbol'] == stock].copy()
stock_subset = stock_subset.sort_values('date')
# Moving averages
stock_subset['ma_20'] = stock_subset['price'].rolling(window=20).mean()
stock_subset['ma_50'] = stock_subset['price'].rolling(window=50).mean()
# Price changes
stock_subset['price_change'] = stock_subset['price'].pct_change()
stock_subset['cumulative_return'] = (1 + stock_subset['price_change']).cumprod() - 1
# Update main dataframe
stock_df.loc[stock_df['symbol'] == stock, 'ma_20'] = stock_subset['ma_20'].values
stock_df.loc[stock_df['symbol'] == stock, 'ma_50'] = stock_subset['ma_50'].values
stock_df.loc[stock_df['symbol'] == stock, 'price_change'] = stock_subset['price_change'].values
stock_df.loc[stock_df['symbol'] == stock, 'cumulative_return'] = stock_subset['cumulative_return'].values
# Analysis
final_returns = stock_df.groupby('symbol')['cumulative_return'].last().sort_values(ascending=False)
print("\nStock performance (1-year returns):")
print(final_returns)
volatility = stock_df.groupby('symbol')['price_change'].std().sort_values(ascending=False)
print("\nStock volatility:")
print(volatility)
# Save data
stock_df.to_csv('/lyceum/storage/stock_data.csv', index=False)
# Create summary
summary = {
'collection_date': datetime.now().isoformat(),
'data_sources': [
'JSONPlaceholder API (posts and users)',
'Simulated stock data'
],
'records_collected': {
'posts': len(posts_df) if 'posts_df' in locals() else 0,
'users': len(users_df) if 'users_df' in locals() else 0,
'stock_prices': len(stock_df)
},
'stock_analysis': {
'best_performer': final_returns.index[0],
'best_return': final_returns.iloc[0],
'most_volatile': volatility.index[0],
'highest_volatility': volatility.iloc[0]
}
}
with open('/lyceum/storage/collection_summary.json', 'w') as f:
json.dump(summary, f, indent=2, default=str)
print(f"\n=== COLLECTION COMPLETE ===")
print(f"Best performer: {summary['stock_analysis']['best_performer']} ({summary['stock_analysis']['best_return']:.2%})")
print(f"Most volatile: {summary['stock_analysis']['most_volatile']}")
print("\nFiles saved:")
print("- posts.csv")
print("- users_api.csv")
print("- stock_data.csv")
print("- collection_summary.json")
except Exception as e:
print(f"Error in data processing: {e}")
# Run data collection
scrape_sample_data()
print("\n=== DATA QUALITY REPORT ===")
# Check all generated files
storage_path = '/lyceum/storage/'
files = [f for f in os.listdir(storage_path) if f.endswith('.csv')]
for file in files:
try:
df = pd.read_csv(os.path.join(storage_path, file))
print(f"\n{file}:")
print(f" Rows: {len(df):,}")
print(f" Columns: {len(df.columns)}")
print(f" Missing values: {df.isnull().sum().sum()}")
print(f" Memory usage: {df.memory_usage(deep=True).sum() / 1024:.1f} KB")
except Exception as e:
print(f" Error reading {file}: {e}")
Large datasets can be processed in chunks using
pd.read_csv(chunksize=10000) to manage memory usage efficiently.When scraping websites, respect robots.txt and implement rate limiting to avoid overwhelming servers. Always check the website’s terms of service.
All processed data is automatically saved to your storage and remains available after execution. Download the results or use them in subsequent processing steps.

