Jinja Date Templates
This guide provides comprehensive information about using Jinja date templates in SQL and Python tasks, including pipeline interval logic and best practices.
Overview
Section titled “Overview”Jinja date templates allow you to dynamically reference dates and times in your SQL and Python tasks. These templates are automatically populated by Airflow based on your pipeline’s schedule and execution context.
Key Concepts
Section titled “Key Concepts”- Data Interval: The time period your task is processing
- Logical Date: The date the task is scheduled to run
- Execution Date: When the task actually runs
- Template Rendering: Jinja templates are rendered before task execution
Available Date Variables
Section titled “Available Date Variables”Core Date Variables
Section titled “Core Date Variables”| Variable | Format | Description | Example |
|---|---|---|---|
{{ ds }} | YYYY-MM-DD | Logical date (data interval start) | 2024-01-15 |
{{ ds_nodash }} | YYYYMMDD | Logical date without dashes | 20240115 |
{{ ts }} | YYYY-MM-DDTHH:MM:SS+00:00 | Logical timestamp with timezone | 2024-01-15T00:00:00+00:00 |
{{ ts_nodash }} | YYYYMMDDTHHMMSS | Logical timestamp without dashes | 20240115T000000 |
{{ ts_nodash_with_tz }} | YYYYMMDDTHHMMSS+00:00 | Timestamp with timezone, no dashes | 20240115T000000+00:00 |
Data Interval Variables
Section titled “Data Interval Variables”| Variable | Format | Description | Example |
|---|---|---|---|
{{ data_interval_start }} | Pendulum DateTime | Start of data interval | 2024-01-15T00:00:00+00:00 |
{{ data_interval_end }} | Pendulum DateTime | End of data interval | 2024-01-16T00:00:00+00:00 |
{{ start_date }} | YYYY-MM-DD | Data interval start date | 2024-01-15 |
{{ end_date }} | YYYY-MM-DD | Data interval end date | 2024-01-16 |
{{ start_date_nodash }} | YYYYMMDD | Start date without dashes | 20240115 |
{{ end_date_nodash }} | YYYYMMDD | End date without dashes | 20240116 |
{{ start_datetime }} | YYYY-MM-DDTHH:MM:SS | Start datetime without timezone | 2024-01-15T00:00:00 |
{{ end_datetime }} | YYYY-MM-DDTHH:MM:SS | End datetime without timezone | 2024-01-16T00:00:00 |
{{ start_datetime_with_tz }} | YYYY-MM-DDTHH:MM:SSZ | Start datetime with UTC timezone | 2024-01-15T00:00:00Z |
{{ end_datetime_with_tz }} | YYYY-MM-DDTHH:MM:SSZ | End datetime with UTC timezone | 2024-01-16T00:00:00Z |
SQL Usage
Section titled “SQL Usage”Basic SQL Examples
Section titled “Basic SQL Examples”-- Daily data processingSELECT *FROM raw.eventsWHERE event_date = '{{ ds }}' AND created_at >= '{{ start_datetime_with_tz }}' AND created_at < '{{ end_datetime_with_tz }}';
-- Date range processingSELECT *FROM staging.user_activityWHERE activity_date BETWEEN '{{ start_date }}' AND '{{ end_date }}';
-- Partitioned table querySELECT *FROM analytics.daily_metricsWHERE dt = '{{ ds }}';Advanced SQL Patterns
Section titled “Advanced SQL Patterns”-- Incremental processing with date calculationsDECLARE run_dt DATE DEFAULT '{{ ds }}';DECLARE start_dt DATE DEFAULT DATE_SUB(run_dt, INTERVAL 7 DAY);
-- Delete and insert patternDELETE FROM core_model.user_metricsWHERE dt BETWEEN start_dt AND run_dt;
INSERT INTO core_model.user_metricsSELECT user_id, run_dt as dt, COUNT(*) as daily_events, SUM(revenue) as daily_revenueFROM staging.eventsWHERE event_date BETWEEN start_dt AND run_dtGROUP BY 1, 2;
-- Dynamic table namingSELECT *FROM `project.dataset.table_{{ ds_nodash }}`;
-- Conditional logic based on dateSELECT *, CASE WHEN '{{ ds }}' = '2024-01-01' THEN 'New Year Processing' WHEN '{{ ds }}' = '2024-12-31' THEN 'Year End Processing' ELSE 'Regular Processing' END as processing_typeFROM raw.eventsWHERE event_date = '{{ ds }}';SQL with Date Utility Functions
Section titled “SQL with Date Utility Functions”-- Using date utility functionsSELECT *FROM staging.eventsWHERE event_date = '{{ utils.date_add(ds, -1) }}' -- Previous day AND created_at >= '{{ utils.date_format(start_datetime, "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S") }}';
-- Custom date calculationsSELECT user_id, '{{ ds }}' as processing_date, DATE_DIFF('{{ ds }}', install_date, DAY) as days_since_installFROM core_model.usersWHERE install_date <= '{{ ds }}';Python Usage
Section titled “Python Usage”Environment Variables in Python
Section titled “Environment Variables in Python”Python tasks receive date information through environment variables:
import osfrom datetime import datetimeimport pandas as pd
# Access date variables through environment variablesdata_interval_start = os.getenv('BLAST_DATA_INTERVAL_START')data_interval_end = os.getenv('BLAST_DATA_INTERVAL_END')start_date = os.getenv('BLAST_START_DATE')end_date = os.getenv('BLAST_END_DATE')start_date_nodash = os.getenv('BLAST_START_DATE_NODASH')end_date_nodash = os.getenv('BLAST_END_DATE_NODASH')
# Convert to datetime objectsstart_dt = datetime.fromisoformat(data_interval_start.replace('Z', '+00:00'))end_dt = datetime.fromisoformat(data_interval_end.replace('Z', '+00:00'))
print(f"Processing data from {start_dt} to {end_dt}")Python Task Example
Section titled “Python Task Example”import osfrom datetime import datetime, timedeltaimport pandas as pdfrom google.cloud import bigquery
# Get date informationstart_date = os.getenv('BLAST_START_DATE')end_date = os.getenv('BLAST_END_DATE')start_date_nodash = os.getenv('BLAST_START_DATE_NODASH')
print(f"Processing period: {start_date} to {end_date}")
# Initialize BigQuery clientclient = bigquery.Client()
# Query with date parametersquery = f"""SELECT user_id, event_type, COUNT(*) as event_countFROM `project.dataset.events`WHERE event_date BETWEEN '{start_date}' AND '{end_date}'GROUP BY 1, 2"""
# Execute querydf = client.query(query).to_dataframe()
# Process dataprocessed_data = df.groupby('user_id')['event_count'].sum().reset_index()
# Save resultsoutput_table = f"project.dataset.user_metrics_{start_date_nodash}"processed_data.to_gbq(output_table, if_exists='replace')
print(f"Results saved to {output_table}")Python with Date Calculations
Section titled “Python with Date Calculations”import osfrom datetime import datetime, timedeltaimport pandas as pd
def calculate_date_range(base_date_str, days_back=7): """Calculate date range for processing""" base_date = datetime.strptime(base_date_str, '%Y-%m-%d') start_date = base_date - timedelta(days=days_back) return start_date.strftime('%Y-%m-%d'), base_date.strftime('%Y-%m-%d')
# Get current processing datecurrent_date = os.getenv('BLAST_START_DATE')
# Calculate date rangestart_date, end_date = calculate_date_range(current_date, days_back=7)
print(f"Processing 7-day window: {start_date} to {end_date}")
# Your processing logic hereprocess_data_window(start_date, end_date)
def process_data_window(start_date, end_date): """Process data for a specific date window""" # Implementation here passPipeline Interval Logic
Section titled “Pipeline Interval Logic”Understanding Data Intervals
Section titled “Understanding Data Intervals”The data interval represents the time period your task is processing. This is crucial for understanding what data your task should process.
Daily Pipeline Example
Section titled “Daily Pipeline Example”schedule: "0 4 * * *" # Run daily at 4 AM UTCFor a task running on 2024-01-15:
{{ ds }}=2024-01-15(logical date){{ data_interval_start }}=2024-01-15T00:00:00+00:00{{ data_interval_end }}=2024-01-16T00:00:00+00:00- Data to process: Events from 2024-01-15 (inclusive) to 2024-01-16 (exclusive)
Hourly Pipeline Example
Section titled “Hourly Pipeline Example”schedule: "0 * * * *" # Run hourlyFor a task running at 2024-01-15 14:00:
{{ ds }}=2024-01-15(logical date){{ data_interval_start }}=2024-01-15T14:00:00+00:00{{ data_interval_end }}=2024-01-15T15:00:00+00:00- Data to process: Events from 14:00 to 15:00 on 2024-01-15
Common Interval Patterns
Section titled “Common Interval Patterns”1. Daily Processing (Most Common)
Section titled “1. Daily Processing (Most Common)”-- Process yesterday's dataSELECT *FROM raw.eventsWHERE event_date = '{{ ds }}' AND created_at >= '{{ start_datetime_with_tz }}' AND created_at < '{{ end_datetime_with_tz }}';2. Weekly Processing
Section titled “2. Weekly Processing”-- Process last week's dataSELECT *FROM raw.eventsWHERE event_date BETWEEN '{{ utils.date_add(ds, -7) }}' AND '{{ ds }}';3. Monthly Processing
Section titled “3. Monthly Processing”-- Process last month's dataSELECT *FROM raw.eventsWHERE event_date >= DATE_TRUNC('{{ ds }}', MONTH) AND event_date < DATE_ADD(DATE_TRUNC('{{ ds }}', MONTH), INTERVAL 1 MONTH);4. Incremental Processing
Section titled “4. Incremental Processing”-- Process data incrementallyDECLARE run_dt DATE DEFAULT '{{ ds }}';DECLARE start_dt DATE DEFAULT DATE_SUB(run_dt, INTERVAL 1 DAY);
-- Delete existing data for the periodDELETE FROM core_model.user_metricsWHERE dt BETWEEN start_dt AND run_dt;
-- Insert new dataINSERT INTO core_model.user_metricsSELECT user_id, event_date as dt, COUNT(*) as event_countFROM staging.eventsWHERE event_date BETWEEN start_dt AND run_dtGROUP BY 1, 2;Date Utility Functions
Section titled “Date Utility Functions”Available Utility Functions
Section titled “Available Utility Functions”| Function | Description | Example |
|---|---|---|
date_add(ds, days) | Add/subtract days from date | {{ utils.date_add(ds, -7) }} |
date_format(date, input_format, output_format) | Format date string | {{ utils.date_format(ds, "%Y-%m-%d", "%Y/%m/%d") }} |
SQL Usage Examples
Section titled “SQL Usage Examples”-- Get previous daySELECT * FROM events WHERE event_date = '{{ utils.date_add(ds, -1) }}';
-- Get 7 days agoSELECT * FROM events WHERE event_date = '{{ utils.date_add(ds, -7) }}';
-- Get next daySELECT * FROM events WHERE event_date = '{{ utils.date_add(ds, 1) }}';
-- Format date differentlySELECT * FROM events WHERE event_date = '{{ utils.date_format(ds, "%Y-%m-%d", "%Y/%m/%d") }}';Python Usage Examples
Section titled “Python Usage Examples”import osfrom datetime import datetime, timedelta
current_date = os.getenv('BLAST_START_DATE')
# Calculate previous dayprev_date = datetime.strptime(current_date, '%Y-%m-%d') - timedelta(days=1)prev_date_str = prev_date.strftime('%Y-%m-%d')
# Calculate 7 days agoweek_ago = datetime.strptime(current_date, '%Y-%m-%d') - timedelta(days=7)week_ago_str = week_ago.strftime('%Y-%m-%d')
print(f"Current: {current_date}")print(f"Previous: {prev_date_str}")print(f"Week ago: {week_ago_str}")Best Practices
Section titled “Best Practices”1. Always Use Data Interval Variables
Section titled “1. Always Use Data Interval Variables”✅ Good:
SELECT *FROM eventsWHERE created_at >= '{{ start_datetime_with_tz }}' AND created_at < '{{ end_datetime_with_tz }}';❌ Avoid:
SELECT *FROM eventsWHERE created_at >= '{{ ds }}' AND created_at < '{{ next_ds }}';2. Use Appropriate Date Formats
Section titled “2. Use Appropriate Date Formats”✅ Good:
-- For date comparisonsWHERE event_date = '{{ ds }}'
-- For datetime comparisonsWHERE created_at >= '{{ start_datetime_with_tz }}'❌ Avoid:
-- Mixing date and datetime formatsWHERE event_date = '{{ start_datetime }}'3. Handle Timezones Consistently
Section titled “3. Handle Timezones Consistently”✅ Good:
-- Use timezone-aware timestampsWHERE created_at >= '{{ start_datetime_with_tz }}' AND created_at < '{{ end_datetime_with_tz }}';❌ Avoid:
-- Mixing timezone-aware and naive timestampsWHERE created_at >= '{{ start_datetime }}' AND created_at < '{{ end_datetime_with_tz }}';4. Use Declare Statements for Complex Logic
Section titled “4. Use Declare Statements for Complex Logic”✅ Good:
DECLARE run_dt DATE DEFAULT '{{ ds }}';DECLARE start_dt DATE DEFAULT DATE_SUB(run_dt, INTERVAL 7 DAY);
SELECT *FROM eventsWHERE event_date BETWEEN start_dt AND run_dt;❌ Avoid:
-- Complex inline date calculationsSELECT *FROM eventsWHERE event_date BETWEEN DATE_SUB('{{ ds }}', INTERVAL 7 DAY) AND '{{ ds }}';Common Patterns
Section titled “Common Patterns”1. Daily Incremental Processing
Section titled “1. Daily Incremental Processing”-- tasks/core_model/daily_metrics.sqlDECLARE run_dt DATE DEFAULT '{{ ds }}';
-- Delete existing data for the dayDELETE FROM core_model.daily_metrics WHERE dt = run_dt;
-- Insert new dataINSERT INTO core_model.daily_metricsSELECT user_id, run_dt as dt, COUNT(*) as daily_events, SUM(revenue) as daily_revenueFROM staging.eventsWHERE event_date = run_dtGROUP BY 1, 2;2. Weekly Aggregation
Section titled “2. Weekly Aggregation”-- tasks/analytics/weekly_summary.sqlDECLARE run_dt DATE DEFAULT '{{ ds }}';DECLARE week_start DATE DEFAULT DATE_SUB(run_dt, INTERVAL 6 DAY);
SELECT user_id, run_dt as week_ending, COUNT(*) as weekly_events, SUM(revenue) as weekly_revenue, AVG(session_duration) as avg_session_durationFROM staging.eventsWHERE event_date BETWEEN week_start AND run_dtGROUP BY 1, 2;3. Monthly Rollup
Section titled “3. Monthly Rollup”-- tasks/analytics/monthly_rollup.sqlDECLARE run_dt DATE DEFAULT '{{ ds }}';DECLARE month_start DATE DEFAULT DATE_TRUNC(run_dt, MONTH);
SELECT user_id, month_start as month, COUNT(*) as monthly_events, SUM(revenue) as monthly_revenueFROM staging.eventsWHERE event_date >= month_start AND event_date < DATE_ADD(month_start, INTERVAL 1 MONTH)GROUP BY 1, 2;4. Python ML Training
Section titled “4. Python ML Training”import osfrom datetime import datetime, timedelta
# Get training datetraining_date = os.getenv('BLAST_START_DATE')
# Calculate training window (last 30 days)train_start = datetime.strptime(training_date, '%Y-%m-%d') - timedelta(days=30)train_start_str = train_start.strftime('%Y-%m-%d')
print(f"Training model with data from {train_start_str} to {training_date}")
# Query training dataquery = f"""SELECT *FROM core_model.user_featuresWHERE feature_date BETWEEN '{train_start_str}' AND '{training_date}'"""
# Train modeltrain_model(query, training_date)
def train_model(query, training_date): # Implementation here passTroubleshooting
Section titled “Troubleshooting”Common Issues
Section titled “Common Issues”1. Template Not Rendering
Section titled “1. Template Not Rendering”Problem: Jinja template shows as literal text
-- This shows as literal textWHERE event_date = '{{ ds }}'Solution: Ensure you’re using the correct task type and the template is properly formatted.
2. Date Format Mismatch
Section titled “2. Date Format Mismatch”Problem: Date comparison fails due to format differences
-- This might failWHERE event_date = '{{ start_datetime }}' -- Includes timeSolution: Use appropriate date format
-- Use date-only formatWHERE event_date = '{{ start_date }}'3. Timezone Issues
Section titled “3. Timezone Issues”Problem: Data appears to be missing due to timezone differences
-- This might miss dataWHERE created_at >= '{{ start_datetime }}' -- No timezoneSolution: Use timezone-aware timestamps
-- Use timezone-aware formatWHERE created_at >= '{{ start_datetime_with_tz }}'4. Python Environment Variables Not Available
Section titled “4. Python Environment Variables Not Available”Problem: Environment variables are None or empty
start_date = os.getenv('BLAST_START_DATE') # Returns NoneSolution: Ensure you’re using the correct environment variable names and the task is properly configured.
Debugging Tips
Section titled “Debugging Tips”- Log Template Values:
import osprint(f"Start date: {os.getenv('BLAST_START_DATE')}")print(f"End date: {os.getenv('BLAST_END_DATE')}")- Test SQL Queries:
-- Add debugging outputSELECT '{{ ds }}' as logical_date, '{{ start_date }}' as start_date, '{{ end_date }}' as end_date, COUNT(*) as record_countFROM eventsWHERE event_date = '{{ ds }}';- Validate Date Ranges:
-- Check date rangeSELECT MIN(event_date) as min_date, MAX(event_date) as max_date, COUNT(*) as record_countFROM eventsWHERE event_date BETWEEN '{{ start_date }}' AND '{{ end_date }}';Next Steps
Section titled “Next Steps”- Learn about SQL & Python Assets
- Explore BigQuery Integration
- Review Task Types
- Check out Project Structure