The Daily Deadline
Every morning at 8:30 AM, a clock starts ticking.
Our data provider uploads fresh vehicle listings to an S3 bucket. Gzip-compressed CSV files. Multiple files. Around 200GB total when uncompressed.
By 10:00 AM, users expect to search those listings.
90 minutes to:
- Download files from S3
- Decompress them
- Deduplicate 30-40 million rows down to 3-4 million
- Transform 91 CSV columns into clean documents
- Index everything into Elasticsearch
- Delete stale records that disappeared from the feed
Miss the window? Users see yesterday's prices. Yesterday's inventory. Cars that were sold 12 hours ago.
What We're Actually Dealing With
DAILY DATA FEED:
┌─────────────────────────────────────────────────────────────┐
│ S3 Bucket: ecarfeedback │
│ │
│ mc_us_new_20250115.csv.gz (~15GB compressed) │
│ mc_us_used_20250115.csv.gz (~45GB compressed) │
│ mc_us_fsbo_20250115.csv.gz (~8GB compressed) │
│ │
│ Total uncompressed: ~200GB │
│ Total rows: 30-40 million │
│ Unique vehicles (after dedup): 3-4 million │
└─────────────────────────────────────────────────────────────┘Why so many duplicates? The same car appears in multiple dealer feeds. The same VIN gets scraped 5-10 times from different sources. We only want the most recent scrape for each vehicle.
The Pipeline That Actually Works
After months of trial and error, here's what runs every morning:
THE DAILY PIPELINE (8:30 AM PST):
┌─────────────┐
│ S3 Bucket │
└──────┬──────┘
│
▼
┌─────────────────────────────────────┐
│ STEP 1: Multi-Threaded Download │
│ │
│ • Check which files are new today │
│ • Spawn thread per file │
│ • Download in parallel │
│ • Track in database to skip dupes │
│ │
│ Time: ~8 minutes │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ STEP 2: Decompress + Stream │
│ │
│ • gunzip on-the-fly │
│ • Never store full uncompressed │
│ • Stream rows as they decompress │
│ │
│ Memory: ~2GB peak (not 200GB) │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ STEP 3: VIN Deduplication │
│ │
│ • Miller CLI for streaming sort │
│ • Keep only latest scrape per VIN │
│ • 30-40M rows → 3-4M unique │
│ │
│ Time: ~12 minutes │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ STEP 4: Transform 91 → 50 fields │
│ │
│ • Parse dealer info │
│ • Extract 21 feature flags │
│ • Calculate price brackets │
│ • Normalize colors, trims │
│ • Build geo coordinates │
│ │
│ (Happens per-row during indexing) │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ STEP 5: Bulk Index to ES │
│ │
│ • Batch 20,000 documents │
│ • Route by make (64 shards) │
│ • 4 indices: new, used, fsbo, cert │
│ │
│ Time: ~45 minutes │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ STEP 6: Cleanup Stale Records │
│ │
│ • Compare current IDs vs indexed │
│ • Delete anything not in new feed │
│ • Batch delete 5,000 at a time │
│ │
│ Time: ~15 minutes │
└─────────────────────────────────────┘Total time: ~80 minutes. Just under the wire.
Step 1: The Download Problem
First attempt: sequential downloads.
# SLOW VERSION - Don't do this
def download_files():
for file in s3_files:
s3.download_file(BUCKET, file, local_path)
# 15 minutes per file × 3 files = 45 minutes just downloading45 minutes just to download. That's half our time budget gone.
What actually works: Multi-threaded downloads.
import threading
import boto3
def download_file_from_s3(key, downloaded_keys):
"""Download a single file (runs in thread)"""
local_path = f"/tmp/{key}"
s3.download_file(BUCKET_NAME, key, local_path)
downloaded_keys.append(key)
print(f"Downloaded: {key}")
def get_file_from_s3():
pst_timezone = pytz.timezone("US/Pacific")
today = timezone.now().astimezone(pst_timezone).date()
response = s3.list_objects_v2(Bucket=BUCKET_NAME)
downloaded_keys = []
threads = []
for obj in response.get("Contents", []):
key = obj["Key"]
last_modified = obj["LastModified"].date()
# Only download today's files
if last_modified != today:
continue
# Skip if already processed (check database)
if S3Files.objects.filter(name=key).exists():
continue
# Spawn download thread
thread = threading.Thread(
target=download_file_from_s3,
args=(key, downloaded_keys)
)
thread.start()
threads.append(thread)
# Wait for all downloads
for thread in threads:
thread.join()
return downloaded_keys3 files downloading in parallel. 8 minutes instead of 45.
Step 2: The Memory Problem
My first instinct: decompress everything, then process.
# THIS KILLS YOUR SERVER
import gzip
with gzip.open('mc_us_used_20250115.csv.gz', 'rt') as f:
data = f.read() # 200GB in RAMOur server had 64GB RAM. The uncompressed data was 200GB. Math doesn't work.
What actually works: Stream decompression.
import gzip
import csv
def stream_rows(gzip_path):
"""Yield rows one at a time, never load full file"""
with gzip.open(gzip_path, 'rt', encoding='utf-8') as f:
reader = csv.reader(f)
header = next(reader) # Skip header
for row in reader:
yield row # One row at a time
# Memory stays flat at ~2GB regardless of file size
for row in stream_rows('mc_us_used_20250115.csv.gz'):
process_row(row)The key insight: gzip supports streaming. You don't need to decompress the whole file to read row 1,000,000. Just decompress as you go.
Step 3: The Deduplication Nightmare
Here's the ugly truth about vehicle data feeds:
RAW DATA (same car, multiple scrapes):
VIN: 1HGCV1F31LA012345
├── scraped_at: 2025-01-15 02:00:00, price: $25,999
├── scraped_at: 2025-01-15 06:00:00, price: $25,499 ← Price dropped!
├── scraped_at: 2025-01-14 18:00:00, price: $25,999
├── scraped_at: 2025-01-14 10:00:00, price: $26,500
└── scraped_at: 2025-01-13 22:00:00, price: $26,500
WE ONLY WANT: 2025-01-15 06:00:00 (latest scrape)30-40 million rows. We need the latest scrape per VIN.
What Actually Worked: Miller CLI
I discovered Miller - a command-line tool for CSV/JSON processing. It streams data without loading everything into memory.
import subprocess
import json
def fetch_latest_vins(csv_file):
"""Use Miller to deduplicate by VIN, keeping latest scrape"""
# Step 1: Extract only columns we need (faster processing)
minimize_cmd = f"""
mlr --icsv --ocsv cut -f vin,scraped_at,id {csv_file} > /tmp/minimal.csv
"""
subprocess.run(minimize_cmd, shell=True)
# Step 2: Sort by VIN, then by scraped_at descending
# Miller streams - never loads full file
miller_cmd = """
mlr --c2j sort -r vin,scraped_at /tmp/minimal.csv
"""
result = subprocess.run(miller_cmd, shell=True, capture_output=True, text=True)
# Step 3: Build VIN -> latest ID mapping
vin_to_id = {}
last_vin = None
for line in result.stdout.strip().split('\n'):
row = json.loads(line)
# First occurrence of each VIN is the latest (we sorted descending)
if row['vin'] != last_vin:
vin_to_id[row['vin']] = row['id']
last_vin = row['vin']
return vin_to_id # 3-4M entries40 million rows deduplicated in 12 minutes. Memory usage: flat at 500MB.
Step 4: The Transformation Gauntlet
The raw CSV has 91 columns. Many are messy:
RAW CSV ROW (91 columns):
Column 0: "2705840915073" → id
Column 1: "1HGCV1F31LA012345" → vin
Column 4: "25999.00" → price (string!)
Column 6: "32450.5" → miles (float string!)
Column 10: "Honda" → make
Column 11: "Accord" → model
Column 42: "Modern Steel Metallic" → exterior_color
Column 48: "1" → is_certified (string "1"!)
Column 82: "ABS|Bluetooth|Leather|GPS" → features (pipe-delimited!)
Column 86: "url1|url2|url3" → photos (pipe-delimited!)
...We need clean, typed, nested documents for Elasticsearch.
def reformat_data(source):
"""Transform raw fields into clean, typed document"""
# Parse price (handle nulls, empty strings, floats)
price = 0
if source.get("price"):
try:
price = int(float(source["price"]))
except:
price = 0
# Parse mileage
miles = 0
if source.get("miles"):
try:
miles = int(float(source["miles"]))
except:
miles = 0
# Boolean conversion (CSV has "1", "0", "", "true", "false")
is_certified = str(source.get("is_certified", "")).lower() in ("1", "true")
# Split pipe-delimited fields
photos = source.get("photo_links", "").split("|") if source.get("photo_links") else []
# Extract feature flags from text
features = return_features(source.get("features", ""))
return {
"id": source["id"],
"vin": source["vin"],
"price": price,
"miles": miles,
"is_certified": is_certified,
"build": {
"year": int(source.get("year", 0) or 0),
"make": source.get("make", "").strip(),
"model": source.get("model", "").strip(),
"body_type": source.get("body_type", ""),
"transmission": source.get("transmission", ""),
},
"features": features,
"media": {
"photo_links": [p for p in photos if p],
},
}Step 5: The Indexing Challenge
3-4 million documents. How do you get them into Elasticsearch fast?
What works: Bulk API with batching
from elasticsearch.helpers import bulk
def index_documents(documents, batch_size=20000):
"""Bulk index with optimal batch size"""
batch = []
indexed_count = 0
for doc in documents:
# Prepare bulk action
action = {
"_index": get_index_name(doc),
"_id": doc["id"],
"_routing": get_routing_key(doc["build"]["make"]),
"_source": doc,
}
batch.append(action)
# Flush batch when full
if len(batch) >= batch_size:
success, errors = bulk(es, batch, raise_on_error=False)
indexed_count += success
batch = []
# Final batch
if batch:
success, _ = bulk(es, batch)
indexed_count += success
return indexed_countWhy 20,000? We tested batch sizes from 1,000 to 100,000:
Batch Size Time for 3M docs Memory Peak
─────────────────────────────────────────────
1,000 2h 15m 1.2 GB
5,000 1h 10m 2.1 GB
10,000 52m 3.4 GB
20,000 45m 5.8 GB ← Sweet spot
50,000 44m 12.3 GB
100,000 46m 24.1 GB ← Diminishing returns20,000 was the sweet spot: fast indexing without blowing up memory.
Step 6: The Stale Data Problem
Here's something nobody tells you about daily data feeds:
Cars get sold. They disappear from the feed.
If a car was in yesterday's feed but not today's, it's probably sold. We need to delete it from our index.
def cleanup_stale_records(current_ids, index_name):
"""Delete records that are no longer in the feed"""
# Get all IDs currently in Elasticsearch
indexed_ids = set()
# Use scroll API for large result sets
response = es.search(
index=index_name,
body={"query": {"match_all": {}}, "_source": False},
scroll="10m",
size=10000
)
while True:
hits = response["hits"]["hits"]
if not hits:
break
for hit in hits:
indexed_ids.add(hit["_id"])
response = es.scroll(scroll_id=response["_scroll_id"], scroll="10m")
# Find IDs to delete
stale_ids = indexed_ids - current_ids
print(f"Found {len(stale_ids)} stale records to delete")
# Batch delete
delete_batch = []
for doc_id in stale_ids:
delete_batch.append({
"_op_type": "delete",
"_index": index_name,
"_id": doc_id,
})
if len(delete_batch) >= 5000:
bulk(es, delete_batch, raise_on_error=False)
delete_batch = []
if delete_batch:
bulk(es, delete_batch, raise_on_error=False)We typically delete 50,000-100,000 stale records daily. Cars sell fast.
The Timezone Bug That Wasted a Week
For a week, our pipeline kept reprocessing the same files. Every day.
The bug:
# We were checking file dates in UTC
last_modified = obj["LastModified"].date() # UTC
# But comparing to "today" in PST
today = timezone.now().date() # Also UTC, but we THOUGHT it was PST
# At 8:30 AM PST, it's 4:30 PM UTC
# Files uploaded at 2 AM PST = 10 AM UTC = "yesterday" in UTC
# So we kept skipping today's files!The fix:
# Explicitly convert to PST
pst_timezone = pytz.timezone("US/Pacific")
today_pst = timezone.now().astimezone(pst_timezone).date()
file_date_pst = obj["LastModified"].astimezone(pst_timezone).date()
if file_date_pst == today_pst:
# Now we're comparing apples to applesLesson: Always be explicit about timezones. "Today" means different things in different timezones.
Key Lessons
Lesson 1: Stream Everything
200GB doesn't fit in RAM. But you don't need it to. Stream rows one at a time. Decompress on the fly. Your memory stays flat.
Lesson 2: The Right Tool for Deduplication
Pandas? Crashes. PostgreSQL? Too slow. Miller CLI? 12 minutes, 500MB RAM.
Sometimes a Unix command-line tool beats a database.
Lesson 3: Batch Size Matters
1,000 documents per batch = slow. 100,000 documents per batch = memory explosion. 20,000 = sweet spot for us.
Benchmark your own system.
Lesson 4: Don't Forget Cleanup
Stale data is worse than no data. If a car sold yesterday, remove it today. Users get angry when they call about a car that's already gone.
Lesson 5: Timezones Will Bite You
If your data source is in a different timezone than your server, be paranoid. Convert everything explicitly. "Today" is ambiguous.
Quick Reference
Stream gzip without loading:
with gzip.open('file.csv.gz', 'rt') as f:
for line in f:
process(line)Deduplicate with Miller:
mlr --csv sort -r vin,scraped_at file.csv | mlr --csv head -n 1 -g vinBulk index to Elasticsearch:
from elasticsearch.helpers import bulk
bulk(es, actions, chunk_size=20000)That's how we process 200GB of vehicle data every morning before users wake up.
Related Reading
- Deduplicating 3M Records Without Running Out of Memory - Deep dive on Miller CLI
- Elasticsearch Routing by Make - How we made searches 5x faster
- Building a Search API with 30+ Filters - The API that queries this data
