Skip to content

snowflakedb/snowflake-sqlalchemy

Repository files navigation

Snowflake SQLAlchemy

Build and Test codecov PyPi License Apache-2.0 Codestyle Black

Snowflake SQLAlchemy runs on the top of the Snowflake Connector for Python as a dialect to bridge a Snowflake database and SQLAlchemy applications.

Table of contents:

Prerequisites

Snowflake Connector for Python

The only requirement for Snowflake SQLAlchemy is the Snowflake Connector for Python; however, the connector does not need to be installed because installing Snowflake SQLAlchemy automatically installs the connector.

Data Analytics and Web Application Frameworks (Optional)

Snowflake SQLAlchemy can be used with Pandas, Jupyter and Pyramid, which provide higher levels of application frameworks for data analytics and web applications. However, building a working environment from scratch is not a trivial task, particularly for novice users. Installing the frameworks requires C compilers and tools, and choosing the right tools and versions is a hurdle that might deter users from using Python applications.

An easier way to build an environment is through Anaconda, which provides a complete, precompiled technology stack for all users, including non-Python experts such as data analysts and students. For Anaconda installation instructions, see the Anaconda install documentation. The Snowflake SQLAlchemy package can then be installed on top of Anaconda using pip.

Installing Snowflake SQLAlchemy

The Snowflake SQLAlchemy package can be installed from the public PyPI repository using pip:

pip install --upgrade snowflake-sqlalchemy

pip automatically installs all required modules, including the Snowflake Connector for Python.

Verifying Your Installation

  1. Create a file (e.g. validate.py) that contains the following Python sample code, which connects to Snowflake and displays the Snowflake version:

    from sqlalchemy import create_engine
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account}/'.format(
            user='<your_user_login_name>',
            password='<your_password>',
            account='<your_account_name>',
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()
  2. Replace <your_user_login_name>, <your_password>, and <your_account_name> with the appropriate values for your Snowflake account and user.

    For more details, see Connection Parameters.

  3. Execute the sample code. For example, if you created a file named validate.py:

    python validate.py

    The Snowflake version (e.g. 1.48.0) should be displayed.

Parameters and Behavior

As much as possible, Snowflake SQLAlchemy provides compatible functionality for SQLAlchemy applications. For information on using SQLAlchemy, see the SQLAlchemy documentation.

However, Snowflake SQLAlchemy also provides Snowflake-specific parameters and behavior, which are described in the following sections.

Connection Parameters

Snowflake SQLAlchemy uses the following syntax for the connection string used to connect to Snowflake and initiate a session:

'snowflake://<user_login_name>:<password>@<account_name>'

Where:

  • <user_login_name> is the login name for your Snowflake user.
  • <password> is the password for your Snowflake user.
  • <account_name> is the name of your Snowflake account.

Include the region in the <account_name> if applicable, more info is available here.

You can optionally specify the initial database and schema for the Snowflake session by including them at the end of the connection string, separated by /. You can also specify the initial warehouse and role for the session as a parameter string at the end of the connection string:

'snowflake://<user_login_name>:<password>@<account_name>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'

Escaping Special Characters such as %, @ signs in Passwords

As pointed out in SQLAlchemy, URLs containing special characters need to be URL encoded to be parsed correctly. This includes the %, @ signs. Unescaped password containing special characters could lead to authentication failure.

The encoding for the password can be generated using urllib.parse:

import urllib.parse
urllib.parse.quote("kx@% jj5/g")
'kx%40%25%20jj5/g'

Note: urllib.parse.quote_plus may also be used if there is no space in the string, as urllib.parse.quote_plus will replace space with +.

To create an engine with the proper encodings, either manually constructing the url string by formatting or taking advantage of the snowflake.sqlalchemy.URL helper method:

import urllib.parse
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

quoted_password = urllib.parse.quote("kx@% jj5/g")

# 1. manually constructing an url string
url = f'snowflake://testuser1:{quoted_password}@abc123/testdb/public?warehouse=testwh&role=myrole'
engine = create_engine(url)

# 2. using the snowflake.sqlalchemy.URL helper method
engine = create_engine(URL(
    account = 'abc123',
    user = 'testuser1',
    password = quoted_password,
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
))

Note: After login, the initial database, schema, warehouse and role specified in the connection string can always be changed for the session.

The following example calls the create_engine method with the user name testuser1, password 0123456, account name abc123, database testdb, schema public, warehouse testwh, and role myrole:

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@abc123/testdb/public?warehouse=testwh&role=myrole'
)

Other parameters, such as timezone, can also be specified as a URI parameter or in connect_args parameters. For example:

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@abc123/testdb/public?warehouse=testwh&role=myrole',
    connect_args={
        'timezone': 'America/Los_Angeles',
    }
)

For convenience, you can use the snowflake.sqlalchemy.URL method to construct the connection string and connect to the database. The following example constructs the same connection string from the previous example:

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'abc123',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    timezone = 'America/Los_Angeles',
))

Using a proxy server

Use the supported environment variables, HTTPS_PROXY, HTTP_PROXY and NO_PROXY to configure a proxy server.

Using session parameters

Snowflake session parameters (such as QUERY_TAG) cannot be set directly through the URL helper. Instead, pass them via the connect_args parameter of create_engine, using the session_parameters dict — the same way you would through the Python connector:

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(
    URL(
        # CONNECTION_PARAMETERS
    ),
    connect_args={
        "session_parameters": {
            "QUERY_TAG": "SOME_QUERY_TAGS",
        }
    },
)

Session parameters set this way apply to all queries executed within the session. To change a session parameter for specific queries mid-session, use ALTER SESSION:

from sqlalchemy import text

with engine.connect() as conn:
    conn.execute(text("ALTER SESSION SET QUERY_TAG = 'batch_job_1'"))
    conn.execute(text("..."))  # Uses 'batch_job_1'

    conn.execute(text("ALTER SESSION SET QUERY_TAG = 'batch_job_2'"))
    conn.execute(text("..."))  # Uses 'batch_job_2'

    conn.execute(text("ALTER SESSION UNSET QUERY_TAG"))
    conn.execute(text("..."))  # No tag

Opening and Closing Connection

Open a connection by executing engine.connect(); avoid using engine.execute(). Make certain to close the connection by executing connection.close() before engine.dispose(); otherwise, the Python Garbage collector removes the resources required to communicate with Snowflake, preventing the Python connector from closing the session properly.

# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()

# Better.
engine = create_engine(...)
connection = engine.connect()
try:
  connection.execute(text(<SQL>))
finally:
    connection.close()
    engine.dispose()

# Best
try:
    with engine.connect() as connection:
        connection.execute(text(<SQL>))
        # or
        connection.exec_driver_sql(<SQL>)
finally:
    engine.dispose()

Auto-increment Behavior

Auto-incrementing a value requires the Sequence object. Include the Sequence object in the primary key column to automatically increment the value as each new record is inserted. For example:

t = Table('mytable', metadata,
    Column('id', Integer, Sequence('id_seq'), primary_key=True),
    Column(...), ...
)

Object Name Case Handling

Snowflake stores all case-insensitive object names in uppercase text. In contrast, SQLAlchemy considers all lowercase object names to be case-insensitive. Snowflake SQLAlchemy converts the object name case during schema-level communication, i.e. during table and index reflection. If you use uppercase object names, SQLAlchemy assumes they are case-sensitive and encloses the names with quotes. This behavior will cause mismatches against data dictionary data received from Snowflake, so unless identifier names have been truly created as case sensitive using quotes, e.g., "TestDb", all lowercase names should be used on the SQLAlchemy side.

Index Support

Indexes are supported only for Hybrid Tables in Snowflake SQLAlchemy. For more details on limitations and use cases, refer to the Create Index documentation. You can create an index using the following methods:

Single Column Index

You can create a single column index by setting the index=True parameter on the column or by explicitly defining an Index object.

hybrid_test_table_1 = HybridTable(
  "table_name",
  metadata,
  Column("column1", Integer, primary_key=True),
  Column("column2", String, index=True),
  Index("index_1", "column1", "column2")
)

metadata.create_all(engine_testaccount)

Multi-Column Index

For multi-column indexes, you define the Index object specifying the columns that should be indexed.

hybrid_test_table_1 = HybridTable(
  "table_name",
  metadata,
  Column("column1", Integer, primary_key=True),
  Column("column2", String),
  Index("index_1", "column1", "column2")
)

metadata.create_all(engine_testaccount)

Numpy Data Type Support

Snowflake SQLAlchemy supports binding and fetching NumPy data types. Binding is always supported. To enable fetching NumPy data types, add numpy=True to the connection parameters.

The following example shows the round trip of numpy.datetime64 data:

import numpy as np
import pandas as pd
engine = create_engine(URL(
    account = 'abc123',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    numpy=True,
))

specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')

with engine.connect() as connection:
    connection.exec_driver_sql(
        "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
    connection.exec_driver_sql(
        "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
    )
    df = pd.read_sql_query("SELECT * FROM ts_tbl", connection)
    assert df.c1.values[0] == specific_date

The following NumPy data types are supported:

  • numpy.int64
  • numpy.float64
  • numpy.datatime64

DECFLOAT Data Type Support

Snowflake SQLAlchemy supports the DECFLOAT data type, which provides decimal floating-point with up to 38 significant digits. For more information, see the Snowflake DECFLOAT documentation.

from sqlalchemy import Column, Integer, MetaData, Table
from snowflake.sqlalchemy import DECFLOAT

metadata = MetaData()
t = Table('my_table', metadata,
    Column('id', Integer, primary_key=True),
    Column('value', DECFLOAT()),
)
metadata.create_all(engine)

DECFLOAT Precision

The Snowflake Python connector uses Python's decimal module context when converting DECFLOAT values to Python Decimal objects. Python's default decimal context precision is 28 digits, which can truncate DECFLOAT values that use up to 38 digits.

To preserve full 38-digit precision, add enable_decfloat=True to the connection URL:

from sqlalchemy import create_engine

engine = create_engine(
    'snowflake://testuser1:0123456@abc123/testdb/public?warehouse=testwh&enable_decfloat=True'
)

Or using the snowflake.sqlalchemy.URL helper:

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'abc123',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    enable_decfloat = True,
))

Note: DECFLOAT does not support special values (inf, -inf, NaN) unlike FLOAT.

Why is enable_decfloat not enabled by default? Enabling it sets decimal.getcontext().prec = 38, which modifies Python's thread-local decimal context and affects all Decimal operations in that thread, not just database queries. To avoid unexpected side effects on application code, the dialect emits a warning when DECFLOAT values are retrieved without full precision enabled, guiding users to opt-in explicitly.

VECTOR Data Type Support

Snowflake SQLAlchemy supports the VECTOR data type with varying element type and dimension. For more information, see the Snowflake documentation.

from sqlalchemy import Column, Integer, Float, MetaData, Table
from snowflake.sqlalchemy import VECTOR

metadata = MetaData()
t = Table('my_table', metadata,
    Column('id', Integer, primary_key=True),
    Column('int_vec', VECTOR(Integer, 20)),
    Column('float_vec', VECTOR(Float, 40)),
)
metadata.create_all(engine)

Timestamp and Timezone Support

Snowflake SQLAlchemy provides three Snowflake-specific timestamp types that map directly to their Snowflake counterparts:

from sqlalchemy import Column, Integer, MetaData, Table, create_engine
from snowflake.sqlalchemy import TIMESTAMP_NTZ, TIMESTAMP_TZ, TIMESTAMP_LTZ

engine = create_engine(...)
metadata = MetaData()
t = Table('events', metadata,
    Column('id', Integer, primary_key=True),
    Column('created_at', TIMESTAMP_NTZ()),   # TIMESTAMP WITHOUT TIME ZONE
    Column('scheduled_at', TIMESTAMP_TZ()),  # TIMESTAMP WITH TIME ZONE
    Column('logged_at', TIMESTAMP_LTZ()),    # TIMESTAMP WITH LOCAL TIME ZONE
)
metadata.create_all(engine)

SQLAlchemy's generic DateTime and TIMESTAMP types also support timezone-aware columns via the timezone parameter. When timezone=True is set, the dialect emits TIMESTAMP_TZ instead of the default TIMESTAMP_NTZ:

from sqlalchemy import Column, DateTime, Integer, MetaData, Table, create_engine
from sqlalchemy.types import TIMESTAMP

engine = create_engine(...)
metadata = MetaData()
t = Table('events', metadata,
    Column('id', Integer, primary_key=True),
    Column('naive_ts', DateTime()),                 # produces TIMESTAMP_NTZ
    Column('aware_ts', DateTime(timezone=True)),    # produces TIMESTAMP_TZ
    Column('naive_ts2', TIMESTAMP()),               # produces TIMESTAMP_NTZ
    Column('aware_ts2', TIMESTAMP(timezone=True)),  # produces TIMESTAMP_TZ
)
metadata.create_all(engine)

This also applies when using pandas to_sql() with timezone-aware datetime columns, which infers DateTime(timezone=True) automatically (see #199).

Note on Time and timezones: SQLAlchemy's Time type accepts a timezone parameter, but Snowflake's TIME data type does not support time zones. Using Time(timezone=True) will compile to plain TIME and the timezone flag will have no effect. If you need to store time data with time-zone information, use a timestamp type such as TIMESTAMP_TZ or DateTime(timezone=True) instead.

Cache Column Metadata

SQLAlchemy provides the runtime inspection API to get the runtime information about the various objects. One common use case is retrieving all tables and their column metadata in a schema to construct a schema catalog. For example, alembic manages database schema migrations on top of SQLAlchemy. A typical flow (SQLAlchemy 1.4) is:

inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
    column_metadata = inspector.get_columns(table_name, schema)
    primary_keys = inspector.get_pk_constraint(table_name, schema)
    foreign_keys = inspector.get_foreign_keys(table_name, schema)
    ...

In this flow, running a separate query per table can be slow for large schemas. Snowflake SQLAlchemy optimises this with schema-wide cached queries and, where appropriate, fast per-table queries.

Single-Table vs Multi-Table Reflection Performance

SQLAlchemy 2.x (automatic)

SQLAlchemy 2.x distinguishes bulk reflection from single-table inspection at the framework level:

  • MetaData.reflect() / Table(..., autoload_with=engine) — calls get_multi_columns, get_multi_pk_constraint, get_multi_foreign_keys, and get_multi_unique_constraints. Each issues one schema-wide SHOW or information_schema query and caches the result for all tables in the schema.
  • inspector.get_columns(table_name) — issues a single DESC TABLE query directly against that table. This is fast and correct for all table types including temporary tables.

No configuration is needed; the routing is handled automatically by the SA 2.x dispatch layer.

Note on reflected type representations: Because inspector.get_columns() uses DESC TABLE, reflected types always include Snowflake's resolved default sizes (e.g. BINARY(8388608) instead of BINARY, VARCHAR(16777216) instead of VARCHAR). The type objects are functionally identical; only str() output differs. Use isinstance() checks rather than string comparison for type introspection.

from sqlalchemy import MetaData, inspect, create_engine

engine = create_engine('snowflake://...')

# SA 2.x: one schema-wide query per metadata type, all tables cached at once
metadata = MetaData()
metadata.reflect(bind=engine, schema='public')

# SA 2.x: direct DESC TABLE, no schema-wide query issued
inspector = inspect(engine)
columns = inspector.get_columns('my_table', schema='public')

Per-table optimisation

On SQLAlchemy 2.x, get_pk_constraint, get_unique_constraints, get_foreign_keys, get_indexes, and get_columns automatically use per-table queries (SHOW … IN TABLE, DESC TABLE) for single-table Inspector calls (e.g. Inspector.get_pk_constraint(), pandas.read_sql_table()). MetaData.reflect() continues to use the schema-wide get_multi_* hooks, which issue one query per reflection pass.

On SQLAlchemy 1.4, MetaData.reflect() calls the singular methods per-table. Add cache_column_metadata=True to the connection URL to opt in to per-table queries for get_pk_constraint, get_unique_constraints, get_foreign_keys, get_indexes, and get_columns. Without this flag, the existing schema-wide queries are used unchanged.

engine = create_engine(URL(
    account = 'abc123',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    cache_column_metadata=True,  # SA 1.4 only: enables per-table reflection
))

Performance Implications

For schemas with many tables (100+), schema-wide queries issued once during MetaData.reflect() are far more efficient than per-table queries in a loop:

  • Schema-wide SHOW PRIMARY KEYS IN SCHEMA (all tables): < 1 second
  • Per-table loop over 1 000 tables: 1 000+ round-trips

For single-table inspection via Inspector, per-table queries (DESC TABLE, SHOW … IN TABLE) are faster than fetching the entire schema.

Best Practices

  1. For bulk reflection: use metadata.reflect() — schema-wide queries are issued once and cached.
  2. For single-table inspection: use inspector.get_columns() / inspector.get_pk_constraint() etc. — per-table queries are used automatically (SA 2.x) or with cache_column_metadata=True (SA 1.4).
  3. For very large schemas: reflect only the tables you need:
metadata.reflect(bind=engine, schema='public', only=['table1', 'table2'])

VARIANT, ARRAY and OBJECT Support

Snowflake SQLAlchemy supports fetching VARIANT, ARRAY and OBJECT data types. All types are converted into str in Python so that you can convert them to native data types using json.loads.

This example shows how to create a table including VARIANT, ARRAY, and OBJECT data type columns.

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)

t = Table('my_semi_strucutred_datatype_table', metadata,
    Column('va', VARIANT),
    Column('ob', OBJECT),
    Column('ar', ARRAY))
metdata.create_all(engine)

In order to retrieve VARIANT, ARRAY, and OBJECT data type columns and convert them to the native Python data types, fetch data and call the json.loads method as follows:

import json
connection = engine.connect()
results = connection.execute(select([t])
row = results.fetchone()
data_variant = json.loads(row[0])
data_object  = json.loads(row[1])
data_array   = json.loads(row[2])

Structured Data Types Support

This module defines custom SQLAlchemy types for Snowflake structured data, specifically for Iceberg tables. The types —MAP, OBJECT, and ARRAY— allow you to store complex data structures in your SQLAlchemy models. For detailed information, refer to the Snowflake Structured data types documentation.


MAP

The MAP type represents a collection of key-value pairs, where each key and value can have different types.

  • Key Type: The type of the keys (e.g., TEXT, NUMBER).
  • Value Type: The type of the values (e.g., TEXT, NUMBER).
  • Not Null: Whether NULL values are allowed (default is False).

Example Usage

IcebergTable(
    table_name,
    metadata,
    Column("id", Integer, primary_key=True),
    Column("map_col", MAP(NUMBER(10, 0), TEXT(16777216))),
    external_volume="external_volume",
    base_location="base_location",
)

OBJECT

The OBJECT type represents a semi-structured object with named fields. Each field can have a specific type, and you can also specify whether each field is nullable.

  • Items Types: A dictionary of field names and their types. The type can optionally include a nullable flag (True for not nullable, False for nullable, default is False).

Example Usage

IcebergTable(
    table_name,
    metadata,
    Column("id", Integer, primary_key=True),
    Column(
        "object_col",
        OBJECT(key1=(TEXT(16777216), False), key2=(NUMBER(10, 0), False)),
        OBJECT(key1=TEXT(16777216), key2=NUMBER(10, 0)), # Without nullable flag
    ),
    external_volume="external_volume",
    base_location="base_location",
)

ARRAY

The ARRAY type represents an ordered list of values, where each element has the same type. The type of the elements is defined when creating the array.

  • Value Type: The type of the elements in the array (e.g., TEXT, NUMBER).
  • Not Null: Whether NULL values are allowed (default is False).

Example Usage

IcebergTable(
    table_name,
    metadata,
    Column("id", Integer, primary_key=True),
    Column("array_col", ARRAY(TEXT(16777216))),
    external_volume="external_volume",
    base_location="base_location",
)

CLUSTER BY Support

Snowflake SQLAchemy supports the CLUSTER BY parameter for tables. For information about the parameter, see :doc:/sql-reference/sql/create-table.

This example shows how to create a table with two columns, id and name, as the clustering keys:

t = Table('myuser', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    snowflake_clusterby=['id', 'name', text('id > 5')], ...
)
metadata.create_all(engine)

Alembic Support

Alembic is a database migration tool on top of SQLAlchemy. Snowflake SQLAlchemy works by adding the following code to alembic/env.py so that Alembic can recognize Snowflake SQLAlchemy.

from alembic.ddl.impl import DefaultImpl

class SnowflakeImpl(DefaultImpl):
    __dialect__ = 'snowflake'

See Alembic Documentation for general usage.

Key Pair Authentication Support

Snowflake SQLAlchemy supports key pair authentication by leveraging its Snowflake Connector for Python underpinnings. See Using Key Pair Authentication for steps to create the private and public keys.

The private key parameter is passed through connect_args as follows:

...
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization

with open("rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

engine = create_engine(URL(
    account='abc123',
    user='testuser1',
    ),
    connect_args={
        'private_key': pkb,
        },
    )

Where PRIVATE_KEY_PASSPHRASE is a passphrase to decrypt the private key file, rsa_key.p8.

Currently a private key parameter is not accepted by the snowflake.sqlalchemy.URL method.

Merge Command Support

Snowflake SQLAlchemy supports upserting with its MergeInto custom expression. See Merge for full documentation.

Use it as follows:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import MergeInto

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
t1 = meta.tables['t1']
t2 = meta.tables['t2']

merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
merge.when_matched_then_delete().where(t2.c.marked == 1)
merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)

Bulk Insert Optimization for ORM Models

When using Session.bulk_save_objects() with models that have nullable optional columns, SQLAlchemy groups objects into separate INSERT batches based on each object's set of non-None column keys. If some objects were constructed without supplying every nullable column, they produce different key sets and SQLAlchemy emits O(N) INSERT statements instead of a single executemany batch.

Snowflake SQLAlchemy provides two components that together solve this problem:

  • SnowflakeBase (SQLAlchemy 2.x only) — a DeclarativeBase subclass whose constructor pre-populates every plain-nullable column with None (or its scalar Python default) at construction time, so all instances share the same column-key set regardless of which kwargs the caller supplied.
  • snowflake_declarative_base() — a factory function compatible with both SQLAlchemy 1.4 and 2.x that produces a declarative base with the same pre-population behaviour.
  • SnowflakeSession — a Session subclass that passes render_nulls=True to the internal bulk-save call, preventing pre-populated None values from being stripped before grouping. Must be used together with SnowflakeBase or snowflake_declarative_base() for full effect.

SQLAlchemy 2.x example:

from sqlalchemy import Column, Integer, String, create_engine
from snowflake.sqlalchemy import SnowflakeBase, SnowflakeSession

class MyModel(SnowflakeBase):
    __tablename__ = "my_model"
    id = Column(Integer, primary_key=True)
    name = Column(String)       # nullable, no default
    status = Column(String, default="active")  # scalar default

engine = create_engine("snowflake://...")
SnowflakeBase.metadata.create_all(engine)

session = SnowflakeSession(bind=engine)
# All objects share the same column-key set — emits a single executemany INSERT
session.bulk_save_objects([
    MyModel(id=1),
    MyModel(id=2, name="foo"),
    MyModel(id=3, status="inactive"),
])
session.commit()

SQLAlchemy 1.4 and 2.x compatible example:

from sqlalchemy import Column, Integer, String, create_engine
from snowflake.sqlalchemy import snowflake_declarative_base, SnowflakeSession

Base = snowflake_declarative_base()

class MyModel(Base):
    __tablename__ = "my_model"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    status = Column(String, default="active")

engine = create_engine("snowflake://...")
Base.metadata.create_all(engine)

session = SnowflakeSession(bind=engine)
session.bulk_save_objects([
    MyModel(id=1),
    MyModel(id=2, name="foo"),
])
session.commit()

Notes:

  • SnowflakeBase is only available in SQLAlchemy 2.x. Use snowflake_declarative_base() when your code must run on both SA 1.4 and 2.x.
  • Columns with server_default, callable Python defaults (default=fn), or SQL-expression defaults (default=func.now()) are intentionally left absent from pre-population. Objects that differ on such columns may still be placed in separate INSERT batches — this is the same behaviour as stock SQLAlchemy.
  • SnowflakeSession alone (without the matching base class) is not sufficient: the base class is required to unify the column-key sets before SnowflakeSession can batch them together.

CopyIntoStorage Support

Snowflake SQLAlchemy supports saving tables/query results into different stages, as well as into Azure Containers and AWS buckets with its custom CopyIntoStorage expression. See Copy into for full documentation.

Use it as follows:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
users = meta.tables['users']

copy_into = CopyIntoStorage(from_=users,
                            into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'),
                            formatter=CSVFormatter().null_if(['null', 'Null']))
connection.execute(copy_into)

Iceberg Table with Snowflake Catalog support

Snowflake SQLAlchemy supports Iceberg Tables with the Snowflake Catalog, along with various related parameters. For detailed information about Iceberg Tables, refer to the Snowflake CREATE ICEBERG documentation.

To create an Iceberg Table using Snowflake SQLAlchemy, you can define the table using the SQLAlchemy Core syntax as follows:

table = IcebergTable(
    "myuser",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String),
    external_volume=external_volume_name,
    base_location="my_iceberg_table",
    as_query="SELECT * FROM table"
)

Alternatively, you can define the table using a declarative approach:

class MyUser(Base):
    __tablename__ = "myuser"

    @classmethod
    def __table_cls__(cls, name, metadata, *arg, **kw):
        return IcebergTable(name, metadata, *arg, **kw)

    __table_args__ = {
        "external_volume": "my_external_volume",
        "base_location": "my_iceberg_table",
        "as_query": "SELECT * FROM table",
    }

    id = Column(Integer, primary_key=True)
    name = Column(String)

Hybrid Table support

Snowflake SQLAlchemy supports Hybrid Tables with indexes. For detailed information, refer to the Snowflake CREATE HYBRID TABLE documentation.

To create a Hybrid Table and add an index, you can use the SQLAlchemy Core syntax as follows:

table = HybridTable(
    "myuser",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String),
    Index("idx_name", "name")
)

Alternatively, you can define the table using the declarative approach:

class MyUser(Base):
    __tablename__ = "myuser"

    @classmethod
    def __table_cls__(cls, name, metadata, *arg, **kw):
        return HybridTable(name, metadata, *arg, **kw)

    __table_args__ = (
        Index("idx_name", "name"),
    )

    id = Column(Integer, primary_key=True)
    name = Column(String)

Dynamic Tables support

Snowflake SQLAlchemy supports Dynamic Tables. For detailed information, refer to the Snowflake CREATE DYNAMIC TABLE documentation.

To create a Dynamic Table, you can use the SQLAlchemy Core syntax as follows:

dynamic_test_table_1 = DynamicTable(
    "dynamic_MyUser",
    metadata,
    Column("id", Integer),
    Column("name", String),
    target_lag=(1, TimeUnit.HOURS),  # Additionally, you can use SnowflakeKeyword.DOWNSTREAM
    warehouse='test_wh',
    refresh_mode=SnowflakeKeyword.FULL,
    as_query="SELECT id, name from MyUser;"
)

Alternatively, you can define a table without columns using the SQLAlchemy select() construct:

dynamic_test_table_1 = DynamicTable(
    "dynamic_MyUser",
    metadata,
    target_lag=(1, TimeUnit.HOURS),
    warehouse='test_wh',
    refresh_mode=SnowflakeKeyword.FULL,
    as_query=select(MyUser.id, MyUser.name)
)

Notes

  • Defining a primary key in a Dynamic Table is not supported, meaning declarative tables don’t support Dynamic Tables.
  • When using the as_query parameter with a string, you must explicitly define the columns. However, if you use the SQLAlchemy select() construct, you don’t need to explicitly define the columns.
  • Direct data insertion into Dynamic Tables is not supported.

Verifying Package Signatures

To ensure the authenticity and integrity of the Python package, follow the steps below to verify the package signature using cosign.

Steps to verify the signature:

  • Install cosign:
  • Download the file from the repository like pypi:
  • Download the signature files from the release tag, replace the version number with the version you are verifying:
  • Verify signature:
    # replace the version number with the version you are verifying
    ./cosign verify-blob snowflake_sqlalchemy-1.7.3-py3-none-any.whl  \
    --certificate snowflake_sqlalchemy-1.7.3-py3-none-any.whl.crt \
    --certificate-identity https://github.com/snowflakedb/snowflake-sqlalchemy/.github/workflows/python-publish.yml@refs/tags/v1.7.3 \
    --certificate-oidc-issuer https://token.actions.githubusercontent.com \
    --signature snowflake_sqlalchemy-1.7.3-py3-none-any.whl.sig
    Verified OK

Support

Feel free to file an issue or submit a PR here for general cases. For official support, contact Snowflake support at: https://community.snowflake.com/s/article/How-To-Submit-a-Support-Case-in-Snowflake-Lodge

Known Limitations

Identity columns as primary keys

Using SQLAlchemy's Identity() construct on a primary key column is not compatible with the SQLAlchemy ORM when targeting Snowflake.

Why it fails: After an INSERT, the ORM must retrieve the generated primary key to populate the in-memory object. SQLAlchemy supports two mechanisms for this — RETURNING (not available in Snowflake) and cursor.lastrowid (the Snowflake Python connector returns None for this attribute because Snowflake has no native rowid concept — see snowflake-connector-python#1201). With neither mechanism available, the ORM receives None as the primary key and raises:

sqlalchemy.orm.exc.FlushError: Instance <MyModel at 0x...> has a NULL identity key after a flush ...

Example that fails:

from sqlalchemy import Column, Identity, Integer, String
from sqlalchemy.orm import declarative_base, Session

Base = declarative_base()

class MyModel(Base):
    __tablename__ = "my_model"
    id = Column(Integer, Identity(start=1, increment=1), primary_key=True)  # does not work with ORM
    name = Column(String)

Base.metadata.create_all(engine)

with Session(engine) as session:
    session.add(MyModel(name="test"))
    session.commit()  # raises FlushError: NULL identity key

The dialect emits a SnowflakeWarning at DDL compile time when Identity() is detected on a primary key column to surface this problem early. The warning is emitted once per unique (table, column) pair per Python process — repeated DDL compilation of the same schema does not produce duplicate output.

To silence the warning entirely, use Python's standard warning filter:

import warnings
from snowflake.sqlalchemy.exc import SnowflakeWarning

warnings.filterwarnings("ignore", category=SnowflakeWarning)

Or set it via the PYTHONWARNINGS environment variable before starting your application:

PYTHONWARNINGS=ignore::snowflake.sqlalchemy.exc.SnowflakeWarning python my_app.py

Workaround — use Sequence() instead:

from sqlalchemy import Column, Integer, Sequence, String
from sqlalchemy.orm import declarative_base, Session

Base = declarative_base()

class MyModel(Base):
    __tablename__ = "my_model"
    id = Column(Integer, Sequence("my_model_id_seq"), primary_key=True)
    name = Column(String)

Base.metadata.create_all(engine)

with Session(engine) as session:
    session.add(MyModel(name="test"))
    session.commit()  # id is populated correctly

Sequence objects are fully supported by the Snowflake dialect and are the recommended way to generate auto-incrementing primary keys when using the ORM. See the Auto-increment Behavior section for more details.


Case-sensitive identifiers

Snowflake stores unquoted identifiers in UPPERCASE and treats them case-insensitively. SQLAlchemy uses lowercase for case-insensitive identifiers. The dialect bridges this gap via normalize_name / denormalize_name, but a few edge cases require explicit opt-in.

How reflection maps Snowflake names to SQLAlchemy names

When the dialect reflects a table, each column name passes through normalize_name, which produces one of three outcomes depending on how the identifier was stored in Snowflake:

Snowflake stored form How it was created normalize_name returns (default) normalize_name returns (case_sensitive_identifiers=True) SQLAlchemy treats it as
MYCOL (all-uppercase) CREATE TABLE t (MYCOL INT) — unquoted "mycol" (plain str) "mycol" (plain str) case-insensitive
mycol (lowercase) CREATE TABLE t ("mycol" INT) — quoted quoted_name("mycol", True) quoted_name("mycol", True) case-sensitive
MyCol (mixed-case) CREATE TABLE t ("MyCol" INT) — quoted "MyCol" (plain str) quoted_name("MyCol", True) case-sensitive — emitted SQL is "MyCol" in both modes (_requires_quotes forces quoting for any uppercase character)

With the flag off, the only observable difference for mixed-case names is the Python type: isinstance(name, quoted_name) is False and .quote is None. Emitted SQL, dict key equality, and hashing are identical because quoted_name is a str subclass. Enable the flag when downstream code (e.g. an Alembic render_item hook or a custom preparer subclass) needs to distinguish case-sensitive reflected names by type.

You can observe this directly via the inspector:

from sqlalchemy import inspect
from sqlalchemy.sql.elements import quoted_name

inspector = inspect(engine)
for col in inspector.get_columns("my_table"):
    name = col["name"]
    if isinstance(name, quoted_name) and name.quote:
        print(f"{name!r} — case-sensitive (was created quoted in Snowflake)")
    else:
        print(f"{name!r} — case-insensitive (Snowflake stores as {name.upper()})")

This means that after reflection, accessing a case-insensitive column requires the lowercase name. Given a table created as:

CREATE TABLE my_table (mycol INT);   -- unquoted: stored as MYCOL
metadata.reflect(bind=engine)
t = metadata.tables["my_table"]
t.c.mycol      # correct — Snowflake stored MYCOL, reflected as "mycol"
t.c["MYCOL"]   # KeyError — the reflected key is lowercase

And a case-sensitive column (quoted in Snowflake) is accessed by its exact reflected name:

t.c[quoted_name("mycol", True)]  # correct — matches the reflected quoted_name key
t.c.mycol                        # also works — quoted_name.__eq__ compares by value

Lowercase column names in CLUSTER BY

Column objects wrapped in quoted_name("mycol", True) are treated as case-sensitive by SQLAlchemy. Pass them directly to snowflake_clusterby:

from sqlalchemy.sql.elements import quoted_name
from sqlalchemy import Column, Integer, MetaData, Table

t = Table(
    "my_table",
    MetaData(),
    Column(quoted_name("mycol", True), Integer),
    snowflake_clusterby=[quoted_name("mycol", True)],
)
# Generates: CLUSTER BY ("mycol")

Without quoted_name(..., True) the column name is treated as case-insensitive and Snowflake resolves it as MYCOL.

ALL-UPPERCASE identifiers that are SQL reserved words (e.g. TABLE, SELECT)

Most identifiers are handled correctly without any flag at the SQL layer:

  • A quoted lowercase name ("mycol" in Snowflake) reflects as quoted_name("mycol", True) — already case-sensitive, in both flag modes.
  • A quoted mixed-case name ("MyCol") reflects as a plain str "MyCol" by default; with case_sensitive_identifiers=True it reflects as quoted_name("MyCol", True). Emitted SQL is "MyCol" either way because the preparer's _requires_quotes heuristic force-quotes any name containing uppercase characters, so only code that inspects .quote or isinstance(..., quoted_name) observes a difference.
  • An unquoted name (MYCOL stored as all-uppercase) reflects as "mycol" — correctly case-insensitive.

The one gap is an identifier whose all-uppercase form is also a SQL reserved word. For example, a table literally named TABLE (created as CREATE TABLE "TABLE" ...) stores as TABLE in Snowflake. When the dialect reflects it, normalize_name("TABLE") cannot tell whether TABLE is a plain uppercase column or the keyword TABLE, so by default it returns "TABLE" unchanged — an all-uppercase string that SQLAlchemy treats as case-sensitive. This causes a key mismatch: the table was reflected under the key "TABLE" but the same normalize_name call made during DDL would produce "table".

Enable case_sensitive_identifiers to fix this: the dialect will return quoted_name("table", True) for any all-uppercase identifier that is a reserved word, matching the standard SQLAlchemy convention:

from sqlalchemy import create_engine

engine = create_engine(
    "snowflake://user:pass@account/db",
    case_sensitive_identifiers=True,
)

Or via URL:

snowflake://user:pass@account/db?case_sensitive_identifiers=True

Hard limit: Enabling this flag changes the dict key used by normalize_name("TABLE") from "TABLE" to quoted_name("table", True). Because hash("TABLE") != hash("table"), any existing code that accesses metadata.tables["TABLE"] by the uppercase key will miss after the flag is enabled. Enabling the flag on an existing codebase requires auditing all string-keyed lookups into metadata.tables and table.c for names that happen to be reserved words.

Case-sensitive schema names — table and model definitions

The most reliable way to declare a case-sensitive schema across multiple tables is MetaData(schema=quoted_name(..., True)). All tables and ORM models that share that MetaData object inherit the schema automatically — no repetition per table:

from sqlalchemy import Column, Integer, MetaData, String, Table
from sqlalchemy.sql.elements import quoted_name

# Declare once — every Table attached to this MetaData inherits it.
metadata = MetaData(schema=quoted_name("myschema", True))

orders = Table("orders", metadata, Column("id", Integer, primary_key=True))
items  = Table("items",  metadata, Column("id", Integer, primary_key=True))
# SQL: "myschema".ORDERS and "myschema".ITEMS
# (table names are plain strings — Snowflake uppercases them)

To also make the table names case-sensitive, wrap them in quoted_name as well:

from sqlalchemy.sql.elements import quoted_name

orders = Table(quoted_name("orders", True), metadata, Column("id", Integer, primary_key=True))
items  = Table(quoted_name("items",  True), metadata, Column("id", Integer, primary_key=True))
# SQL: "myschema"."orders" and "myschema"."items"

When tables belong to different schemas, or when the MetaData is shared and you need per-table control, pass schema explicitly on the first table and reuse its .schema attribute on the rest:

orders = Table("orders", metadata,
               Column("id", Integer, primary_key=True),
               schema=quoted_name("myschema", True))

# Reuse the schema object directly — no need to repeat quoted_name.
items = Table("items", metadata,
              Column("id", Integer, primary_key=True),
              schema=orders.schema)

Table.schema is always a quoted_name instance. Assigning from another table's .schema carries the quote=True flag intact.

For ORM declarative models, set the schema on Base.metadata so every model inherits it:

from sqlalchemy import Column, Integer
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.sql.elements import quoted_name

class Base(DeclarativeBase):
    metadata = MetaData(schema=quoted_name("myschema", True))

class Orders(Base):
    __tablename__ = "orders"
    id = Column(Integer, primary_key=True)

class Items(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True)

For per-model override, use __table_args__:

class Orders(Base):
    __tablename__ = "orders"
    __table_args__ = {"schema": quoted_name("myschema", True)}
    id = Column(Integer, primary_key=True)

class Items(Base):
    __tablename__ = "items"
    __table_args__ = {"schema": Orders.__table__.schema}  # reuse — no repetition
    id = Column(Integer, primary_key=True)

Plain strings with inner double-quotes — '"myschema"' — are also accepted and produce the same result as quoted_name("myschema", True) when the dialect is built with case_sensitive_identifiers=True:

engine = create_engine(
    "snowflake://user:pass@account/db",
    case_sensitive_identifiers=True,
)
Table("orders", metadata, schema='"myschema"')         # → "myschema".ORDERS
Table("orders", metadata, schema='"mydb"."myschema"')  # → "mydb"."myschema".ORDERS

Without the flag, the schema parser extracts the inner value but leaves the quote attribute at its default (None), so the preparer's _requires_quotes heuristic decides whether to re-quote it — and for an all-lowercase value like myschema the heuristic returns False, stripping the quotes from the emitted SQL:

# Flag off (default):
Table("orders", metadata, schema='"myschema"')
# SQL: myschema.ORDERS   ← Snowflake resolves this to MYSCHEMA (case-insensitive)

When case-sensitivity matters, either enable the flag or use quoted_name("myschema", True) / MetaData(schema=quoted_name(..., True)) explicitly — those forms carry quote=True on the value itself and are honoured regardless of the flag. Prefer quoted_name in code that is read often; prefer the quoted-string form together with the flag when the value comes from a config that already embeds the quotes.

Identifiers that contain a literal double-quote character use the standard SQL escape "". The parser handles this correctly — '"my""schema"' is treated as a single identifier named my"schema and round-trips to the SQL form "my""schema" in both flag modes (the embedded " is not a legal identifier character, so _requires_quotes forces quoting regardless of the flag):

Table("orders", metadata, schema='"my""schema"')   # schema name: my"schema
# SQL: "my""schema".ORDERS

Case-sensitive schema names — engine connection

create_snowflake_engine sets the default schema for the connection (equivalent to USE SCHEMA on connect). Use it when all queries in the session target the same case-sensitive schema and you do not want to qualify every table individually:

from snowflake.sqlalchemy import create_snowflake_engine

engine = create_snowflake_engine(
    "snowflake://user:pass@account/mydb",
    schema="myschema",
    case_sensitive_schema=True,
)

When the URL is stored outside Python code (environment variable, alembic.ini, Docker/Kubernetes config), create_snowflake_engine is not available and the schema must be percent-encoded directly in the URL string. Wrap the schema name in %22 (the percent-encoded form of "):

snowflake://user:pass@account/mydb/%22myschema%22

The dialect decodes %22 back to a literal " before passing the value to the Snowflake connector, which then executes USE SCHEMA "myschema" preserving case.

When building the URL programmatically with SQLAlchemy's URL() helper, you can pass the schema with embedded double-quotes directly — the helper's percent-encoding handles them automatically:

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account="abc123",
    user="testuser1",
    password="secret",
    database="mydb",
    schema='"myschema"',   # literal " characters; URL() encodes them as %22 internally
))

This is equivalent to the %22 string form but avoids manual percent-encoding.

Alembic — hand-written migrations

quoted_name works directly in op.create_table and op.add_column, so hand-written migration files need no special handling:

import sqlalchemy as sa
from alembic import op
from sqlalchemy.sql.elements import quoted_name

def upgrade():
    op.create_table(
        "my_table",
        sa.Column(quoted_name("mycol", True), sa.String()),  # quoted → "mycol" in Snowflake
        sa.Column("name", sa.String()),                      # unquoted → NAME in Snowflake
    )

    op.add_column(
        "my_table",
        sa.Column(quoted_name("extra", True), sa.Integer()),
    )

Alembic — case-sensitive schema configuration

When the Alembic version table or the migrations themselves target a case-sensitive schema, use the %22 form in every place Alembic receives a schema string, because these values are passed as plain strings and create_snowflake_engine is not available at that point:

# alembic/env.py
from sqlalchemy import create_engine

url = "snowflake://user:pass@account/mydb/%22myschema%22"

context.configure(
    url=url,
    target_metadata=target_metadata,
    version_table_schema="%22myschema%22",  # keeps alembic_version table in the same schema
)

When include_schemas=True is enabled, Alembic calls inspector.get_schema_names() and passes each result to the include_name filter. The dialect returns case-sensitive schema names as quoted_name objects (e.g. quoted_name("myschema", True)). Because quoted_name inherits from str, a plain string comparison is safe:

# alembic/env.py
from sqlalchemy.sql.elements import quoted_name

def include_name(name, type_, parent_names):
    if type_ == "schema":
        # quoted_name("myschema", True) == "myschema" → True (str subclass equality)
        return name in {'"myschema"', "myschema"}
    return True

context.configure(
    url="snowflake://user:pass@account/mydb/%22myschema%22",
    target_metadata=target_metadata,
    include_schemas=True,
    include_name=include_name,
    version_table_schema="%22myschema%22",
)

Note: inspector.get_schema_names() returns the schema as Snowflake stores it — lowercase "myschema" if it was created quoted, or uppercase "MYSCHEMA" if unquoted. After normalize_name this becomes quoted_name("myschema", True) or "myschema" (plain lowercase) respectively. Filter against the lowercase value and both cases are covered by the str equality of quoted_name.

Alembic — autogenerate and case-sensitive columns

Alembic's default renderer serialises quoted_name("mycol", True) as the plain string "mycol", losing the case-sensitivity signal. The generated migration would create a case-insensitive MYCOL column instead of "mycol".

This also affects the comparison phase: when autogenerate detects that a reflected quoted_name("mycol", True) column differs from what it would render, it may emit a spurious alter_column on every run. The fix for both problems is the same — register the render_item hook in env.py:

from snowflake.sqlalchemy.alembic_util import render_item as snowflake_render_item

context.configure(
    ...,
    render_item=snowflake_render_item,
)

Hard limit: Alembic has no dialect-level rendering hook. The render_item callback in env.py is the only injection point and requires a two-line opt-in per project. This cannot be eliminated without upstream Alembic changes.

Patterns that do not work

Plain unquoted string as a case-sensitive schema

# WRONG — "myschema" (no quotes, no quoted_name) is denormalized to MYSCHEMA
Table("my_table", metadata, schema="myschema")

class MyModel(Base):
    __tablename__ = "my_table"
    __table_args__ = {"schema": "myschema"}  # also wrong

A plain lowercase string has no quote attribute, so denormalize_name converts it to MYSCHEMA before it reaches Snowflake. No error is raised — if a schema named MYSCHEMA happens to exist the query succeeds silently against the wrong schema.

Use quoted_name("myschema", True), '"myschema"', or MetaData(schema=quoted_name(..., True)) as shown in the section above.

Packages

 
 
 

Contributors

Languages