Skip to main content

Building New Collectors

Collector Types

API Integration

  • Direct API calls (e.g., Stack Exchange, Reddit)
  • Structured JSON output
  • Examples: stack_overflow, reddit

Scrapy Spider

  • Web scraping with Scrapy framework
  • Handles dynamic content, pagination
  • Examples: atlassian_forums

Step-by-Step Checklist

1. Create Collector Code

Location: packages/data-lake/data-lake-collector/

For API Integration:

  • Add module: data_lake_collector/api_integrations/{name}.py
  • Implement API client class
  • Add to api_integrations/__init__.py

For Scrapy Spider:

  • Add spider: data_lake_collector/spiders/{name}.py
  • Define spider class inheriting from scrapy.Spider
  • Configure in settings.py if needed

2. Update Settings (if needed)

File: packages/data-lake/data-lake-collector/data_lake_collector/settings.py

  • Add environment variables
  • Configure S3 feed paths
  • Add API keys/secrets

3. Create Infrastructure Component

Location: packages/data-lake/data-lake-infra/src/{name}.ts

Base Structure:

import { EmrServerlessApp } from '@application/infra-core';
import { BronzeInfra } from './bronze/bronzeInfra';
import { DataCollector } from './bronze/dataCollector';
import { DataEtlFlow } from './running-flow/dataEtlFlow';
import { SilverInfra } from './silver/silverInfra';

export interface {Name}Args {
//
}

export class {Name} extends pulumi.ComponentResource {
constructor(name: string, args: {Name}Args, opts?: pulumi.ComponentResourceOptions) {
// Implementation
}
}

Reference Examples:

  • API: packages/data-lake/data-lake-infra/src/redditApi.ts
  • API: packages/data-lake/data-lake-infra/src/stackExchangeApi.ts
  • Scrapy: packages/data-lake/data-lake-infra/src/atlassianForums.ts

4. Configure in Index

File: packages/data-lake/data-lake-infra/src/index.ts

Add new instance:

new { Name }('{source-name}', {
dataArchiveEmrAppInstance: dataArchiveEmrAppInstance,
sparkJobLocationPrefix: dataArchiveSparkJobLocationPrefix,
alarmSnsTopicArn: alarmSnsTopicArn,
athenaCatalogName: athenaCatalog.name,
env: env,
vpcId: vpcId,
ecsClusterArn: ecsClusterArn,
scraperImage: dataLakeCollectorImage.imageName,
// [PLACEHOLDER: additional required config]
});

5. Add Pulumi Configuration

Set required secrets:

cd packages/data-lake/data-lake-infra
pulumi config set --stack dev/prod --secret {apiKeyName} {value}
pulumi config set --stack dev/prod --secret {otherSecretName} {value}

6. Deploy Infrastructure

Make sure to configure the current PR new image tag in order to have the most update python code in the emr serverless instance

NX_TUI=false pnpm nx pulumi data-lake-infra -- preview --stack dev
NX_TUI=false pnpm nx pulumi data-lake-infra -- up --stack dev

Code Examples

API Integration Collector

Collector Code (api_integrations/{name}.py):

from data_lake_collector.api_integrations.model import ApiIntegration

class {Name}Client:
def __init__(self, api_key: str):
# Initialize API client
pass

def fetch_data(self, **kwargs):
# Fetch and return data
pass

Infrastructure ({name}.ts):

  • Follow RedditApi or StackExchangeApi pattern
  • Use BronzeInfra with api_source.py Spark job
  • Use SilverInfra with api_source.py Spark job

Scrapy Spider Collector

Spider Code (spiders/{name}.py):

import scrapy

class {Name}Spider(scrapy.Spider):
name = '{name}'

def start_requests(self):
# Define starting URLs
pass

def parse(self, response):
# Parse and yield items
pass

Infrastructure ({name}.ts):

  • Follow AtlassianForums pattern
  • Use BronzeInfra with forums.py Spark job
  • Use SilverInfra with forums.py Spark job

Testing

Local Testing

API Integration:

cd packages/data-lake/data-lake-collector
python -m data_lake_collector.api_integrations.{name} --args

Scrapy Spider:

SCRAPER_MODE=spider SPIDER={name} ./data_lake_collector/run.sh

Test Data Validation

  • Verify JSON output format
  • Check S3 bucket contents
  • Validate schema matches expected structure

Deployment

  1. Build collector: pnpm nx package data-lake-collector
  2. Deploy infrastructure: See step 6 above
  3. Verify: Check Step Functions execution and CloudWatch logs

References

  • Collector README: packages/data-lake/data-lake-collector/README.md
  • Collector settings: packages/data-lake/data-lake-collector/data_lake_collector/settings.py
  • Infrastructure index: packages/data-lake/data-lake-infra/src/index.ts
  • DataCollector class: packages/data-lake/data-lake-infra/src/bronze/dataCollector.ts
  • Naming conventions: [PLACEHOLDER: collector naming conventions]