Skip to content

Shared Utilities Best Practices

This guide provides comprehensive best practices for creating, organizing, and managing shared utility functions (src/ directory) across multiple pipelines in Datablast projects.

Shared utilities are the backbone of maintainable data pipelines. They provide reusable functions, reduce code duplication, and ensure consistency across all pipelines in a repository. This guide covers everything from basic organization to advanced patterns.

your-project/
├── src/ # Shared utilities (recommended)
├── utils/ # Alternative naming
├── shared/ # Alternative naming
├── common/ # Alternative naming
├── pipeline-1/
├── pipeline-2/
└── requirements.txt
src/
├── __init__.py # Package initialization
├── bigquery_functions.py # BigQuery operations
├── api_utils.py # API utilities
├── data_utils.py # Data processing utilities
├── report_generator.py # Reporting utilities
├── auth/ # Authentication modules
│ ├── __init__.py
│ ├── gcp_auth.py
│ └── api_auth.py
├── database/ # Database utilities
│ ├── __init__.py
│ ├── bigquery_client.py
│ └── postgresql_client.py
└── monitoring/ # Monitoring utilities
├── __init__.py
├── logging_utils.py
└── alerting.py

1. BigQuery Functions (bigquery_functions.py)

Section titled “1. BigQuery Functions (bigquery_functions.py)”

Essential BigQuery operations for data pipelines:

import os
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
from typing import Optional, Dict, Any
def get_bq_credentials(env_name: str = "GCP_CONNECTION") -> service_account.Credentials:
"""
Get BigQuery credentials from environment variables.
Args:
env_name: Environment variable name containing connection ID
Returns:
BigQuery service account credentials
"""
connection_id = os.getenv(env_name)
if not connection_id:
raise ValueError(f"Environment variable {env_name} not found")
# Implementation to retrieve credentials from Datablast connection
# This is handled by the platform automatically
pass
def delete_bq_table(
credentials: service_account.Credentials,
project: str,
dataset: str,
table: str,
if_exists: bool = True
) -> bool:
"""
Delete BigQuery table safely.
Args:
credentials: BigQuery credentials
project: GCP project ID
dataset: BigQuery dataset ID
table: BigQuery table ID
if_exists: Only delete if table exists
Returns:
True if table was deleted, False otherwise
"""
client = bigquery.Client(credentials=credentials, project=project)
try:
table_ref = client.dataset(dataset).table(table)
if if_exists:
client.delete_table(table_ref, not_found_ok=True)
else:
client.delete_table(table_ref)
return True
except Exception as e:
print(f"Error deleting table {project}.{dataset}.{table}: {e}")
return False
def upload_to_bq(
dataframe: pd.DataFrame,
project: str,
dataset: str,
table: str,
credentials: service_account.Credentials,
if_exists: str = "replace",
write_disposition: str = "WRITE_TRUNCATE"
) -> bool:
"""
Upload pandas DataFrame to BigQuery.
Args:
dataframe: Pandas DataFrame to upload
project: GCP project ID
dataset: BigQuery dataset ID
table: BigQuery table ID
credentials: BigQuery credentials
if_exists: Action if table exists ('replace', 'append', 'fail')
write_disposition: BigQuery write disposition
Returns:
True if upload successful, False otherwise
"""
client = bigquery.Client(credentials=credentials, project=project)
try:
table_ref = client.dataset(dataset).table(table)
job_config = bigquery.LoadJobConfig(
write_disposition=write_disposition
)
job = client.load_table_from_dataframe(
dataframe, table_ref, job_config=job_config
)
job.result() # Wait for job to complete
print(f"Successfully uploaded {len(dataframe)} rows to {project}.{dataset}.{table}")
return True
except Exception as e:
print(f"Error uploading to BigQuery: {e}")
return False
def execute_bq_query(
query: str,
credentials: service_account.Credentials,
project: str,
use_legacy_sql: bool = False
) -> pd.DataFrame:
"""
Execute BigQuery query and return results as DataFrame.
Args:
query: SQL query to execute
credentials: BigQuery credentials
project: GCP project ID
use_legacy_sql: Use legacy SQL syntax
Returns:
Query results as pandas DataFrame
"""
client = bigquery.Client(credentials=credentials, project=project)
job_config = bigquery.QueryJobConfig(use_legacy_sql=use_legacy_sql)
query_job = client.query(query, job_config=job_config)
return query_job.to_dataframe()
def get_table_schema(
project: str,
dataset: str,
table: str,
credentials: service_account.Credentials
) -> list:
"""
Get BigQuery table schema.
Args:
project: GCP project ID
dataset: BigQuery dataset ID
table: BigQuery table ID
credentials: BigQuery credentials
Returns:
List of table schema fields
"""
client = bigquery.Client(credentials=credentials, project=project)
table_ref = client.dataset(dataset).table(table)
table = client.get_table(table_ref)
return [field.to_api_repr() for field in table.schema]

Generic API authentication and data retrieval:

import requests
import json
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
import time
class APIClient:
"""Generic API client with authentication and retry logic."""
def __init__(self, base_url: str, auth_token: Optional[str] = None):
self.base_url = base_url
self.auth_token = auth_token
self.session = requests.Session()
if auth_token:
self.session.headers.update({
'Authorization': f'Bearer {auth_token}',
'Content-Type': 'application/json'
})
def authenticate_api(self, api_key: str) -> bool:
"""
Authenticate with API using API key.
Args:
api_key: API key for authentication
Returns:
True if authentication successful
"""
try:
self.session.headers.update({
'X-API-Key': api_key
})
return True
except Exception as e:
print(f"Authentication failed: {e}")
return False
def fetch_data_from_api(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
max_retries: int = 3,
retry_delay: int = 5
) -> Optional[Dict[str, Any]]:
"""
Fetch data from API with retry logic.
Args:
endpoint: API endpoint
params: Query parameters
max_retries: Maximum number of retries
retry_delay: Delay between retries in seconds
Returns:
API response data or None if failed
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(max_retries + 1):
try:
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt < max_retries:
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
print(f"All attempts failed: {e}")
return None
return None
def post_data_to_api(
self,
endpoint: str,
data: Dict[str, Any],
max_retries: int = 3
) -> Optional[Dict[str, Any]]:
"""
Post data to API with retry logic.
Args:
endpoint: API endpoint
data: Data to post
max_retries: Maximum number of retries
Returns:
API response data or None if failed
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(max_retries + 1):
try:
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt < max_retries:
print(f"Attempt {attempt + 1} failed: {e}. Retrying...")
time.sleep(2)
else:
print(f"All attempts failed: {e}")
return None
return None
def chunk_dataframe(dataframe, chunk_size: int = 1000):
"""
Generator to chunk DataFrame into smaller pieces.
Args:
dataframe: Pandas DataFrame
chunk_size: Size of each chunk
Yields:
DataFrame chunks
"""
for i in range(0, len(dataframe), chunk_size):
yield dataframe.iloc[i:i + chunk_size]
def handle_api_rate_limit(response: requests.Response, default_delay: int = 60) -> int:
"""
Handle API rate limiting by extracting delay from response.
Args:
response: API response object
default_delay: Default delay if rate limit info not found
Returns:
Delay in seconds before retry
"""
if response.status_code == 429:
# Try to extract rate limit delay from headers
retry_after = response.headers.get('Retry-After')
if retry_after:
return int(retry_after)
# Try to extract from response body
try:
error_data = response.json()
if 'retry_after' in error_data:
return int(error_data['retry_after'])
except:
pass
return default_delay

3. Data Processing Utilities (data_utils.py)

Section titled “3. Data Processing Utilities (data_utils.py)”

Common data processing functions:

import pandas as pd
import numpy as np
from typing import List, Dict, Any, Optional
from datetime import datetime, date
import logging
def clean_dataframe(df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame:
"""
Clean DataFrame based on configuration.
Args:
df: Input DataFrame
config: Cleaning configuration
Returns:
Cleaned DataFrame
"""
cleaned_df = df.copy()
# Remove duplicates
if config.get('remove_duplicates', False):
cleaned_df = cleaned_df.drop_duplicates()
# Handle missing values
missing_strategy = config.get('missing_values', 'drop')
if missing_strategy == 'drop':
cleaned_df = cleaned_df.dropna()
elif missing_strategy == 'fill':
fill_value = config.get('fill_value', 0)
cleaned_df = cleaned_df.fillna(fill_value)
# Data type conversions
type_mappings = config.get('type_mappings', {})
for column, dtype in type_mappings.items():
if column in cleaned_df.columns:
try:
cleaned_df[column] = cleaned_df[column].astype(dtype)
except Exception as e:
logging.warning(f"Failed to convert {column} to {dtype}: {e}")
return cleaned_df
def validate_data_quality(df: pd.DataFrame, rules: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate data quality based on rules.
Args:
df: Input DataFrame
rules: Validation rules
Returns:
Validation results
"""
results = {
'passed': True,
'errors': [],
'warnings': [],
'summary': {}
}
# Check for null values
null_counts = df.isnull().sum()
for column, max_nulls in rules.get('null_limits', {}).items():
if column in df.columns:
null_count = null_counts[column]
if null_count > max_nulls:
results['errors'].append(f"Column {column} has {null_count} null values (limit: {max_nulls})")
results['passed'] = False
# Check data ranges
for column, range_config in rules.get('ranges', {}).items():
if column in df.columns:
min_val = range_config.get('min')
max_val = range_config.get('max')
if min_val is not None and df[column].min() < min_val:
results['errors'].append(f"Column {column} has values below minimum {min_val}")
results['passed'] = False
if max_val is not None and df[column].max() > max_val:
results['errors'].append(f"Column {column} has values above maximum {max_val}")
results['passed'] = False
# Generate summary
results['summary'] = {
'total_rows': len(df),
'total_columns': len(df.columns),
'null_counts': null_counts.to_dict(),
'data_types': df.dtypes.to_dict()
}
return results
def standardize_column_names(df: pd.DataFrame) -> pd.DataFrame:
"""
Standardize column names to lowercase with underscores.
Args:
df: Input DataFrame
Returns:
DataFrame with standardized column names
"""
df_copy = df.copy()
df_copy.columns = df_copy.columns.str.lower().str.replace(' ', '_').str.replace('-', '_')
return df_copy
def add_metadata_columns(df: pd.DataFrame, execution_date: str = None) -> pd.DataFrame:
"""
Add metadata columns to DataFrame.
Args:
df: Input DataFrame
execution_date: Execution date for the pipeline
Returns:
DataFrame with metadata columns
"""
df_copy = df.copy()
# Add execution timestamp
df_copy['_loaded_at'] = datetime.now()
# Add execution date if provided
if execution_date:
df_copy['_execution_date'] = execution_date
# Add row number
df_copy['_row_number'] = range(1, len(df_copy) + 1)
return df_copy

4. Reporting Utilities (report_generator.py)

Section titled “4. Reporting Utilities (report_generator.py)”

Generate reports and summaries:

import pandas as pd
from typing import Dict, Any, List, Optional
from datetime import datetime, date
import json
def generate_data_summary(df: pd.DataFrame, config: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Generate comprehensive data summary.
Args:
df: Input DataFrame
config: Summary configuration
Returns:
Data summary dictionary
"""
if config is None:
config = {}
summary = {
'basic_info': {
'total_rows': len(df),
'total_columns': len(df.columns),
'memory_usage': df.memory_usage(deep=True).sum(),
'generated_at': datetime.now().isoformat()
},
'data_types': df.dtypes.to_dict(),
'null_counts': df.isnull().sum().to_dict(),
'unique_counts': df.nunique().to_dict()
}
# Add statistical summaries for numeric columns
numeric_columns = df.select_dtypes(include=[np.number]).columns
if len(numeric_columns) > 0:
summary['numeric_summary'] = df[numeric_columns].describe().to_dict()
# Add value counts for categorical columns
categorical_columns = df.select_dtypes(include=['object', 'category']).columns
if len(categorical_columns) > 0:
summary['categorical_summary'] = {}
for col in categorical_columns[:5]: # Limit to first 5 categorical columns
summary['categorical_summary'][col] = df[col].value_counts().head(10).to_dict()
return summary
def create_data_quality_report(df: pd.DataFrame, rules: Dict[str, Any]) -> Dict[str, Any]:
"""
Create data quality report.
Args:
df: Input DataFrame
rules: Data quality rules
Returns:
Data quality report
"""
report = {
'report_date': datetime.now().isoformat(),
'total_records': len(df),
'quality_score': 100,
'issues': [],
'recommendations': []
}
# Check data completeness
null_percentage = (df.isnull().sum() / len(df) * 100).to_dict()
high_null_columns = {k: v for k, v in null_percentage.items() if v > 10}
if high_null_columns:
report['issues'].append({
'type': 'high_null_percentage',
'description': 'Columns with high null percentage',
'details': high_null_columns
})
report['quality_score'] -= len(high_null_columns) * 5
# Check for duplicates
duplicate_count = df.duplicated().sum()
if duplicate_count > 0:
report['issues'].append({
'type': 'duplicates',
'description': f'Found {duplicate_count} duplicate records',
'count': duplicate_count
})
report['quality_score'] -= min(duplicate_count / len(df) * 20, 20)
# Generate recommendations
if high_null_columns:
report['recommendations'].append(
"Consider data imputation strategies for columns with high null percentages"
)
if duplicate_count > 0:
report['recommendations'].append(
"Review and remove duplicate records to improve data quality"
)
return report
def export_summary_to_json(summary: Dict[str, Any], filepath: str) -> bool:
"""
Export summary to JSON file.
Args:
summary: Summary dictionary
filepath: Output file path
Returns:
True if export successful
"""
try:
with open(filepath, 'w') as f:
json.dump(summary, f, indent=2, default=str)
return True
except Exception as e:
print(f"Error exporting summary: {e}")
return False

Create specialized modules for different domains:

src/auth/__init__.py
from .gcp_auth import GCPAuthenticator
from .api_auth import APIAuthenticator
__all__ = ['GCPAuthenticator', 'APIAuthenticator']
# src/auth/gcp_auth.py
class GCPAuthenticator:
"""GCP authentication utilities."""
@staticmethod
def get_service_account_credentials(service_account_path: str):
"""Get service account credentials."""
pass
@staticmethod
def get_application_default_credentials():
"""Get application default credentials."""
pass
# src/auth/api_auth.py
class APIAuthenticator:
"""API authentication utilities."""
@staticmethod
def get_bearer_token(api_key: str) -> str:
"""Get bearer token from API key."""
pass
@staticmethod
def refresh_token(refresh_token: str) -> str:
"""Refresh authentication token."""
pass

Centralized configuration handling:

src/config.py
import os
import yaml
from typing import Dict, Any, Optional
class ConfigManager:
"""Centralized configuration management."""
def __init__(self, config_path: str = None):
self.config_path = config_path or os.getenv('CONFIG_PATH', 'config.yaml')
self._config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from file."""
try:
with open(self.config_path, 'r') as f:
return yaml.safe_load(f)
except FileNotFoundError:
return {}
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value."""
keys = key.split('.')
value = self._config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
def get_database_config(self, connection_name: str) -> Dict[str, str]:
"""Get database configuration."""
return self.get(f'databases.{connection_name}', {})
def get_api_config(self, api_name: str) -> Dict[str, str]:
"""Get API configuration."""
return self.get(f'apis.{api_name}', {})

Comprehensive error handling:

src/monitoring/logging_utils.py
import logging
import functools
from typing import Callable, Any
from datetime import datetime
def setup_logging(level: str = 'INFO', log_file: str = None) -> logging.Logger:
"""
Setup logging configuration.
Args:
level: Logging level
log_file: Log file path
Returns:
Configured logger
"""
logger = logging.getLogger('datablast')
logger.setLevel(getattr(logging, level.upper()))
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File handler
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
def log_function_call(func: Callable) -> Callable:
"""
Decorator to log function calls.
Args:
func: Function to decorate
Returns:
Decorated function
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
logger = logging.getLogger('datablast')
logger.info(f"Calling {func.__name__} with args={args}, kwargs={kwargs}")
try:
result = func(*args, **kwargs)
logger.info(f"Function {func.__name__} completed successfully")
return result
except Exception as e:
logger.error(f"Function {func.__name__} failed: {e}")
raise
return wrapper
class ErrorHandler:
"""Centralized error handling."""
@staticmethod
def handle_api_error(error: Exception, context: str = "") -> Dict[str, Any]:
"""Handle API-related errors."""
return {
'error_type': 'api_error',
'message': str(error),
'context': context,
'timestamp': datetime.now().isoformat(),
'retry_recommended': True
}
@staticmethod
def handle_data_error(error: Exception, context: str = "") -> Dict[str, Any]:
"""Handle data processing errors."""
return {
'error_type': 'data_error',
'message': str(error),
'context': context,
'timestamp': datetime.now().isoformat(),
'retry_recommended': False
}
  • Single Responsibility: Each function should have one clear purpose
  • Pure Functions: Avoid side effects when possible
  • Type Hints: Always include type annotations
  • Documentation: Comprehensive docstrings with examples
  • Error Handling: Graceful error handling with meaningful messages
# Standard library imports
import os
import json
from datetime import datetime
from typing import Dict, List, Optional
# Third-party imports
import pandas as pd
import numpy as np
from google.cloud import bigquery
# Local imports
from .config import ConfigManager
from .monitoring.logging_utils import setup_logging

Create test utilities for shared functions:

src/testing/test_utils.py
import unittest
import pandas as pd
from unittest.mock import Mock, patch
import tempfile
import os
class TestDataFrame:
"""Test DataFrame utilities."""
@staticmethod
def create_sample_dataframe() -> pd.DataFrame:
"""Create sample DataFrame for testing."""
return pd.DataFrame({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 40, 45],
'city': ['New York', 'London', 'Tokyo', 'Paris', 'Sydney']
})
@staticmethod
def assert_dataframe_equal(df1: pd.DataFrame, df2: pd.DataFrame):
"""Assert two DataFrames are equal."""
pd.testing.assert_frame_equal(df1, df2)
class MockAPIClient:
"""Mock API client for testing."""
def __init__(self):
self.responses = {}
def set_response(self, endpoint: str, response: Dict[str, Any]):
"""Set mock response for endpoint."""
self.responses[endpoint] = response
def fetch_data_from_api(self, endpoint: str, **kwargs):
"""Mock API fetch."""
return self.responses.get(endpoint, {})
src/performance/optimization.py
import pandas as pd
import numpy as np
from typing import List, Dict, Any
import time
from functools import wraps
def measure_execution_time(func):
"""Decorator to measure function execution time."""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"Function {func.__name__} took {end_time - start_time:.2f} seconds")
return result
return wrapper
def optimize_dataframe_memory(df: pd.DataFrame) -> pd.DataFrame:
"""
Optimize DataFrame memory usage.
Args:
df: Input DataFrame
Returns:
Memory-optimized DataFrame
"""
df_optimized = df.copy()
# Optimize integer columns
for col in df_optimized.select_dtypes(include=['int64']).columns:
if df_optimized[col].min() >= 0:
if df_optimized[col].max() < 255:
df_optimized[col] = df_optimized[col].astype('uint8')
elif df_optimized[col].max() < 65535:
df_optimized[col] = df_optimized[col].astype('uint16')
elif df_optimized[col].max() < 4294967295:
df_optimized[col] = df_optimized[col].astype('uint32')
# Optimize float columns
for col in df_optimized.select_dtypes(include=['float64']).columns:
df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='float')
# Optimize object columns
for col in df_optimized.select_dtypes(include=['object']).columns:
if df_optimized[col].nunique() / len(df_optimized) < 0.5:
df_optimized[col] = df_optimized[col].astype('category')
return df_optimized
# In your Python task file
import sys
sys.path.append('../../src') # Add src to path
from bigquery_functions import get_bq_credentials, upload_to_bq
from api_utils import APIClient
from data_utils import clean_dataframe
import pandas as pd
def main():
# Get credentials
credentials = get_bq_credentials("GCP_CONNECTION")
# Initialize API client
api_client = APIClient("https://api.example.com")
api_client.authenticate_api("your-api-key")
# Fetch data
data = api_client.fetch_data_from_api("data/endpoint")
# Convert to DataFrame
df = pd.DataFrame(data)
# Clean data
cleaning_config = {
'remove_duplicates': True,
'missing_values': 'fill',
'fill_value': 0
}
df_clean = clean_dataframe(df, cleaning_config)
# Upload to BigQuery
success = upload_to_bq(
df_clean,
project="your-project",
dataset="your_dataset",
table="your_table",
credentials=credentials
)
if success:
print("Data uploaded successfully")
else:
print("Upload failed")
if __name__ == "__main__":
main()
# In your Python task file
import sys
sys.path.append('../../src')
from config import ConfigManager
from bigquery_functions import BigQueryClient
from monitoring.logging_utils import setup_logging, log_function_call
import pandas as pd
# Setup logging
logger = setup_logging(level='INFO')
# Load configuration
config = ConfigManager('../../config.yaml')
@log_function_call
def process_data():
"""Process data using shared utilities."""
# Get database configuration
db_config = config.get_database_config('bigquery')
# Initialize BigQuery client
bq_client = BigQueryClient(
project=db_config['project'],
credentials=get_bq_credentials()
)
# Execute query
query = """
SELECT * FROM `project.dataset.table`
WHERE date = '{{ ds }}'
"""
df = bq_client.execute_query(query)
# Process data
processed_df = process_dataframe(df)
# Upload results
bq_client.upload_dataframe(
processed_df,
dataset='processed',
table='results'
)
logger.info("Data processing completed successfully")
def process_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Process DataFrame with shared utilities."""
# Your processing logic here
return df
if __name__ == "__main__":
process_data()

Datablast automatically provides environment variables for shared utilities:

import os
# Get execution date
execution_date = os.getenv('BLAST_START_DATE')
# Get connection IDs
gcp_connection = os.getenv('GCP_CONNECTION')
# Get project information
project_id = os.getenv('PROJECT_ID')

Access secrets in shared utilities:

def get_api_key(secret_name: str) -> str:
"""Get API key from Datablast secrets."""
# Secrets are automatically injected as environment variables
return os.getenv(secret_name)

Use Datablast connections in shared utilities:

def get_bigquery_client(connection_id: str):
"""Get BigQuery client using Datablast connection."""
# Connection is automatically configured by Datablast
credentials = get_bq_credentials(connection_id)
return bigquery.Client(credentials=credentials)
  • Use semantic versioning for shared utilities
  • Tag releases for stability
  • Maintain changelog for breaking changes
  • Keep README updated with new functions
  • Include usage examples for complex functions
  • Document breaking changes
  • Write unit tests for all utility functions
  • Test integration with Datablast platform
  • Validate performance with large datasets

Shared utilities are essential for maintaining scalable, maintainable data pipelines. By following these best practices, you can create robust utility functions that:

  • Reduce code duplication across pipelines
  • Ensure consistency in data processing
  • Simplify maintenance and updates
  • Improve reliability through tested functions
  • Enhance productivity for data engineers

The key is to start simple and gradually build a comprehensive library of utilities that serve your specific needs while maintaining flexibility for future requirements.