Skip to main content

Data Platform Warehouse and Lake Project Structure

Overview

The data platform consists of two main domains: Data Lake and Data Warehouse. These work together to collect, process, and serve data for analytics and search capabilities.

Data Flow:
External Sources → Data Lake (Bronze → Silver → Gold) → Data Warehouse (OpenSearch, ClickHouse)

Data Lake Domain

The Data Lake domain handles raw data collection, transformation, and storage in Apache Iceberg format.

Project Structure

packages/data-lake/
├── data-lake-collector/ # Data collection (scrapers & API integrations)
├── data-lake-etl/ # ETL processing (Bronze → Silver → Gold)
├── data-lake-infra/ # Infrastructure as Code (Pulumi)
└── data-lake-orchestration/ # Dagster orchestration

Components

1. data-lake-collector

Purpose: Collects raw data from external sources (APIs, web scraping)

Key Files:

  • data_lake_collector/spiders/ - Scrapy spider implementations
  • data_lake_collector/api_integrations/ - API client integrations
  • data_lake_collector/settings.py - Configuration and S3 feed paths
  • data_lake_collector/run.sh - Universal runner script

Supported Collectors:

  • API Integrations: Stack Exchange, Reddit
  • Scrapy Spiders: Atlassian Forums

Deployment: ECS tasks triggered by EventBridge schedules

Reference: packages/data-lake/data-lake-collector/README.md

2. data-lake-etl

Purpose: Processes raw data through Bronze → Silver → Gold layers

Key Directories:

  • data_lake_etl/bronze/ - Raw data ingestion (API sources, forums)
  • data_lake_etl/silver/ - Data cleaning and transformation
  • data_lake_etl/gold/ - Aggregated and enriched data
  • data_lake_etl/one_step_etl/ - Single-step ETL jobs (e.g., offerings)

Key Files:

  • bronze/api_source.py - Processes API integration outputs
  • bronze/forums.py - Processes Scrapy spider outputs
  • silver/*.py - Silver layer transformations
  • gold/internet_posts.py - Internet posts aggregation
  • gold/add_col.py - Column enrichment (embeddings, language detection)
  • gold/offering.py - Offerings processing
  • shared_utils/langfuse_integration.py - LLM tracing integration

Storage Format: Apache Iceberg tables in S3

Deployment: EMR Serverless Spark jobs

Reference: packages/data-lake/data-lake-etl/README.md

3. data-lake-infra

Purpose: Infrastructure as Code for Data Lake components

Key Files:

  • src/index.ts - Main infrastructure entry point
  • src/dataLakeDB.ts - Glue database and S3 bucket creation
  • src/bronze/ - Bronze layer infrastructure components
  • src/silver/ - Silver layer infrastructure components
  • src/gold/ - Gold layer infrastructure components
  • src/running-flow/ - Step Functions workflow definitions
  • src/{source}.ts - Per-source infrastructure (e.g., redditApi.ts, stackExchangeApi.ts)

Key Components:

  • DataLakeDB - Creates Glue database and S3 bucket
  • BronzeInfra - Bronze layer tables and EMR jobs
  • SilverInfra - Silver layer tables and EMR jobs
  • GoldInfra - Gold layer tables and EMR jobs
  • DataEtlFlow - Step Functions state machine for ETL pipeline
  • DataCollector - ECS task definitions for collectors

Pulumi Stacks: dev, prod

Reference: packages/data-lake/data-lake-infra/src/index.ts

4. data-lake-orchestration

Purpose: Dagster orchestration for data pipeline management

Key Files:

  • src/data_lake_orchestration/defs.py - Dagster definitions
  • src/data_lake_orchestration/defs/assets.py - Asset definitions

Deployment: Dagster code location in ECS

Data Warehouse Domain

The Data Warehouse domain takes processed data from the Data Lake and loads it into specialized storage systems for analytics and search.

Project Structure

packages/data-warehouse/
├── data-warehouse-ingest/ # ETL jobs for warehouse destinations
├── data-warehouse-infra/ # Infrastructure as Code (Pulumi)
├── data-warehouse-opensearch-init/ # OpenSearch index initialization
└── data-warehouse-orchestration/ # Dagster orchestration

Components

1. data-warehouse-ingest

Purpose: Extracts data from Iceberg tables and loads into warehouse destinations

Key Directories:

  • data_warehouse_ingest/connectors/ - Connection modules (Iceberg, OpenSearch, ClickHouse)
  • data_warehouse_ingest/generic_ingest_flows/ - Reusable ingestion pipelines
  • data_warehouse_ingest/use_cases/ - Use case specific processing (clustering)
  • data_warehouse_ingest/funnel_events/ - Funnel events ingestion

Key Files:

  • connectors/iceberg_connector.py - Iceberg table reader
  • connectors/open_search_connector.py - OpenSearch writer
  • connectors/click_house_connector.py - ClickHouse writer
  • generic_ingest_flows/opensearch_ingest.py - OpenSearch ingestion pipeline
  • generic_ingest_flows/offerings_os_ingest.py - Offerings to OpenSearch pipeline
  • generic_ingest_flows/clickhouse_ingest.py - ClickHouse ingestion pipeline
  • use_cases/use_case_ingest.py - Use cases clustering pipeline
  • use_cases/incremental_clustering_opensearch.py - Incremental DBSCAN clustering

Warehouse Destinations:

  • OpenSearch: Semantic search with KNN indexes for embeddings
  • ClickHouse: High-performance analytics and real-time queries

Deployment: EMR Serverless Spark jobs

Reference: packages/data-warehouse/data-warehouse-ingest/README.md

2. data-warehouse-infra

Purpose: Infrastructure as Code for Data Warehouse components

Key Files:

  • src/index.ts - Main infrastructure entry point
  • src/clickHouseInfra.ts - ClickHouse infrastructure setup
  • src/db-ingest/openSearchIngest.ts - OpenSearch ingestion infrastructure
  • src/db-ingest/useCasesIngest.ts - Use cases ingestion infrastructure
  • src/db-ingest/clickHouseIngest.ts - ClickHouse ingestion infrastructure

Key Components:

  • EmrServerlessApp - EMR Serverless application for Spark jobs
  • OpenSearchIngest - Scheduled OpenSearch ingestion jobs
  • UseCasesIngest - Scheduled use cases clustering jobs
  • ClickHouseInfra - ClickHouse infrastructure and materialized views
  • KnnIndex - OpenSearch KNN index definitions
  • OpenSearchUser - OpenSearch user and role management

Pulumi Stacks: dev, prod

Reference: packages/data-warehouse/data-warehouse-infra/README.md

3. data-warehouse-opensearch-init

Purpose: OpenSearch index initialization scripts

Key Files:

  • src/scripts/init-opensearch.sh - Index creation script
  • src/indices/wordpress-offerings.json - Index mapping definitions

Deployment: One-time initialization job

4. data-warehouse-orchestration

Purpose: Dagster orchestration for warehouse jobs

Key Files:

  • src/data_lake_orchestration/defs.py - Dagster definitions
  • src/data_lake_orchestration/defs/assets.py - Asset definitions

Deployment: Dagster code location in ECS

Data Flow Architecture

Collection → Processing → Storage

1. Collection (data-lake-collector)

Raw JSON files → S3 bronze bucket

2. Bronze Processing (data-lake-etl)

Iceberg tables → S3 bronze bucket

3. Silver Processing (data-lake-etl)

Cleaned Iceberg tables → S3 silver bucket

4. Gold Processing (data-lake-etl)

Enriched Iceberg tables → S3 gold bucket

5. Warehouse Ingestion (data-warehouse-ingest)

OpenSearch indexes + ClickHouse tables

Layer Descriptions

Bronze Layer

  • Purpose: Raw data storage
  • Format: Iceberg tables
  • Location: bronze-dl-{id} S3 bucket
  • Tables: bronze.{source}_{entity} (e.g., bronze.stack_overflow_questions)

Silver Layer

  • Purpose: Cleaned and standardized data
  • Format: Iceberg tables
  • Location: silver-dl-{id} S3 bucket
  • Tables: silver.{entity} (e.g., silver.internet_posts)

Gold Layer

  • Purpose: Aggregated and enriched data
  • Format: Iceberg tables
  • Location: gold-dl-{id} S3 bucket
  • Tables: gold.{entity} (e.g., gold.internet_posts, gold.offering)
  • Enrichments: Embeddings, language detection, marketplace flags

Key Technologies

Processing

  • Apache Spark: Distributed data processing (via EMR Serverless)
  • Apache Iceberg: Table format for data lake storage
  • PySpark: Python API for Spark

Storage

  • S3: Object storage for Iceberg tables
  • AWS Glue: Data catalog and metadata management
  • OpenSearch: Search and analytics engine with KNN support
  • ClickHouse: Columnar database for analytics

Infrastructure

  • Pulumi: Infrastructure as Code
  • EMR Serverless: Serverless Spark execution
  • ECS: Container orchestration for collectors
  • Step Functions: Workflow orchestration
  • EventBridge: Scheduled triggers
  • Dagster: Data orchestration platform

Orchestration

  • Step Functions: ETL pipeline workflows
  • EventBridge: Scheduled triggers
  • Dagster: Asset-based data orchestration

Common Patterns

Adding a New Data Source

  1. Create Collector (data-lake-collector)

    • Add API client or Scrapy spider
    • Configure S3 output paths
  2. Create ETL Jobs (data-lake-etl)

    • Bronze: Raw ingestion
    • Silver: Cleaning and standardization
    • Gold: Aggregation and enrichment
  3. Create Infrastructure (data-lake-infra)

    • Add source-specific infrastructure component
    • Configure Step Functions workflow
    • Set up EventBridge schedules
  4. Deploy

    • Build and push Docker images
    • Deploy Pulumi infrastructure

Reference: See building-new-collectors.md for detailed steps

Adding a New Warehouse Destination

  1. Create Connector (data-warehouse-ingest)

    • Implement connector class
    • Add write logic
  2. Create Ingestion Flow (data-warehouse-ingest)

    • Implement ingestion pipeline
    • Add error handling and monitoring
  3. Create Infrastructure (data-warehouse-infra)

    • Add ingestion job component
    • Configure EMR Serverless job
    • Set up EventBridge schedule
  4. Deploy

    • Build and push Docker images
    • Deploy Pulumi infrastructure

Monitoring and Debugging

Key Locations

  • Step Functions: Pipeline execution status
  • CloudWatch Logs: Job execution logs
  • EMR Serverless: Spark job details and Spark UI
  • S3 Buckets: Data storage inspection
  • Athena: Query Iceberg tables directly

Reference: See monitoring-etl-pipelines.md for detailed monitoring guide

Configuration Management

Pulumi Configuration

Configuration is managed per stack (dev/prod):

# View configuration
cd packages/data-lake/data-lake-infra
pulumi config --stack dev

# Set configuration
pulumi config set --stack dev --secret apiKey value

# View outputs
pulumi stack output --stack dev

Environment Variables

  • Collectors: Configured in data_lake_collector/settings.py
  • ETL Jobs: Passed via EMR Serverless job parameters
  • Warehouse Jobs: Configured in Pulumi infrastructure

Testing

Unit Tests

# Data Lake Collector
pnpm nx test @application/data-lake-collector

# Data Lake ETL
pnpm nx test @application/data-lake-etl

# Data Warehouse Ingest
pnpm nx test @application/data-warehouse-ingest

E2E Tests

# Data Warehouse E2E (requires Docker)
cd packages/data-warehouse/data-warehouse-ingest
pnpm nx test @application/data-warehouse-ingest --testPathPattern=e2e

Deployment

Building Images

# Build all data platform images
pnpm nx package data-lake-collector
pnpm nx package data-lake-etl
pnpm nx package data-warehouse-ingest
pnpm nx package data-warehouse-orchestration

Deploying Infrastructure

# Data Lake Infrastructure
cd packages/data-lake/data-lake-infra
NX_TUI=false pnpm nx pulumi data-lake-infra -- preview --stack dev
NX_TUI=false pnpm nx pulumi data-lake-infra -- up --stack dev

# Data Warehouse Infrastructure
cd packages/data-warehouse/data-warehouse-infra
NX_TUI=false pnpm nx pulumi data-warehouse-infra -- preview --stack dev
NX_TUI=false pnpm nx pulumi data-warehouse-infra -- up --stack dev

Reference: See deploying-infrastructure-updates.md for detailed deployment guide

References

  • Data Lake Collector: packages/data-lake/data-lake-collector/README.md
  • Data Lake ETL: packages/data-lake/data-lake-etl/README.md
  • Data Warehouse Ingest: packages/data-warehouse/data-warehouse-ingest/README.md
  • Data Warehouse Infra: packages/data-warehouse/data-warehouse-infra/README.md
  • Building New Collectors: building-new-collectors.md
  • Monitoring ETL Pipelines: monitoring-etl-pipelines.md
  • Deploying Infrastructure: deploying-infrastructure-updates.md