Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 88 additions & 32 deletions sqlmesh/core/engine_adapter/fabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,33 @@ def _target_catalog(self) -> t.Optional[str]:
def _target_catalog(self, value: t.Optional[str]) -> None:
self._connection_pool.set_attribute("target_catalog", value)

@property
def _connected_catalog(self) -> t.Optional[str]:
"""Catalog the currently-open thread-local connection is actually using."""
return self._connection_pool.get_attribute("connected_catalog")

@_connected_catalog.setter
def _connected_catalog(self, value: t.Optional[str]) -> None:
self._connection_pool.set_attribute("connected_catalog", value)

def _normalize_catalog(self, catalog_name: t.Optional[str]) -> t.Optional[str]:
if not catalog_name:
return None

default_catalog = self._default_catalog or self._extra_config.get("database")
if default_catalog and catalog_name == default_catalog:
return None

return catalog_name

def _catalog_state_label(self, catalog_name: t.Optional[str]) -> str:
return (
catalog_name
or self._default_catalog
or self._extra_config.get("database")
or "<default>"
)

@property
def api_client(self) -> FabricHttpClient:
# the requests Session is not guaranteed to be threadsafe
Expand Down Expand Up @@ -95,20 +122,28 @@ def _create_catalog(self, catalog_name: exp.Identifier) -> None:
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
"""Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
current_catalog = self.get_current_catalog()

logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
self.api_client.delete_warehouse(warehouse_name)

if warehouse_name == current_catalog:
# Somewhere around 2025-09-08, Fabric started validating the "Database=" connection argument and throwing 'Authentication failed' if the database doesnt exist
# In addition, set_current_catalog() is implemented using a threadlocal variable "target_catalog"
# So, when we drop a warehouse, and there are still threads with "target_catalog" set to reference it, any operations on those threads
# that use an either use an existing connection pointing to this warehouse or trigger a new connection
# will fail with an 'Authentication Failed' error unless we close all connections here, which also clears all the threadlocal data
# Close all connections if any thread may be using the dropped warehouse.
# We must check both the logical target and the physical connection catalog
# (falling back to the configured default when either is neutral) because
# Fabric validates the DATABASE= connection argument and raises
# 'Authentication Failed' when it points at a non-existent warehouse.
default_db = self._extra_config.get("database")
in_use = {
self.get_current_catalog() or default_db,
self._normalize_catalog(self._connected_catalog) or default_db,
}
if warehouse_name in in_use:
self.close()

def set_current_catalog(self, catalog_name: str) -> None:
def get_current_catalog(self) -> t.Optional[str]:
"""Return the explicit Fabric catalog target for the current thread."""
return self._normalize_catalog(self._target_catalog)

def set_current_catalog(self, catalog_name: t.Optional[str]) -> None:
"""
Set the current catalog for Microsoft Fabric connections.

Expand All @@ -117,7 +152,8 @@ def set_current_catalog(self, catalog_name: str) -> None:
recreate them with the new catalog in the connection configuration.

Args:
catalog_name: The name of the catalog (warehouse) to switch to
catalog_name: The name of the catalog (warehouse) to switch to.
The configured default catalog is treated as the neutral state.

Note:
Fabric doesn't support catalog switching via USE statements because each
Expand All @@ -127,33 +163,53 @@ def set_current_catalog(self, catalog_name: str) -> None:
See:
https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations
"""
current_catalog = self.get_current_catalog()
target_catalog = self._normalize_catalog(catalog_name)

# If already using the requested catalog, do nothing
if current_catalog and current_catalog == catalog_name:
logger.debug(f"Already using catalog '{catalog_name}', no action needed")
# No-op: the logical catalog state already matches.
if self.get_current_catalog() == target_catalog:
logger.debug("Already using requested Fabric catalog state, no action needed")
return

logger.info(f"Switching from catalog '{current_catalog}' to '{catalog_name}'")

# commit the transaction before closing the connection to help prevent errors like:
# > Snapshot isolation transaction failed in database because the object accessed by the statement has been modified by a
# > DDL statement in another concurrent transaction since the start of this transaction
# on subsequent queries in the new connection
self._connection_pool.commit()

# note: we call close() on the connection pool instead of self.close() because self.close() calls close_all()
# on the connection pool but we just want to close the connection for this thread
self._connection_pool.close()
self._target_catalog = catalog_name # new connections will use this catalog

catalog_after_switch = self.get_current_catalog()

if catalog_after_switch != catalog_name:
# We need to raise an error if the catalog switch failed to prevent the operation that needed the catalog switch from being run against the wrong catalog
raise SQLMeshError(
f"Unable to switch catalog to {catalog_name}, catalog ended up as {catalog_after_switch}"
# Decide whether the open connection needs to be replaced.
#
# The set_catalog decorator restores the previous catalog (often None)
# after every catalog-scoped call. For Fabric, a connection close +
# reopen is expensive because each new connection goes through ODBC and
# the Fabric gateway. We therefore apply lazy connection management:
#
# * When restoring to neutral (target=None): just update _target_catalog.
# The existing connection stays alive and will be reused or replaced
# on the next real switch, avoiding a pointless bounce through the
# default catalog.
#
# * When switching to a non-neutral catalog: only close/reopen if the
# open connection is already on a different catalog. If a previous
# restore-to-neutral left the connection on the right catalog, we
# skip the close entirely.
connected_catalog = self._normalize_catalog(self._connected_catalog)
needs_reconnect = target_catalog is not None and connected_catalog != target_catalog

if needs_reconnect:
logger.info(
"Switching connection from catalog '%s' to '%s'",
self._catalog_state_label(connected_catalog),
self._catalog_state_label(target_catalog),
)
# Commit before closing to avoid snapshot-isolation errors on
# subsequent queries in the new connection.
self._connection_pool.commit()
# note: close() on the pool (not self.close()) to only affect this
# thread's connection rather than all threads.
self._connection_pool.close()
self._connected_catalog = target_catalog
else:
logger.debug(
"Updating catalog target to '%s' (connection remains on '%s')",
self._catalog_state_label(target_catalog),
self._catalog_state_label(connected_catalog),
)

self._target_catalog = target_catalog

def alter_table(
self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]]
Expand Down
125 changes: 125 additions & 0 deletions tests/core/engine_adapter/test_fabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,131 @@ def adapter(make_mocked_engine_adapter: t.Callable) -> FabricEngineAdapter:
return make_mocked_engine_adapter(FabricEngineAdapter)


def test_get_current_catalog_uses_only_explicit_target_catalog(
make_mocked_engine_adapter: t.Callable,
):
adapter = make_mocked_engine_adapter(
FabricEngineAdapter,
database="default_catalog",
)

assert adapter.get_current_catalog() is None

adapter._target_catalog = "switched_catalog"

assert adapter.get_current_catalog() == "switched_catalog"

adapter._connection_pool.close()

assert adapter._connection_pool.get_attribute("target_catalog") is None
assert adapter.get_current_catalog() is None
adapter.cursor.execute.assert_not_called()


def test_get_current_catalog_returns_none_without_target_or_database(
make_mocked_engine_adapter: t.Callable,
):
adapter = make_mocked_engine_adapter(FabricEngineAdapter)

assert adapter.get_current_catalog() is None
adapter.cursor.execute.assert_not_called()


def test_set_current_catalog_does_not_query_database(
make_mocked_engine_adapter: t.Callable,
):
adapter = make_mocked_engine_adapter(
FabricEngineAdapter,
database="default_catalog",
)

adapter.set_current_catalog("new_catalog")

assert adapter.get_current_catalog() == "new_catalog"
adapter.cursor.execute.assert_not_called()


def test_set_current_catalog_to_default_clears_explicit_target(
make_mocked_engine_adapter: t.Callable,
):
adapter = make_mocked_engine_adapter(
FabricEngineAdapter,
default_catalog="core",
database="core",
)

adapter.set_current_catalog("planning")
adapter.set_current_catalog("core")

assert adapter.get_current_catalog() is None
adapter.cursor.execute.assert_not_called()


def test_catalog_scoped_call_restores_to_neutral_without_close(
make_mocked_engine_adapter: t.Callable,
mocker: MockerFixture,
):
"""Decorator's restore-to-neutral must not close the existing connection."""
adapter = make_mocked_engine_adapter(
FabricEngineAdapter,
default_catalog="core",
database="core",
)
close_spy = mocker.spy(adapter._connection_pool, "close")
adapter.cursor.fetchone.return_value = (1,)

adapter.table_exists("planning.db.table")

# Decorator calls set_current_catalog("planning") then set_current_catalog(None).
# Only the first call (None→planning) should trigger a connection close.
assert close_spy.call_count == 1
assert adapter._connected_catalog == "planning"
assert adapter.get_current_catalog() is None


def test_repeated_same_catalog_reuses_connection(
make_mocked_engine_adapter: t.Callable,
mocker: MockerFixture,
):
"""Two consecutive operations on the same catalog share one connection."""
adapter = make_mocked_engine_adapter(
FabricEngineAdapter,
default_catalog="core",
database="core",
)
close_spy = mocker.spy(adapter._connection_pool, "close")
adapter.cursor.fetchone.return_value = (1,)

adapter.table_exists("planning.db.table")
adapter.table_exists("planning.db.table")

# Only the very first switch (None→planning) should close.
# The restore to neutral keeps the connection alive and the second
# planning operation reuses it without another close.
assert close_spy.call_count == 1
assert adapter._connected_catalog == "planning"


def test_switching_between_catalogs_closes_each_time(
make_mocked_engine_adapter: t.Callable,
mocker: MockerFixture,
):
"""Switching to a different catalog always triggers a connection close."""
adapter = make_mocked_engine_adapter(
FabricEngineAdapter,
default_catalog="core",
database="core",
)
close_spy = mocker.spy(adapter._connection_pool, "close")
adapter.cursor.fetchone.return_value = (1,)

adapter.table_exists("safran.db.table") # None→safran: 1 close
adapter.table_exists("planning.db.table") # safran→planning: 2nd close

assert close_spy.call_count == 2
assert adapter._connected_catalog == "planning"


def test_columns(adapter: FabricEngineAdapter):
adapter.cursor.fetchall.return_value = [
("decimal_ps", "decimal", None, 5, 4),
Expand Down