ETL: Extract, Transform, Load
ETL is a data integration pattern used to consolidate data from multiple sources, transform it into a usable format, and load it into a destination system. This pattern is fundamental in data warehousing, analytics, and data migration scenarios.
Pattern Overview
The ETL pattern consists of three distinct phases:
- Extract - Retrieve data from various sources (APIs, databases, files, etc.)
- Transform - Clean, validate, normalize, and enrich the data
- Load - Store the processed data in the target system
1. Extract Phase
Extract data from multiple heterogeneous sources. Handle different data formats and protocols.
// EXTRACT - Get data from various sources
class DataExtractor {
async extractFromAPI(url) {
const response = await fetch(url);
return response.json();
}
extractFromCSV(csvString) {
const lines = csvString.split('\n');
const headers = lines[0].split(',');
return lines.slice(1).map(line => {
const values = line.split(',');
return headers.reduce((obj, header, index) => {
obj[header.trim()] = values[index]?.trim();
return obj;
}, {});
});
}
extractFromLocalStorage(key) {
const data = localStorage.getItem(key);
return data ? JSON.parse(data) : null;
}
async extractMultipleSources() {
const [apiData, csvData, localData] = await Promise.all([
this.extractFromAPI('/api/users'),
this.extractFromCSV(csvString),
this.extractFromLocalStorage('user-preferences')
]);
return { apiData, csvData, localData };
}
} Key Responsibilities
- Connect to various data sources (REST APIs, databases, files)
- Handle authentication and authorization
- Manage rate limiting and pagination
- Extract metadata along with data
2. Transform Phase
The transformation phase is where data quality is established. Clean, validate, normalize, and enrich data to meet business requirements.
// TRANSFORM - Clean, validate, and reshape data
class DataTransformer {
// Normalize field names
normalizeFields(data) {
return data.map(item => ({
id: item.id || item.userId || item.ID,
name: item.name || item.fullName || item.userName,
email: item.email?.toLowerCase(),
createdAt: new Date(item.created_at || item.createdDate)
}));
}
// Filter invalid records
filterValid(data) {
return data.filter(item => {
return item.email &&
item.email.includes('@') &&
item.name &&
item.name.length > 0;
});
}
// Enrich data
enrichData(data) {
return data.map(item => ({
...item,
domain: item.email.split('@')[1],
initials: item.name.split(' ').map(n => n[0]).join(''),
accountAge: Math.floor(
(Date.now() - item.createdAt.getTime()) / (1000 * 60 * 60 * 24)
)
}));
}
// Aggregate data
aggregateByDomain(data) {
return data.reduce((acc, item) => {
const domain = item.domain;
if (!acc[domain]) {
acc[domain] = { domain, users: [], count: 0 };
}
acc[domain].users.push(item);
acc[domain].count++;
return acc;
}, {});
}
// Complete transformation pipeline
transform(rawData) {
return this.enrichData(
this.filterValid(
this.normalizeFields(rawData)
)
);
}
} Common Transformations
- Normalization - Standardize field names and formats
- Validation - Remove invalid or incomplete records
- Enrichment - Add derived or computed fields
- Aggregation - Group and summarize data
- Deduplication - Remove duplicate records
3. Load Phase
Store transformed data efficiently in the target system. Consider performance, consistency, and fault tolerance.
// LOAD - Store transformed data
class DataLoader {
async loadToDatabase(data) {
// Batch insert for performance
const batchSize = 100;
const batches = [];
for (let i = 0; i < data.length; i += batchSize) {
batches.push(data.slice(i, i + batchSize));
}
for (const batch of batches) {
await fetch('/api/users/batch', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(batch)
});
}
}
loadToLocalStorage(key, data) {
localStorage.setItem(key, JSON.stringify(data));
}
loadToIndexedDB(storeName, data) {
return new Promise((resolve, reject) => {
const request = indexedDB.open('AppDB', 1);
request.onsuccess = (event) => {
const db = event.target.result;
const transaction = db.transaction([storeName], 'readwrite');
const store = transaction.objectStore(storeName);
data.forEach(item => store.put(item));
transaction.oncomplete = () => resolve();
transaction.onerror = () => reject(transaction.error);
};
});
}
async loadToMultipleTargets(data) {
// Load to different destinations based on data type
const userData = data.filter(item => item.type === 'user');
const metrics = data.filter(item => item.type === 'metric');
await Promise.all([
this.loadToDatabase(userData),
this.loadToLocalStorage('metrics', metrics),
this.loadToIndexedDB('users', userData)
]);
}
} Loading Strategies
- Full Load - Replace all existing data
- Incremental Load - Add only new/changed records
- Batch Load - Group inserts for better performance
- Streaming Load - Real-time continuous loading
Complete ETL Pipeline
Orchestrate all three phases into a cohesive pipeline with error handling and monitoring.
// Complete ETL Pipeline
class ETLPipeline {
constructor() {
this.extractor = new DataExtractor();
this.transformer = new DataTransformer();
this.loader = new DataLoader();
this.stats = { extracted: 0, transformed: 0, loaded: 0, errors: [] };
}
async run() {
try {
// EXTRACT
console.log('Extracting data...');
const rawData = await this.extractor.extractMultipleSources();
this.stats.extracted = rawData.apiData.length;
// TRANSFORM
console.log('Transforming data...');
const transformedData = this.transformer.transform(rawData.apiData);
this.stats.transformed = transformedData.length;
// LOAD
console.log('Loading data...');
await this.loader.loadToMultipleTargets(transformedData);
this.stats.loaded = transformedData.length;
console.log('ETL Pipeline Complete:', this.stats);
return this.stats;
} catch (error) {
this.stats.errors.push(error.message);
console.error('ETL Pipeline Failed:', error);
throw error;
}
}
// Schedule regular ETL runs
schedule(intervalMs = 3600000) { // Default: 1 hour
setInterval(() => this.run(), intervalMs);
}
}
// Usage
const pipeline = new ETLPipeline();
await pipeline.run();
pipeline.schedule(3600000); // Run every hour Error Handling and Resilience
Production ETL pipelines need robust error handling, retry logic, and monitoring.
// Error Handling and Logging
class RobustETLPipeline extends ETLPipeline {
async runWithRetry(maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await this.run();
} catch (error) {
console.warn(`Attempt ${attempt} failed:`, error.message);
if (attempt === maxRetries) {
// Final attempt failed - log and notify
await this.logFailure(error);
await this.notifyAdmins(error);
throw error;
}
// Wait before retry (exponential backoff)
await this.sleep(Math.pow(2, attempt) * 1000);
}
}
}
async logFailure(error) {
await fetch('/api/logs/etl-failures', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
timestamp: new Date().toISOString(),
error: error.message,
stats: this.stats
})
});
}
async notifyAdmins(error) {
// Send notification to monitoring service
console.error('ETL FAILURE - Admins notified:', error);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
} When to Use ETL
- Data Integration - Combining data from multiple sources
- Data Migration - Moving data between systems
- Data Warehousing - Building analytics databases
- Batch Processing - Regular scheduled data updates
- Data Quality - Cleaning and standardizing data
Benefits
- Data Quality - Centralized validation and cleaning
- Separation of Concerns - Clear phases with distinct responsibilities
- Reusability - Transform logic can be reused across pipelines
- Monitoring - Easy to track progress and failures at each phase
- Scalability - Can process large volumes of data in batches
Considerations
- Performance - Balance between speed and data quality
- Error Recovery - Handle partial failures gracefully
- Data Lineage - Track data transformation history
- Idempotency - Re-running should produce same results
- Schema Evolution - Handle changing data structures over time
Modern Variations
While ETL remains popular, modern architectures often use variations:
- ELT - Extract-Load-Transform (transform in the database)
- Streaming ETL - Real-time continuous processing
- Micro-batch - Small frequent batches instead of large periodic ones
- Event-Driven ETL - Triggered by data changes instead of schedules