Skip to content

Jinja Date Templates

This guide provides comprehensive information about using Jinja date templates in SQL and Python tasks, including pipeline interval logic and best practices.

Jinja date templates allow you to dynamically reference dates and times in your SQL and Python tasks. These templates are automatically populated by Airflow based on your pipeline’s schedule and execution context.

  • Data Interval: The time period your task is processing
  • Logical Date: The date the task is scheduled to run
  • Execution Date: When the task actually runs
  • Template Rendering: Jinja templates are rendered before task execution
VariableFormatDescriptionExample
{{ ds }}YYYY-MM-DDLogical date (data interval start)2024-01-15
{{ ds_nodash }}YYYYMMDDLogical date without dashes20240115
{{ ts }}YYYY-MM-DDTHH:MM:SS+00:00Logical timestamp with timezone2024-01-15T00:00:00+00:00
{{ ts_nodash }}YYYYMMDDTHHMMSSLogical timestamp without dashes20240115T000000
{{ ts_nodash_with_tz }}YYYYMMDDTHHMMSS+00:00Timestamp with timezone, no dashes20240115T000000+00:00
VariableFormatDescriptionExample
{{ data_interval_start }}Pendulum DateTimeStart of data interval2024-01-15T00:00:00+00:00
{{ data_interval_end }}Pendulum DateTimeEnd of data interval2024-01-16T00:00:00+00:00
{{ start_date }}YYYY-MM-DDData interval start date2024-01-15
{{ end_date }}YYYY-MM-DDData interval end date2024-01-16
{{ start_date_nodash }}YYYYMMDDStart date without dashes20240115
{{ end_date_nodash }}YYYYMMDDEnd date without dashes20240116
{{ start_datetime }}YYYY-MM-DDTHH:MM:SSStart datetime without timezone2024-01-15T00:00:00
{{ end_datetime }}YYYY-MM-DDTHH:MM:SSEnd datetime without timezone2024-01-16T00:00:00
{{ start_datetime_with_tz }}YYYY-MM-DDTHH:MM:SSZStart datetime with UTC timezone2024-01-15T00:00:00Z
{{ end_datetime_with_tz }}YYYY-MM-DDTHH:MM:SSZEnd datetime with UTC timezone2024-01-16T00:00:00Z
-- Daily data processing
SELECT *
FROM raw.events
WHERE event_date = '{{ ds }}'
AND created_at >= '{{ start_datetime_with_tz }}'
AND created_at < '{{ end_datetime_with_tz }}';
-- Date range processing
SELECT *
FROM staging.user_activity
WHERE activity_date BETWEEN '{{ start_date }}' AND '{{ end_date }}';
-- Partitioned table query
SELECT *
FROM analytics.daily_metrics
WHERE dt = '{{ ds }}';
-- Incremental processing with date calculations
DECLARE run_dt DATE DEFAULT '{{ ds }}';
DECLARE start_dt DATE DEFAULT DATE_SUB(run_dt, INTERVAL 7 DAY);
-- Delete and insert pattern
DELETE FROM core_model.user_metrics
WHERE dt BETWEEN start_dt AND run_dt;
INSERT INTO core_model.user_metrics
SELECT
user_id,
run_dt as dt,
COUNT(*) as daily_events,
SUM(revenue) as daily_revenue
FROM staging.events
WHERE event_date BETWEEN start_dt AND run_dt
GROUP BY 1, 2;
-- Dynamic table naming
SELECT *
FROM `project.dataset.table_{{ ds_nodash }}`;
-- Conditional logic based on date
SELECT
*,
CASE
WHEN '{{ ds }}' = '2024-01-01' THEN 'New Year Processing'
WHEN '{{ ds }}' = '2024-12-31' THEN 'Year End Processing'
ELSE 'Regular Processing'
END as processing_type
FROM raw.events
WHERE event_date = '{{ ds }}';
-- Using date utility functions
SELECT *
FROM staging.events
WHERE event_date = '{{ utils.date_add(ds, -1) }}' -- Previous day
AND created_at >= '{{ utils.date_format(start_datetime, "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S") }}';
-- Custom date calculations
SELECT
user_id,
'{{ ds }}' as processing_date,
DATE_DIFF('{{ ds }}', install_date, DAY) as days_since_install
FROM core_model.users
WHERE install_date <= '{{ ds }}';

Python tasks receive date information through environment variables:

import os
from datetime import datetime
import pandas as pd
# Access date variables through environment variables
data_interval_start = os.getenv('BLAST_DATA_INTERVAL_START')
data_interval_end = os.getenv('BLAST_DATA_INTERVAL_END')
start_date = os.getenv('BLAST_START_DATE')
end_date = os.getenv('BLAST_END_DATE')
start_date_nodash = os.getenv('BLAST_START_DATE_NODASH')
end_date_nodash = os.getenv('BLAST_END_DATE_NODASH')
# Convert to datetime objects
start_dt = datetime.fromisoformat(data_interval_start.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(data_interval_end.replace('Z', '+00:00'))
print(f"Processing data from {start_dt} to {end_dt}")
import os
from datetime import datetime, timedelta
import pandas as pd
from google.cloud import bigquery
# Get date information
start_date = os.getenv('BLAST_START_DATE')
end_date = os.getenv('BLAST_END_DATE')
start_date_nodash = os.getenv('BLAST_START_DATE_NODASH')
print(f"Processing period: {start_date} to {end_date}")
# Initialize BigQuery client
client = bigquery.Client()
# Query with date parameters
query = f"""
SELECT
user_id,
event_type,
COUNT(*) as event_count
FROM `project.dataset.events`
WHERE event_date BETWEEN '{start_date}' AND '{end_date}'
GROUP BY 1, 2
"""
# Execute query
df = client.query(query).to_dataframe()
# Process data
processed_data = df.groupby('user_id')['event_count'].sum().reset_index()
# Save results
output_table = f"project.dataset.user_metrics_{start_date_nodash}"
processed_data.to_gbq(output_table, if_exists='replace')
print(f"Results saved to {output_table}")
import os
from datetime import datetime, timedelta
import pandas as pd
def calculate_date_range(base_date_str, days_back=7):
"""Calculate date range for processing"""
base_date = datetime.strptime(base_date_str, '%Y-%m-%d')
start_date = base_date - timedelta(days=days_back)
return start_date.strftime('%Y-%m-%d'), base_date.strftime('%Y-%m-%d')
# Get current processing date
current_date = os.getenv('BLAST_START_DATE')
# Calculate date range
start_date, end_date = calculate_date_range(current_date, days_back=7)
print(f"Processing 7-day window: {start_date} to {end_date}")
# Your processing logic here
process_data_window(start_date, end_date)
def process_data_window(start_date, end_date):
"""Process data for a specific date window"""
# Implementation here
pass

The data interval represents the time period your task is processing. This is crucial for understanding what data your task should process.

pipeline.yml
schedule: "0 4 * * *" # Run daily at 4 AM UTC

For a task running on 2024-01-15:

  • {{ ds }} = 2024-01-15 (logical date)
  • {{ data_interval_start }} = 2024-01-15T00:00:00+00:00
  • {{ data_interval_end }} = 2024-01-16T00:00:00+00:00
  • Data to process: Events from 2024-01-15 (inclusive) to 2024-01-16 (exclusive)
pipeline.yml
schedule: "0 * * * *" # Run hourly

For a task running at 2024-01-15 14:00:

  • {{ ds }} = 2024-01-15 (logical date)
  • {{ data_interval_start }} = 2024-01-15T14:00:00+00:00
  • {{ data_interval_end }} = 2024-01-15T15:00:00+00:00
  • Data to process: Events from 14:00 to 15:00 on 2024-01-15
-- Process yesterday's data
SELECT *
FROM raw.events
WHERE event_date = '{{ ds }}'
AND created_at >= '{{ start_datetime_with_tz }}'
AND created_at < '{{ end_datetime_with_tz }}';
-- Process last week's data
SELECT *
FROM raw.events
WHERE event_date BETWEEN '{{ utils.date_add(ds, -7) }}' AND '{{ ds }}';
-- Process last month's data
SELECT *
FROM raw.events
WHERE event_date >= DATE_TRUNC('{{ ds }}', MONTH)
AND event_date < DATE_ADD(DATE_TRUNC('{{ ds }}', MONTH), INTERVAL 1 MONTH);
-- Process data incrementally
DECLARE run_dt DATE DEFAULT '{{ ds }}';
DECLARE start_dt DATE DEFAULT DATE_SUB(run_dt, INTERVAL 1 DAY);
-- Delete existing data for the period
DELETE FROM core_model.user_metrics
WHERE dt BETWEEN start_dt AND run_dt;
-- Insert new data
INSERT INTO core_model.user_metrics
SELECT
user_id,
event_date as dt,
COUNT(*) as event_count
FROM staging.events
WHERE event_date BETWEEN start_dt AND run_dt
GROUP BY 1, 2;
FunctionDescriptionExample
date_add(ds, days)Add/subtract days from date{{ utils.date_add(ds, -7) }}
date_format(date, input_format, output_format)Format date string{{ utils.date_format(ds, "%Y-%m-%d", "%Y/%m/%d") }}
-- Get previous day
SELECT * FROM events WHERE event_date = '{{ utils.date_add(ds, -1) }}';
-- Get 7 days ago
SELECT * FROM events WHERE event_date = '{{ utils.date_add(ds, -7) }}';
-- Get next day
SELECT * FROM events WHERE event_date = '{{ utils.date_add(ds, 1) }}';
-- Format date differently
SELECT * FROM events WHERE event_date = '{{ utils.date_format(ds, "%Y-%m-%d", "%Y/%m/%d") }}';
import os
from datetime import datetime, timedelta
current_date = os.getenv('BLAST_START_DATE')
# Calculate previous day
prev_date = datetime.strptime(current_date, '%Y-%m-%d') - timedelta(days=1)
prev_date_str = prev_date.strftime('%Y-%m-%d')
# Calculate 7 days ago
week_ago = datetime.strptime(current_date, '%Y-%m-%d') - timedelta(days=7)
week_ago_str = week_ago.strftime('%Y-%m-%d')
print(f"Current: {current_date}")
print(f"Previous: {prev_date_str}")
print(f"Week ago: {week_ago_str}")

Good:

SELECT *
FROM events
WHERE created_at >= '{{ start_datetime_with_tz }}'
AND created_at < '{{ end_datetime_with_tz }}';

Avoid:

SELECT *
FROM events
WHERE created_at >= '{{ ds }}'
AND created_at < '{{ next_ds }}';

Good:

-- For date comparisons
WHERE event_date = '{{ ds }}'
-- For datetime comparisons
WHERE created_at >= '{{ start_datetime_with_tz }}'

Avoid:

-- Mixing date and datetime formats
WHERE event_date = '{{ start_datetime }}'

Good:

-- Use timezone-aware timestamps
WHERE created_at >= '{{ start_datetime_with_tz }}'
AND created_at < '{{ end_datetime_with_tz }}';

Avoid:

-- Mixing timezone-aware and naive timestamps
WHERE created_at >= '{{ start_datetime }}'
AND created_at < '{{ end_datetime_with_tz }}';

4. Use Declare Statements for Complex Logic

Section titled “4. Use Declare Statements for Complex Logic”

Good:

DECLARE run_dt DATE DEFAULT '{{ ds }}';
DECLARE start_dt DATE DEFAULT DATE_SUB(run_dt, INTERVAL 7 DAY);
SELECT *
FROM events
WHERE event_date BETWEEN start_dt AND run_dt;

Avoid:

-- Complex inline date calculations
SELECT *
FROM events
WHERE event_date BETWEEN DATE_SUB('{{ ds }}', INTERVAL 7 DAY) AND '{{ ds }}';
-- tasks/core_model/daily_metrics.sql
DECLARE run_dt DATE DEFAULT '{{ ds }}';
-- Delete existing data for the day
DELETE FROM core_model.daily_metrics WHERE dt = run_dt;
-- Insert new data
INSERT INTO core_model.daily_metrics
SELECT
user_id,
run_dt as dt,
COUNT(*) as daily_events,
SUM(revenue) as daily_revenue
FROM staging.events
WHERE event_date = run_dt
GROUP BY 1, 2;
-- tasks/analytics/weekly_summary.sql
DECLARE run_dt DATE DEFAULT '{{ ds }}';
DECLARE week_start DATE DEFAULT DATE_SUB(run_dt, INTERVAL 6 DAY);
SELECT
user_id,
run_dt as week_ending,
COUNT(*) as weekly_events,
SUM(revenue) as weekly_revenue,
AVG(session_duration) as avg_session_duration
FROM staging.events
WHERE event_date BETWEEN week_start AND run_dt
GROUP BY 1, 2;
-- tasks/analytics/monthly_rollup.sql
DECLARE run_dt DATE DEFAULT '{{ ds }}';
DECLARE month_start DATE DEFAULT DATE_TRUNC(run_dt, MONTH);
SELECT
user_id,
month_start as month,
COUNT(*) as monthly_events,
SUM(revenue) as monthly_revenue
FROM staging.events
WHERE event_date >= month_start
AND event_date < DATE_ADD(month_start, INTERVAL 1 MONTH)
GROUP BY 1, 2;
tasks/ml_models/train_model.py
import os
from datetime import datetime, timedelta
# Get training date
training_date = os.getenv('BLAST_START_DATE')
# Calculate training window (last 30 days)
train_start = datetime.strptime(training_date, '%Y-%m-%d') - timedelta(days=30)
train_start_str = train_start.strftime('%Y-%m-%d')
print(f"Training model with data from {train_start_str} to {training_date}")
# Query training data
query = f"""
SELECT *
FROM core_model.user_features
WHERE feature_date BETWEEN '{train_start_str}' AND '{training_date}'
"""
# Train model
train_model(query, training_date)
def train_model(query, training_date):
# Implementation here
pass

Problem: Jinja template shows as literal text

-- This shows as literal text
WHERE event_date = '{{ ds }}'

Solution: Ensure you’re using the correct task type and the template is properly formatted.

Problem: Date comparison fails due to format differences

-- This might fail
WHERE event_date = '{{ start_datetime }}' -- Includes time

Solution: Use appropriate date format

-- Use date-only format
WHERE event_date = '{{ start_date }}'

Problem: Data appears to be missing due to timezone differences

-- This might miss data
WHERE created_at >= '{{ start_datetime }}' -- No timezone

Solution: Use timezone-aware timestamps

-- Use timezone-aware format
WHERE created_at >= '{{ start_datetime_with_tz }}'

4. Python Environment Variables Not Available

Section titled “4. Python Environment Variables Not Available”

Problem: Environment variables are None or empty

start_date = os.getenv('BLAST_START_DATE') # Returns None

Solution: Ensure you’re using the correct environment variable names and the task is properly configured.

  1. Log Template Values:
import os
print(f"Start date: {os.getenv('BLAST_START_DATE')}")
print(f"End date: {os.getenv('BLAST_END_DATE')}")
  1. Test SQL Queries:
-- Add debugging output
SELECT
'{{ ds }}' as logical_date,
'{{ start_date }}' as start_date,
'{{ end_date }}' as end_date,
COUNT(*) as record_count
FROM events
WHERE event_date = '{{ ds }}';
  1. Validate Date Ranges:
-- Check date range
SELECT
MIN(event_date) as min_date,
MAX(event_date) as max_date,
COUNT(*) as record_count
FROM events
WHERE event_date BETWEEN '{{ start_date }}' AND '{{ end_date }}';