Skip to content

BigQuery Sensors

BigQuery sensors allow tasks to wait for BigQuery tables, partitions, or query results before proceeding. This guide covers BigQuery-specific sensor types and their configuration.

Wait for BigQuery tables to be available.

name: "wait.for.users.table"
type: "bq.sensor.table"
description: "Wait for users table to be available"
parameters:
table_id: "project.dataset.users"
project_id: "my-project"

Wait for specific table partitions to be available.

name: "wait.for.daily.partition"
type: "bq.sensor.partition"
description: "Wait for today's partition to be available"
parameters:
table_id: "project.dataset.events"
partition_id: "{{ ds }}" # Today's date
project_id: "my-project"

Wait for query results to meet specific conditions.

name: "wait.for.data.availability"
type: "bq.sensor.query"
description: "Wait for data to be available"
parameters:
sql: "SELECT COUNT(*) FROM project.dataset.events WHERE dt = '{{ ds }}'"
project_id: "my-project"
name: "wait.for.users.table"
type: "bq.sensor.table"
description: "Wait for users table to be available"
parameters:
table_id: "project.dataset.users"
project_id: "my-project"
name: "wait.for.users.table"
type: "bq.sensor.table"
description: "Wait for users table to be available"
parameters:
table_id: "project.dataset.users"
project_id: "my-project"
# Optional: Check for specific conditions
check_conditions:
- "row_count > 0"
- "last_modified > '{{ prev_ds }}'"
name: "wait.for.daily.partition"
type: "bq.sensor.partition"
description: "Wait for today's partition to be available"
parameters:
table_id: "project.dataset.events"
partition_id: "{{ ds }}"
project_id: "my-project"
# Optional: Check partition conditions
check_conditions:
- "row_count > 1000"
- "data_quality_score > 0.95"
name: "wait.for.data.availability"
type: "bq.sensor.query"
description: "Wait for data to be available"
parameters:
sql: |
SELECT
COUNT(*) as row_count,
MIN(event_timestamp) as min_timestamp,
MAX(event_timestamp) as max_timestamp
FROM project.dataset.events
WHERE dt = '{{ ds }}'
project_id: "my-project"
# Optional: Check query result conditions
check_conditions:
- "row_count > 0"
- "min_timestamp >= '{{ ds }}'"
- "max_timestamp < '{{ next_ds }}'"
parameters:
table_id: "project.dataset.events"
partition_id: "{{ ds }}" # Today's date
date_filter: "{{ prev_ds }}" # Previous day
time_filter: "{{ ts }}" # Current timestamp
parameters:
table_id: "project.dataset.events"
partition_id: "{{ ds }}"
# Check for data from last 7 days
sql: |
SELECT COUNT(*) as row_count
FROM project.dataset.events
WHERE dt BETWEEN '{{ prev_ds }}' AND '{{ ds }}'
AND event_type = 'user_action'
project_id: "my-project"
check_conditions:
- "row_count > 10000"
# Wait for source data before processing
name: "wait.for.source.data"
type: "bq.sensor.table"
description: "Wait for source data to be available"
parameters:
table_id: "external_project.source_dataset.raw_events"
project_id: "external_project"
# Wait for daily partition before processing
name: "wait.for.daily.partition"
type: "bq.sensor.partition"
description: "Wait for daily partition to be available"
parameters:
table_id: "project.dataset.events"
partition_id: "{{ ds }}"
project_id: "my-project"
# Wait for data quality conditions
name: "wait.for.data.quality"
type: "bq.sensor.query"
description: "Wait for data quality conditions"
parameters:
sql: |
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT user_id) as unique_users,
COUNT(CASE WHEN event_type IS NULL THEN 1 END) as null_events
FROM project.dataset.events
WHERE dt = '{{ ds }}'
project_id: "my-project"
check_conditions:
- "total_rows > 0"
- "unique_users > 0"
- "null_events = 0"
# Wait for data from external project
name: "wait.for.external.data"
type: "bq.sensor.table"
description: "Wait for external data to be available"
parameters:
table_id: "external_project.dataset.table"
project_id: "external_project"
# Optional: Check for recent updates
check_conditions:
- "last_modified > '{{ prev_ds }}'"
  1. Clear purpose: Each sensor should have a clear, specific purpose
  2. Appropriate conditions: Choose conditions that accurately reflect data availability
  3. Error handling: Implement proper error handling and logging
  4. Performance: Optimize sensor queries for efficiency
  1. Use templates: Leverage Jinja templates for dynamic values
  2. Validate parameters: Ensure parameter values are correct
  3. Document purpose: Include clear descriptions of sensor behavior
  4. Test thoroughly: Validate sensor behavior in different scenarios
  1. Efficient queries: Use efficient queries for sensor conditions
  2. Appropriate filters: Use WHERE clauses to limit data scanning
  3. Index usage: Ensure queries use appropriate indexes
  4. Cost optimization: Minimize BigQuery costs for sensor queries
  • Issue: Sensor waits indefinitely
  • Solution: Check parameter values and data availability
  • Debug: Review sensor logs and data sources
  • Issue: Invalid parameter values
  • Solution: Validate parameter formats and values
  • Debug: Check parameter syntax and templates
  • Issue: Sensor cannot connect to BigQuery
  • Solution: Check connection configurations and credentials
  • Debug: Test BigQuery connections independently
  • Issue: Sensor queries run slowly
  • Solution: Optimize queries and use appropriate filters
  • Debug: Review query execution plans
  1. Test queries manually: Run sensor queries manually to verify results
  2. Check logs: Review sensor execution logs for errors
  3. Validate parameters: Ensure parameter values are correct
  4. Monitor costs: Track BigQuery costs for sensor queries