Skip to content

Project Structure

Running tasks on Datablast requires a Git repository with a well-organized structure. This guide covers both single-pipeline and multi-pipeline repository patterns.

your-project/
├── pipeline.yml # Main pipeline configuration
├── requirements.txt # Global Python dependencies (optional)
├── README.md # Project documentation
└── tasks/ # Task definitions
├── staging/ # Raw data processing
│ ├── users.task.yaml
│ ├── users.sql
│ ├── events.task.yaml
│ └── events.sql
├── core_model/ # Core business logic
│ ├── user_metrics.task.yaml
│ ├── user_metrics.sql
│ ├── engagement.task.yaml
│ └── engagement.sql
├── analytics/ # Analytics and reporting
│ ├── daily_report.task.yaml
│ ├── daily_report.sql
│ ├── weekly_summary.task.yaml
│ └── weekly_summary.sql
├── ml_models/ # Machine learning
│ ├── churn_model.task.yaml
│ ├── churn_model.py
│ ├── requirements.txt # ML-specific dependencies
│ ├── feature_engineering.task.yaml
│ └── feature_engineering.py
└── export/ # Data export
├── csv_export.task.yaml
├── csv_export.py
├── api_export.task.yaml
└── api_export.py
your-project/
├── requirements.txt # Global dependencies
├── README.md # Project documentation
├── src/ # Shared utilities and scripts
│ ├── bigquery_functions.py
│ ├── api_utils.py
│ └── report_generator.py
├── client-internal/ # Main client pipeline
│ ├── pipeline.yml # Pipeline configuration
│ ├── tasks/ # Task definitions
│ │ ├── staging/
│ │ │ ├── users.task.yaml
│ │ │ ├── users.sql
│ │ │ └── requirements.txt # Pipeline-specific dependencies
│ │ └── core_model/
│ │ ├── metrics.task.yaml
│ │ └── metrics.sql
│ └── func/ # Pipeline-specific functions (optional)
│ └── common_functions.sql
├── analytics/ # Analytics pipeline
│ ├── pipeline.yml # Pipeline configuration
│ ├── tasks/ # Task definitions
│ │ ├── reports/
│ │ │ ├── daily_report.task.yaml
│ │ │ ├── daily_report.sql
│ │ │ └── requirements.txt # Pipeline-specific dependencies
│ │ └── ml_models/
│ │ ├── model.task.yaml
│ │ └── model.py
│ └── func/ # Pipeline-specific functions (optional)
│ └── analytics_functions.sql
└── pipeline_test/ # Testing pipeline
├── pipeline.yml # Pipeline configuration
├── tasks/ # Task definitions
│ ├── test_data/
│ │ ├── sample_data.task.yaml
│ │ └── sample_data.sql
│ └── validation/
│ ├── data_quality.task.yaml
│ └── data_quality.py
└── func/ # Pipeline-specific functions (optional)
└── test_functions.sql

Organize tasks by data processing stage:

  • staging/: Raw data ingestion and initial processing
  • core_model/: Core business logic and transformations
  • analytics/: Analytics and reporting tables
  • ml_models/: Machine learning models and features
  • export/: Data export and external integrations

Group tasks by business function:

  • marketing/: Marketing analytics and attribution
  • finance/: Financial reporting and metrics
  • product/: Product analytics and user behavior
  • operations/: Operational metrics and monitoring

Organize by data source:

  • mobile_app/: Mobile app analytics
  • web_analytics/: Web analytics data
  • attribution/: Attribution platform data
  • external_apis/: Third-party API integrations
  • YAML Task Files: Must end with .task.yaml (e.g., currency_rates_eu.task.yaml)
  • SQL Assets: Use .sql extension (e.g., currency_rates.sql, users.sql)
  • Python Assets: Use .py extension (e.g., churn_model.py, data_processor.py)
  • Use lowercase with underscores: user_metrics, daily_reports
  • Be descriptive: marketing_attribution instead of mkt_attr
  • Use consistent patterns: {layer}_{entity}_{metric}
  • Use hierarchical names: staging.users, core_model.user_metrics
  • Be descriptive: user_engagement_metrics instead of uem
  • Use consistent patterns: {layer}.{entity}_{metric}
id: my-pipeline
schedule: "0 1 * * *" # Daily at 1 AM
start_date: "2023-01-01"
description: Daily data processing pipeline
# Default connections for all tasks
default_connections:
gcpConnectionId: my-gcp-conn
aws_conn_id: aws-123
# Notification settings
notifications:
slack:
- name: data-team
connection: slack-123
success: ✅ Pipeline completed successfully!
failure: ❌ Pipeline failed!
discord:
- name: alerts
connection: discord-webhook
success: Pipeline completed successfully!
failure: Pipeline failed!
name: core_model.users
type: bq.sql
description: Create users table from activity data
depends:
- core_model.activity_daily
root_dir: tasks/core_model
run: users.sql
# Materialization settings
materialization:
type: table
strategy: create+replace
partition_by: dt
cluster_by: [user_pseudo_id, dt]
# Data quality tests
tests:
columns:
user_pseudo_id:
- name: not_null # blocking: true (default)
- name: unique
blocking: false # Must specify to override default
install_dt:
- name: not_null # blocking: true (default)
custom:
- name: valid_dates
equal_to: 0
query: |
SELECT count(*) FROM users WHERE install_dt > CURRENT_DATE()
# blocking: true (default)
# Schedule settings
schedule:
days: [monday, wednesday, friday] # Optional: run only on specific days

Create a root-level src/ directory for shared utilities that can be imported across all pipelines:

src/bigquery_functions.py
def get_bq_credentials(envName: str):
"""Get BigQuery credentials from environment variables"""
pass
def delete_bq_table(credentials, project, dataset, table):
"""Delete BigQuery table safely"""
pass
def upload_to_bq(dataframe, project, dataset, table, credentials, if_exists):
"""Upload pandas DataFrame to BigQuery"""
pass
# src/api_utils.py
def authenticate_api(api_key: str):
"""Generic API authentication"""
pass
def fetch_data_from_api(endpoint: str, params: dict):
"""Generic API data retrieval"""
pass

Some pipelines have their own func/ directories for shared SQL functions:

-- func/common_functions.sql
CREATE OR REPLACE FUNCTION get_param_str(event_params ARRAY<STRUCT<key STRING, value STRUCT<string_value STRING>>>, param_name STRING)
RETURNS STRING AS (
(SELECT value.string_value FROM UNNEST(event_params) WHERE key = param_name)
);
CREATE OR REPLACE FUNCTION get_param_int(event_params ARRAY<STRUCT<key STRING, value STRUCT<int_value INT64>>>, param_name STRING)
RETURNS INT64 AS (
(SELECT value.int_value FROM UNNEST(event_params) WHERE key = param_name)
);

The platform searches for requirements.txt files hierarchically:

  1. Task Directory: Look for requirements.txt in the same directory as your Python task
  2. Parent Directories: Search upward through parent directories
  3. Repository Root: Check the root directory of your repository
your-project/
├── requirements.txt # Global dependencies
└── tasks/
├── ml_models/
│ ├── churn_model.py
│ └── requirements.txt # ML-specific dependencies
└── export/
├── csv_export.py
└── requirements.txt # Export-specific dependencies
  • Break pipelines into small, focused tasks
  • Use clear separation of concerns
  • Implement reusable components
  • Use descriptive, hierarchical task names
  • Follow consistent file naming conventions
  • Maintain clear directory structure
  • Include comprehensive README files
  • Document task purposes and dependencies
  • Provide usage examples
  • Implement data quality tests
  • Use comprehensive validation
  • Include error handling
  • Set up proper notifications
  • Implement logging and alerting
  • Monitor pipeline performance