Shared Utilities Best Practices
This guide provides comprehensive best practices for creating, organizing, and managing shared utility functions (src/ directory) across multiple pipelines in Datablast projects.
Overview
Section titled “Overview”Shared utilities are the backbone of maintainable data pipelines. They provide reusable functions, reduce code duplication, and ensure consistency across all pipelines in a repository. This guide covers everything from basic organization to advanced patterns.
Directory Structure
Section titled “Directory Structure”Root-Level Organization
Section titled “Root-Level Organization”your-project/├── src/ # Shared utilities (recommended)├── utils/ # Alternative naming├── shared/ # Alternative naming├── common/ # Alternative naming├── pipeline-1/├── pipeline-2/└── requirements.txtRecommended Structure
Section titled “Recommended Structure”src/├── __init__.py # Package initialization├── bigquery_functions.py # BigQuery operations├── api_utils.py # API utilities├── data_utils.py # Data processing utilities├── report_generator.py # Reporting utilities├── auth/ # Authentication modules│ ├── __init__.py│ ├── gcp_auth.py│ └── api_auth.py├── database/ # Database utilities│ ├── __init__.py│ ├── bigquery_client.py│ └── postgresql_client.py└── monitoring/ # Monitoring utilities ├── __init__.py ├── logging_utils.py └── alerting.pyCore Utility Categories
Section titled “Core Utility Categories”1. BigQuery Functions (bigquery_functions.py)
Section titled “1. BigQuery Functions (bigquery_functions.py)”Essential BigQuery operations for data pipelines:
import osfrom google.cloud import bigqueryfrom google.oauth2 import service_accountimport pandas as pdfrom typing import Optional, Dict, Any
def get_bq_credentials(env_name: str = "GCP_CONNECTION") -> service_account.Credentials: """ Get BigQuery credentials from environment variables.
Args: env_name: Environment variable name containing connection ID
Returns: BigQuery service account credentials """ connection_id = os.getenv(env_name) if not connection_id: raise ValueError(f"Environment variable {env_name} not found")
# Implementation to retrieve credentials from Datablast connection # This is handled by the platform automatically pass
def delete_bq_table( credentials: service_account.Credentials, project: str, dataset: str, table: str, if_exists: bool = True) -> bool: """ Delete BigQuery table safely.
Args: credentials: BigQuery credentials project: GCP project ID dataset: BigQuery dataset ID table: BigQuery table ID if_exists: Only delete if table exists
Returns: True if table was deleted, False otherwise """ client = bigquery.Client(credentials=credentials, project=project)
try: table_ref = client.dataset(dataset).table(table) if if_exists: client.delete_table(table_ref, not_found_ok=True) else: client.delete_table(table_ref) return True except Exception as e: print(f"Error deleting table {project}.{dataset}.{table}: {e}") return False
def upload_to_bq( dataframe: pd.DataFrame, project: str, dataset: str, table: str, credentials: service_account.Credentials, if_exists: str = "replace", write_disposition: str = "WRITE_TRUNCATE") -> bool: """ Upload pandas DataFrame to BigQuery.
Args: dataframe: Pandas DataFrame to upload project: GCP project ID dataset: BigQuery dataset ID table: BigQuery table ID credentials: BigQuery credentials if_exists: Action if table exists ('replace', 'append', 'fail') write_disposition: BigQuery write disposition
Returns: True if upload successful, False otherwise """ client = bigquery.Client(credentials=credentials, project=project)
try: table_ref = client.dataset(dataset).table(table) job_config = bigquery.LoadJobConfig( write_disposition=write_disposition )
job = client.load_table_from_dataframe( dataframe, table_ref, job_config=job_config ) job.result() # Wait for job to complete
print(f"Successfully uploaded {len(dataframe)} rows to {project}.{dataset}.{table}") return True
except Exception as e: print(f"Error uploading to BigQuery: {e}") return False
def execute_bq_query( query: str, credentials: service_account.Credentials, project: str, use_legacy_sql: bool = False) -> pd.DataFrame: """ Execute BigQuery query and return results as DataFrame.
Args: query: SQL query to execute credentials: BigQuery credentials project: GCP project ID use_legacy_sql: Use legacy SQL syntax
Returns: Query results as pandas DataFrame """ client = bigquery.Client(credentials=credentials, project=project)
job_config = bigquery.QueryJobConfig(use_legacy_sql=use_legacy_sql) query_job = client.query(query, job_config=job_config)
return query_job.to_dataframe()
def get_table_schema( project: str, dataset: str, table: str, credentials: service_account.Credentials) -> list: """ Get BigQuery table schema.
Args: project: GCP project ID dataset: BigQuery dataset ID table: BigQuery table ID credentials: BigQuery credentials
Returns: List of table schema fields """ client = bigquery.Client(credentials=credentials, project=project) table_ref = client.dataset(dataset).table(table) table = client.get_table(table_ref)
return [field.to_api_repr() for field in table.schema]2. API Utilities (api_utils.py)
Section titled “2. API Utilities (api_utils.py)”Generic API authentication and data retrieval:
import requestsimport jsonfrom typing import Dict, Any, Optional, Listfrom datetime import datetime, timedeltaimport time
class APIClient: """Generic API client with authentication and retry logic."""
def __init__(self, base_url: str, auth_token: Optional[str] = None): self.base_url = base_url self.auth_token = auth_token self.session = requests.Session()
if auth_token: self.session.headers.update({ 'Authorization': f'Bearer {auth_token}', 'Content-Type': 'application/json' })
def authenticate_api(self, api_key: str) -> bool: """ Authenticate with API using API key.
Args: api_key: API key for authentication
Returns: True if authentication successful """ try: self.session.headers.update({ 'X-API-Key': api_key }) return True except Exception as e: print(f"Authentication failed: {e}") return False
def fetch_data_from_api( self, endpoint: str, params: Optional[Dict[str, Any]] = None, max_retries: int = 3, retry_delay: int = 5 ) -> Optional[Dict[str, Any]]: """ Fetch data from API with retry logic.
Args: endpoint: API endpoint params: Query parameters max_retries: Maximum number of retries retry_delay: Delay between retries in seconds
Returns: API response data or None if failed """ url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(max_retries + 1): try: response = self.session.get(url, params=params) response.raise_for_status() return response.json()
except requests.exceptions.RequestException as e: if attempt < max_retries: print(f"Attempt {attempt + 1} failed: {e}. Retrying in {retry_delay} seconds...") time.sleep(retry_delay) else: print(f"All attempts failed: {e}") return None
return None
def post_data_to_api( self, endpoint: str, data: Dict[str, Any], max_retries: int = 3 ) -> Optional[Dict[str, Any]]: """ Post data to API with retry logic.
Args: endpoint: API endpoint data: Data to post max_retries: Maximum number of retries
Returns: API response data or None if failed """ url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(max_retries + 1): try: response = self.session.post(url, json=data) response.raise_for_status() return response.json()
except requests.exceptions.RequestException as e: if attempt < max_retries: print(f"Attempt {attempt + 1} failed: {e}. Retrying...") time.sleep(2) else: print(f"All attempts failed: {e}") return None
return None
def chunk_dataframe(dataframe, chunk_size: int = 1000): """ Generator to chunk DataFrame into smaller pieces.
Args: dataframe: Pandas DataFrame chunk_size: Size of each chunk
Yields: DataFrame chunks """ for i in range(0, len(dataframe), chunk_size): yield dataframe.iloc[i:i + chunk_size]
def handle_api_rate_limit(response: requests.Response, default_delay: int = 60) -> int: """ Handle API rate limiting by extracting delay from response.
Args: response: API response object default_delay: Default delay if rate limit info not found
Returns: Delay in seconds before retry """ if response.status_code == 429: # Try to extract rate limit delay from headers retry_after = response.headers.get('Retry-After') if retry_after: return int(retry_after)
# Try to extract from response body try: error_data = response.json() if 'retry_after' in error_data: return int(error_data['retry_after']) except: pass
return default_delay3. Data Processing Utilities (data_utils.py)
Section titled “3. Data Processing Utilities (data_utils.py)”Common data processing functions:
import pandas as pdimport numpy as npfrom typing import List, Dict, Any, Optionalfrom datetime import datetime, dateimport logging
def clean_dataframe(df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame: """ Clean DataFrame based on configuration.
Args: df: Input DataFrame config: Cleaning configuration
Returns: Cleaned DataFrame """ cleaned_df = df.copy()
# Remove duplicates if config.get('remove_duplicates', False): cleaned_df = cleaned_df.drop_duplicates()
# Handle missing values missing_strategy = config.get('missing_values', 'drop') if missing_strategy == 'drop': cleaned_df = cleaned_df.dropna() elif missing_strategy == 'fill': fill_value = config.get('fill_value', 0) cleaned_df = cleaned_df.fillna(fill_value)
# Data type conversions type_mappings = config.get('type_mappings', {}) for column, dtype in type_mappings.items(): if column in cleaned_df.columns: try: cleaned_df[column] = cleaned_df[column].astype(dtype) except Exception as e: logging.warning(f"Failed to convert {column} to {dtype}: {e}")
return cleaned_df
def validate_data_quality(df: pd.DataFrame, rules: Dict[str, Any]) -> Dict[str, Any]: """ Validate data quality based on rules.
Args: df: Input DataFrame rules: Validation rules
Returns: Validation results """ results = { 'passed': True, 'errors': [], 'warnings': [], 'summary': {} }
# Check for null values null_counts = df.isnull().sum() for column, max_nulls in rules.get('null_limits', {}).items(): if column in df.columns: null_count = null_counts[column] if null_count > max_nulls: results['errors'].append(f"Column {column} has {null_count} null values (limit: {max_nulls})") results['passed'] = False
# Check data ranges for column, range_config in rules.get('ranges', {}).items(): if column in df.columns: min_val = range_config.get('min') max_val = range_config.get('max')
if min_val is not None and df[column].min() < min_val: results['errors'].append(f"Column {column} has values below minimum {min_val}") results['passed'] = False
if max_val is not None and df[column].max() > max_val: results['errors'].append(f"Column {column} has values above maximum {max_val}") results['passed'] = False
# Generate summary results['summary'] = { 'total_rows': len(df), 'total_columns': len(df.columns), 'null_counts': null_counts.to_dict(), 'data_types': df.dtypes.to_dict() }
return results
def standardize_column_names(df: pd.DataFrame) -> pd.DataFrame: """ Standardize column names to lowercase with underscores.
Args: df: Input DataFrame
Returns: DataFrame with standardized column names """ df_copy = df.copy() df_copy.columns = df_copy.columns.str.lower().str.replace(' ', '_').str.replace('-', '_') return df_copy
def add_metadata_columns(df: pd.DataFrame, execution_date: str = None) -> pd.DataFrame: """ Add metadata columns to DataFrame.
Args: df: Input DataFrame execution_date: Execution date for the pipeline
Returns: DataFrame with metadata columns """ df_copy = df.copy()
# Add execution timestamp df_copy['_loaded_at'] = datetime.now()
# Add execution date if provided if execution_date: df_copy['_execution_date'] = execution_date
# Add row number df_copy['_row_number'] = range(1, len(df_copy) + 1)
return df_copy4. Reporting Utilities (report_generator.py)
Section titled “4. Reporting Utilities (report_generator.py)”Generate reports and summaries:
import pandas as pdfrom typing import Dict, Any, List, Optionalfrom datetime import datetime, dateimport json
def generate_data_summary(df: pd.DataFrame, config: Dict[str, Any] = None) -> Dict[str, Any]: """ Generate comprehensive data summary.
Args: df: Input DataFrame config: Summary configuration
Returns: Data summary dictionary """ if config is None: config = {}
summary = { 'basic_info': { 'total_rows': len(df), 'total_columns': len(df.columns), 'memory_usage': df.memory_usage(deep=True).sum(), 'generated_at': datetime.now().isoformat() }, 'data_types': df.dtypes.to_dict(), 'null_counts': df.isnull().sum().to_dict(), 'unique_counts': df.nunique().to_dict() }
# Add statistical summaries for numeric columns numeric_columns = df.select_dtypes(include=[np.number]).columns if len(numeric_columns) > 0: summary['numeric_summary'] = df[numeric_columns].describe().to_dict()
# Add value counts for categorical columns categorical_columns = df.select_dtypes(include=['object', 'category']).columns if len(categorical_columns) > 0: summary['categorical_summary'] = {} for col in categorical_columns[:5]: # Limit to first 5 categorical columns summary['categorical_summary'][col] = df[col].value_counts().head(10).to_dict()
return summary
def create_data_quality_report(df: pd.DataFrame, rules: Dict[str, Any]) -> Dict[str, Any]: """ Create data quality report.
Args: df: Input DataFrame rules: Data quality rules
Returns: Data quality report """ report = { 'report_date': datetime.now().isoformat(), 'total_records': len(df), 'quality_score': 100, 'issues': [], 'recommendations': [] }
# Check data completeness null_percentage = (df.isnull().sum() / len(df) * 100).to_dict() high_null_columns = {k: v for k, v in null_percentage.items() if v > 10}
if high_null_columns: report['issues'].append({ 'type': 'high_null_percentage', 'description': 'Columns with high null percentage', 'details': high_null_columns }) report['quality_score'] -= len(high_null_columns) * 5
# Check for duplicates duplicate_count = df.duplicated().sum() if duplicate_count > 0: report['issues'].append({ 'type': 'duplicates', 'description': f'Found {duplicate_count} duplicate records', 'count': duplicate_count }) report['quality_score'] -= min(duplicate_count / len(df) * 20, 20)
# Generate recommendations if high_null_columns: report['recommendations'].append( "Consider data imputation strategies for columns with high null percentages" )
if duplicate_count > 0: report['recommendations'].append( "Review and remove duplicate records to improve data quality" )
return report
def export_summary_to_json(summary: Dict[str, Any], filepath: str) -> bool: """ Export summary to JSON file.
Args: summary: Summary dictionary filepath: Output file path
Returns: True if export successful """ try: with open(filepath, 'w') as f: json.dump(summary, f, indent=2, default=str) return True except Exception as e: print(f"Error exporting summary: {e}") return FalseAdvanced Patterns
Section titled “Advanced Patterns”1. Modular Organization
Section titled “1. Modular Organization”Create specialized modules for different domains:
from .gcp_auth import GCPAuthenticatorfrom .api_auth import APIAuthenticator
__all__ = ['GCPAuthenticator', 'APIAuthenticator']
# src/auth/gcp_auth.pyclass GCPAuthenticator: """GCP authentication utilities."""
@staticmethod def get_service_account_credentials(service_account_path: str): """Get service account credentials.""" pass
@staticmethod def get_application_default_credentials(): """Get application default credentials.""" pass
# src/auth/api_auth.pyclass APIAuthenticator: """API authentication utilities."""
@staticmethod def get_bearer_token(api_key: str) -> str: """Get bearer token from API key.""" pass
@staticmethod def refresh_token(refresh_token: str) -> str: """Refresh authentication token.""" pass2. Configuration Management
Section titled “2. Configuration Management”Centralized configuration handling:
import osimport yamlfrom typing import Dict, Any, Optional
class ConfigManager: """Centralized configuration management."""
def __init__(self, config_path: str = None): self.config_path = config_path or os.getenv('CONFIG_PATH', 'config.yaml') self._config = self._load_config()
def _load_config(self) -> Dict[str, Any]: """Load configuration from file.""" try: with open(self.config_path, 'r') as f: return yaml.safe_load(f) except FileNotFoundError: return {}
def get(self, key: str, default: Any = None) -> Any: """Get configuration value.""" keys = key.split('.') value = self._config
for k in keys: if isinstance(value, dict) and k in value: value = value[k] else: return default
return value
def get_database_config(self, connection_name: str) -> Dict[str, str]: """Get database configuration.""" return self.get(f'databases.{connection_name}', {})
def get_api_config(self, api_name: str) -> Dict[str, str]: """Get API configuration.""" return self.get(f'apis.{api_name}', {})3. Error Handling and Logging
Section titled “3. Error Handling and Logging”Comprehensive error handling:
import loggingimport functoolsfrom typing import Callable, Anyfrom datetime import datetime
def setup_logging(level: str = 'INFO', log_file: str = None) -> logging.Logger: """ Setup logging configuration.
Args: level: Logging level log_file: Log file path
Returns: Configured logger """ logger = logging.getLogger('datablast') logger.setLevel(getattr(logging, level.upper()))
# Create formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
# Console handler console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler)
# File handler if log_file: file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) logger.addHandler(file_handler)
return logger
def log_function_call(func: Callable) -> Callable: """ Decorator to log function calls.
Args: func: Function to decorate
Returns: Decorated function """ @functools.wraps(func) def wrapper(*args, **kwargs): logger = logging.getLogger('datablast') logger.info(f"Calling {func.__name__} with args={args}, kwargs={kwargs}")
try: result = func(*args, **kwargs) logger.info(f"Function {func.__name__} completed successfully") return result except Exception as e: logger.error(f"Function {func.__name__} failed: {e}") raise
return wrapper
class ErrorHandler: """Centralized error handling."""
@staticmethod def handle_api_error(error: Exception, context: str = "") -> Dict[str, Any]: """Handle API-related errors.""" return { 'error_type': 'api_error', 'message': str(error), 'context': context, 'timestamp': datetime.now().isoformat(), 'retry_recommended': True }
@staticmethod def handle_data_error(error: Exception, context: str = "") -> Dict[str, Any]: """Handle data processing errors.""" return { 'error_type': 'data_error', 'message': str(error), 'context': context, 'timestamp': datetime.now().isoformat(), 'retry_recommended': False }Best Practices
Section titled “Best Practices”1. Function Design Principles
Section titled “1. Function Design Principles”- Single Responsibility: Each function should have one clear purpose
- Pure Functions: Avoid side effects when possible
- Type Hints: Always include type annotations
- Documentation: Comprehensive docstrings with examples
- Error Handling: Graceful error handling with meaningful messages
2. Import Organization
Section titled “2. Import Organization”# Standard library importsimport osimport jsonfrom datetime import datetimefrom typing import Dict, List, Optional
# Third-party importsimport pandas as pdimport numpy as npfrom google.cloud import bigquery
# Local importsfrom .config import ConfigManagerfrom .monitoring.logging_utils import setup_logging3. Testing Utilities
Section titled “3. Testing Utilities”Create test utilities for shared functions:
import unittestimport pandas as pdfrom unittest.mock import Mock, patchimport tempfileimport os
class TestDataFrame: """Test DataFrame utilities."""
@staticmethod def create_sample_dataframe() -> pd.DataFrame: """Create sample DataFrame for testing.""" return pd.DataFrame({ 'id': [1, 2, 3, 4, 5], 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], 'age': [25, 30, 35, 40, 45], 'city': ['New York', 'London', 'Tokyo', 'Paris', 'Sydney'] })
@staticmethod def assert_dataframe_equal(df1: pd.DataFrame, df2: pd.DataFrame): """Assert two DataFrames are equal.""" pd.testing.assert_frame_equal(df1, df2)
class MockAPIClient: """Mock API client for testing."""
def __init__(self): self.responses = {}
def set_response(self, endpoint: str, response: Dict[str, Any]): """Set mock response for endpoint.""" self.responses[endpoint] = response
def fetch_data_from_api(self, endpoint: str, **kwargs): """Mock API fetch.""" return self.responses.get(endpoint, {})4. Performance Optimization
Section titled “4. Performance Optimization”import pandas as pdimport numpy as npfrom typing import List, Dict, Anyimport timefrom functools import wraps
def measure_execution_time(func): """Decorator to measure function execution time.""" @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time()
print(f"Function {func.__name__} took {end_time - start_time:.2f} seconds") return result
return wrapper
def optimize_dataframe_memory(df: pd.DataFrame) -> pd.DataFrame: """ Optimize DataFrame memory usage.
Args: df: Input DataFrame
Returns: Memory-optimized DataFrame """ df_optimized = df.copy()
# Optimize integer columns for col in df_optimized.select_dtypes(include=['int64']).columns: if df_optimized[col].min() >= 0: if df_optimized[col].max() < 255: df_optimized[col] = df_optimized[col].astype('uint8') elif df_optimized[col].max() < 65535: df_optimized[col] = df_optimized[col].astype('uint16') elif df_optimized[col].max() < 4294967295: df_optimized[col] = df_optimized[col].astype('uint32')
# Optimize float columns for col in df_optimized.select_dtypes(include=['float64']).columns: df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='float')
# Optimize object columns for col in df_optimized.select_dtypes(include=['object']).columns: if df_optimized[col].nunique() / len(df_optimized) < 0.5: df_optimized[col] = df_optimized[col].astype('category')
return df_optimizedUsage Examples
Section titled “Usage Examples”1. Basic Usage in Python Tasks
Section titled “1. Basic Usage in Python Tasks”# In your Python task fileimport syssys.path.append('../../src') # Add src to path
from bigquery_functions import get_bq_credentials, upload_to_bqfrom api_utils import APIClientfrom data_utils import clean_dataframeimport pandas as pd
def main(): # Get credentials credentials = get_bq_credentials("GCP_CONNECTION")
# Initialize API client api_client = APIClient("https://api.example.com") api_client.authenticate_api("your-api-key")
# Fetch data data = api_client.fetch_data_from_api("data/endpoint")
# Convert to DataFrame df = pd.DataFrame(data)
# Clean data cleaning_config = { 'remove_duplicates': True, 'missing_values': 'fill', 'fill_value': 0 } df_clean = clean_dataframe(df, cleaning_config)
# Upload to BigQuery success = upload_to_bq( df_clean, project="your-project", dataset="your_dataset", table="your_table", credentials=credentials )
if success: print("Data uploaded successfully") else: print("Upload failed")
if __name__ == "__main__": main()2. Advanced Usage with Configuration
Section titled “2. Advanced Usage with Configuration”# In your Python task fileimport syssys.path.append('../../src')
from config import ConfigManagerfrom bigquery_functions import BigQueryClientfrom monitoring.logging_utils import setup_logging, log_function_callimport pandas as pd
# Setup logginglogger = setup_logging(level='INFO')
# Load configurationconfig = ConfigManager('../../config.yaml')
@log_function_calldef process_data(): """Process data using shared utilities."""
# Get database configuration db_config = config.get_database_config('bigquery')
# Initialize BigQuery client bq_client = BigQueryClient( project=db_config['project'], credentials=get_bq_credentials() )
# Execute query query = """ SELECT * FROM `project.dataset.table` WHERE date = '{{ ds }}' """
df = bq_client.execute_query(query)
# Process data processed_df = process_dataframe(df)
# Upload results bq_client.upload_dataframe( processed_df, dataset='processed', table='results' )
logger.info("Data processing completed successfully")
def process_dataframe(df: pd.DataFrame) -> pd.DataFrame: """Process DataFrame with shared utilities.""" # Your processing logic here return df
if __name__ == "__main__": process_data()Integration with Datablast
Section titled “Integration with Datablast”1. Environment Variables
Section titled “1. Environment Variables”Datablast automatically provides environment variables for shared utilities:
import os
# Get execution dateexecution_date = os.getenv('BLAST_START_DATE')
# Get connection IDsgcp_connection = os.getenv('GCP_CONNECTION')
# Get project informationproject_id = os.getenv('PROJECT_ID')2. Secret Management
Section titled “2. Secret Management”Access secrets in shared utilities:
def get_api_key(secret_name: str) -> str: """Get API key from Datablast secrets.""" # Secrets are automatically injected as environment variables return os.getenv(secret_name)3. Connection Management
Section titled “3. Connection Management”Use Datablast connections in shared utilities:
def get_bigquery_client(connection_id: str): """Get BigQuery client using Datablast connection.""" # Connection is automatically configured by Datablast credentials = get_bq_credentials(connection_id) return bigquery.Client(credentials=credentials)Maintenance and Updates
Section titled “Maintenance and Updates”1. Version Control
Section titled “1. Version Control”- Use semantic versioning for shared utilities
- Tag releases for stability
- Maintain changelog for breaking changes
2. Documentation
Section titled “2. Documentation”- Keep README updated with new functions
- Include usage examples for complex functions
- Document breaking changes
3. Testing
Section titled “3. Testing”- Write unit tests for all utility functions
- Test integration with Datablast platform
- Validate performance with large datasets
Conclusion
Section titled “Conclusion”Shared utilities are essential for maintaining scalable, maintainable data pipelines. By following these best practices, you can create robust utility functions that:
- Reduce code duplication across pipelines
- Ensure consistency in data processing
- Simplify maintenance and updates
- Improve reliability through tested functions
- Enhance productivity for data engineers
The key is to start simple and gradually build a comprehensive library of utilities that serve your specific needs while maintaining flexibility for future requirements.
Related Documentation
Section titled “Related Documentation”- Multiple Repositories - Repository organization patterns
- Project Structure - Project organization best practices
- Python Development - Python task development
- Task Configuration - Task configuration patterns