Snowflake SQLAlchemy runs on the top of the Snowflake Connector for Python as a dialect to bridge a Snowflake database and SQLAlchemy applications.
Table of contents:
- Snowflake SQLAlchemy
- Prerequisites
- Installing Snowflake SQLAlchemy
- Verifying Your Installation
- Parameters and Behavior
- Connection Parameters
- Opening and Closing Connection
- Auto-increment Behavior
- Object Name Case Handling
- Index Support
- Numpy Data Type Support
- DECFLOAT Data Type Support
- VECTOR Data Type Support
- Cache Column Metadata
- VARIANT, ARRAY and OBJECT Support
- Structured Data Types Support
- CLUSTER BY Support
- Alembic Support
- Key Pair Authentication Support
- Merge Command Support
- Bulk Insert Optimization for ORM Models
- CopyIntoStorage Support
- Iceberg Table with Snowflake Catalog support
- Hybrid Table support
- Dynamic Tables support
- Notes
- Verifying Package Signatures
- Support
- Known Limitations
The only requirement for Snowflake SQLAlchemy is the Snowflake Connector for Python; however, the connector does not need to be installed because installing Snowflake SQLAlchemy automatically installs the connector.
Snowflake SQLAlchemy can be used with Pandas, Jupyter and Pyramid, which provide higher levels of application frameworks for data analytics and web applications. However, building a working environment from scratch is not a trivial task, particularly for novice users. Installing the frameworks requires C compilers and tools, and choosing the right tools and versions is a hurdle that might deter users from using Python applications.
An easier way to build an environment is through Anaconda, which provides a complete, precompiled technology stack for all users, including non-Python experts such as data analysts and students. For Anaconda installation instructions, see the Anaconda install documentation. The Snowflake SQLAlchemy package can then be installed on top of Anaconda using pip.
The Snowflake SQLAlchemy package can be installed from the public PyPI repository using pip:
pip install --upgrade snowflake-sqlalchemypip automatically installs all required modules, including the Snowflake Connector for Python.
-
Create a file (e.g.
validate.py) that contains the following Python sample code, which connects to Snowflake and displays the Snowflake version:from sqlalchemy import create_engine engine = create_engine( 'snowflake://{user}:{password}@{account}/'.format( user='<your_user_login_name>', password='<your_password>', account='<your_account_name>', ) ) try: connection = engine.connect() results = connection.execute('select current_version()').fetchone() print(results[0]) finally: connection.close() engine.dispose()
-
Replace
<your_user_login_name>,<your_password>, and<your_account_name>with the appropriate values for your Snowflake account and user.For more details, see Connection Parameters.
-
Execute the sample code. For example, if you created a file named
validate.py:python validate.py
The Snowflake version (e.g.
1.48.0) should be displayed.
As much as possible, Snowflake SQLAlchemy provides compatible functionality for SQLAlchemy applications. For information on using SQLAlchemy, see the SQLAlchemy documentation.
However, Snowflake SQLAlchemy also provides Snowflake-specific parameters and behavior, which are described in the following sections.
Snowflake SQLAlchemy uses the following syntax for the connection string used to connect to Snowflake and initiate a session:
'snowflake://<user_login_name>:<password>@<account_name>'Where:
<user_login_name>is the login name for your Snowflake user.<password>is the password for your Snowflake user.<account_name>is the name of your Snowflake account.
Include the region in the <account_name> if applicable, more info is available here.
You can optionally specify the initial database and schema for the Snowflake session by including them at the end of the connection string, separated by /. You can also specify the initial warehouse and role for the session as a parameter string at the end of the connection string:
'snowflake://<user_login_name>:<password>@<account_name>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'As pointed out in SQLAlchemy, URLs
containing special characters need to be URL encoded to be parsed correctly. This includes the %, @ signs. Unescaped password containing special
characters could lead to authentication failure.
The encoding for the password can be generated using urllib.parse:
import urllib.parse
urllib.parse.quote("kx@% jj5/g")
'kx%40%25%20jj5/g'Note: urllib.parse.quote_plus may also be used if there is no space in the string, as urllib.parse.quote_plus will replace space with +.
To create an engine with the proper encodings, either manually constructing the url string by formatting
or taking advantage of the snowflake.sqlalchemy.URL helper method:
import urllib.parse
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
quoted_password = urllib.parse.quote("kx@% jj5/g")
# 1. manually constructing an url string
url = f'snowflake://testuser1:{quoted_password}@abc123/testdb/public?warehouse=testwh&role=myrole'
engine = create_engine(url)
# 2. using the snowflake.sqlalchemy.URL helper method
engine = create_engine(URL(
account = 'abc123',
user = 'testuser1',
password = quoted_password,
database = 'testdb',
schema = 'public',
warehouse = 'testwh',
role='myrole',
))Note: After login, the initial database, schema, warehouse and role specified in the connection string can always be changed for the session.
The following example calls the create_engine method with the user name testuser1, password 0123456, account name abc123, database testdb, schema public, warehouse testwh, and role myrole:
from sqlalchemy import create_engine
engine = create_engine(
'snowflake://testuser1:0123456@abc123/testdb/public?warehouse=testwh&role=myrole'
)Other parameters, such as timezone, can also be specified as a URI parameter or in connect_args parameters. For example:
from sqlalchemy import create_engine
engine = create_engine(
'snowflake://testuser1:0123456@abc123/testdb/public?warehouse=testwh&role=myrole',
connect_args={
'timezone': 'America/Los_Angeles',
}
)For convenience, you can use the snowflake.sqlalchemy.URL method to construct the connection string and connect to the database. The following example constructs the same connection string from the previous example:
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
engine = create_engine(URL(
account = 'abc123',
user = 'testuser1',
password = '0123456',
database = 'testdb',
schema = 'public',
warehouse = 'testwh',
role='myrole',
timezone = 'America/Los_Angeles',
))Use the supported environment variables, HTTPS_PROXY, HTTP_PROXY and NO_PROXY to configure a proxy server.
Snowflake session parameters (such as QUERY_TAG) cannot be set directly through the URL helper.
Instead, pass them via the connect_args parameter of create_engine, using the session_parameters dict — the same way you would through the Python connector:
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
engine = create_engine(
URL(
# CONNECTION_PARAMETERS
),
connect_args={
"session_parameters": {
"QUERY_TAG": "SOME_QUERY_TAGS",
}
},
)Session parameters set this way apply to all queries executed within the session.
To change a session parameter for specific queries mid-session, use ALTER SESSION:
from sqlalchemy import text
with engine.connect() as conn:
conn.execute(text("ALTER SESSION SET QUERY_TAG = 'batch_job_1'"))
conn.execute(text("...")) # Uses 'batch_job_1'
conn.execute(text("ALTER SESSION SET QUERY_TAG = 'batch_job_2'"))
conn.execute(text("...")) # Uses 'batch_job_2'
conn.execute(text("ALTER SESSION UNSET QUERY_TAG"))
conn.execute(text("...")) # No tagOpen a connection by executing engine.connect(); avoid using engine.execute(). Make certain to close the connection by executing connection.close() before
engine.dispose(); otherwise, the Python Garbage collector removes the resources required to communicate with Snowflake, preventing the Python connector from closing the session properly.
# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()
# Better.
engine = create_engine(...)
connection = engine.connect()
try:
connection.execute(text(<SQL>))
finally:
connection.close()
engine.dispose()
# Best
try:
with engine.connect() as connection:
connection.execute(text(<SQL>))
# or
connection.exec_driver_sql(<SQL>)
finally:
engine.dispose()Auto-incrementing a value requires the Sequence object. Include the Sequence object in the primary key column to automatically increment the value as each new record is inserted. For example:
t = Table('mytable', metadata,
Column('id', Integer, Sequence('id_seq'), primary_key=True),
Column(...), ...
)Snowflake stores all case-insensitive object names in uppercase text. In contrast, SQLAlchemy considers all lowercase object names to be case-insensitive. Snowflake SQLAlchemy converts the object name case during schema-level communication, i.e. during table and index reflection. If you use uppercase object names, SQLAlchemy assumes they are case-sensitive and encloses the names with quotes. This behavior will cause mismatches against data dictionary data received from Snowflake, so unless identifier names have been truly created as case sensitive using quotes, e.g., "TestDb", all lowercase names should be used on the SQLAlchemy side.
Indexes are supported only for Hybrid Tables in Snowflake SQLAlchemy. For more details on limitations and use cases, refer to the Create Index documentation. You can create an index using the following methods:
You can create a single column index by setting the index=True parameter on the column or by explicitly defining an Index object.
hybrid_test_table_1 = HybridTable(
"table_name",
metadata,
Column("column1", Integer, primary_key=True),
Column("column2", String, index=True),
Index("index_1", "column1", "column2")
)
metadata.create_all(engine_testaccount)For multi-column indexes, you define the Index object specifying the columns that should be indexed.
hybrid_test_table_1 = HybridTable(
"table_name",
metadata,
Column("column1", Integer, primary_key=True),
Column("column2", String),
Index("index_1", "column1", "column2")
)
metadata.create_all(engine_testaccount)Snowflake SQLAlchemy supports binding and fetching NumPy data types. Binding is always supported. To enable fetching NumPy data types, add numpy=True to the connection parameters.
The following example shows the round trip of numpy.datetime64 data:
import numpy as np
import pandas as pd
engine = create_engine(URL(
account = 'abc123',
user = 'testuser1',
password = 'pass',
database = 'db',
schema = 'public',
warehouse = 'testwh',
role='myrole',
numpy=True,
))
specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')
with engine.connect() as connection:
connection.exec_driver_sql(
"CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
connection.exec_driver_sql(
"INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
)
df = pd.read_sql_query("SELECT * FROM ts_tbl", connection)
assert df.c1.values[0] == specific_dateThe following NumPy data types are supported:
- numpy.int64
- numpy.float64
- numpy.datatime64
Snowflake SQLAlchemy supports the DECFLOAT data type, which provides decimal floating-point with up to 38 significant digits. For more information, see the Snowflake DECFLOAT documentation.
from sqlalchemy import Column, Integer, MetaData, Table
from snowflake.sqlalchemy import DECFLOAT
metadata = MetaData()
t = Table('my_table', metadata,
Column('id', Integer, primary_key=True),
Column('value', DECFLOAT()),
)
metadata.create_all(engine)The Snowflake Python connector uses Python's decimal module context when converting DECFLOAT values to Python Decimal objects. Python's default decimal context precision is 28 digits, which can truncate DECFLOAT values that use up to 38 digits.
To preserve full 38-digit precision, add enable_decfloat=True to the connection URL:
from sqlalchemy import create_engine
engine = create_engine(
'snowflake://testuser1:0123456@abc123/testdb/public?warehouse=testwh&enable_decfloat=True'
)Or using the snowflake.sqlalchemy.URL helper:
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
engine = create_engine(URL(
account = 'abc123',
user = 'testuser1',
password = '0123456',
database = 'testdb',
schema = 'public',
warehouse = 'testwh',
enable_decfloat = True,
))Note: DECFLOAT does not support special values (inf, -inf, NaN) unlike FLOAT.
Why is enable_decfloat not enabled by default? Enabling it sets decimal.getcontext().prec = 38, which modifies Python's thread-local decimal context and affects all Decimal operations in that thread, not just database queries. To avoid unexpected side effects on application code, the dialect emits a warning when DECFLOAT values are retrieved without full precision enabled, guiding users to opt-in explicitly.
Snowflake SQLAlchemy supports the VECTOR data type with varying element type and dimension.
For more information, see the Snowflake documentation.
from sqlalchemy import Column, Integer, Float, MetaData, Table
from snowflake.sqlalchemy import VECTOR
metadata = MetaData()
t = Table('my_table', metadata,
Column('id', Integer, primary_key=True),
Column('int_vec', VECTOR(Integer, 20)),
Column('float_vec', VECTOR(Float, 40)),
)
metadata.create_all(engine)Snowflake SQLAlchemy provides three Snowflake-specific timestamp types that map directly to their Snowflake counterparts:
from sqlalchemy import Column, Integer, MetaData, Table, create_engine
from snowflake.sqlalchemy import TIMESTAMP_NTZ, TIMESTAMP_TZ, TIMESTAMP_LTZ
engine = create_engine(...)
metadata = MetaData()
t = Table('events', metadata,
Column('id', Integer, primary_key=True),
Column('created_at', TIMESTAMP_NTZ()), # TIMESTAMP WITHOUT TIME ZONE
Column('scheduled_at', TIMESTAMP_TZ()), # TIMESTAMP WITH TIME ZONE
Column('logged_at', TIMESTAMP_LTZ()), # TIMESTAMP WITH LOCAL TIME ZONE
)
metadata.create_all(engine)SQLAlchemy's generic DateTime and TIMESTAMP types also support timezone-aware columns via the timezone parameter. When timezone=True is set, the dialect emits TIMESTAMP_TZ instead of the default TIMESTAMP_NTZ:
from sqlalchemy import Column, DateTime, Integer, MetaData, Table, create_engine
from sqlalchemy.types import TIMESTAMP
engine = create_engine(...)
metadata = MetaData()
t = Table('events', metadata,
Column('id', Integer, primary_key=True),
Column('naive_ts', DateTime()), # produces TIMESTAMP_NTZ
Column('aware_ts', DateTime(timezone=True)), # produces TIMESTAMP_TZ
Column('naive_ts2', TIMESTAMP()), # produces TIMESTAMP_NTZ
Column('aware_ts2', TIMESTAMP(timezone=True)), # produces TIMESTAMP_TZ
)
metadata.create_all(engine)This also applies when using pandas to_sql() with timezone-aware datetime columns, which infers DateTime(timezone=True) automatically (see #199).
Note on Time and timezones: SQLAlchemy's Time type accepts a timezone parameter, but Snowflake's TIME data type does not support time zones. Using Time(timezone=True) will compile to plain TIME and the timezone flag will have no effect. If you need to store time data with time-zone information, use a timestamp type such as TIMESTAMP_TZ or DateTime(timezone=True) instead.
SQLAlchemy provides the runtime inspection API to get the runtime information about the various objects. One common use case is retrieving all tables and their column metadata in a schema to construct a schema catalog. For example, alembic manages database schema migrations on top of SQLAlchemy. A typical flow (SQLAlchemy 1.4) is:
inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
column_metadata = inspector.get_columns(table_name, schema)
primary_keys = inspector.get_pk_constraint(table_name, schema)
foreign_keys = inspector.get_foreign_keys(table_name, schema)
...In this flow, running a separate query per table can be slow for large schemas. Snowflake SQLAlchemy optimises this with schema-wide cached queries and, where appropriate, fast per-table queries.
SQLAlchemy 2.x (automatic)
SQLAlchemy 2.x distinguishes bulk reflection from single-table inspection at the framework level:
MetaData.reflect()/Table(..., autoload_with=engine)— callsget_multi_columns,get_multi_pk_constraint,get_multi_foreign_keys, andget_multi_unique_constraints. Each issues one schema-wideSHOWorinformation_schemaquery and caches the result for all tables in the schema.inspector.get_columns(table_name)— issues a singleDESC TABLEquery directly against that table. This is fast and correct for all table types including temporary tables.
No configuration is needed; the routing is handled automatically by the SA 2.x dispatch layer.
Note on reflected type representations: Because inspector.get_columns() uses DESC TABLE, reflected types always include Snowflake's resolved default sizes (e.g. BINARY(8388608) instead of BINARY, VARCHAR(16777216) instead of VARCHAR). The type objects are functionally identical; only str() output differs. Use isinstance() checks rather than string comparison for type introspection.
from sqlalchemy import MetaData, inspect, create_engine
engine = create_engine('snowflake://...')
# SA 2.x: one schema-wide query per metadata type, all tables cached at once
metadata = MetaData()
metadata.reflect(bind=engine, schema='public')
# SA 2.x: direct DESC TABLE, no schema-wide query issued
inspector = inspect(engine)
columns = inspector.get_columns('my_table', schema='public')Per-table optimisation
On SQLAlchemy 2.x, get_pk_constraint, get_unique_constraints, get_foreign_keys, get_indexes, and get_columns automatically use per-table queries (SHOW … IN TABLE, DESC TABLE) for single-table Inspector calls (e.g. Inspector.get_pk_constraint(), pandas.read_sql_table()). MetaData.reflect() continues to use the schema-wide get_multi_* hooks, which issue one query per reflection pass.
On SQLAlchemy 1.4, MetaData.reflect() calls the singular methods per-table. Add cache_column_metadata=True to the connection URL to opt in to per-table queries for get_pk_constraint, get_unique_constraints, get_foreign_keys, get_indexes, and get_columns. Without this flag, the existing schema-wide queries are used unchanged.
engine = create_engine(URL(
account = 'abc123',
user = 'testuser1',
password = 'pass',
database = 'db',
schema = 'public',
warehouse = 'testwh',
role='myrole',
cache_column_metadata=True, # SA 1.4 only: enables per-table reflection
))Performance Implications
For schemas with many tables (100+), schema-wide queries issued once during MetaData.reflect() are far more efficient than per-table queries in a loop:
- Schema-wide
SHOW PRIMARY KEYS IN SCHEMA(all tables): < 1 second - Per-table loop over 1 000 tables: 1 000+ round-trips
For single-table inspection via Inspector, per-table queries (DESC TABLE, SHOW … IN TABLE) are faster than fetching the entire schema.
Best Practices
- For bulk reflection: use
metadata.reflect()— schema-wide queries are issued once and cached. - For single-table inspection: use
inspector.get_columns()/inspector.get_pk_constraint()etc. — per-table queries are used automatically (SA 2.x) or withcache_column_metadata=True(SA 1.4). - For very large schemas: reflect only the tables you need:
metadata.reflect(bind=engine, schema='public', only=['table1', 'table2'])Snowflake SQLAlchemy supports fetching VARIANT, ARRAY and OBJECT data types. All types are converted into str in Python so that you can convert them to native data types using json.loads.
This example shows how to create a table including VARIANT, ARRAY, and OBJECT data type columns.
from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
t = Table('my_semi_strucutred_datatype_table', metadata,
Column('va', VARIANT),
Column('ob', OBJECT),
Column('ar', ARRAY))
metdata.create_all(engine)In order to retrieve VARIANT, ARRAY, and OBJECT data type columns and convert them to the native Python data types, fetch data and call the json.loads method as follows:
import json
connection = engine.connect()
results = connection.execute(select([t])
row = results.fetchone()
data_variant = json.loads(row[0])
data_object = json.loads(row[1])
data_array = json.loads(row[2])This module defines custom SQLAlchemy types for Snowflake structured data, specifically for Iceberg tables. The types —MAP, OBJECT, and ARRAY— allow you to store complex data structures in your SQLAlchemy models. For detailed information, refer to the Snowflake Structured data types documentation.
The MAP type represents a collection of key-value pairs, where each key and value can have different types.
- Key Type: The type of the keys (e.g.,
TEXT,NUMBER). - Value Type: The type of the values (e.g.,
TEXT,NUMBER). - Not Null: Whether
NULLvalues are allowed (default isFalse).
Example Usage
IcebergTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("map_col", MAP(NUMBER(10, 0), TEXT(16777216))),
external_volume="external_volume",
base_location="base_location",
)The OBJECT type represents a semi-structured object with named fields. Each field can have a specific type, and you can also specify whether each field is nullable.
- Items Types: A dictionary of field names and their types. The type can optionally include a nullable flag (
Truefor not nullable,Falsefor nullable, default isFalse).
Example Usage
IcebergTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column(
"object_col",
OBJECT(key1=(TEXT(16777216), False), key2=(NUMBER(10, 0), False)),
OBJECT(key1=TEXT(16777216), key2=NUMBER(10, 0)), # Without nullable flag
),
external_volume="external_volume",
base_location="base_location",
)The ARRAY type represents an ordered list of values, where each element has the same type. The type of the elements is defined when creating the array.
- Value Type: The type of the elements in the array (e.g.,
TEXT,NUMBER). - Not Null: Whether
NULLvalues are allowed (default isFalse).
Example Usage
IcebergTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("array_col", ARRAY(TEXT(16777216))),
external_volume="external_volume",
base_location="base_location",
)Snowflake SQLAchemy supports the CLUSTER BY parameter for tables. For information about the parameter, see :doc:/sql-reference/sql/create-table.
This example shows how to create a table with two columns, id and name, as the clustering keys:
t = Table('myuser', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
snowflake_clusterby=['id', 'name', text('id > 5')], ...
)
metadata.create_all(engine)Alembic is a database migration tool on top of SQLAlchemy. Snowflake SQLAlchemy works by adding the following code to alembic/env.py so that Alembic can recognize Snowflake SQLAlchemy.
from alembic.ddl.impl import DefaultImpl
class SnowflakeImpl(DefaultImpl):
__dialect__ = 'snowflake'See Alembic Documentation for general usage.
Snowflake SQLAlchemy supports key pair authentication by leveraging its Snowflake Connector for Python underpinnings. See Using Key Pair Authentication for steps to create the private and public keys.
The private key parameter is passed through connect_args as follows:
...
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
with open("rsa_key.p8", "rb") as key:
p_key= serialization.load_pem_private_key(
key.read(),
password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend=default_backend()
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
engine = create_engine(URL(
account='abc123',
user='testuser1',
),
connect_args={
'private_key': pkb,
},
)Where PRIVATE_KEY_PASSPHRASE is a passphrase to decrypt the private key file, rsa_key.p8.
Currently a private key parameter is not accepted by the snowflake.sqlalchemy.URL method.
Snowflake SQLAlchemy supports upserting with its MergeInto custom expression.
See Merge for full documentation.
Use it as follows:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import MergeInto
engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()
meta = MetaData()
meta.reflect(bind=session.bind)
t1 = meta.tables['t1']
t2 = meta.tables['t2']
merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
merge.when_matched_then_delete().where(t2.c.marked == 1)
merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)When using Session.bulk_save_objects() with models that have nullable optional
columns, SQLAlchemy groups objects into separate INSERT batches based on each
object's set of non-None column keys. If some objects were constructed without
supplying every nullable column, they produce different key sets and SQLAlchemy
emits O(N) INSERT statements instead of a single executemany batch.
Snowflake SQLAlchemy provides two components that together solve this problem:
SnowflakeBase(SQLAlchemy 2.x only) — aDeclarativeBasesubclass whose constructor pre-populates every plain-nullable column withNone(or its scalar Python default) at construction time, so all instances share the same column-key set regardless of which kwargs the caller supplied.snowflake_declarative_base()— a factory function compatible with both SQLAlchemy 1.4 and 2.x that produces a declarative base with the same pre-population behaviour.SnowflakeSession— aSessionsubclass that passesrender_nulls=Trueto the internal bulk-save call, preventing pre-populatedNonevalues from being stripped before grouping. Must be used together withSnowflakeBaseorsnowflake_declarative_base()for full effect.
SQLAlchemy 2.x example:
from sqlalchemy import Column, Integer, String, create_engine
from snowflake.sqlalchemy import SnowflakeBase, SnowflakeSession
class MyModel(SnowflakeBase):
__tablename__ = "my_model"
id = Column(Integer, primary_key=True)
name = Column(String) # nullable, no default
status = Column(String, default="active") # scalar default
engine = create_engine("snowflake://...")
SnowflakeBase.metadata.create_all(engine)
session = SnowflakeSession(bind=engine)
# All objects share the same column-key set — emits a single executemany INSERT
session.bulk_save_objects([
MyModel(id=1),
MyModel(id=2, name="foo"),
MyModel(id=3, status="inactive"),
])
session.commit()SQLAlchemy 1.4 and 2.x compatible example:
from sqlalchemy import Column, Integer, String, create_engine
from snowflake.sqlalchemy import snowflake_declarative_base, SnowflakeSession
Base = snowflake_declarative_base()
class MyModel(Base):
__tablename__ = "my_model"
id = Column(Integer, primary_key=True)
name = Column(String)
status = Column(String, default="active")
engine = create_engine("snowflake://...")
Base.metadata.create_all(engine)
session = SnowflakeSession(bind=engine)
session.bulk_save_objects([
MyModel(id=1),
MyModel(id=2, name="foo"),
])
session.commit()Notes:
SnowflakeBaseis only available in SQLAlchemy 2.x. Usesnowflake_declarative_base()when your code must run on both SA 1.4 and 2.x.- Columns with
server_default, callable Python defaults (default=fn), or SQL-expression defaults (default=func.now()) are intentionally left absent from pre-population. Objects that differ on such columns may still be placed in separate INSERT batches — this is the same behaviour as stock SQLAlchemy. SnowflakeSessionalone (without the matching base class) is not sufficient: the base class is required to unify the column-key sets beforeSnowflakeSessioncan batch them together.
Snowflake SQLAlchemy supports saving tables/query results into different stages, as well as into Azure Containers and
AWS buckets with its custom CopyIntoStorage expression. See Copy into
for full documentation.
Use it as follows:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter
engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()
meta = MetaData()
meta.reflect(bind=session.bind)
users = meta.tables['users']
copy_into = CopyIntoStorage(from_=users,
into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'),
formatter=CSVFormatter().null_if(['null', 'Null']))
connection.execute(copy_into)Snowflake SQLAlchemy supports Iceberg Tables with the Snowflake Catalog, along with various related parameters. For detailed information about Iceberg Tables, refer to the Snowflake CREATE ICEBERG documentation.
To create an Iceberg Table using Snowflake SQLAlchemy, you can define the table using the SQLAlchemy Core syntax as follows:
table = IcebergTable(
"myuser",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
external_volume=external_volume_name,
base_location="my_iceberg_table",
as_query="SELECT * FROM table"
)Alternatively, you can define the table using a declarative approach:
class MyUser(Base):
__tablename__ = "myuser"
@classmethod
def __table_cls__(cls, name, metadata, *arg, **kw):
return IcebergTable(name, metadata, *arg, **kw)
__table_args__ = {
"external_volume": "my_external_volume",
"base_location": "my_iceberg_table",
"as_query": "SELECT * FROM table",
}
id = Column(Integer, primary_key=True)
name = Column(String)Snowflake SQLAlchemy supports Hybrid Tables with indexes. For detailed information, refer to the Snowflake CREATE HYBRID TABLE documentation.
To create a Hybrid Table and add an index, you can use the SQLAlchemy Core syntax as follows:
table = HybridTable(
"myuser",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
Index("idx_name", "name")
)Alternatively, you can define the table using the declarative approach:
class MyUser(Base):
__tablename__ = "myuser"
@classmethod
def __table_cls__(cls, name, metadata, *arg, **kw):
return HybridTable(name, metadata, *arg, **kw)
__table_args__ = (
Index("idx_name", "name"),
)
id = Column(Integer, primary_key=True)
name = Column(String)Snowflake SQLAlchemy supports Dynamic Tables. For detailed information, refer to the Snowflake CREATE DYNAMIC TABLE documentation.
To create a Dynamic Table, you can use the SQLAlchemy Core syntax as follows:
dynamic_test_table_1 = DynamicTable(
"dynamic_MyUser",
metadata,
Column("id", Integer),
Column("name", String),
target_lag=(1, TimeUnit.HOURS), # Additionally, you can use SnowflakeKeyword.DOWNSTREAM
warehouse='test_wh',
refresh_mode=SnowflakeKeyword.FULL,
as_query="SELECT id, name from MyUser;"
)Alternatively, you can define a table without columns using the SQLAlchemy select() construct:
dynamic_test_table_1 = DynamicTable(
"dynamic_MyUser",
metadata,
target_lag=(1, TimeUnit.HOURS),
warehouse='test_wh',
refresh_mode=SnowflakeKeyword.FULL,
as_query=select(MyUser.id, MyUser.name)
)- Defining a primary key in a Dynamic Table is not supported, meaning declarative tables don’t support Dynamic Tables.
- When using the
as_queryparameter with a string, you must explicitly define the columns. However, if you use the SQLAlchemyselect()construct, you don’t need to explicitly define the columns. - Direct data insertion into Dynamic Tables is not supported.
To ensure the authenticity and integrity of the Python package, follow the steps below to verify the package signature using cosign.
Steps to verify the signature:
- Install cosign:
- This example is using golang installation: installing-cosign-with-go
- Download the file from the repository like pypi:
- Download the signature files from the release tag, replace the version number with the version you are verifying:
- Verify signature:
# replace the version number with the version you are verifying ./cosign verify-blob snowflake_sqlalchemy-1.7.3-py3-none-any.whl \ --certificate snowflake_sqlalchemy-1.7.3-py3-none-any.whl.crt \ --certificate-identity https://github.com/snowflakedb/snowflake-sqlalchemy/.github/workflows/python-publish.yml@refs/tags/v1.7.3 \ --certificate-oidc-issuer https://token.actions.githubusercontent.com \ --signature snowflake_sqlalchemy-1.7.3-py3-none-any.whl.sig Verified OK
Feel free to file an issue or submit a PR here for general cases. For official support, contact Snowflake support at: https://community.snowflake.com/s/article/How-To-Submit-a-Support-Case-in-Snowflake-Lodge
Using SQLAlchemy's Identity() construct on a primary key column is not compatible with the SQLAlchemy ORM when targeting Snowflake.
Why it fails: After an INSERT, the ORM must retrieve the generated primary key to populate the in-memory object. SQLAlchemy supports two mechanisms for this — RETURNING (not available in Snowflake) and cursor.lastrowid (the Snowflake Python connector returns None for this attribute because Snowflake has no native rowid concept — see snowflake-connector-python#1201). With neither mechanism available, the ORM receives None as the primary key and raises:
sqlalchemy.orm.exc.FlushError: Instance <MyModel at 0x...> has a NULL identity key after a flush ...
Example that fails:
from sqlalchemy import Column, Identity, Integer, String
from sqlalchemy.orm import declarative_base, Session
Base = declarative_base()
class MyModel(Base):
__tablename__ = "my_model"
id = Column(Integer, Identity(start=1, increment=1), primary_key=True) # does not work with ORM
name = Column(String)
Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(MyModel(name="test"))
session.commit() # raises FlushError: NULL identity keyThe dialect emits a SnowflakeWarning at DDL compile time when Identity() is detected on a primary key column to surface this problem early. The warning is emitted once per unique (table, column) pair per Python process — repeated DDL compilation of the same schema does not produce duplicate output.
To silence the warning entirely, use Python's standard warning filter:
import warnings
from snowflake.sqlalchemy.exc import SnowflakeWarning
warnings.filterwarnings("ignore", category=SnowflakeWarning)Or set it via the PYTHONWARNINGS environment variable before starting your application:
PYTHONWARNINGS=ignore::snowflake.sqlalchemy.exc.SnowflakeWarning python my_app.pyWorkaround — use Sequence() instead:
from sqlalchemy import Column, Integer, Sequence, String
from sqlalchemy.orm import declarative_base, Session
Base = declarative_base()
class MyModel(Base):
__tablename__ = "my_model"
id = Column(Integer, Sequence("my_model_id_seq"), primary_key=True)
name = Column(String)
Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(MyModel(name="test"))
session.commit() # id is populated correctlySequence objects are fully supported by the Snowflake dialect and are the recommended way to generate auto-incrementing primary keys when using the ORM. See the Auto-increment Behavior section for more details.
Snowflake stores unquoted identifiers in UPPERCASE and treats them case-insensitively. SQLAlchemy uses lowercase for case-insensitive identifiers. The dialect bridges this gap via normalize_name / denormalize_name, but a few edge cases require explicit opt-in.
When the dialect reflects a table, each column name passes through normalize_name, which produces one of three outcomes depending on how the identifier was stored in Snowflake:
| Snowflake stored form | How it was created | normalize_name returns (default) |
normalize_name returns (case_sensitive_identifiers=True) |
SQLAlchemy treats it as |
|---|---|---|---|---|
MYCOL (all-uppercase) |
CREATE TABLE t (MYCOL INT) — unquoted |
"mycol" (plain str) |
"mycol" (plain str) |
case-insensitive |
mycol (lowercase) |
CREATE TABLE t ("mycol" INT) — quoted |
quoted_name("mycol", True) |
quoted_name("mycol", True) |
case-sensitive |
MyCol (mixed-case) |
CREATE TABLE t ("MyCol" INT) — quoted |
"MyCol" (plain str) |
quoted_name("MyCol", True) |
case-sensitive — emitted SQL is "MyCol" in both modes (_requires_quotes forces quoting for any uppercase character) |
With the flag off, the only observable difference for mixed-case names is the Python type: isinstance(name, quoted_name) is False and .quote is None. Emitted SQL, dict key equality, and hashing are identical because quoted_name is a str subclass. Enable the flag when downstream code (e.g. an Alembic render_item hook or a custom preparer subclass) needs to distinguish case-sensitive reflected names by type.
You can observe this directly via the inspector:
from sqlalchemy import inspect
from sqlalchemy.sql.elements import quoted_name
inspector = inspect(engine)
for col in inspector.get_columns("my_table"):
name = col["name"]
if isinstance(name, quoted_name) and name.quote:
print(f"{name!r} — case-sensitive (was created quoted in Snowflake)")
else:
print(f"{name!r} — case-insensitive (Snowflake stores as {name.upper()})")This means that after reflection, accessing a case-insensitive column requires the lowercase name. Given a table created as:
CREATE TABLE my_table (mycol INT); -- unquoted: stored as MYCOLmetadata.reflect(bind=engine)
t = metadata.tables["my_table"]
t.c.mycol # correct — Snowflake stored MYCOL, reflected as "mycol"
t.c["MYCOL"] # KeyError — the reflected key is lowercaseAnd a case-sensitive column (quoted in Snowflake) is accessed by its exact reflected name:
t.c[quoted_name("mycol", True)] # correct — matches the reflected quoted_name key
t.c.mycol # also works — quoted_name.__eq__ compares by valueColumn objects wrapped in quoted_name("mycol", True) are treated as case-sensitive by SQLAlchemy. Pass them directly to snowflake_clusterby:
from sqlalchemy.sql.elements import quoted_name
from sqlalchemy import Column, Integer, MetaData, Table
t = Table(
"my_table",
MetaData(),
Column(quoted_name("mycol", True), Integer),
snowflake_clusterby=[quoted_name("mycol", True)],
)
# Generates: CLUSTER BY ("mycol")Without quoted_name(..., True) the column name is treated as case-insensitive and Snowflake resolves it as MYCOL.
Most identifiers are handled correctly without any flag at the SQL layer:
- A quoted lowercase name (
"mycol"in Snowflake) reflects asquoted_name("mycol", True)— already case-sensitive, in both flag modes. - A quoted mixed-case name (
"MyCol") reflects as a plainstr"MyCol"by default; withcase_sensitive_identifiers=Trueit reflects asquoted_name("MyCol", True). Emitted SQL is"MyCol"either way because the preparer's_requires_quotesheuristic force-quotes any name containing uppercase characters, so only code that inspects.quoteorisinstance(..., quoted_name)observes a difference. - An unquoted name (
MYCOLstored as all-uppercase) reflects as"mycol"— correctly case-insensitive.
The one gap is an identifier whose all-uppercase form is also a SQL reserved word. For example, a table literally named TABLE (created as CREATE TABLE "TABLE" ...) stores as TABLE in Snowflake. When the dialect reflects it, normalize_name("TABLE") cannot tell whether TABLE is a plain uppercase column or the keyword TABLE, so by default it returns "TABLE" unchanged — an all-uppercase string that SQLAlchemy treats as case-sensitive. This causes a key mismatch: the table was reflected under the key "TABLE" but the same normalize_name call made during DDL would produce "table".
Enable case_sensitive_identifiers to fix this: the dialect will return quoted_name("table", True) for any all-uppercase identifier that is a reserved word, matching the standard SQLAlchemy convention:
from sqlalchemy import create_engine
engine = create_engine(
"snowflake://user:pass@account/db",
case_sensitive_identifiers=True,
)Or via URL:
snowflake://user:pass@account/db?case_sensitive_identifiers=True
Hard limit: Enabling this flag changes the dict key used by normalize_name("TABLE") from "TABLE" to quoted_name("table", True). Because hash("TABLE") != hash("table"), any existing code that accesses metadata.tables["TABLE"] by the uppercase key will miss after the flag is enabled. Enabling the flag on an existing codebase requires auditing all string-keyed lookups into metadata.tables and table.c for names that happen to be reserved words.
The most reliable way to declare a case-sensitive schema across multiple tables is MetaData(schema=quoted_name(..., True)). All tables and ORM models that share that MetaData object inherit the schema automatically — no repetition per table:
from sqlalchemy import Column, Integer, MetaData, String, Table
from sqlalchemy.sql.elements import quoted_name
# Declare once — every Table attached to this MetaData inherits it.
metadata = MetaData(schema=quoted_name("myschema", True))
orders = Table("orders", metadata, Column("id", Integer, primary_key=True))
items = Table("items", metadata, Column("id", Integer, primary_key=True))
# SQL: "myschema".ORDERS and "myschema".ITEMS
# (table names are plain strings — Snowflake uppercases them)To also make the table names case-sensitive, wrap them in quoted_name as well:
from sqlalchemy.sql.elements import quoted_name
orders = Table(quoted_name("orders", True), metadata, Column("id", Integer, primary_key=True))
items = Table(quoted_name("items", True), metadata, Column("id", Integer, primary_key=True))
# SQL: "myschema"."orders" and "myschema"."items"When tables belong to different schemas, or when the MetaData is shared and you need per-table control, pass schema explicitly on the first table and reuse its .schema attribute on the rest:
orders = Table("orders", metadata,
Column("id", Integer, primary_key=True),
schema=quoted_name("myschema", True))
# Reuse the schema object directly — no need to repeat quoted_name.
items = Table("items", metadata,
Column("id", Integer, primary_key=True),
schema=orders.schema)Table.schema is always a quoted_name instance. Assigning from another table's .schema carries the quote=True flag intact.
For ORM declarative models, set the schema on Base.metadata so every model inherits it:
from sqlalchemy import Column, Integer
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.sql.elements import quoted_name
class Base(DeclarativeBase):
metadata = MetaData(schema=quoted_name("myschema", True))
class Orders(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True)
class Items(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)For per-model override, use __table_args__:
class Orders(Base):
__tablename__ = "orders"
__table_args__ = {"schema": quoted_name("myschema", True)}
id = Column(Integer, primary_key=True)
class Items(Base):
__tablename__ = "items"
__table_args__ = {"schema": Orders.__table__.schema} # reuse — no repetition
id = Column(Integer, primary_key=True)Plain strings with inner double-quotes — '"myschema"' — are also accepted and produce the same result as quoted_name("myschema", True) when the dialect is built with case_sensitive_identifiers=True:
engine = create_engine(
"snowflake://user:pass@account/db",
case_sensitive_identifiers=True,
)
Table("orders", metadata, schema='"myschema"') # → "myschema".ORDERS
Table("orders", metadata, schema='"mydb"."myschema"') # → "mydb"."myschema".ORDERSWithout the flag, the schema parser extracts the inner value but leaves the quote attribute at its default (None), so the preparer's _requires_quotes heuristic decides whether to re-quote it — and for an all-lowercase value like myschema the heuristic returns False, stripping the quotes from the emitted SQL:
# Flag off (default):
Table("orders", metadata, schema='"myschema"')
# SQL: myschema.ORDERS ← Snowflake resolves this to MYSCHEMA (case-insensitive)When case-sensitivity matters, either enable the flag or use quoted_name("myschema", True) / MetaData(schema=quoted_name(..., True)) explicitly — those forms carry quote=True on the value itself and are honoured regardless of the flag. Prefer quoted_name in code that is read often; prefer the quoted-string form together with the flag when the value comes from a config that already embeds the quotes.
Identifiers that contain a literal double-quote character use the standard SQL escape "". The parser handles this correctly — '"my""schema"' is treated as a single identifier named my"schema and round-trips to the SQL form "my""schema" in both flag modes (the embedded " is not a legal identifier character, so _requires_quotes forces quoting regardless of the flag):
Table("orders", metadata, schema='"my""schema"') # schema name: my"schema
# SQL: "my""schema".ORDERScreate_snowflake_engine sets the default schema for the connection (equivalent to USE SCHEMA on connect). Use it when all queries in the session target the same case-sensitive schema and you do not want to qualify every table individually:
from snowflake.sqlalchemy import create_snowflake_engine
engine = create_snowflake_engine(
"snowflake://user:pass@account/mydb",
schema="myschema",
case_sensitive_schema=True,
)When the URL is stored outside Python code (environment variable, alembic.ini, Docker/Kubernetes config), create_snowflake_engine is not available and the schema must be percent-encoded directly in the URL string. Wrap the schema name in %22 (the percent-encoded form of "):
snowflake://user:pass@account/mydb/%22myschema%22
The dialect decodes %22 back to a literal " before passing the value to the Snowflake connector, which then executes USE SCHEMA "myschema" preserving case.
When building the URL programmatically with SQLAlchemy's URL() helper, you can pass the schema with embedded double-quotes directly — the helper's percent-encoding handles them automatically:
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
engine = create_engine(URL(
account="abc123",
user="testuser1",
password="secret",
database="mydb",
schema='"myschema"', # literal " characters; URL() encodes them as %22 internally
))This is equivalent to the %22 string form but avoids manual percent-encoding.
quoted_name works directly in op.create_table and op.add_column, so hand-written migration files need no special handling:
import sqlalchemy as sa
from alembic import op
from sqlalchemy.sql.elements import quoted_name
def upgrade():
op.create_table(
"my_table",
sa.Column(quoted_name("mycol", True), sa.String()), # quoted → "mycol" in Snowflake
sa.Column("name", sa.String()), # unquoted → NAME in Snowflake
)
op.add_column(
"my_table",
sa.Column(quoted_name("extra", True), sa.Integer()),
)When the Alembic version table or the migrations themselves target a case-sensitive schema, use the %22 form in every place Alembic receives a schema string, because these values are passed as plain strings and create_snowflake_engine is not available at that point:
# alembic/env.py
from sqlalchemy import create_engine
url = "snowflake://user:pass@account/mydb/%22myschema%22"
context.configure(
url=url,
target_metadata=target_metadata,
version_table_schema="%22myschema%22", # keeps alembic_version table in the same schema
)When include_schemas=True is enabled, Alembic calls inspector.get_schema_names() and passes each result to the include_name filter. The dialect returns case-sensitive schema names as quoted_name objects (e.g. quoted_name("myschema", True)). Because quoted_name inherits from str, a plain string comparison is safe:
# alembic/env.py
from sqlalchemy.sql.elements import quoted_name
def include_name(name, type_, parent_names):
if type_ == "schema":
# quoted_name("myschema", True) == "myschema" → True (str subclass equality)
return name in {'"myschema"', "myschema"}
return True
context.configure(
url="snowflake://user:pass@account/mydb/%22myschema%22",
target_metadata=target_metadata,
include_schemas=True,
include_name=include_name,
version_table_schema="%22myschema%22",
)Note: inspector.get_schema_names() returns the schema as Snowflake stores it — lowercase "myschema" if it was created quoted, or uppercase "MYSCHEMA" if unquoted. After normalize_name this becomes quoted_name("myschema", True) or "myschema" (plain lowercase) respectively. Filter against the lowercase value and both cases are covered by the str equality of quoted_name.
Alembic's default renderer serialises quoted_name("mycol", True) as the plain string "mycol", losing the case-sensitivity signal. The generated migration would create a case-insensitive MYCOL column instead of "mycol".
This also affects the comparison phase: when autogenerate detects that a reflected quoted_name("mycol", True) column differs from what it would render, it may emit a spurious alter_column on every run. The fix for both problems is the same — register the render_item hook in env.py:
from snowflake.sqlalchemy.alembic_util import render_item as snowflake_render_item
context.configure(
...,
render_item=snowflake_render_item,
)Hard limit: Alembic has no dialect-level rendering hook. The render_item callback in env.py is the only injection point and requires a two-line opt-in per project. This cannot be eliminated without upstream Alembic changes.
Plain unquoted string as a case-sensitive schema
# WRONG — "myschema" (no quotes, no quoted_name) is denormalized to MYSCHEMA
Table("my_table", metadata, schema="myschema")
class MyModel(Base):
__tablename__ = "my_table"
__table_args__ = {"schema": "myschema"} # also wrongA plain lowercase string has no quote attribute, so denormalize_name converts it to MYSCHEMA before it reaches Snowflake. No error is raised — if a schema named MYSCHEMA happens to exist the query succeeds silently against the wrong schema.
Use quoted_name("myschema", True), '"myschema"', or MetaData(schema=quoted_name(..., True)) as shown in the section above.