MEEKER <AJM> USER: AJM@NYC-01 SESSION 04-17-26 · 09:42:11 EST LIVE

OPEN SOURCE · REFERENCE IMPLEMENTATION

Email Ingestion Data Warehouse

Production-grade email ingestion pipeline with deduplication, semantic chunking, vector embeddings, and LLM classification. A 7-stage pipeline that transforms raw Gmail data into a searchable, classified, relationship-aware knowledge base.

Open Source Python 3.11 PostgreSQL 17 pgvector CPU-first ML
7 STAGES IN THE INGESTION PIPELINE — from raw IMAP fetch to embedded semantic chunks.

Email Ingestion Data Warehouse

Language Python
License MIT
Status Active
Repository GitHub →

Originally developed for Bushwick Daily's AI-powered newsroom. This system serves as the upstream data source for the Local News Multiverse Agent Platform.

Related Open Source Projects


Why This Exists

The Problem

Local newsrooms receive hundreds of emails daily: story leads, press releases, event listings, advertising inquiries, and the inevitable flood of newsletters. Manually triaging this volume is unsustainable, yet buried in that noise are the tips and sources that become great stories.

The Constraints

I built this on a MacBook Pro without dedicated ML infrastructure. Every architectural decision reflects this reality:

  • CPU-first design: all-MiniLM-L6-v2 (384-dim) runs fast on CPU without GPU
  • PostgreSQL as backbone: pgvector for embeddings, no external vector database needed
  • Local-first philosophy: Your emails stay on your servers, not in someone else's cloud
  • Token efficiency: Deterministic rules bypass LLM for obvious cases, saving 30-50% on API costs

The Solution

A 7-step pipeline that transforms raw Gmail data into a searchable, classified, relationship-aware knowledge base:

Gmail API → Deduplication → Chunking → Embedding → Classification → Participants → Profiles

Architecture Overview

                        1. EXTRACTION
                              |
        [Gmail API] --> [Service Account Auth] --> [Email Extraction] --> [Fingerprint Generation]
                              |
                        2. DEDUPLICATION
                              |
        [Content Normalization] --> [Multi-Hash Fingerprints] --> [Duplicate Groups]
                              |
                        3. PROCESSING
                              |
        [Semantic Chunking] --> [Vector Embeddings: all-MiniLM-L6-v2] --> [LLM Classification: Gemini Flash]
                              |
                        4. INTELLIGENCE
                              |
        [Participant Extraction] --> [Sender Profiles] --> [Relationship Mapping]
                              |
                        5. STORAGE
                              |
                    [PostgreSQL + pgvector]
                              |
                      [HNSW Vector Index]
                              |
                        6. CONSUMPTION
                              |
        [RAG Search API] | [Multiverse Agents] | [Email Assistant]

Database Schema

Core Tables

TablePurposeKey Columns
classified_emailsPrimary email storagegmail_id, thread_id, body_text, content_fingerprint
email_chunksSemantic chunks with embeddingsemail_id, text, embedding VECTOR(384)
email_participantsNormalized sender/recipient recordsparticipant_email, role, fingerprint
sender_profilesAggregated sender intelligenceemail, is_journalist, influence_score
email_duplicate_groupsDeduplication clusterscontent_fingerprint, member_count
email_fingerprints_v2Multi-hash fingerprintsnew_content_hash, structure_hash

Vector Storage

-- email_chunks with HNSW index for fast semantic search
CREATE TABLE email_chunks (
    id SERIAL PRIMARY KEY,
    email_id INTEGER REFERENCES classified_emails(id),
    chunk_index INTEGER,
    text TEXT,
    embedding VECTOR(384),  -- all-MiniLM-L6-v2 output
    metadata JSONB
);

-- HNSW index for approximate nearest neighbor search
CREATE INDEX idx_email_chunks_embedding_hnsw
    ON email_chunks
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

Sender Intelligence

-- sender_profiles aggregates sender behavior
CREATE TABLE sender_profiles (
    fingerprint VARCHAR(32) PRIMARY KEY,  -- MD5 of email
    email VARCHAR(255) UNIQUE,
    is_journalist BOOLEAN,
    is_government BOOLEAN,
    is_media BOOLEAN,
    email_count INTEGER,
    influence_score FLOAT,
    topics TEXT[],
    frequent_contacts JSONB
);

Pipeline Stages

Stage 1: Gmail API Extraction

python scripts/gmail_extractor.py --max-results 500

Uses Google Cloud service account with domain-wide delegation to access Gmail. Extracts:

  • Full email metadata (headers, labels, size)
  • Body content (text and HTML)
  • Threading information (message-id, in-reply-to, references)
  • Attachment metadata (not content)

Stage 2: Content Normalization & Fingerprinting

The deduplication system uses multi-hash fingerprinting:

# Content normalization replaces URLs and emails with placeholders
"Check out https://example.com for details"
→ "check out [URL] for details"

# Multiple hash types for different dedup strategies
{
    "new_content_hash": "abc123...",     # Fresh content only
    "quoted_content_hash": "def456...",  # Quoted text only
    "full_content_hash": "ghi789...",    # Complete message
    "structure_hash": "jkl012...",       # Email structure
}

Gmail Alias Resolution: The system normalizes email addresses according to provider rules:

  • Gmail: Ignores dots, removes plus suffixes (john.doe+test@gmail.comjohndoe@gmail.com)
  • Yahoo: Removes dash suffixes
  • ProtonMail: Preserves plus addressing (they use it for unique addresses)

Stage 3: Semantic Chunking

python scripts/batch_chunking.py

Emails are chunked using a word-based strategy optimized for email content:

  • Chunk size: ~500 characters
  • Overlap: 50 characters (preserves context)
  • Short emails (<50 chars): Combined with subject as single chunk
  • Long emails (>50k chars): Truncated with marker

Stage 4: Vector Embeddings

Uses sentence-transformers/all-MiniLM-L6-v2:

  • 384 dimensions (efficient for CPU)
  • ~80ms per embedding on M1 MacBook
  • Cosine similarity for semantic search

Why this model? It's the sweet spot for CPU-constrained environments:

  • 5x smaller than larger models
  • 2x faster inference
  • 95% of quality for email-length text

Stage 5: LLM Classification

python scripts/batch_classifier.py --all --batch-size 50

16 classification categories tuned for newsroom workflows:

CategoryExample
story_lead_or_tipAnonymous tips, source referrals
freelance_pitchArticle proposals from writers
press_releaseOfficial announcements
community_event_listingLocal events
sales_or_advertising_inquiryAd space requests
financial_adminReceipts, invoices

Deterministic Pre-filtering: Obvious cases bypass LLM to save tokens:

  • @venmo.comfinancial_admin (no LLM needed)
  • globenewswire.compress_release (no LLM needed)

Stage 6: Participant Extraction

python scripts/participant_extractor.py --incremental

Extracts and normalizes participants from email headers:

  • Sender: Who sent the email
  • Recipients: TO field (tracks primary)
  • CC: Carbon copy (tracks position)
  • BCC: Blind carbon copy

Each participant gets an MD5 fingerprint enabling cross-email analysis.

Stage 7: Sender Profile Enrichment

Aggregated intelligence about each sender:

  • Classification flags: is_journalist, is_government, is_media
  • Volume metrics: email_count, thread_count, response_rate
  • Temporal data: first_seen, last_seen, days_active
  • Network data: frequent_contacts, communication patterns

Hashing Strategy

Why Multiple Hashes?

Email deduplication is harder than document deduplication. The same content appears in:

  • Direct sends
  • Forwards (with "FYI" prefix)
  • Replies (with quoted text)
  • CC'd copies to different recipients

Hash Types

HashPurposeIgnores
full_content_hashBasic dedupNothing
new_content_hashForward detectionQuoted text
structure_hashLayout matchingContent
thread_hashConversation groupingMessage body
recipient_set_hashAudience matchingMessage body

Normalization Pipeline

  1. Remove tracking parameters (UTM, pixels)
  2. Replace URLs → [URL]
  3. Replace emails → [EMAIL]
  4. Remove zero-width characters
  5. Normalize whitespace
  6. Normalize Unicode (smart quotes → regular)

Integration with Multiverse Platform

This system serves as the upstream data source for the Local News Multiverse Agent Platform.

Agent Tools

The multiverse agents use these tables:

# lookup_sender_entity - queries sender_profiles
sender = lookup_sender_entity("john@example.com")
# Returns: is_journalist, influence_score, email_count, topics

# search_emails - vector search on email_chunks
results = search_emails("city council budget meeting", limit=10)
# Returns: similar emails with relevance scores

Required Schema

For multiverse compatibility, ensure:

  1. classified_emails has: id, gmail_id, thread_id, sender_email, body_text
  2. email_chunks has: embedding VECTOR(384) with HNSW index
  3. sender_profiles has: fingerprint, is_journalist, is_government, is_media

Quick Start

1. Prerequisites

# PostgreSQL with pgvector
brew install postgresql@17
psql -c "CREATE EXTENSION vector;"

# Python 3.10+
python --version  # Should be 3.10+

2. Clone and Setup

git clone https://github.com/alecmeeeker/email-ingestion-data-warehouse.git
cd email-ingestion-data-warehouse

# Create virtual environment
python -m venv venv
source venv/bin/activate

# Install dependencies
pip install -r requirements.txt

3. Configure Environment

# Copy example config
cp .env.example .env

# Edit with your values
nano .env

Required variables:

  • GOOGLE_SERVICE_ACCOUNT_FILE: Path to service account JSON
  • GMAIL_DELEGATE_EMAIL: Email to impersonate
  • GEMINI_API_KEY: For LLM classification

4. Run Migrations

# Create database
createdb email_warehouse

# Run migrations in order
for f in migrations/*.sql; do psql -d email_warehouse -f "$f"; done

5. Run Pipeline

# Extract emails
python scripts/gmail_extractor.py --max-results 100

# Create chunks and embeddings
python scripts/batch_chunking.py

# Classify emails
python scripts/batch_classifier.py --all

# Extract participants
python scripts/participant_extractor.py

Configuration Reference

Database

VariableDefaultDescription
DB_NAMEemail_warehousePostgreSQL database name
DB_USERpostgresDatabase user
DB_HOSTlocalhostDatabase host
DB_PASSWORD(empty)Database password
DB_PORT5432Database port

Gmail API

VariableRequiredDescription
GOOGLE_SERVICE_ACCOUNT_FILEYesPath to service account JSON
GMAIL_DELEGATE_EMAILYesEmail to impersonate

LLM

VariableRequiredDescription
GEMINI_API_KEYYesGoogle Gemini API key

Model

VariableDefaultDescription
EMBEDDING_MODELsentence-transformers/all-MiniLM-L6-v2Embedding model
HF_HUB_OFFLINE0Use cached models only
BATCH_SIZE200Processing batch size

Performance Considerations

Why These Choices Work on MacBook

ChoiceWhy
all-MiniLM-L6-v2384-dim vs 768-dim = 2x faster, 50% memory
HNSW indexO(log n) search, no rebuild on insert
Deterministic rules30-50% of emails skip LLM entirely
Batch commitsResume from failure without reprocessing
Incremental extractionOnly process new emails

Benchmarks (M1 MacBook Pro)

OperationThroughput
Email extraction~100 emails/min
Chunking + embedding~200 emails/min
LLM classification~30 emails/min
Participant extraction~500 emails/min

Scaling Notes

For larger deployments:

  1. Increase BATCH_SIZE for better throughput
  2. Use connection pooling (pgbouncer)
  3. Consider Gemini Flash Lite for classification
  4. Run embedding and classification in parallel

Contributing

Contributions welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Submit a pull request

Areas of interest:

  • Additional email providers (Outlook, IMAP)
  • Alternative embedding models
  • Classification category extensions
  • Performance optimizations

Related Projects

Built with care by Alec Meeker for Bushwick Daily.