Building New Collectors
Collector Types
API Integration
- Direct API calls (e.g., Stack Exchange, Reddit)
- Structured JSON output
- Examples:
stack_overflow,reddit
Scrapy Spider
- Web scraping with Scrapy framework
- Handles dynamic content, pagination
- Examples:
atlassian_forums
Step-by-Step Checklist
1. Create Collector Code
Location: packages/data-lake/data-lake-collector/
For API Integration:
- Add module:
data_lake_collector/api_integrations/{name}.py - Implement API client class
- Add to
api_integrations/__init__.py
For Scrapy Spider:
- Add spider:
data_lake_collector/spiders/{name}.py - Define spider class inheriting from
scrapy.Spider - Configure in
settings.pyif needed
2. Update Settings (if needed)
File: packages/data-lake/data-lake-collector/data_lake_collector/settings.py
- Add environment variables
- Configure S3 feed paths
- Add API keys/secrets
3. Create Infrastructure Component
Location: packages/data-lake/data-lake-infra/src/{name}.ts
Base Structure:
import { EmrServerlessApp } from '@application/infra-core';
import { BronzeInfra } from './bronze/bronzeInfra';
import { DataCollector } from './bronze/dataCollector';
import { DataEtlFlow } from './running-flow/dataEtlFlow';
import { SilverInfra } from './silver/silverInfra';
export interface {Name}Args {
//
}
export class {Name} extends pulumi.ComponentResource {
constructor(name: string, args: {Name}Args, opts?: pulumi.ComponentResourceOptions) {
// Implementation
}
}
Reference Examples:
- API:
packages/data-lake/data-lake-infra/src/redditApi.ts - API:
packages/data-lake/data-lake-infra/src/stackExchangeApi.ts - Scrapy:
packages/data-lake/data-lake-infra/src/atlassianForums.ts
4. Configure in Index
File: packages/data-lake/data-lake-infra/src/index.ts
Add new instance:
new { Name }('{source-name}', {
dataArchiveEmrAppInstance: dataArchiveEmrAppInstance,
sparkJobLocationPrefix: dataArchiveSparkJobLocationPrefix,
alarmSnsTopicArn: alarmSnsTopicArn,
athenaCatalogName: athenaCatalog.name,
env: env,
vpcId: vpcId,
ecsClusterArn: ecsClusterArn,
scraperImage: dataLakeCollectorImage.imageName,
// [PLACEHOLDER: additional required config]
});
5. Add Pulumi Configuration
Set required secrets:
cd packages/data-lake/data-lake-infra
pulumi config set --stack dev/prod --secret {apiKeyName} {value}
pulumi config set --stack dev/prod --secret {otherSecretName} {value}
6. Deploy Infrastructure
Make sure to configure the current PR new image tag in order to have the most update python code in the emr serverless instance
NX_TUI=false pnpm nx pulumi data-lake-infra -- preview --stack dev
NX_TUI=false pnpm nx pulumi data-lake-infra -- up --stack dev
Code Examples
API Integration Collector
Collector Code (api_integrations/{name}.py):
from data_lake_collector.api_integrations.model import ApiIntegration
class {Name}Client:
def __init__(self, api_key: str):
# Initialize API client
pass
def fetch_data(self, **kwargs):
# Fetch and return data
pass
Infrastructure ({name}.ts):
- Follow
RedditApiorStackExchangeApipattern - Use
BronzeInfrawithapi_source.pySpark job - Use
SilverInfrawithapi_source.pySpark job
Scrapy Spider Collector
Spider Code (spiders/{name}.py):
import scrapy
class {Name}Spider(scrapy.Spider):
name = '{name}'
def start_requests(self):
# Define starting URLs
pass
def parse(self, response):
# Parse and yield items
pass
Infrastructure ({name}.ts):
- Follow
AtlassianForumspattern - Use
BronzeInfrawithforums.pySpark job - Use
SilverInfrawithforums.pySpark job
Testing
Local Testing
API Integration:
cd packages/data-lake/data-lake-collector
python -m data_lake_collector.api_integrations.{name} --args
Scrapy Spider:
SCRAPER_MODE=spider SPIDER={name} ./data_lake_collector/run.sh
Test Data Validation
- Verify JSON output format
- Check S3 bucket contents
- Validate schema matches expected structure
Deployment
- Build collector:
pnpm nx package data-lake-collector - Deploy infrastructure: See step 6 above
- Verify: Check Step Functions execution and CloudWatch logs
References
- Collector README:
packages/data-lake/data-lake-collector/README.md - Collector settings:
packages/data-lake/data-lake-collector/data_lake_collector/settings.py - Infrastructure index:
packages/data-lake/data-lake-infra/src/index.ts - DataCollector class:
packages/data-lake/data-lake-infra/src/bronze/dataCollector.ts - Naming conventions: [PLACEHOLDER: collector naming conventions]