Reuse dbt™ Connection Credentials in dlt Pipelines - BigQuery
In this guide, you'll learn how to reuse your existing BigQuery connection credentials from profiles.yml to authenticate dlt pipelines — without duplicating credentials or managing separate config files.
What the Script Expects from profiles.yml
The loader reads your active target directly from dbt_project.yml and looks it up in profiles.yml. A typical BigQuery profile looks like this:
The script will load whichever target is set as target: in your profile — in this case prod.
ingestion target is optional.
The get_credentials_by_environment() method uses an ingestion target as a fallback when your default target is prod, to avoid accidentally running pipelines against production. This is only relevant if you call that specific method — the rest of this guide uses get_active_credentials(), which works with any target name.
1. Set Up the Credentials Loader
Create a dlthub folder at the root of your dbt™ project and add the profile_connection_credentials.py script inside it.
2. Use the Credentials in Your dlt Pipeline
In your dlt pipeline file (e.g., my_pipeline.py), import the loader and pass the credentials to your BigQuery destination.
Note
The loader automatically detects your dbt_project.yml and profiles.yml paths, as long as profile_connection_credentials.py is placed inside a dlthub folder at the root of your dbt™ project as shown above. If you place it elsewhere, pass project_dir and profiles_dir explicitly to ProfileConnectionCredentialsLoader()
your-dbt-project/
├── models/
├── dbt_project.yml
├── dlthub/
│ ├── profile_connection_credentials.py ← add this script here
│ └── my_pipeline.py
import dlt
from profile_connection_credentials import ProfileConnectionCredentialsLoader
# Load credentials from your active dbt™ profile
loader = ProfileConnectionCredentialsLoader()
creds = loader.get_active_credentials()
# Configure your dlt pipeline using the fetched credentials
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination=dlt.destinations.bigquery(
project_id=creds.get("project"),
credentials=creds.get("keyfile_json"),
location=creds.get("location", "US"),
),
dataset_name="my_dataset",
)
# Define and run your pipeline as normal
@dlt.resource
def my_data():
yield [{"id": 1, "value": "hello"}, {"id": 2, "value": "world"}]
load_info = pipeline.run(my_data())
print(load_info)
#!/usr/bin/env python3
"""
Profile Connection Credentials Loader Module
A reusable utility module for fetching connection credentials from dbt's profiles.yml
on demand. Can be imported and used in different Python scripts.
This module directly parses profiles.yml without requiring dbt-core.
Usage:
from profile_connection_credentials import ProfileConnectionCredentialsLoader
loader = ProfileConnectionCredentialsLoader(
project_dir="/path/to/dbt/project",
profiles_dir="/path/to/profiles/dir"
)
# Get credentials for a specific profile and target
creds = loader.get_credentials(profile_name="my_profile", target_name="dev")
# Or get the active target credentials (from dbt_project.yml)
creds = loader.get_active_credentials()
# Access credential attributes
print(creds['type']) # e.g., "bigquery"
print(creds['project_id']) # database/project specific
"""
import sys
from pathlib import Path
from typing import Optional, Dict, Any
try:
import yaml
except ImportError:
print("Error: PyYAML is not installed")
print("Install it with: pip install pyyaml")
sys.exit(1)
class ProfileConnectionCredentialsLoader:
"""
Load and manage connection credentials from profiles.yml by directly parsing YAML.
This class provides a simple interface to fetch credentials on demand
without requiring dbt-core to be installed.
"""
def __init__(
self,
project_dir: Optional[str] = None,
profiles_dir: Optional[str] = None
):
"""
Initialize the credentials loader.
Args:
project_dir: Path to the dbt project directory (auto-detected if not provided)
profiles_dir: Path to the directory containing profiles.yml
(auto-detected if not provided)
Raises:
FileNotFoundError: If project_dir or profiles_dir don't exist
FileNotFoundError: If dbt_project.yml or profiles.yml not found
"""
# Auto-detect paths if not provided
if not project_dir:
project_dir = self._detect_project_dir()
if not profiles_dir:
profiles_dir = self._detect_profiles_dir(project_dir)
self.project_dir = project_dir
self.profiles_dir = profiles_dir
self._profiles_data = None
self._project_data = None
# Validate directories and files
self._validate_paths()
@staticmethod
def _detect_project_dir() -> str:
"""
Auto-detect the dbt project directory.
Looks for dbt_project.yml relative to this script's location.
Falls back to /workspace/repository/dbt if not found.
Returns:
Path to the dbt project directory
"""
# Try to find dbt_project.yml relative to this script
# This file is at: <repo>/dbt/dlthub/profile_connection_credentials.py
# So we go up 2 levels to get to <repo>/dbt
script_file = Path(__file__).resolve()
script_dir = script_file.parent.parent # Go up to dbt project root
if (script_dir / "dbt_project.yml").exists():
return str(script_dir.resolve())
# Fall back to default location
return "/workspace/repository/dbt"
@staticmethod
def _detect_profiles_dir(project_dir: str) -> str:
"""
Auto-detect the profiles directory.
Looks for profiles.yml in the parent directory of the project.
Falls back to /workspace if not found.
Args:
project_dir: Path to the dbt project directory
Returns:
Path to the profiles directory
"""
# Try to find profiles.yml in the parent directory of project
# project_dir is typically <repo>/dbt, so parent is <repo>
project_path = Path(project_dir).resolve()
potential_profiles_dir = project_path.parent
if (potential_profiles_dir / "profiles.yml").exists():
return str(potential_profiles_dir.resolve())
# Fall back to default location
return "/workspace"
def _validate_paths(self) -> None:
"""Validate that required directories and files exist."""
project_path = Path(self.project_dir)
if not project_path.exists():
raise FileNotFoundError(f"Project directory does not exist: {self.project_dir}")
dbt_project_yml = project_path / 'dbt_project.yml'
if not dbt_project_yml.exists():
raise FileNotFoundError(f"dbt_project.yml not found in: {self.project_dir}")
profiles_path = Path(self.profiles_dir)
if not profiles_path.exists():
raise FileNotFoundError(f"Profiles directory does not exist: {self.profiles_dir}")
profiles_yml = profiles_path / 'profiles.yml'
if not profiles_yml.exists():
raise FileNotFoundError(f"profiles.yml not found in: {self.profiles_dir}")
def _load_profiles_yaml(self) -> Dict[str, Any]:
"""
Load and parse the profiles.yml file.
Returns:
Dictionary containing the parsed profiles.yml content
"""
if self._profiles_data is not None:
return self._profiles_data
try:
profiles_path = Path(self.profiles_dir) / 'profiles.yml'
with open(profiles_path, 'r') as f:
self._profiles_data = yaml.safe_load(f) or {}
return self._profiles_data
except Exception as e:
raise RuntimeError(f"Failed to load profiles.yml: {e}")
def _load_project_yaml(self) -> Dict[str, Any]:
"""
Load and parse the dbt_project.yml file.
Returns:
Dictionary containing the parsed dbt_project.yml content
"""
if self._project_data is not None:
return self._project_data
try:
project_path = Path(self.project_dir) / 'dbt_project.yml'
with open(project_path, 'r') as f:
self._project_data = yaml.safe_load(f) or {}
return self._project_data
except Exception as e:
raise RuntimeError(f"Failed to load dbt_project.yml: {e}")
def get_active_credentials(self) -> Dict[str, Any]:
"""
Get the credentials for the active target (from dbt_project.yml).
Returns:
Dictionary of credentials for the active target
Raises:
RuntimeError: If credentials cannot be loaded
"""
project_data = self._load_project_yaml()
profiles_data = self._load_profiles_yaml()
# Get profile name from dbt_project.yml
profile_name = project_data.get('profile')
if not profile_name:
raise RuntimeError("No 'profile' specified in dbt_project.yml")
# Get the profile from profiles.yml
if profile_name not in profiles_data:
raise RuntimeError(f"Profile '{profile_name}' not found in profiles.yml")
profile = profiles_data[profile_name]
# Get the target name (default or specified)
target_name = profile.get('target')
if not target_name:
raise RuntimeError(f"No 'target' specified for profile '{profile_name}'")
# Get the target credentials
outputs = profile.get('outputs', {})
if target_name not in outputs:
raise RuntimeError(
f"Target '{target_name}' not found in profile '{profile_name}'"
)
credentials = outputs[target_name]
return credentials
def get_credentials(
self,
profile_name: Optional[str] = None,
target_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Get credentials for a specific profile and target.
If profile_name or target_name are not provided, uses the active
target from dbt_project.yml.
Args:
profile_name: Name of the profile (optional)
target_name: Name of the target within the profile (optional)
Returns:
Dictionary of credentials for the specified target
Raises:
RuntimeError: If credentials cannot be loaded
"""
profiles_data = self._load_profiles_yaml()
# If no specific profile/target requested, use active credentials
if not profile_name and not target_name:
return self.get_active_credentials()
# Use provided profile_name or get from dbt_project.yml
if not profile_name:
project_data = self._load_project_yaml()
profile_name = project_data.get('profile')
if not profile_name:
raise RuntimeError("No 'profile' specified in dbt_project.yml")
# Get the profile from profiles.yml
if profile_name not in profiles_data:
raise RuntimeError(f"Profile '{profile_name}' not found in profiles.yml")
profile = profiles_data[profile_name]
# Use provided target_name or get default from profile
if not target_name:
target_name = profile.get('target')
if not target_name:
raise RuntimeError(f"No 'target' specified for profile '{profile_name}'")
# Get the target credentials
outputs = profile.get('outputs', {})
if target_name not in outputs:
raise RuntimeError(
f"Target '{target_name}' not found in profile '{profile_name}'"
)
credentials = outputs[target_name]
return credentials
def get_credentials_dict(self) -> Dict[str, Any]:
"""
Get the active credentials as a dictionary.
Returns:
Dictionary of credential attributes
Raises:
RuntimeError: If credentials cannot be loaded
"""
credentials = self.get_active_credentials()
# Filter out private attributes (starting with _)
creds_dict = {
key: value for key, value in credentials.items()
if not key.startswith('_')
}
return creds_dict
def get_profile_info(self) -> Dict[str, Any]:
"""
Get information about the loaded profile.
Returns:
Dictionary with profile metadata
"""
project_data = self._load_project_yaml()
profiles_data = self._load_profiles_yaml()
profile_name = project_data.get('profile')
profile = profiles_data.get(profile_name, {})
target_name = profile.get('target')
return {
'project_name': project_data.get('name'),
'project_root': str(Path(self.project_dir).resolve()),
'profiles_dir': str(Path(self.profiles_dir).resolve()),
'profile_name': profile_name,
'target_name': target_name,
}
def get_credential_attribute(self, attribute_name: str) -> Any:
"""
Get a specific attribute from the active credentials.
Args:
attribute_name: Name of the credential attribute to retrieve
Returns:
The value of the requested attribute
Raises:
KeyError: If the attribute doesn't exist
RuntimeError: If credentials cannot be loaded
"""
credentials = self.get_active_credentials()
if attribute_name not in credentials:
raise KeyError(
f"Credentials dictionary has no key '{attribute_name}'"
)
return credentials[attribute_name]
def get_credentials_by_environment(self, non_prod_target: str = 'ingestion') -> Dict[str, Any]:
"""
Get credentials based on the environment (target).
If the default target is 'prod', uses the non-prod target instead.
Otherwise, uses the default target.
Args:
non_prod_target: Target name to use if default is 'prod' (default: 'ingestion')
Returns:
Dictionary of credentials for the selected target
Raises:
RuntimeError: If credentials cannot be loaded
"""
profile_info = self.get_profile_info()
default_target = profile_info.get('target_name')
# If default target is 'prod', use non-prod target instead
if default_target == 'prod':
return self.get_credentials(target_name=non_prod_target)
else:
# Use the default target
return self.get_active_credentials()
# Example usage and testing
if __name__ == "__main__":
import json
if len(sys.argv) < 2:
print("Usage:")
print(" python profile_connection_credentials.py <project_dir> [profiles_dir]")
print("\nExample:")
print(" python profile_connection_credentials.py /workspace/repository/dbt /workspace")
sys.exit(1)
project_dir = sys.argv[1]
profiles_dir = sys.argv[2] if len(sys.argv) > 2 else "/workspace"
try:
# Initialize the loader
loader = ProfileConnectionCredentialsLoader(project_dir, profiles_dir)
# Get profile info
print("\n" + "=" * 70)
print("PROFILE INFORMATION")
print("=" * 70)
profile_info = loader.get_profile_info()
for key, value in profile_info.items():
print(f"{key}: {value}")
# Get active credentials
print("\n" + "=" * 70)
print("ACTIVE TARGET CREDENTIALS")
print("=" * 70)
creds = loader.get_active_credentials()
print(f"Type: {creds.get('type', 'N/A')}")
# Print all credential attributes
creds_dict = loader.get_credentials_dict()
for key, value in creds_dict.items():
# Redact sensitive information
if key in ['password', 'token', 'api_key', 'secret', 'private_key']:
value = "[REDACTED]"
print(f"{key}: {value}")
print("\n" + "=" * 70 + "\n")
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)