Python Data Pipeline using dltHub - Google Sheets to Snowflake

This guide will walk you through setting up automated data pipelines that extract data from Google Sheets and load it into Snowflake using Paradime and dltHub (a Python data loading tool).

By the end, you'll be able to:

  • Set up a Python pipeline project alongside your dbt project

  • Extract data from Google Sheets automatically

  • Load that data into Snowflake

  • Run pipelines both in development and production

Prerequisites

Before starting, make sure you have:

  • Paradime account with access to a workspace

  • Google Cloud Platform (GCP) account for Google Sheets API access

  • Snowflake credentials with permissions to create schemas and tables

  • Google spreadsheet with some data

Part 1: Understanding the Project Structure

Your project will look like this:

your-project/
├── dbt_project.yml          # Your existing dbt project configuration
├── pyproject.toml            # Python dependency management (we'll create this)
├── python_dlt/               # Folder for your Python pipelines
│   ├── gsheet_pipeline.py    # Your pipeline script (we'll create this)
│   └── ...                   # dltHub scaffoldings (auto-generated)
└── models/                   # Your existing dbt models

Part 2: Set Up Google Sheets API Access

Why do we need this?

To read data from Google Sheets programmatically, you need API credentials from Google Cloud Platform.

Steps:

  1. Go to Google Cloud Console

  2. Create a Service Account (if you don't have one)

    • In the left menu, go to IAM & AdminService Accounts

    • Click + CREATE SERVICE ACCOUNT

    • Give it a name like "paradime-sheets-reader"

    • Click CREATE AND CONTINUE

    • Skip the optional steps and click DONE

  3. Enable Google Sheets API

    • In the search bar at the top, type "Google Sheets API"

    • Click on it and press ENABLE

  4. Create API Credentials

    • Go back to IAM & AdminService Accounts

    • Find your service account and click the three dots (⋮) under Actions

    • Select Manage Keys

    • Click ADD KEYCreate new key

    • Choose JSON format

    • Click CREATE - a JSON file will download automatically

  5. Share Your Google Sheet

    • Open the JSON file you just downloaded

    • Find the client_email field (looks like: [email protected])

    • Copy this email address

    • Go to your Google Sheet and click Share

    • Paste the service account email and give it Viewer access

  6. Extract Credentials from JSON

    Open the downloaded JSON file. You'll need these four values:

    {
      "project_id": "your-project-12345",
      "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBA...",
      "client_email": "[email protected]",
      "token_uri": "https://oauth2.googleapis.com/token"
    }
  7. Encode the Private Key

    The private key needs to be encoded in Base64 format.

    On Mac/Linux:

    echo -n 'YOUR_PRIVATE_KEY_HERE' | base64

  8. On Windows (PowerShell):

    [Convert]::ToBase64String([Text.Encoding]::UTF8.GetBytes('YOUR_PRIVATE_KEY_HERE'))

  9. Save this encoded value - you'll use it as B64_BIGQUERY_PRIVATE_KEY.


Part 3: Prepare Snowflake Credentials

Why RSA Key Authentication?

For security, we use key-pair authentication instead of passwords for automated pipelines.

Steps:

Generate RSA Key Pair (if you don't have one)

# Generate encrypted private key
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8

# Generate public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

Register Public Key in Snowflake

-- Connect to Snowflake and run:
ALTER USER your_username SET RSA_PUBLIC_KEY='MIIBIjANBgkqhki...';

To get the key value, open rsa_key.pub and copy everything between the header/footer lines.

Format Private Key for Paradime

Open rsa_key.p8 in a text editor:

Original:

-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFHDBOBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA
zdLQw8fH9QxPxHFvqKVhH3zqxKgHHGxXrKJH8fH9QxPx
...more lines...
qwertyuiop
-----END ENCRYPTED PRIVATE KEY-----

Formatted (remove headers and join into one line):

MIIFHDBOBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAzdLQw8fH9QxPxHFvqKVhH3zqxKgHHGxXrKJH8fH9QxPx...qwertyuiop

Gather Snowflake Connection Details

You'll need:

  • Account identifier: Found in your Snowflake URL (e.g., xy12345.us-east-1)

  • Database name: The database where data will be loaded

  • Warehouse name: The compute warehouse to use

  • Role: Your Snowflake role (e.g., ACCOUNTADMIN, SYSADMIN)

  • Username: Your Snowflake username

  • Passphrase: [optional] if set when generating the private key


Part 4: Configure Environment Variables in Paradime

Environment variables are secure ways to store credentials without hardcoding them in your scripts.

Where to Set Them:

You need to set these in TWO places in Paradime:

  1. Code IDE (for development)

    • Click Settings → Environment Variables

    • Scroll to " Code IDE"

  2. Bolt Scheduler (for production runs)

    • Click Settings → Environment Variables

    • Scroll to " Bolt"

Variables to Set:

Google Sheets Credentials

Variable Name
Value
Example

SOURCES__GOOGLE_SHEETS__CREDENTIALS__PROJECT_ID

From JSON: project_id

my-project-12345

SOURCES__GOOGLE_SHEETS__CREDENTIALS__CLIENT_EMAIL

From JSON: client_email

SOURCES__GOOGLE_SHEETS__CREDENTIALS__TOKEN_URI

From JSON: token_uri

https://oauth2.googleapis.com/token

B64_BIGQUERY_PRIVATE_KEY

Base64-encoded private key from Step 2.7

LS0tLS1CRUdJTi...

Snowflake Credentials

Variable Name
Value
Example

DESTINATION__SNOWFLAKE__CREDENTIALS__HOST

Snowflake account identifier

xy12345.us-east-1

DESTINATION__SNOWFLAKE__CREDENTIALS__DATABASE

Target database

ANALYTICS

DESTINATION__SNOWFLAKE__CREDENTIALS__WAREHOUSE

Warehouse name

COMPUTE_WH

DESTINATION__SNOWFLAKE__CREDENTIALS__ROLE

Your role

TRANSFORMER

DESTINATION__SNOWFLAKE__CREDENTIALS__USERNAME

Your username

DESTINATION__SNOWFLAKE__CREDENTIALS__PRIVATE_KEY

Formatted private key (from Step 3.3)

MIIFHDBOBgkq...

DESTINATION__SNOWFLAKE__CREDENTIALS__PRIVATE_KEY_PASSPHRASE

Passphrase you created

MySecurePass123!

Development Variable

Variable Name
Value
Example

DEV_SCHEMA_PREFIX

Your initials or identifier

JD

Note: Only set DEV_SCHEMA_PREFIX in Code IDE settings, NOT in Bolt. This ensures development data is isolated from production.


Part 5: Initialize Your Python Project

Now we'll set up the project structure and dependencies.

Step 5.1: Create pyproject.toml

In Paradime's Code IDE, at the root level (same folder as dbt_project.yml), create a new file called pyproject.toml:

[tool.poetry]
name = "python-pipelines"
version = "0.1.0"
description = "Data pipelines using dltHub"
authors = ["Your Name <[email protected]>"]
readme = "README.md"
package-mode = false

[tool.poetry.dependencies]
python = ">=3.11,<3.13"
google-api-python-client = "^2.118.0"
dlt = {extras = ["snowflake"], version = "1.5.0"}

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

What this does:

  • Poetry is a Python dependency manager (like npm for JavaScript)

  • We're specifying we need Python 3.11 or 3.12

  • We're installing dlt with Snowflake support and Google API client

Step 5.2: Install Dependencies

Open the terminal in Paradime's Code IDE and run:

poetry install

This will:

  • Create a virtual environment

  • Install dltHub and all required packages

  • Generate a poetry.lock file (commit this to git!)

Step 5.3: Initialize dltHub

Create the python_dlt folder and initialize the Google Sheets pipeline:

mkdir python_dlt
cd python_dlt
dlt init google_sheets snowflake

What this command does:

  • Creates .dlt/ folder with configuration files

  • Downloads the Google Sheets source code

  • Creates example pipeline script

  • Adds a .gitignore file

Note: We're using environment variables instead of .dlt/secrets.toml for security, so you can ignore that file.


Part 6: Create Your Pipeline Script

Replace the example google_sheets_pipeline.py or create a new file called gsheet_pipeline.py:

import os
import dlt
import base64
from google_sheets import google_spreadsheet
from typing import List, Sequence

def setup_credentials():
    """
    Decode and set up Google Sheets API credentials.
    
    This function retrieves the base64-encoded private key from environment
    variables, decodes it, and sets it in the format dltHub expects.
    """
    b64_private_key = os.getenv("B64_BIGQUERY_PRIVATE_KEY")
    if not b64_private_key:
        raise ValueError("Missing B64_BIGQUERY_PRIVATE_KEY environment variable")

    try:
        # Decode from base64
        private_key = base64.b64decode(b64_private_key).decode('utf-8')
        # Replace escaped newlines with actual newlines
        private_key = private_key.replace('\\n', '\n').strip()
        # Set for dltHub to use
        os.environ["SOURCES__GOOGLE_SHEETS__CREDENTIALS__PRIVATE_KEY"] = private_key
    except Exception as e:
        raise ValueError(f"Failed to decode private key: {str(e)}")

def get_dataset_name() -> str:
    """
    Determine the Snowflake schema name based on the environment.
    
    Production (scheduled runs): Uses GOOGLE_SHEETS_LOAD
    Development (manual runs): Uses {DEV_SCHEMA_PREFIX}_GOOGLE_SHEETS_LOAD
    
    Returns:
        str: The schema/dataset name to use
    
    Raises:
        ValueError: If neither environment variable is set
    """
    # Paradime automatically sets this when running scheduled jobs
    schedule_run_id = os.getenv("PARADIME_SCHEDULE_RUN_ID")
    # You manually set this in Code IDE settings
    dev_prefix = os.getenv("DEV_SCHEMA_PREFIX")
    
    if schedule_run_id:
        # We're in a scheduled production run
        return "GOOGLE_SHEETS_LOAD"
    elif dev_prefix:
        # We're in development mode
        return f"{dev_prefix}_GOOGLE_SHEETS_LOAD"
    else:
        raise ValueError(
            "Must set either PARADIME_SCHEDULE_RUN_ID (for production) "
            "or DEV_SCHEMA_PREFIX (for development) environment variable"
        )

def load_pipeline(spreadsheet_url_or_id: str, range_names: Sequence[str]) -> None:
    """
    Extract data from Google Sheets and load into Snowflake.
    
    Args:
        spreadsheet_url_or_id: The Google Sheet ID or full URL
                               Example: "1U3NQQrMgodDV8t-OPrHpf9wVgrXU2HKbrJK2effbUjA"
        range_names: List of sheet names or ranges to extract
                    Example: ["Sheet1", "Sheet2!A1:B10"]
    """
    # Set up Google credentials
    setup_credentials()
    
    # Determine which schema to use
    dataset_name = get_dataset_name()
    
    # Create the dltHub pipeline
    pipeline = dlt.pipeline(
        pipeline_name="google_sheets_pipeline",  # Identifier for this pipeline
        destination='snowflake',                 # Where to load data
        dev_mode=False,                          # Disable dev mode (use for testing)
        dataset_name=dataset_name                # Target schema name
    )
    
    # Configure what to extract from Google Sheets
    data = google_spreadsheet(
        spreadsheet_url_or_id=spreadsheet_url_or_id,
        range_names=range_names,           # Only load these specific sheets/ranges
        get_sheets=False,                  # Don't automatically load all sheets
        get_named_ranges=False             # Don't load named ranges
    )
    
    # Run the pipeline
    print(f"Loading data into dataset: {dataset_name}")
    info = pipeline.run(data)
    print(info)

if __name__ == "__main__":
    # CONFIGURE YOUR GOOGLE SHEET HERE
    
    # Option 1: Use the full URL
    spreadsheet_url_or_id = "https://docs.google.com/spreadsheets/d/1U3NQQrMgodDV8t-OPrHpf9wVgrXU2HKbrJK2effbUjA/edit"
    
    # Option 2: Or just the sheet ID (the long string in the URL)
    # spreadsheet_url_or_id = "1U3NQQrMgodDV8t-OPrHpf9wVgrXU2HKbrJK2effbUjA"
    
    # Specify which sheets or ranges to load
    range_names = ["WorldCupMatches"]  # Sheet name(s)
    # range_names = ["Sheet1!A1:D100"]  # Or specific range(s)
    # range_names = ["Sheet1", "Sheet2", "DataSheet!A:Z"]  # Multiple sheets
    
    # Run the pipeline
    load_pipeline(spreadsheet_url_or_id=spreadsheet_url_or_id, range_names=range_names)

Understanding the Code

Function Breakdown:

  1. setup_credentials(): Handles the Google API authentication

    • Retrieves the encoded private key

    • Decodes it from Base64

    • Sets it in the format dltHub expects

  2. get_dataset_name(): Smart schema naming

    • In development: Creates {YOUR_INITIALS}_GOOGLE_SHEETS_LOAD

    • In production: Creates GOOGLE_SHEETS_LOAD

    • Prevents dev data from mixing with prod data

  3. load_pipeline(): The main extraction and loading logic

    • Sets up credentials

    • Creates a dltHub pipeline object

    • Configures the Google Sheets source

    • Runs the extraction and loading

  4. if __name__ == "__main__": Configuration section

    • This is where YOU specify which Google Sheet to load

    • And which specific sheets/ranges to extract


Part 7: Run Your First Pipeline

Development Run (Testing)

  1. Open Paradime's Code IDE terminal

  2. Navigate to the pipeline folder:

    cd python_dlt
  3. Run the pipeline using Poetry:

    poetry run python gsheet_pipeline.py

What happens:

  • The script executes in the Poetry virtual environment

  • Data is extracted from your Google Sheet

  • Tables are created in Snowflake under {YOUR_PREFIX}_GOOGLE_SHEETS_LOAD schema

  • You'll see progress logs in the terminal

Expected Output:

Loading data into dataset: JD_GOOGLE_SHEETS_LOAD
Pipeline google_sheets_pipeline load step completed in X.XX seconds
1 load package(s) were loaded

Production Run (Scheduled)

  1. In Paradime, go to Bolt → Schedules

  2. Create a new schedule: name: "Load Google Sheets Data"

  3. Add Commands:

# Install dependencies with Poetry
poetry install

# Run your Python script
poetry run python python_dlt/gsheet_pipeline.py
  1. Schedule: Choose frequency (e.g., "Daily at 6 AM")

  2. Save and enable the schedule

What happens:

  • Paradime automatically sets PARADIME_SCHEDULE_RUN_ID

  • Data loads into GOOGLE_SHEETS_LOAD schema (without your prefix)

  • Your dbt models can now reference this data

  • Runs automatically on your chosen schedule


Part 8: Using the Loaded Data in dbt

Now that data is in Snowflake, you can create dbt models to transform it:

-- models/staging/stg_world_cup_matches.sql

with source as (
    select * from {{ source('google_sheets', 'worldcupmatches') }}
),

renamed as (
    select
        year,
        datetime,
        stage,
        stadium,
        city,
        home_team_name,
        home_team_goals,
        away_team_goals,
        away_team_name,
        win_conditions,
        attendance,
        half_time_home_goals,
        half_time_away_goals,
        referee,
        assistant_1,
        assistant_2,
        _dlt_load_id,
        _dlt_id
    from source
)

select * from renamed

Define the source in sources.yml:

version: 2

sources:
  - name: google_sheets
    schema: google_sheets_load  # Production schema
    tables:
      - name: worldcupmatches
        description: "Raw World Cup match data from Google Sheets"

Troubleshooting Common Issues

"Missing B64_BIGQUERY_PRIVATE_KEY environment variable"

Cause: Environment variable not set or not accessible

Fix:

  • Double-check spelling in Paradime settings

  • Ensure you saved the environment variable

  • Restart the Code IDE to pick up new variables

"Authentication failed" for Google Sheets

Cause: Service account doesn't have access to the sheet

Fix:

  1. Open your Google Sheet

  2. Click "Share"

  3. Add the service account email (from client_email in JSON)

  4. Give at least "Viewer" permission

"Unable to connect to Snowflake"

Cause: Incorrect Snowflake credentials or network issues

Fix:

  • Verify all Snowflake environment variables are set correctly

  • Test the private key format (no headers, single line)

  • Check your Snowflake role has CREATE SCHEMA permissions

  • Verify the warehouse is running

Tables created but empty

Cause: Wrong sheet name or range specified

Fix:

  • Double-check the sheet name spelling (case-sensitive)

  • Verify the sheet exists in the Google Sheet

  • Try using just the sheet name without range specifiers

"Command not found: poetry"

Cause: Poetry not installed in the environment

Fix:

pip install poetry

Best Practices

1. Schema Organization

Development:

  • Use personal prefixes (JD_, SARAH_, etc.)

  • Prevents conflicts when multiple developers test

  • Easy to identify and clean up dev data

Production:

  • Use clean, standard names (GOOGLE_SHEETS_LOAD)

  • Apply proper access controls

  • Document schema purposes

2. Pipeline Configuration

Keep it flexible:

# Good - easy to modify
SPREADSHEET_ID = "1U3NQQ..."
SHEET_NAMES = ["Sales2024", "Sales2023"]

# Then use in function call
load_pipeline(SPREADSHEET_ID, SHEET_NAMES)

Version control your changes:

  • Commit pyproject.toml and poetry.lock

  • Track pipeline scripts in git

  • Document any configuration changes

3. Error Handling

Add try-except blocks for robustness:

try:
    load_pipeline(spreadsheet_url_or_id, range_names)
except Exception as e:
    print(f"Pipeline failed: {str(e)}")
    # Send alert, log to monitoring system, etc.
    raise

4. Monitoring

Things to track:

  • Pipeline run duration

  • Number of rows loaded

  • Data freshness in Snowflake

  • Failed run alerts

Example logging:

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

logger.info(f"Starting pipeline for {spreadsheet_url_or_id}")
info = pipeline.run(data)
logger.info(f"Loaded {info.dataset_name} successfully")

5. Incremental Loading

For large sheets, configure incremental loading to only extract new data:

# In your pipeline configuration
data = google_spreadsheet(
    spreadsheet_url_or_id=spreadsheet_url_or_id,
    range_names=range_names,
    get_sheets=False
)

# Add incremental configuration
data = data.add_incremental(
    "timestamp_column",  # Column to use for incrementality
    primary_key="id"     # Unique identifier column
)

Next Steps

Now that you have a working pipeline:

  1. Add More Data Sources

    • Check dltHub documentation for other verified sources

    • Initialize additional pipelines: dlt init [source] snowflake

  2. Build Transformations

    • Create dbt™. models that use your loaded data

    • Add tests and documentation

    • Build downstream analytics models

  3. Automate Everything

    • Schedule your pipeline in Bolt

    • Set up dbt™ to run after data loads

    • Create alerts for pipeline failures

  4. Explore Advanced Features

    • Incremental loading strategies

    • Schema evolution handling

    • Custom data transformations in Python

    • Multiple destination support


Additional Resources

Last updated

Was this helpful?