Skip to content

Sensor Overview

Sensor tasks in Datablast allow tasks to wait for external conditions before proceeding. This guide covers the available sensor types and basic configuration methods.

  • Table Sensor: Wait for BigQuery tables to be available
  • Partition Sensor: Wait for specific table partitions
  • Query Sensor: Wait for query results to meet conditions
  • GCS Object Sensor: Wait for Google Cloud Storage objects
  • S3 Key Sensor: Wait for Amazon S3 objects
  • Custom Logic: Implement custom sensor logic
  • API Sensors: Wait for API responses
  • Database Sensors: Wait for database conditions
name: "wait.for.users.table"
type: "bq.sensor.table"
description: "Wait for users table to be available"
parameters:
table_id: "project.dataset.users"
project_id: "my-project"
# @blast.name: wait.for.users.table
# @blast.type: bq.sensor.table
# @blast.description: Wait for users table to be available
# @blast.parameters.table_id: project.dataset.users
# @blast.parameters.project_id: my-project

Wait for BigQuery tables to be available.

name: "wait.for.users.table"
type: "bq.sensor.table"
description: "Wait for users table to be available"
parameters:
table_id: "project.dataset.users"
project_id: "my-project"

Wait for specific table partitions to be available.

name: "wait.for.daily.partition"
type: "bq.sensor.partition"
description: "Wait for today's partition to be available"
parameters:
table_id: "project.dataset.events"
partition_id: "{{ ds }}" # Today's date
project_id: "my-project"

Wait for query results to meet specific conditions.

name: "wait.for.data.availability"
type: "bq.sensor.query"
description: "Wait for data to be available"
parameters:
sql: "SELECT COUNT(*) FROM project.dataset.events WHERE dt = '{{ ds }}'"
project_id: "my-project"

Wait for Google Cloud Storage objects to be available.

name: "wait.for.gcs.files"
type: "gcs.sensor.object_sensor_with_prefix"
description: "Wait for files to be uploaded to GCS"
parameters:
bucket: "my-data-bucket"
prefix: "incoming/data/{{ ds }}"
project_id: "my-project"

Wait for Amazon S3 objects to be available.

name: "wait.for.s3.file"
type: "s3.sensor.key_sensor"
description: "Wait for S3 file to be available"
parameters:
bucket_name: "my-s3-bucket"
bucket_key: "data/{{ ds }}/events.parquet"
parameters:
# Connection parameters
project_id: "my-project"
connection_id: "my-connection"
# Sensor-specific parameters
table_id: "project.dataset.table"
partition_id: "{{ ds }}"
bucket: "my-bucket"
prefix: "data/{{ ds }}"
# Jinja template support
date_filter: "{{ ds }}"
time_filter: "{{ ts }}"

Sensors support Jinja templates for dynamic parameter values:

parameters:
table_id: "project.dataset.events"
partition_id: "{{ ds }}" # Today's date
date_filter: "{{ prev_ds }}" # Previous day
time_filter: "{{ ts }}" # Current timestamp
  1. Clear purpose: Each sensor should have a clear, specific purpose
  2. Appropriate conditions: Choose conditions that accurately reflect data availability
  3. Error handling: Implement proper error handling and logging
  4. Performance: Optimize sensor queries for efficiency
  1. Use templates: Leverage Jinja templates for dynamic values
  2. Validate parameters: Ensure parameter values are correct
  3. Document purpose: Include clear descriptions of sensor behavior
  4. Test thoroughly: Validate sensor behavior in different scenarios
  1. Track execution: Monitor sensor execution times and success rates
  2. Set alerts: Configure alerts for sensor failures
  3. Log results: Maintain detailed logs of sensor behavior
  4. Review regularly: Periodically review and optimize sensors
  • Issue: Sensor waits indefinitely
  • Solution: Check parameter values and data availability
  • Debug: Review sensor logs and data sources
  • Issue: Invalid parameter values
  • Solution: Validate parameter formats and values
  • Debug: Check parameter syntax and templates
  • Issue: Sensor cannot connect to data source
  • Solution: Check connection configurations and credentials
  • Debug: Test connections independently