Data Platform Warehouse and Lake Project Structure
Overview
The data platform consists of two main domains: Data Lake and Data Warehouse. These work together to collect, process, and serve data for analytics and search capabilities.
Data Flow:
External Sources → Data Lake (Bronze → Silver → Gold) → Data Warehouse (OpenSearch, ClickHouse)
Data Lake Domain
The Data Lake domain handles raw data collection, transformation, and storage in Apache Iceberg format.
Project Structure
packages/data-lake/
├── data-lake-collector/ # Data collection (scrapers & API integrations)
├── data-lake-etl/ # ETL processing (Bronze → Silver → Gold)
├── data-lake-infra/ # Infrastructure as Code (Pulumi)
└── data-lake-orchestration/ # Dagster orchestration
Components
1. data-lake-collector
Purpose: Collects raw data from external sources (APIs, web scraping)
Key Files:
data_lake_collector/spiders/- Scrapy spider implementationsdata_lake_collector/api_integrations/- API client integrationsdata_lake_collector/settings.py- Configuration and S3 feed pathsdata_lake_collector/run.sh- Universal runner script
Supported Collectors:
- API Integrations: Stack Exchange, Reddit
- Scrapy Spiders: Atlassian Forums
Deployment: ECS tasks triggered by EventBridge schedules
Reference: packages/data-lake/data-lake-collector/README.md
2. data-lake-etl
Purpose: Processes raw data through Bronze → Silver → Gold layers
Key Directories:
data_lake_etl/bronze/- Raw data ingestion (API sources, forums)data_lake_etl/silver/- Data cleaning and transformationdata_lake_etl/gold/- Aggregated and enriched datadata_lake_etl/one_step_etl/- Single-step ETL jobs (e.g., offerings)
Key Files:
bronze/api_source.py- Processes API integration outputsbronze/forums.py- Processes Scrapy spider outputssilver/*.py- Silver layer transformationsgold/internet_posts.py- Internet posts aggregationgold/add_col.py- Column enrichment (embeddings, language detection)gold/offering.py- Offerings processingshared_utils/langfuse_integration.py- LLM tracing integration
Storage Format: Apache Iceberg tables in S3
Deployment: EMR Serverless Spark jobs
Reference: packages/data-lake/data-lake-etl/README.md
3. data-lake-infra
Purpose: Infrastructure as Code for Data Lake components
Key Files:
src/index.ts- Main infrastructure entry pointsrc/dataLakeDB.ts- Glue database and S3 bucket creationsrc/bronze/- Bronze layer infrastructure componentssrc/silver/- Silver layer infrastructure componentssrc/gold/- Gold layer infrastructure componentssrc/running-flow/- Step Functions workflow definitionssrc/{source}.ts- Per-source infrastructure (e.g.,redditApi.ts,stackExchangeApi.ts)
Key Components:
DataLakeDB- Creates Glue database and S3 bucketBronzeInfra- Bronze layer tables and EMR jobsSilverInfra- Silver layer tables and EMR jobsGoldInfra- Gold layer tables and EMR jobsDataEtlFlow- Step Functions state machine for ETL pipelineDataCollector- ECS task definitions for collectors
Pulumi Stacks: dev, prod
Reference: packages/data-lake/data-lake-infra/src/index.ts
4. data-lake-orchestration
Purpose: Dagster orchestration for data pipeline management
Key Files:
src/data_lake_orchestration/defs.py- Dagster definitionssrc/data_lake_orchestration/defs/assets.py- Asset definitions
Deployment: Dagster code location in ECS
Data Warehouse Domain
The Data Warehouse domain takes processed data from the Data Lake and loads it into specialized storage systems for analytics and search.
Project Structure
packages/data-warehouse/
├── data-warehouse-ingest/ # ETL jobs for warehouse destinations
├── data-warehouse-infra/ # Infrastructure as Code (Pulumi)
├── data-warehouse-opensearch-init/ # OpenSearch index initialization
└── data-warehouse-orchestration/ # Dagster orchestration
Components
1. data-warehouse-ingest
Purpose: Extracts data from Iceberg tables and loads into warehouse destinations
Key Directories:
data_warehouse_ingest/connectors/- Connection modules (Iceberg, OpenSearch, ClickHouse)data_warehouse_ingest/generic_ingest_flows/- Reusable ingestion pipelinesdata_warehouse_ingest/use_cases/- Use case specific processing (clustering)data_warehouse_ingest/funnel_events/- Funnel events ingestion
Key Files:
connectors/iceberg_connector.py- Iceberg table readerconnectors/open_search_connector.py- OpenSearch writerconnectors/click_house_connector.py- ClickHouse writergeneric_ingest_flows/opensearch_ingest.py- OpenSearch ingestion pipelinegeneric_ingest_flows/offerings_os_ingest.py- Offerings to OpenSearch pipelinegeneric_ingest_flows/clickhouse_ingest.py- ClickHouse ingestion pipelineuse_cases/use_case_ingest.py- Use cases clustering pipelineuse_cases/incremental_clustering_opensearch.py- Incremental DBSCAN clustering
Warehouse Destinations:
- OpenSearch: Semantic search with KNN indexes for embeddings
- ClickHouse: High-performance analytics and real-time queries
Deployment: EMR Serverless Spark jobs
Reference: packages/data-warehouse/data-warehouse-ingest/README.md
2. data-warehouse-infra
Purpose: Infrastructure as Code for Data Warehouse components
Key Files:
src/index.ts- Main infrastructure entry pointsrc/clickHouseInfra.ts- ClickHouse infrastructure setupsrc/db-ingest/openSearchIngest.ts- OpenSearch ingestion infrastructuresrc/db-ingest/useCasesIngest.ts- Use cases ingestion infrastructuresrc/db-ingest/clickHouseIngest.ts- ClickHouse ingestion infrastructure
Key Components:
EmrServerlessApp- EMR Serverless application for Spark jobsOpenSearchIngest- Scheduled OpenSearch ingestion jobsUseCasesIngest- Scheduled use cases clustering jobsClickHouseInfra- ClickHouse infrastructure and materialized viewsKnnIndex- OpenSearch KNN index definitionsOpenSearchUser- OpenSearch user and role management
Pulumi Stacks: dev, prod
Reference: packages/data-warehouse/data-warehouse-infra/README.md
3. data-warehouse-opensearch-init
Purpose: OpenSearch index initialization scripts
Key Files:
src/scripts/init-opensearch.sh- Index creation scriptsrc/indices/wordpress-offerings.json- Index mapping definitions
Deployment: One-time initialization job
4. data-warehouse-orchestration
Purpose: Dagster orchestration for warehouse jobs
Key Files:
src/data_lake_orchestration/defs.py- Dagster definitionssrc/data_lake_orchestration/defs/assets.py- Asset definitions
Deployment: Dagster code location in ECS
Data Flow Architecture
Collection → Processing → Storage
1. Collection (data-lake-collector)
↓
Raw JSON files → S3 bronze bucket
2. Bronze Processing (data-lake-etl)
↓
Iceberg tables → S3 bronze bucket
3. Silver Processing (data-lake-etl)
↓
Cleaned Iceberg tables → S3 silver bucket
4. Gold Processing (data-lake-etl)
↓
Enriched Iceberg tables → S3 gold bucket
5. Warehouse Ingestion (data-warehouse-ingest)
↓
OpenSearch indexes + ClickHouse tables
Layer Descriptions
Bronze Layer
- Purpose: Raw data storage
- Format: Iceberg tables
- Location:
bronze-dl-{id}S3 bucket - Tables:
bronze.{source}_{entity}(e.g.,bronze.stack_overflow_questions)
Silver Layer
- Purpose: Cleaned and standardized data
- Format: Iceberg tables
- Location:
silver-dl-{id}S3 bucket - Tables:
silver.{entity}(e.g.,silver.internet_posts)
Gold Layer
- Purpose: Aggregated and enriched data
- Format: Iceberg tables
- Location:
gold-dl-{id}S3 bucket - Tables:
gold.{entity}(e.g.,gold.internet_posts,gold.offering) - Enrichments: Embeddings, language detection, marketplace flags
Key Technologies
Processing
- Apache Spark: Distributed data processing (via EMR Serverless)
- Apache Iceberg: Table format for data lake storage
- PySpark: Python API for Spark
Storage
- S3: Object storage for Iceberg tables
- AWS Glue: Data catalog and metadata management
- OpenSearch: Search and analytics engine with KNN support
- ClickHouse: Columnar database for analytics
Infrastructure
- Pulumi: Infrastructure as Code
- EMR Serverless: Serverless Spark execution
- ECS: Container orchestration for collectors
- Step Functions: Workflow orchestration
- EventBridge: Scheduled triggers
- Dagster: Data orchestration platform
Orchestration
- Step Functions: ETL pipeline workflows
- EventBridge: Scheduled triggers
- Dagster: Asset-based data orchestration
Common Patterns
Adding a New Data Source
-
Create Collector (
data-lake-collector)- Add API client or Scrapy spider
- Configure S3 output paths
-
Create ETL Jobs (
data-lake-etl)- Bronze: Raw ingestion
- Silver: Cleaning and standardization
- Gold: Aggregation and enrichment
-
Create Infrastructure (
data-lake-infra)- Add source-specific infrastructure component
- Configure Step Functions workflow
- Set up EventBridge schedules
-
Deploy
- Build and push Docker images
- Deploy Pulumi infrastructure
Reference: See building-new-collectors.md for detailed steps
Adding a New Warehouse Destination
-
Create Connector (
data-warehouse-ingest)- Implement connector class
- Add write logic
-
Create Ingestion Flow (
data-warehouse-ingest)- Implement ingestion pipeline
- Add error handling and monitoring
-
Create Infrastructure (
data-warehouse-infra)- Add ingestion job component
- Configure EMR Serverless job
- Set up EventBridge schedule
-
Deploy
- Build and push Docker images
- Deploy Pulumi infrastructure
Monitoring and Debugging
Key Locations
- Step Functions: Pipeline execution status
- CloudWatch Logs: Job execution logs
- EMR Serverless: Spark job details and Spark UI
- S3 Buckets: Data storage inspection
- Athena: Query Iceberg tables directly
Reference: See monitoring-etl-pipelines.md for detailed monitoring guide
Configuration Management
Pulumi Configuration
Configuration is managed per stack (dev/prod):
# View configuration
cd packages/data-lake/data-lake-infra
pulumi config --stack dev
# Set configuration
pulumi config set --stack dev --secret apiKey value
# View outputs
pulumi stack output --stack dev
Environment Variables
- Collectors: Configured in
data_lake_collector/settings.py - ETL Jobs: Passed via EMR Serverless job parameters
- Warehouse Jobs: Configured in Pulumi infrastructure
Testing
Unit Tests
# Data Lake Collector
pnpm nx test @application/data-lake-collector
# Data Lake ETL
pnpm nx test @application/data-lake-etl
# Data Warehouse Ingest
pnpm nx test @application/data-warehouse-ingest
E2E Tests
# Data Warehouse E2E (requires Docker)
cd packages/data-warehouse/data-warehouse-ingest
pnpm nx test @application/data-warehouse-ingest --testPathPattern=e2e
Deployment
Building Images
# Build all data platform images
pnpm nx package data-lake-collector
pnpm nx package data-lake-etl
pnpm nx package data-warehouse-ingest
pnpm nx package data-warehouse-orchestration
Deploying Infrastructure
# Data Lake Infrastructure
cd packages/data-lake/data-lake-infra
NX_TUI=false pnpm nx pulumi data-lake-infra -- preview --stack dev
NX_TUI=false pnpm nx pulumi data-lake-infra -- up --stack dev
# Data Warehouse Infrastructure
cd packages/data-warehouse/data-warehouse-infra
NX_TUI=false pnpm nx pulumi data-warehouse-infra -- preview --stack dev
NX_TUI=false pnpm nx pulumi data-warehouse-infra -- up --stack dev
Reference: See deploying-infrastructure-updates.md for detailed deployment guide
References
- Data Lake Collector:
packages/data-lake/data-lake-collector/README.md - Data Lake ETL:
packages/data-lake/data-lake-etl/README.md - Data Warehouse Ingest:
packages/data-warehouse/data-warehouse-ingest/README.md - Data Warehouse Infra:
packages/data-warehouse/data-warehouse-infra/README.md - Building New Collectors:
building-new-collectors.md - Monitoring ETL Pipelines:
monitoring-etl-pipelines.md - Deploying Infrastructure:
deploying-infrastructure-updates.md