Multiple Repositories
This guide demonstrates best practices for structuring and managing multiple data pipelines using the Datablast Data Platform. It shows how to organize pipelines, shared utilities, and scalable data architecture.
Overview
Section titled “Overview”This repository serves as a comprehensive example of how to structure and manage multiple data pipelines using our data platform. It demonstrates best practices for pipeline organization, shared utilities, and scalable data architecture.
Repository Structure
Section titled “Repository Structure”Root Level Organization
Section titled “Root Level Organization”client-data-pipeline/├── src/ # Shared utilities and scripts├── client-internal/ # Main client pipeline├── analytics/ # Analytics pipeline├── pipeline_test/ # Testing pipeline├── requirements.txt # Global Python dependencies└── README.mdPipeline Structure Pattern
Section titled “Pipeline Structure Pattern”Each pipeline follows a consistent structure:
pipeline_name/├── pipeline.yml # Pipeline configuration├── tasks/ # Task definitions│ ├── external_data_raw/ # Raw data ingestion│ ├── external_data/ # Processed external data│ ├── dashboard/ # Dashboard tables│ ├── mapping/ # Data mapping tables│ └── tableau_reports/ # Reporting tables└── func/ # Pipeline-specific functions (optional)Complete Example Structure
Section titled “Complete Example Structure”client-data-pipeline/├── requirements.txt # Global dependencies├── README.md # Project documentation├── src/ # Shared utilities and scripts│ ├── bigquery_functions.py│ ├── api_utils.py│ └── report_generator.py├── client-internal/ # Main client pipeline│ ├── pipeline.yml # Pipeline configuration│ ├── tasks/ # Task definitions│ │ ├── external_data_raw/ # Raw data ingestion│ │ │ ├── app_sales.task.yaml│ │ │ ├── app_sales.py│ │ │ └── requirements.txt # Pipeline-specific dependencies│ │ ├── external_data/ # Processed external data│ │ │ ├── app_sales.task.yaml│ │ │ └── app_sales.sql│ │ ├── dashboard/ # Dashboard tables│ │ │ ├── daily_metrics.task.yaml│ │ │ └── daily_metrics.sql│ │ └── mapping/ # Data mapping tables│ │ ├── country_codes.task.yaml│ │ └── country_codes.sql│ └── func/ # Pipeline-specific functions│ └── common_functions.sql├── analytics/ # Analytics pipeline│ ├── pipeline.yml # Pipeline configuration│ ├── tasks/ # Task definitions│ │ ├── reports/ # Reporting tables│ │ │ ├── daily_report.task.yaml│ │ │ ├── daily_report.sql│ │ │ └── requirements.txt # Pipeline-specific dependencies│ │ └── ml_models/ # Machine learning│ │ ├── churn_model.task.yaml│ │ └── churn_model.py│ └── func/ # Pipeline-specific functions│ └── analytics_functions.sql└── pipeline_test/ # Testing pipeline ├── pipeline.yml # Pipeline configuration ├── tasks/ # Task definitions │ ├── test_data/ # Test data generation │ │ ├── sample_data.task.yaml │ │ └── sample_data.sql │ └── validation/ # Data validation │ ├── data_quality.task.yaml │ └── data_quality.py └── func/ # Pipeline-specific functions └── test_functions.sqlPipeline Organization
Section titled “Pipeline Organization”1. Data Source Pipelines
Section titled “1. Data Source Pipelines”Multiple pipelines can be created for different data sources such as:
- Mobile app store data (iOS/Android sales and analytics)
- Attribution platforms (marketing analytics and user acquisition)
- Analytics platforms (user events and behavior tracking)
- Ad networks (revenue and performance data)
- Third-party APIs (various external data sources)
2. Analytics Pipelines
Section titled “2. Analytics Pipelines”Cross-functional analytics pipelines including:
- Marketing analytics (cross-platform performance metrics)
- Financial reporting (revenue, costs, and KPI calculations)
- Machine learning models (predictive analytics and insights)
- Business intelligence (operational and performance metrics)
3. Specialized Pipelines
Section titled “3. Specialized Pipelines”Domain-specific pipelines for:
- Social media performance tracking
- Customer feedback and rating analysis
- Periodic business metrics and reporting
- Custom business logic and calculations
Shared Utilities
Section titled “Shared Utilities”Root src/ Directory (Best Practice)
Section titled “Root src/ Directory (Best Practice)”As a best practice, create a root-level src/ directory (or any other named folder like utils/, shared/, common/) that contains shared utilities and can be imported across all pipelines in the same repository.
For comprehensive guidance on creating and managing shared utilities, see our dedicated Shared Utilities Best Practices guide, which covers:
- Core utility categories: BigQuery functions, API utilities, data processing, reporting
- Advanced patterns: Modular organization, configuration management, error handling
- Best practices: Function design, testing, performance optimization
- Integration: How to use utilities with Datablast platform features
- Maintenance: Version control, documentation, and updates
Pipeline-Specific Functions
Section titled “Pipeline-Specific Functions”Some pipelines have their own func/ directories for shared SQL functions:
Common Functions (client-internal/func/common_functions.sql):
Section titled “Common Functions (client-internal/func/common_functions.sql):”get_param_str(): Extract string parameters from event arraysget_param_int(): Extract integer parameters from event arraysget_param_bool(): Extract boolean parameters from event arraysget_param_double(): Extract double parameters from event arraysparse_version(): Parse version strings for comparison
Best Practices
Section titled “Best Practices”1. Pipeline Configuration (pipeline.yml)
Section titled “1. Pipeline Configuration (pipeline.yml)”id: pipeline-nameschedule: "0 8 * * *" # Cron schedulestart_date: "2023-01-01" # Pipeline start datedefault_connections: gcpConnectionId: client-gcp # Default GCP connectionnotifications: slack: - name: client-notifications connection: client-slack success: :tada: Pipeline has finished successfully! failure: :red_circle: Pipeline has failed!description: | Detailed description of the pipeline's purpose, data sources, and schedule.2. Task Organization
Section titled “2. Task Organization”Tasks are organized by data processing stage:
External Data Raw (external_data_raw/)
Section titled “External Data Raw (external_data_raw/)”- Python scripts for API data ingestion
- Handle authentication and data retrieval
- Store raw data in BigQuery
External Data (external_data/)
Section titled “External Data (external_data/)”- SQL transformations of raw data
- Data cleaning and standardization
- Dependencies on
external_data_rawtasks
Dashboard (dashboard/)
Section titled “Dashboard (dashboard/)”- Aggregated data for dashboards
- Daily, weekly, monthly summaries
- Business metrics and KPIs
Mapping (mapping/)
Section titled “Mapping (mapping/)”- Data mapping and lookup tables
- Country codes, app IDs, SKU mappings
- Reference data for other tasks
3. Task Configuration (task.yaml)
Section titled “3. Task Configuration (task.yaml)”name: task_nametype: bq.sql # or pythonrun: script_name.sql # or script_name.pydepends: # Task dependencies - upstream_task_1 - upstream_task_2tests: # Data quality tests columns: column_name: - name: not_null - name: accepted_values equal_to: value: [value1, value2]description: | Detailed description of the task, including example queries and column documentation.4. Python Task Annotations
Section titled “4. Python Task Annotations”Python tasks use special annotations for configuration:
# @blast.name: task_name# @blast.type: python# @blast.depends: upstream_task# @blast.secrets: SECRET_NAME:SECRET_NAME# @blast.instance: d1.large5. Data Quality Testing
Section titled “5. Data Quality Testing”Comprehensive testing framework:
- Column-level tests (not_null, accepted_values)
- Custom SQL tests for business logic
- Data freshness checks
- Referential integrity tests
Pipeline Examples
Section titled “Pipeline Examples”1. Data Ingestion Pipeline Example
Section titled “1. Data Ingestion Pipeline Example”Purpose: Ingest and process external data from various sources Schedule: Daily at 16:00 UTC Data Flow:
External API → external_data_raw → external_data → dashboardKey Tasks:
external_data_raw.data_source: Python script for API data retrievalexternal_data.data_source: SQL transformation and cleaningdashboard.daily_metrics: Daily aggregated metrics
2. Marketing Analytics Pipeline Example
Section titled “2. Marketing Analytics Pipeline Example”Purpose: Attribution and marketing analytics Schedule: Daily at 08:00 UTC Data Flow:
Marketing API → external_data_raw → external_data → dashboard → reportsKey Features:
- Handles large datasets with chunking strategy
- Multiple table partitioning for BigQuery quotas
- Retry logic for failed downloads
- Data mapping and transformation
3. Machine Learning Pipeline Example
Section titled “3. Machine Learning Pipeline Example”Purpose: Machine learning for predictive analytics Schedule: Daily at 23:00 UTC Components:
- Python ML models (XGBoost, scikit-learn)
- Feature engineering
- Model training and prediction
- Segment-specific models
Task Management
Section titled “Task Management”Dependencies
Section titled “Dependencies”Tasks are organized with clear dependencies:
depends: - external_data_raw.app_sales - external_data_raw.client_a_sales - external_data_raw.client_b_salesError Handling
Section titled “Error Handling”- Retry logic for API failures
- Graceful handling of missing data
- Comprehensive logging
- Slack notifications for failures
Resource Management
Section titled “Resource Management”- Instance sizing for large data processing
- BigQuery quota management
- Memory optimization for large datasets
Data Flow Patterns
Section titled “Data Flow Patterns”1. Raw → Processed → Aggregated
Section titled “1. Raw → Processed → Aggregated”API/External Source → Raw Tables → Processed Tables → Dashboard Tables2. Multi-Source Integration
Section titled “2. Multi-Source Integration”Source A ┐Source B ├→ Mapping Tables → Unified Tables → ReportsSource C ┘3. ML Pipeline Pattern
Section titled “3. ML Pipeline Pattern”Raw Data → Feature Engineering → Model Training → Predictions → Business MetricsKey Features
Section titled “Key Features”1. Scalability
Section titled “1. Scalability”- Modular pipeline design
- Shared utilities reduce code duplication
- Consistent patterns across pipelines
2. Maintainability
Section titled “2. Maintainability”- Clear separation of concerns
- Comprehensive documentation
- Standardized configuration
3. Reliability
Section titled “3. Reliability”- Comprehensive testing
- Error handling and retry logic
- Monitoring and alerting
4. Flexibility
Section titled “4. Flexibility”- Easy to add new data sources
- Configurable schedules and dependencies
- Extensible architecture
Getting Started
Section titled “Getting Started”1. Setting Up a New Pipeline
Section titled “1. Setting Up a New Pipeline”- Create pipeline directory structure
- Define
pipeline.ymlconfiguration - Implement tasks following established patterns
- Add data quality tests
- Configure notifications
2. Adding New Data Sources
Section titled “2. Adding New Data Sources”- Create
external_data_rawtask for API integration - Implement data transformation in
external_data - Add mapping tables if needed
- Create dashboard aggregations
- Add comprehensive tests
3. Best Practices Checklist
Section titled “3. Best Practices Checklist”- Use shared utilities from
src/directory - Follow consistent naming conventions
- Implement comprehensive testing
- Add detailed documentation
- Configure proper dependencies
- Set up monitoring and alerting
Real-World Example Structure
Section titled “Real-World Example Structure”Based on the repository structure, here’s how requirements are organized:
integration/test-pipeline-2/├── tasks/│ ├── metabase_checker/│ │ ├── metabase_checker.py│ │ ├── metabase_api.py│ │ └── requirements.txt # requests==2.28.1│ └── export/│ ├── export_task.py│ └── requirements.txt # sshtunnel, pandas, etc.Shared Utilities Example
Section titled “Shared Utilities Example”For detailed examples and implementation patterns, see the Shared Utilities Best Practices guide.
Pipeline-Specific Functions Example
Section titled “Pipeline-Specific Functions Example”-- func/common_functions.sqlCREATE OR REPLACE FUNCTION get_param_str(event_params ARRAY<STRUCT<key STRING, value STRUCT<string_value STRING>>>, param_name STRING)RETURNS STRING AS ( (SELECT value.string_value FROM UNNEST(event_params) WHERE key = param_name));
CREATE OR REPLACE FUNCTION get_param_int(event_params ARRAY<STRUCT<key STRING, value STRUCT<int_value INT64>>>, param_name STRING)RETURNS INT64 AS ( (SELECT value.int_value FROM UNNEST(event_params) WHERE key = param_name));
CREATE OR REPLACE FUNCTION get_param_bool(event_params ARRAY<STRUCT<key STRING, value STRUCT<bool_value BOOL>>>, param_name STRING)RETURNS BOOL AS ( (SELECT value.bool_value FROM UNNEST(event_params) WHERE key = param_name));
CREATE OR REPLACE FUNCTION get_param_double(event_params ARRAY<STRUCT<key STRING, value STRUCT<double_value FLOAT64>>>, param_name STRING)RETURNS FLOAT64 AS ( (SELECT value.double_value FROM UNNEST(event_params) WHERE key = param_name));
CREATE OR REPLACE FUNCTION parse_version(version_string STRING)RETURNS STRUCT<major INT64, minor INT64, patch INT64> AS ( STRUCT( CAST(SPLIT(version_string, '.')[OFFSET(0)] AS INT64) as major, CAST(SPLIT(version_string, '.')[OFFSET(1)] AS INT64) as minor, CAST(SPLIT(version_string, '.')[OFFSET(2)] AS INT64) as patch ));Advanced Patterns
Section titled “Advanced Patterns”1. Cross-Pipeline Dependencies
Section titled “1. Cross-Pipeline Dependencies”# Pipeline A depends on Pipeline B completiondepends: - pipeline_b.final_task2. Conditional Execution
Section titled “2. Conditional Execution”# Run task only on specific conditionsschedule: days: [monday, wednesday, friday] condition: {{ ds }} >= '2024-01-01'```
### 3. Dynamic Task Generation```python# Generate tasks dynamically based on data sourcesdef generate_tasks(data_sources): tasks = [] for source in data_sources: tasks.append({ 'name': f'ingest_{source}', 'type': 'python', 'run': f'ingest_{source}.py' }) return tasks4. Multi-Environment Support
Section titled “4. Multi-Environment Support”environments: development: schedule: "0 2 * * *" connections: gcp: dev-gcp-conn production: schedule: "0 1 * * *" connections: gcp: prod-gcp-conn```
## Monitoring and Alerting
### 1. Pipeline Health Monitoring```yaml# pipeline.ymlmonitoring: health_checks: - name: data_freshness query: SELECT COUNT(*) FROM staging.events WHERE dt = '{{ ds }}' threshold: 1000 - name: data_quality query: SELECT COUNT(*) FROM staging.events WHERE user_id IS NULL threshold: 02. Cost Monitoring
Section titled “2. Cost Monitoring”cost_monitoring: bigquery: daily_threshold: 100.0 hourly_threshold: 50.0 alerts:
### 3. Performance Monitoring```yaml# pipeline.ymlperformance_monitoring: slow_tasks: threshold: 3600 # 1 hour alerts: - slack: data-team resource_usage: memory_threshold: 80% cpu_threshold: 90%Security Best Practices
Section titled “Security Best Practices”1. Secret Management
Section titled “1. Secret Management”secrets: - API_KEY:API_KEY - DB_PASSWORD:DB_PASSWORD - ENCRYPTION_KEY:ENCRYPTION_KEY```
### 2. Access Control```yaml# pipeline.ymlaccess_control: roles: - name: data_engineer permissions: [read, write, execute] - name: data_analyst permissions: [read] environments: production: required_approval: true approvers: [data_team_lead]3. Data Encryption
Section titled “3. Data Encryption”data_protection: encryption: at_rest: true in_transit: true masking: sensitive_columns: [email, phone, ssn] masking_strategy: hash```
## Conclusion
This repository demonstrates a mature, production-ready data platform implementation with:
- **Clear separation of concerns** between pipelines- **Shared utilities** for common operations- **Comprehensive testing** and quality assurance- **Scalable architecture** for growing data needs- **Best practices** for maintainability and reliability
The structure and patterns shown here can serve as a template for other clients and use cases, ensuring consistency and quality across all data platform implementations.
## Next Steps
- Learn about [Project Structure](/docs/project/project-structure)- Explore [Shared Utilities Best Practices](/docs/guides/shared-utilities)- Review [Task Configuration](/docs/project/task-configuration)- Check out [Python Development](/docs/guides/python-development)- Explore [SQL Development](/docs/guides/sql-development)