Skip to content

SQL Task Overview

SQL tasks in Datablast allow you to execute data transformation and analysis queries against various databases. This guide covers the supported SQL dialects and basic configuration methods.

Task TypeDatabaseDescription
bq.sqlBigQueryGoogle Cloud BigQuery with automatic materialization
sf.sqlSnowflakeSnowflake data warehouse
athena.sqlAWS AthenaAmazon Athena for S3 data analysis
pg.sqlPostgreSQLPostgreSQL databases

Define task information directly in your SQL file using annotations:

-- @blast.name: core_model.users
-- @blast.type: bq.sql
-- @blast.description: Create users table from activity data
-- @blast.depends: core_model.activity_daily
-- @blast.materialization.type: table
-- @blast.materialization.strategy: create+replace
-- @blast.materialization.partition_by: dt
-- @blast.materialization.cluster_by: user_id,event_type
-- @blast.tests.columns.user_id: not_null,unique
-- @blast.tests.columns.daily_active_users: not_null,positive
SELECT
user_id,
event_type,
COUNT(*) as event_count,
CURRENT_TIMESTAMP() as created_at
FROM staging.events
WHERE event_date = '{{ ds }}'
GROUP BY 1, 2

Define task information in a separate YAML file:

name: "core_model.users"
type: "bq.sql"
description: "Create users table from activity data"
depends:
- core_model.activity_daily
run: "users.sql"
# Materialization settings
materialization:
type: "table"
strategy: "create+replace"
partition_by: "dt"
cluster_by: ["user_id", "event_type"]
# Data quality tests
tests:
columns:
user_id:
- name: "not_null" # blocking: true (default)
- name: "unique"
blocking: false # Must specify to override default
daily_active_users:
- name: "not_null" # blocking: true (default)
- name: "positive" # blocking: true (default)
name: "task.name" # Unique task identifier
type: "bq.sql" # Task type
description: "Task description" # Human-readable description
run: "script.sql" # Script file to execute
depends:
- task1
- task2
root_dir: "tasks/analytics" # Root directory for task files
connections:
gcp: "my-gcp-conn" # Database connections
depends:
- task1
- task2
depends:
- "task1"
- "task2"
condition: "task1.status == 'success'"
connections:
gcp: "my-gcp-conn" # BigQuery connection
snowflake: "my-snowflake-conn" # Snowflake connection
aws: "my-aws-conn" # AWS connection
postgres: "my-pg-conn" # PostgreSQL connection
  1. Single Responsibility: Each task should have one clear purpose
  2. Idempotency: Tasks should be safe to run multiple times
  3. Error Handling: Implement proper error handling and logging
  4. Resource Efficiency: Use appropriate resources for task complexity
  1. Incremental Processing: Use incremental strategies for large datasets
  2. Partitioning: Implement appropriate partitioning strategies
  3. Clustering: Use clustering for frequently queried columns
  4. Resource Right-sizing: Match resources to task requirements