Project Structure
Running tasks on Datablast requires a Git repository with a well-organized structure. This guide covers both single-pipeline and multi-pipeline repository patterns.
Basic Repository Structure
Section titled “Basic Repository Structure”Single Pipeline Repository
Section titled “Single Pipeline Repository”your-project/├── pipeline.yml # Main pipeline configuration├── requirements.txt # Global Python dependencies (optional)├── README.md # Project documentation└── tasks/ # Task definitions ├── staging/ # Raw data processing │ ├── users.task.yaml │ ├── users.sql │ ├── events.task.yaml │ └── events.sql ├── core_model/ # Core business logic │ ├── user_metrics.task.yaml │ ├── user_metrics.sql │ ├── engagement.task.yaml │ └── engagement.sql ├── analytics/ # Analytics and reporting │ ├── daily_report.task.yaml │ ├── daily_report.sql │ ├── weekly_summary.task.yaml │ └── weekly_summary.sql ├── ml_models/ # Machine learning │ ├── churn_model.task.yaml │ ├── churn_model.py │ ├── requirements.txt # ML-specific dependencies │ ├── feature_engineering.task.yaml │ └── feature_engineering.py └── export/ # Data export ├── csv_export.task.yaml ├── csv_export.py ├── api_export.task.yaml └── api_export.pyMultiple Pipeline Repository
Section titled “Multiple Pipeline Repository”your-project/├── requirements.txt # Global dependencies├── README.md # Project documentation├── src/ # Shared utilities and scripts│ ├── bigquery_functions.py│ ├── api_utils.py│ └── report_generator.py├── client-internal/ # Main client pipeline│ ├── pipeline.yml # Pipeline configuration│ ├── tasks/ # Task definitions│ │ ├── staging/│ │ │ ├── users.task.yaml│ │ │ ├── users.sql│ │ │ └── requirements.txt # Pipeline-specific dependencies│ │ └── core_model/│ │ ├── metrics.task.yaml│ │ └── metrics.sql│ └── func/ # Pipeline-specific functions (optional)│ └── common_functions.sql├── analytics/ # Analytics pipeline│ ├── pipeline.yml # Pipeline configuration│ ├── tasks/ # Task definitions│ │ ├── reports/│ │ │ ├── daily_report.task.yaml│ │ │ ├── daily_report.sql│ │ │ └── requirements.txt # Pipeline-specific dependencies│ │ └── ml_models/│ │ ├── model.task.yaml│ │ └── model.py│ └── func/ # Pipeline-specific functions (optional)│ └── analytics_functions.sql└── pipeline_test/ # Testing pipeline ├── pipeline.yml # Pipeline configuration ├── tasks/ # Task definitions │ ├── test_data/ │ │ ├── sample_data.task.yaml │ │ └── sample_data.sql │ └── validation/ │ ├── data_quality.task.yaml │ └── data_quality.py └── func/ # Pipeline-specific functions (optional) └── test_functions.sqlDirectory Organization Patterns
Section titled “Directory Organization Patterns”1. Data Processing Layers
Section titled “1. Data Processing Layers”Organize tasks by data processing stage:
staging/: Raw data ingestion and initial processingcore_model/: Core business logic and transformationsanalytics/: Analytics and reporting tablesml_models/: Machine learning models and featuresexport/: Data export and external integrations
2. Functional Organization
Section titled “2. Functional Organization”Group tasks by business function:
marketing/: Marketing analytics and attributionfinance/: Financial reporting and metricsproduct/: Product analytics and user behavioroperations/: Operational metrics and monitoring
3. Data Source Organization
Section titled “3. Data Source Organization”Organize by data source:
mobile_app/: Mobile app analyticsweb_analytics/: Web analytics dataattribution/: Attribution platform dataexternal_apis/: Third-party API integrations
File Naming Conventions
Section titled “File Naming Conventions”Task Files
Section titled “Task Files”- YAML Task Files: Must end with
.task.yaml(e.g.,currency_rates_eu.task.yaml) - SQL Assets: Use
.sqlextension (e.g.,currency_rates.sql,users.sql) - Python Assets: Use
.pyextension (e.g.,churn_model.py,data_processor.py)
Directory Names
Section titled “Directory Names”- Use lowercase with underscores:
user_metrics,daily_reports - Be descriptive:
marketing_attributioninstead ofmkt_attr - Use consistent patterns:
{layer}_{entity}_{metric}
Task Names
Section titled “Task Names”- Use hierarchical names:
staging.users,core_model.user_metrics - Be descriptive:
user_engagement_metricsinstead ofuem - Use consistent patterns:
{layer}.{entity}_{metric}
Configuration Files
Section titled “Configuration Files”Pipeline Configuration (pipeline.yml)
Section titled “Pipeline Configuration (pipeline.yml)”id: my-pipelineschedule: "0 1 * * *" # Daily at 1 AMstart_date: "2023-01-01"description: Daily data processing pipeline# Default connections for all tasksdefault_connections: gcpConnectionId: my-gcp-conn aws_conn_id: aws-123# Notification settingsnotifications: slack: - name: data-team connection: slack-123 success: ✅ Pipeline completed successfully! failure: ❌ Pipeline failed! discord: - name: alerts connection: discord-webhook success: Pipeline completed successfully! failure: Pipeline failed!Task Configuration (task.yaml)
Section titled “Task Configuration (task.yaml)”name: core_model.userstype: bq.sqldescription: Create users table from activity datadepends: - core_model.activity_dailyroot_dir: tasks/core_modelrun: users.sql# Materialization settingsmaterialization: type: table strategy: create+replace partition_by: dt cluster_by: [user_pseudo_id, dt]
# Data quality teststests: columns: user_pseudo_id: - name: not_null # blocking: true (default) - name: unique blocking: false # Must specify to override default install_dt: - name: not_null # blocking: true (default) custom: - name: valid_dates equal_to: 0 query: | SELECT count(*) FROM users WHERE install_dt > CURRENT_DATE() # blocking: true (default)
# Schedule settingsschedule: days: [monday, wednesday, friday] # Optional: run only on specific daysShared Utilities
Section titled “Shared Utilities”Root src/ Directory (Best Practice)
Section titled “Root src/ Directory (Best Practice)”Create a root-level src/ directory for shared utilities that can be imported across all pipelines:
def get_bq_credentials(envName: str): """Get BigQuery credentials from environment variables""" pass
def delete_bq_table(credentials, project, dataset, table): """Delete BigQuery table safely""" pass
def upload_to_bq(dataframe, project, dataset, table, credentials, if_exists): """Upload pandas DataFrame to BigQuery""" pass
# src/api_utils.pydef authenticate_api(api_key: str): """Generic API authentication""" pass
def fetch_data_from_api(endpoint: str, params: dict): """Generic API data retrieval""" passPipeline-Specific Functions
Section titled “Pipeline-Specific Functions”Some pipelines have their own func/ directories for shared SQL functions:
-- func/common_functions.sqlCREATE OR REPLACE FUNCTION get_param_str(event_params ARRAY<STRUCT<key STRING, value STRUCT<string_value STRING>>>, param_name STRING)RETURNS STRING AS ( (SELECT value.string_value FROM UNNEST(event_params) WHERE key = param_name));
CREATE OR REPLACE FUNCTION get_param_int(event_params ARRAY<STRUCT<key STRING, value STRUCT<int_value INT64>>>, param_name STRING)RETURNS INT64 AS ( (SELECT value.int_value FROM UNNEST(event_params) WHERE key = param_name));Python Dependencies
Section titled “Python Dependencies”Requirements File Location
Section titled “Requirements File Location”The platform searches for requirements.txt files hierarchically:
- Task Directory: Look for
requirements.txtin the same directory as your Python task - Parent Directories: Search upward through parent directories
- Repository Root: Check the root directory of your repository
Example Requirements Organization
Section titled “Example Requirements Organization”your-project/├── requirements.txt # Global dependencies└── tasks/ ├── ml_models/ │ ├── churn_model.py │ └── requirements.txt # ML-specific dependencies └── export/ ├── csv_export.py └── requirements.txt # Export-specific dependenciesBest Practices
Section titled “Best Practices”1. Modular Design
Section titled “1. Modular Design”- Break pipelines into small, focused tasks
- Use clear separation of concerns
- Implement reusable components
2. Consistent Naming
Section titled “2. Consistent Naming”- Use descriptive, hierarchical task names
- Follow consistent file naming conventions
- Maintain clear directory structure
3. Documentation
Section titled “3. Documentation”- Include comprehensive README files
- Document task purposes and dependencies
- Provide usage examples
4. Testing
Section titled “4. Testing”- Implement data quality tests
- Use comprehensive validation
- Include error handling
5. Monitoring
Section titled “5. Monitoring”- Set up proper notifications
- Implement logging and alerting
- Monitor pipeline performance
Next Steps
Section titled “Next Steps”- Learn about Task Types
- Explore Pipeline Configuration
- Review SQL & Python Assets Guide
- Check out Multiple Repositories Best Practices