BigQuery Sensors
BigQuery sensors allow tasks to wait for BigQuery tables, partitions, or query results before proceeding. This guide covers BigQuery-specific sensor types and their configuration.
Sensor Types
Section titled “Sensor Types”Table Sensor (bq.sensor.table)
Section titled “Table Sensor (bq.sensor.table)”Wait for BigQuery tables to be available.
name: "wait.for.users.table"type: "bq.sensor.table"description: "Wait for users table to be available"parameters: table_id: "project.dataset.users" project_id: "my-project"Partition Sensor (bq.sensor.partition)
Section titled “Partition Sensor (bq.sensor.partition)”Wait for specific table partitions to be available.
name: "wait.for.daily.partition"type: "bq.sensor.partition"description: "Wait for today's partition to be available"parameters: table_id: "project.dataset.events" partition_id: "{{ ds }}" # Today's date project_id: "my-project"Query Sensor (bq.sensor.query)
Section titled “Query Sensor (bq.sensor.query)”Wait for query results to meet specific conditions.
name: "wait.for.data.availability"type: "bq.sensor.query"description: "Wait for data to be available"parameters: sql: "SELECT COUNT(*) FROM project.dataset.events WHERE dt = '{{ ds }}'" project_id: "my-project"Configuration
Section titled “Configuration”Basic Table Sensor
Section titled “Basic Table Sensor”name: "wait.for.users.table"type: "bq.sensor.table"description: "Wait for users table to be available"parameters: table_id: "project.dataset.users" project_id: "my-project"Advanced Table Sensor
Section titled “Advanced Table Sensor”name: "wait.for.users.table"type: "bq.sensor.table"description: "Wait for users table to be available"parameters: table_id: "project.dataset.users" project_id: "my-project" # Optional: Check for specific conditions check_conditions: - "row_count > 0" - "last_modified > '{{ prev_ds }}'"Partition Sensor with Conditions
Section titled “Partition Sensor with Conditions”name: "wait.for.daily.partition"type: "bq.sensor.partition"description: "Wait for today's partition to be available"parameters: table_id: "project.dataset.events" partition_id: "{{ ds }}" project_id: "my-project" # Optional: Check partition conditions check_conditions: - "row_count > 1000" - "data_quality_score > 0.95"Query Sensor with Complex Logic
Section titled “Query Sensor with Complex Logic”name: "wait.for.data.availability"type: "bq.sensor.query"description: "Wait for data to be available"parameters: sql: | SELECT COUNT(*) as row_count, MIN(event_timestamp) as min_timestamp, MAX(event_timestamp) as max_timestamp FROM project.dataset.events WHERE dt = '{{ ds }}' project_id: "my-project" # Optional: Check query result conditions check_conditions: - "row_count > 0" - "min_timestamp >= '{{ ds }}'" - "max_timestamp < '{{ next_ds }}'"Jinja Template Support
Section titled “Jinja Template Support”Dynamic Date References
Section titled “Dynamic Date References”parameters: table_id: "project.dataset.events" partition_id: "{{ ds }}" # Today's date date_filter: "{{ prev_ds }}" # Previous day time_filter: "{{ ts }}" # Current timestampComplex Template Logic
Section titled “Complex Template Logic”parameters: table_id: "project.dataset.events" partition_id: "{{ ds }}" # Check for data from last 7 days sql: | SELECT COUNT(*) as row_count FROM project.dataset.events WHERE dt BETWEEN '{{ prev_ds }}' AND '{{ ds }}' AND event_type = 'user_action' project_id: "my-project" check_conditions: - "row_count > 10000"Use Cases
Section titled “Use Cases”Data Pipeline Dependencies
Section titled “Data Pipeline Dependencies”# Wait for source data before processingname: "wait.for.source.data"type: "bq.sensor.table"description: "Wait for source data to be available"parameters: table_id: "external_project.source_dataset.raw_events" project_id: "external_project"Incremental Processing
Section titled “Incremental Processing”# Wait for daily partition before processingname: "wait.for.daily.partition"type: "bq.sensor.partition"description: "Wait for daily partition to be available"parameters: table_id: "project.dataset.events" partition_id: "{{ ds }}" project_id: "my-project"Data Quality Checks
Section titled “Data Quality Checks”# Wait for data quality conditionsname: "wait.for.data.quality"type: "bq.sensor.query"description: "Wait for data quality conditions"parameters: sql: | SELECT COUNT(*) as total_rows, COUNT(DISTINCT user_id) as unique_users, COUNT(CASE WHEN event_type IS NULL THEN 1 END) as null_events FROM project.dataset.events WHERE dt = '{{ ds }}' project_id: "my-project" check_conditions: - "total_rows > 0" - "unique_users > 0" - "null_events = 0"Cross-Project Dependencies
Section titled “Cross-Project Dependencies”# Wait for data from external projectname: "wait.for.external.data"type: "bq.sensor.table"description: "Wait for external data to be available"parameters: table_id: "external_project.dataset.table" project_id: "external_project" # Optional: Check for recent updates check_conditions: - "last_modified > '{{ prev_ds }}'"Best Practices
Section titled “Best Practices”Sensor Design
Section titled “Sensor Design”- Clear purpose: Each sensor should have a clear, specific purpose
- Appropriate conditions: Choose conditions that accurately reflect data availability
- Error handling: Implement proper error handling and logging
- Performance: Optimize sensor queries for efficiency
Parameter Configuration
Section titled “Parameter Configuration”- Use templates: Leverage Jinja templates for dynamic values
- Validate parameters: Ensure parameter values are correct
- Document purpose: Include clear descriptions of sensor behavior
- Test thoroughly: Validate sensor behavior in different scenarios
Query Optimization
Section titled “Query Optimization”- Efficient queries: Use efficient queries for sensor conditions
- Appropriate filters: Use WHERE clauses to limit data scanning
- Index usage: Ensure queries use appropriate indexes
- Cost optimization: Minimize BigQuery costs for sensor queries
Troubleshooting
Section titled “Troubleshooting”Common Issues
Section titled “Common Issues”Sensor Timeout
Section titled “Sensor Timeout”- Issue: Sensor waits indefinitely
- Solution: Check parameter values and data availability
- Debug: Review sensor logs and data sources
Parameter Errors
Section titled “Parameter Errors”- Issue: Invalid parameter values
- Solution: Validate parameter formats and values
- Debug: Check parameter syntax and templates
Connection Issues
Section titled “Connection Issues”- Issue: Sensor cannot connect to BigQuery
- Solution: Check connection configurations and credentials
- Debug: Test BigQuery connections independently
Query Performance
Section titled “Query Performance”- Issue: Sensor queries run slowly
- Solution: Optimize queries and use appropriate filters
- Debug: Review query execution plans
Debugging Tips
Section titled “Debugging Tips”- Test queries manually: Run sensor queries manually to verify results
- Check logs: Review sensor execution logs for errors
- Validate parameters: Ensure parameter values are correct
- Monitor costs: Track BigQuery costs for sensor queries