Skip to content

Multiple Repositories

This guide demonstrates best practices for structuring and managing multiple data pipelines using the Datablast Data Platform. It shows how to organize pipelines, shared utilities, and scalable data architecture.

This repository serves as a comprehensive example of how to structure and manage multiple data pipelines using our data platform. It demonstrates best practices for pipeline organization, shared utilities, and scalable data architecture.

client-data-pipeline/
├── src/ # Shared utilities and scripts
├── client-internal/ # Main client pipeline
├── analytics/ # Analytics pipeline
├── pipeline_test/ # Testing pipeline
├── requirements.txt # Global Python dependencies
└── README.md

Each pipeline follows a consistent structure:

pipeline_name/
├── pipeline.yml # Pipeline configuration
├── tasks/ # Task definitions
│ ├── external_data_raw/ # Raw data ingestion
│ ├── external_data/ # Processed external data
│ ├── dashboard/ # Dashboard tables
│ ├── mapping/ # Data mapping tables
│ └── tableau_reports/ # Reporting tables
└── func/ # Pipeline-specific functions (optional)
client-data-pipeline/
├── requirements.txt # Global dependencies
├── README.md # Project documentation
├── src/ # Shared utilities and scripts
│ ├── bigquery_functions.py
│ ├── api_utils.py
│ └── report_generator.py
├── client-internal/ # Main client pipeline
│ ├── pipeline.yml # Pipeline configuration
│ ├── tasks/ # Task definitions
│ │ ├── external_data_raw/ # Raw data ingestion
│ │ │ ├── app_sales.task.yaml
│ │ │ ├── app_sales.py
│ │ │ └── requirements.txt # Pipeline-specific dependencies
│ │ ├── external_data/ # Processed external data
│ │ │ ├── app_sales.task.yaml
│ │ │ └── app_sales.sql
│ │ ├── dashboard/ # Dashboard tables
│ │ │ ├── daily_metrics.task.yaml
│ │ │ └── daily_metrics.sql
│ │ └── mapping/ # Data mapping tables
│ │ ├── country_codes.task.yaml
│ │ └── country_codes.sql
│ └── func/ # Pipeline-specific functions
│ └── common_functions.sql
├── analytics/ # Analytics pipeline
│ ├── pipeline.yml # Pipeline configuration
│ ├── tasks/ # Task definitions
│ │ ├── reports/ # Reporting tables
│ │ │ ├── daily_report.task.yaml
│ │ │ ├── daily_report.sql
│ │ │ └── requirements.txt # Pipeline-specific dependencies
│ │ └── ml_models/ # Machine learning
│ │ ├── churn_model.task.yaml
│ │ └── churn_model.py
│ └── func/ # Pipeline-specific functions
│ └── analytics_functions.sql
└── pipeline_test/ # Testing pipeline
├── pipeline.yml # Pipeline configuration
├── tasks/ # Task definitions
│ ├── test_data/ # Test data generation
│ │ ├── sample_data.task.yaml
│ │ └── sample_data.sql
│ └── validation/ # Data validation
│ ├── data_quality.task.yaml
│ └── data_quality.py
└── func/ # Pipeline-specific functions
└── test_functions.sql

Multiple pipelines can be created for different data sources such as:

  • Mobile app store data (iOS/Android sales and analytics)
  • Attribution platforms (marketing analytics and user acquisition)
  • Analytics platforms (user events and behavior tracking)
  • Ad networks (revenue and performance data)
  • Third-party APIs (various external data sources)

Cross-functional analytics pipelines including:

  • Marketing analytics (cross-platform performance metrics)
  • Financial reporting (revenue, costs, and KPI calculations)
  • Machine learning models (predictive analytics and insights)
  • Business intelligence (operational and performance metrics)

Domain-specific pipelines for:

  • Social media performance tracking
  • Customer feedback and rating analysis
  • Periodic business metrics and reporting
  • Custom business logic and calculations

As a best practice, create a root-level src/ directory (or any other named folder like utils/, shared/, common/) that contains shared utilities and can be imported across all pipelines in the same repository.

For comprehensive guidance on creating and managing shared utilities, see our dedicated Shared Utilities Best Practices guide, which covers:

  • Core utility categories: BigQuery functions, API utilities, data processing, reporting
  • Advanced patterns: Modular organization, configuration management, error handling
  • Best practices: Function design, testing, performance optimization
  • Integration: How to use utilities with Datablast platform features
  • Maintenance: Version control, documentation, and updates

Some pipelines have their own func/ directories for shared SQL functions:

Common Functions (client-internal/func/common_functions.sql):

Section titled “Common Functions (client-internal/func/common_functions.sql):”
  • get_param_str(): Extract string parameters from event arrays
  • get_param_int(): Extract integer parameters from event arrays
  • get_param_bool(): Extract boolean parameters from event arrays
  • get_param_double(): Extract double parameters from event arrays
  • parse_version(): Parse version strings for comparison
id: pipeline-name
schedule: "0 8 * * *" # Cron schedule
start_date: "2023-01-01" # Pipeline start date
default_connections:
gcpConnectionId: client-gcp # Default GCP connection
notifications:
slack:
- name: client-notifications
connection: client-slack success: :tada: Pipeline has finished successfully! failure: :red_circle: Pipeline has failed!description: |
Detailed description of the pipeline's purpose,
data sources, and schedule.

Tasks are organized by data processing stage:

  • Python scripts for API data ingestion
  • Handle authentication and data retrieval
  • Store raw data in BigQuery
  • SQL transformations of raw data
  • Data cleaning and standardization
  • Dependencies on external_data_raw tasks
  • Aggregated data for dashboards
  • Daily, weekly, monthly summaries
  • Business metrics and KPIs
  • Data mapping and lookup tables
  • Country codes, app IDs, SKU mappings
  • Reference data for other tasks
name: task_name
type: bq.sql # or python
run: script_name.sql # or script_name.py
depends: # Task dependencies
- upstream_task_1
- upstream_task_2
tests: # Data quality tests
columns:
column_name:
- name: not_null
- name: accepted_values
equal_to:
value: [value1, value2]
description: |
Detailed description of the task,
including example queries and column documentation.

Python tasks use special annotations for configuration:

# @blast.name: task_name
# @blast.type: python
# @blast.depends: upstream_task
# @blast.secrets: SECRET_NAME:SECRET_NAME
# @blast.instance: d1.large

Comprehensive testing framework:

  • Column-level tests (not_null, accepted_values)
  • Custom SQL tests for business logic
  • Data freshness checks
  • Referential integrity tests

Purpose: Ingest and process external data from various sources Schedule: Daily at 16:00 UTC Data Flow:

External API → external_data_raw → external_data → dashboard

Key Tasks:

  • external_data_raw.data_source: Python script for API data retrieval
  • external_data.data_source: SQL transformation and cleaning
  • dashboard.daily_metrics: Daily aggregated metrics

Purpose: Attribution and marketing analytics Schedule: Daily at 08:00 UTC Data Flow:

Marketing API → external_data_raw → external_data → dashboard → reports

Key Features:

  • Handles large datasets with chunking strategy
  • Multiple table partitioning for BigQuery quotas
  • Retry logic for failed downloads
  • Data mapping and transformation

Purpose: Machine learning for predictive analytics Schedule: Daily at 23:00 UTC Components:

  • Python ML models (XGBoost, scikit-learn)
  • Feature engineering
  • Model training and prediction
  • Segment-specific models

Tasks are organized with clear dependencies:

depends:
- external_data_raw.app_sales
- external_data_raw.client_a_sales
- external_data_raw.client_b_sales
  • Retry logic for API failures
  • Graceful handling of missing data
  • Comprehensive logging
  • Slack notifications for failures
  • Instance sizing for large data processing
  • BigQuery quota management
  • Memory optimization for large datasets
API/External Source → Raw Tables → Processed Tables → Dashboard Tables
Source A ┐
Source B ├→ Mapping Tables → Unified Tables → Reports
Source C ┘
Raw Data → Feature Engineering → Model Training → Predictions → Business Metrics
  • Modular pipeline design
  • Shared utilities reduce code duplication
  • Consistent patterns across pipelines
  • Clear separation of concerns
  • Comprehensive documentation
  • Standardized configuration
  • Comprehensive testing
  • Error handling and retry logic
  • Monitoring and alerting
  • Easy to add new data sources
  • Configurable schedules and dependencies
  • Extensible architecture
  1. Create pipeline directory structure
  2. Define pipeline.yml configuration
  3. Implement tasks following established patterns
  4. Add data quality tests
  5. Configure notifications
  1. Create external_data_raw task for API integration
  2. Implement data transformation in external_data
  3. Add mapping tables if needed
  4. Create dashboard aggregations
  5. Add comprehensive tests
  • Use shared utilities from src/ directory
  • Follow consistent naming conventions
  • Implement comprehensive testing
  • Add detailed documentation
  • Configure proper dependencies
  • Set up monitoring and alerting

Based on the repository structure, here’s how requirements are organized:

integration/test-pipeline-2/
├── tasks/
│ ├── metabase_checker/
│ │ ├── metabase_checker.py
│ │ ├── metabase_api.py
│ │ └── requirements.txt # requests==2.28.1
│ └── export/
│ ├── export_task.py
│ └── requirements.txt # sshtunnel, pandas, etc.

For detailed examples and implementation patterns, see the Shared Utilities Best Practices guide.

-- func/common_functions.sql
CREATE OR REPLACE FUNCTION get_param_str(event_params ARRAY<STRUCT<key STRING, value STRUCT<string_value STRING>>>, param_name STRING)
RETURNS STRING AS (
(SELECT value.string_value FROM UNNEST(event_params) WHERE key = param_name)
);
CREATE OR REPLACE FUNCTION get_param_int(event_params ARRAY<STRUCT<key STRING, value STRUCT<int_value INT64>>>, param_name STRING)
RETURNS INT64 AS (
(SELECT value.int_value FROM UNNEST(event_params) WHERE key = param_name)
);
CREATE OR REPLACE FUNCTION get_param_bool(event_params ARRAY<STRUCT<key STRING, value STRUCT<bool_value BOOL>>>, param_name STRING)
RETURNS BOOL AS (
(SELECT value.bool_value FROM UNNEST(event_params) WHERE key = param_name)
);
CREATE OR REPLACE FUNCTION get_param_double(event_params ARRAY<STRUCT<key STRING, value STRUCT<double_value FLOAT64>>>, param_name STRING)
RETURNS FLOAT64 AS (
(SELECT value.double_value FROM UNNEST(event_params) WHERE key = param_name)
);
CREATE OR REPLACE FUNCTION parse_version(version_string STRING)
RETURNS STRUCT<major INT64, minor INT64, patch INT64> AS (
STRUCT(
CAST(SPLIT(version_string, '.')[OFFSET(0)] AS INT64) as major,
CAST(SPLIT(version_string, '.')[OFFSET(1)] AS INT64) as minor,
CAST(SPLIT(version_string, '.')[OFFSET(2)] AS INT64) as patch
)
);
# Pipeline A depends on Pipeline B completion
depends:
- pipeline_b.final_task
# Run task only on specific conditions
schedule:
days: [monday, wednesday, friday]
condition: {{ ds }} >= '2024-01-01'```
### 3. Dynamic Task Generation
```python
# Generate tasks dynamically based on data sources
def generate_tasks(data_sources):
tasks = []
for source in data_sources:
tasks.append({
'name': f'ingest_{source}',
'type': 'python',
'run': f'ingest_{source}.py'
})
return tasks
pipeline.yml
environments:
development:
schedule: "0 2 * * *" connections:
gcp: dev-gcp-conn production:
schedule: "0 1 * * *" connections:
gcp: prod-gcp-conn```
## Monitoring and Alerting
### 1. Pipeline Health Monitoring
```yaml
# pipeline.yml
monitoring:
health_checks:
- name: data_freshness query: SELECT COUNT(*) FROM staging.events WHERE dt = '{{ ds }}' threshold: 1000
- name: data_quality query: SELECT COUNT(*) FROM staging.events WHERE user_id IS NULL threshold: 0
pipeline.yml
cost_monitoring:
bigquery:
daily_threshold: 100.0
hourly_threshold: 50.0
alerts:
- slack: data-team email: [email protected]```
### 3. Performance Monitoring
```yaml
# pipeline.yml
performance_monitoring:
slow_tasks:
threshold: 3600 # 1 hour
alerts:
- slack: data-team resource_usage:
memory_threshold: 80%
cpu_threshold: 90%
task.yaml
secrets:
- API_KEY:API_KEY - DB_PASSWORD:DB_PASSWORD - ENCRYPTION_KEY:ENCRYPTION_KEY```
### 2. Access Control
```yaml
# pipeline.yml
access_control:
roles:
- name: data_engineer permissions: [read, write, execute]
- name: data_analyst permissions: [read]
environments:
production:
required_approval: true
approvers: [data_team_lead]
pipeline.yml
data_protection:
encryption:
at_rest: true
in_transit: true
masking:
sensitive_columns: [email, phone, ssn]
masking_strategy: hash```
## Conclusion
This repository demonstrates a mature, production-ready data platform implementation with:
- **Clear separation of concerns** between pipelines
- **Shared utilities** for common operations
- **Comprehensive testing** and quality assurance
- **Scalable architecture** for growing data needs
- **Best practices** for maintainability and reliability
The structure and patterns shown here can serve as a template for other clients and use cases, ensuring consistency and quality across all data platform implementations.
## Next Steps
- Learn about [Project Structure](/docs/project/project-structure)
- Explore [Shared Utilities Best Practices](/docs/guides/shared-utilities)
- Review [Task Configuration](/docs/project/task-configuration)
- Check out [Python Development](/docs/guides/python-development)
- Explore [SQL Development](/docs/guides/sql-development)