Back to Notes
·10 min read

How Do You Process 200GB of Vehicle Data Every Day?

#ETL#Python#AWS#S3#Elasticsearch#Celery
How Do You Process 200GB of Vehicle Data Every Day?

The Daily Deadline

Every morning at 8:30 AM, a clock starts ticking.

Our data provider uploads fresh vehicle listings to an S3 bucket. Gzip-compressed CSV files. Multiple files. Around 200GB total when uncompressed.

By 10:00 AM, users expect to search those listings.

90 minutes to:

  • Download files from S3
  • Decompress them
  • Deduplicate 30-40 million rows down to 3-4 million
  • Transform 91 CSV columns into clean documents
  • Index everything into Elasticsearch
  • Delete stale records that disappeared from the feed

Miss the window? Users see yesterday's prices. Yesterday's inventory. Cars that were sold 12 hours ago.


What We're Actually Dealing With

DAILY DATA FEED:
 
┌─────────────────────────────────────────────────────────────┐
│  S3 Bucket: ecarfeedback                                    │
│                                                             │
│  mc_us_new_20250115.csv.gz      (~15GB compressed)         │
│  mc_us_used_20250115.csv.gz     (~45GB compressed)         │
│  mc_us_fsbo_20250115.csv.gz     (~8GB compressed)          │
│                                                             │
│  Total uncompressed: ~200GB
│  Total rows: 30-40 million                                  │
│  Unique vehicles (after dedup): 3-4 million                 │
└─────────────────────────────────────────────────────────────┘

Why so many duplicates? The same car appears in multiple dealer feeds. The same VIN gets scraped 5-10 times from different sources. We only want the most recent scrape for each vehicle.


The Pipeline That Actually Works

After months of trial and error, here's what runs every morning:

THE DAILY PIPELINE (8:30 AM PST):
 
┌─────────────┐
│  S3 Bucket  │
└──────┬──────┘


┌─────────────────────────────────────┐
STEP 1: Multi-Threaded Download    │
│                                     │
│  • Check which files are new today  │
│  • Spawn thread per file
│  • Download in parallel             │
│  • Track in database to skip dupes  │
│                                     │
│  Time: ~8 minutes                   │
└──────────────┬──────────────────────┘


┌─────────────────────────────────────┐
STEP 2: Decompress + Stream        │
│                                     │
│  • gunzip on-the-fly                │
│  • Never store full uncompressed    │
│  • Stream rows as they decompress   │
│                                     │
│  Memory: ~2GB peak (not 200GB)      │
└──────────────┬──────────────────────┘


┌─────────────────────────────────────┐
STEP 3: VIN Deduplication          │
│                                     │
│  • Miller CLI for streaming sort    │
│  • Keep only latest scrape per VIN
│  • 30-40M rows → 3-4M unique        │
│                                     │
│  Time: ~12 minutes                  │
└──────────────┬──────────────────────┘


┌─────────────────────────────────────┐
STEP 4: Transform 9150 fields   │
│                                     │
│  • Parse dealer info                │
│  • Extract 21 feature flags         │
│  • Calculate price brackets         │
│  • Normalize colors, trims          │
│  • Build geo coordinates            │
│                                     │
│  (Happens per-row during indexing)  │
└──────────────┬──────────────────────┘


┌─────────────────────────────────────┐
STEP 5: Bulk Index to ES
│                                     │
│  • Batch 20,000 documents           │
│  • Route by make (64 shards)        │
│  • 4 indices: new, used, fsbo, cert │
│                                     │
│  Time: ~45 minutes                  │
└──────────────┬──────────────────────┘


┌─────────────────────────────────────┐
STEP 6: Cleanup Stale Records      │
│                                     │
│  • Compare current IDs vs indexed   │
│  • Delete anything not in new feed  │
│  • Batch delete 5,000 at a time     │
│                                     │
│  Time: ~15 minutes                  │
└─────────────────────────────────────┘

Total time: ~80 minutes. Just under the wire.


Step 1: The Download Problem

First attempt: sequential downloads.

# SLOW VERSION - Don't do this
def download_files():
    for file in s3_files:
        s3.download_file(BUCKET, file, local_path)
        # 15 minutes per file × 3 files = 45 minutes just downloading

45 minutes just to download. That's half our time budget gone.

What actually works: Multi-threaded downloads.

import threading
import boto3
 
def download_file_from_s3(key, downloaded_keys):
    """Download a single file (runs in thread)"""
    local_path = f"/tmp/{key}"
    s3.download_file(BUCKET_NAME, key, local_path)
    downloaded_keys.append(key)
    print(f"Downloaded: {key}")
 
def get_file_from_s3():
    pst_timezone = pytz.timezone("US/Pacific")
    today = timezone.now().astimezone(pst_timezone).date()
 
    response = s3.list_objects_v2(Bucket=BUCKET_NAME)
    downloaded_keys = []
    threads = []
 
    for obj in response.get("Contents", []):
        key = obj["Key"]
        last_modified = obj["LastModified"].date()
 
        # Only download today's files
        if last_modified != today:
            continue
 
        # Skip if already processed (check database)
        if S3Files.objects.filter(name=key).exists():
            continue
 
        # Spawn download thread
        thread = threading.Thread(
            target=download_file_from_s3,
            args=(key, downloaded_keys)
        )
        thread.start()
        threads.append(thread)
 
    # Wait for all downloads
    for thread in threads:
        thread.join()
 
    return downloaded_keys

3 files downloading in parallel. 8 minutes instead of 45.


Step 2: The Memory Problem

My first instinct: decompress everything, then process.

# THIS KILLS YOUR SERVER
import gzip
 
with gzip.open('mc_us_used_20250115.csv.gz', 'rt') as f:
    data = f.read()  # 200GB in RAM

Our server had 64GB RAM. The uncompressed data was 200GB. Math doesn't work.

What actually works: Stream decompression.

import gzip
import csv
 
def stream_rows(gzip_path):
    """Yield rows one at a time, never load full file"""
    with gzip.open(gzip_path, 'rt', encoding='utf-8') as f:
        reader = csv.reader(f)
        header = next(reader)  # Skip header
 
        for row in reader:
            yield row  # One row at a time
 
# Memory stays flat at ~2GB regardless of file size
for row in stream_rows('mc_us_used_20250115.csv.gz'):
    process_row(row)

The key insight: gzip supports streaming. You don't need to decompress the whole file to read row 1,000,000. Just decompress as you go.


Step 3: The Deduplication Nightmare

Here's the ugly truth about vehicle data feeds:

RAW DATA (same car, multiple scrapes):
 
VIN: 1HGCV1F31LA012345
├── scraped_at: 2025-01-15 02:00:00, price: $25,999
├── scraped_at: 2025-01-15 06:00:00, price: $25,499  ← Price dropped!
├── scraped_at: 2025-01-14 18:00:00, price: $25,999
├── scraped_at: 2025-01-14 10:00:00, price: $26,500
└── scraped_at: 2025-01-13 22:00:00, price: $26,500
 
WE ONLY WANT: 2025-01-15 06:00:00 (latest scrape)

30-40 million rows. We need the latest scrape per VIN.

What Actually Worked: Miller CLI

I discovered Miller - a command-line tool for CSV/JSON processing. It streams data without loading everything into memory.

import subprocess
import json
 
def fetch_latest_vins(csv_file):
    """Use Miller to deduplicate by VIN, keeping latest scrape"""
 
    # Step 1: Extract only columns we need (faster processing)
    minimize_cmd = f"""
        mlr --icsv --ocsv cut -f vin,scraped_at,id {csv_file} > /tmp/minimal.csv
    """
    subprocess.run(minimize_cmd, shell=True)
 
    # Step 2: Sort by VIN, then by scraped_at descending
    # Miller streams - never loads full file
    miller_cmd = """
        mlr --c2j sort -r vin,scraped_at /tmp/minimal.csv
    """
 
    result = subprocess.run(miller_cmd, shell=True, capture_output=True, text=True)
 
    # Step 3: Build VIN -> latest ID mapping
    vin_to_id = {}
    last_vin = None
 
    for line in result.stdout.strip().split('\n'):
        row = json.loads(line)
 
        # First occurrence of each VIN is the latest (we sorted descending)
        if row['vin'] != last_vin:
            vin_to_id[row['vin']] = row['id']
            last_vin = row['vin']
 
    return vin_to_id  # 3-4M entries

40 million rows deduplicated in 12 minutes. Memory usage: flat at 500MB.


Step 4: The Transformation Gauntlet

The raw CSV has 91 columns. Many are messy:

RAW CSV ROW (91 columns):
 
Column 0:  "2705840915073"id
Column 1:  "1HGCV1F31LA012345"          → vin
Column 4:  "25999.00"                    → price (string!)
Column 6:  "32450.5"                     → miles (float string!)
Column 10: "Honda"                       → make
Column 11: "Accord"                      → model
Column 42: "Modern Steel Metallic"       → exterior_color
Column 48: "1"                           → is_certified (string "1"!)
Column 82: "ABS|Bluetooth|Leather|GPS"   → features (pipe-delimited!)
Column 86: "url1|url2|url3"              → photos (pipe-delimited!)
...

We need clean, typed, nested documents for Elasticsearch.

def reformat_data(source):
    """Transform raw fields into clean, typed document"""
 
    # Parse price (handle nulls, empty strings, floats)
    price = 0
    if source.get("price"):
        try:
            price = int(float(source["price"]))
        except:
            price = 0
 
    # Parse mileage
    miles = 0
    if source.get("miles"):
        try:
            miles = int(float(source["miles"]))
        except:
            miles = 0
 
    # Boolean conversion (CSV has "1", "0", "", "true", "false")
    is_certified = str(source.get("is_certified", "")).lower() in ("1", "true")
 
    # Split pipe-delimited fields
    photos = source.get("photo_links", "").split("|") if source.get("photo_links") else []
 
    # Extract feature flags from text
    features = return_features(source.get("features", ""))
 
    return {
        "id": source["id"],
        "vin": source["vin"],
        "price": price,
        "miles": miles,
        "is_certified": is_certified,
 
        "build": {
            "year": int(source.get("year", 0) or 0),
            "make": source.get("make", "").strip(),
            "model": source.get("model", "").strip(),
            "body_type": source.get("body_type", ""),
            "transmission": source.get("transmission", ""),
        },
 
        "features": features,
 
        "media": {
            "photo_links": [p for p in photos if p],
        },
    }

Step 5: The Indexing Challenge

3-4 million documents. How do you get them into Elasticsearch fast?

What works: Bulk API with batching

from elasticsearch.helpers import bulk
 
def index_documents(documents, batch_size=20000):
    """Bulk index with optimal batch size"""
 
    batch = []
    indexed_count = 0
 
    for doc in documents:
        # Prepare bulk action
        action = {
            "_index": get_index_name(doc),
            "_id": doc["id"],
            "_routing": get_routing_key(doc["build"]["make"]),
            "_source": doc,
        }
        batch.append(action)
 
        # Flush batch when full
        if len(batch) >= batch_size:
            success, errors = bulk(es, batch, raise_on_error=False)
            indexed_count += success
            batch = []
 
    # Final batch
    if batch:
        success, _ = bulk(es, batch)
        indexed_count += success
 
    return indexed_count

Why 20,000? We tested batch sizes from 1,000 to 100,000:

Batch Size    Time for 3M docs    Memory Peak
─────────────────────────────────────────────
1,000         2h 15m              1.2 GB
5,000         1h 10m              2.1 GB
10,000        52m                 3.4 GB
20,000        45m                 5.8 GB      ← Sweet spot
50,000        44m                 12.3 GB
100,000       46m                 24.1 GB     ← Diminishing returns

20,000 was the sweet spot: fast indexing without blowing up memory.


Step 6: The Stale Data Problem

Here's something nobody tells you about daily data feeds:

Cars get sold. They disappear from the feed.

If a car was in yesterday's feed but not today's, it's probably sold. We need to delete it from our index.

def cleanup_stale_records(current_ids, index_name):
    """Delete records that are no longer in the feed"""
 
    # Get all IDs currently in Elasticsearch
    indexed_ids = set()
 
    # Use scroll API for large result sets
    response = es.search(
        index=index_name,
        body={"query": {"match_all": {}}, "_source": False},
        scroll="10m",
        size=10000
    )
 
    while True:
        hits = response["hits"]["hits"]
        if not hits:
            break
 
        for hit in hits:
            indexed_ids.add(hit["_id"])
 
        response = es.scroll(scroll_id=response["_scroll_id"], scroll="10m")
 
    # Find IDs to delete
    stale_ids = indexed_ids - current_ids
 
    print(f"Found {len(stale_ids)} stale records to delete")
 
    # Batch delete
    delete_batch = []
    for doc_id in stale_ids:
        delete_batch.append({
            "_op_type": "delete",
            "_index": index_name,
            "_id": doc_id,
        })
 
        if len(delete_batch) >= 5000:
            bulk(es, delete_batch, raise_on_error=False)
            delete_batch = []
 
    if delete_batch:
        bulk(es, delete_batch, raise_on_error=False)

We typically delete 50,000-100,000 stale records daily. Cars sell fast.


The Timezone Bug That Wasted a Week

For a week, our pipeline kept reprocessing the same files. Every day.

The bug:

# We were checking file dates in UTC
last_modified = obj["LastModified"].date()  # UTC
 
# But comparing to "today" in PST
today = timezone.now().date()  # Also UTC, but we THOUGHT it was PST
 
# At 8:30 AM PST, it's 4:30 PM UTC
# Files uploaded at 2 AM PST = 10 AM UTC = "yesterday" in UTC
# So we kept skipping today's files!

The fix:

# Explicitly convert to PST
pst_timezone = pytz.timezone("US/Pacific")
today_pst = timezone.now().astimezone(pst_timezone).date()
file_date_pst = obj["LastModified"].astimezone(pst_timezone).date()
 
if file_date_pst == today_pst:
    # Now we're comparing apples to apples

Lesson: Always be explicit about timezones. "Today" means different things in different timezones.


Key Lessons

Lesson 1: Stream Everything

200GB doesn't fit in RAM. But you don't need it to. Stream rows one at a time. Decompress on the fly. Your memory stays flat.

Lesson 2: The Right Tool for Deduplication

Pandas? Crashes. PostgreSQL? Too slow. Miller CLI? 12 minutes, 500MB RAM.

Sometimes a Unix command-line tool beats a database.

Lesson 3: Batch Size Matters

1,000 documents per batch = slow. 100,000 documents per batch = memory explosion. 20,000 = sweet spot for us.

Benchmark your own system.

Lesson 4: Don't Forget Cleanup

Stale data is worse than no data. If a car sold yesterday, remove it today. Users get angry when they call about a car that's already gone.

Lesson 5: Timezones Will Bite You

If your data source is in a different timezone than your server, be paranoid. Convert everything explicitly. "Today" is ambiguous.


Quick Reference

Stream gzip without loading:

with gzip.open('file.csv.gz', 'rt') as f:
    for line in f:
        process(line)

Deduplicate with Miller:

mlr --csv sort -r vin,scraped_at file.csv | mlr --csv head -n 1 -g vin

Bulk index to Elasticsearch:

from elasticsearch.helpers import bulk
bulk(es, actions, chunk_size=20000)

That's how we process 200GB of vehicle data every morning before users wake up.


Aamir Shahzad

Aamir Shahzad

Author

Software Engineer with 7+ years of experience building scalable data systems. Specializing in Django, Python, and applied AI.