diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index e1dffe88f4..77fc045068 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -52,6 +52,33 @@ def _target_catalog(self) -> t.Optional[str]: def _target_catalog(self, value: t.Optional[str]) -> None: self._connection_pool.set_attribute("target_catalog", value) + @property + def _connected_catalog(self) -> t.Optional[str]: + """Catalog the currently-open thread-local connection is actually using.""" + return self._connection_pool.get_attribute("connected_catalog") + + @_connected_catalog.setter + def _connected_catalog(self, value: t.Optional[str]) -> None: + self._connection_pool.set_attribute("connected_catalog", value) + + def _normalize_catalog(self, catalog_name: t.Optional[str]) -> t.Optional[str]: + if not catalog_name: + return None + + default_catalog = self._default_catalog or self._extra_config.get("database") + if default_catalog and catalog_name == default_catalog: + return None + + return catalog_name + + def _catalog_state_label(self, catalog_name: t.Optional[str]) -> str: + return ( + catalog_name + or self._default_catalog + or self._extra_config.get("database") + or "" + ) + @property def api_client(self) -> FabricHttpClient: # the requests Session is not guaranteed to be threadsafe @@ -95,20 +122,28 @@ def _create_catalog(self, catalog_name: exp.Identifier) -> None: def _drop_catalog(self, catalog_name: exp.Identifier) -> None: """Drop a catalog (warehouse) in Microsoft Fabric via REST API.""" warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False) - current_catalog = self.get_current_catalog() logger.info(f"Deleting Fabric warehouse: {warehouse_name}") self.api_client.delete_warehouse(warehouse_name) - if warehouse_name == current_catalog: - # Somewhere around 2025-09-08, Fabric started validating the "Database=" connection argument and throwing 'Authentication failed' if the database doesnt exist - # In addition, set_current_catalog() is implemented using a threadlocal variable "target_catalog" - # So, when we drop a warehouse, and there are still threads with "target_catalog" set to reference it, any operations on those threads - # that use an either use an existing connection pointing to this warehouse or trigger a new connection - # will fail with an 'Authentication Failed' error unless we close all connections here, which also clears all the threadlocal data + # Close all connections if any thread may be using the dropped warehouse. + # We must check both the logical target and the physical connection catalog + # (falling back to the configured default when either is neutral) because + # Fabric validates the DATABASE= connection argument and raises + # 'Authentication Failed' when it points at a non-existent warehouse. + default_db = self._extra_config.get("database") + in_use = { + self.get_current_catalog() or default_db, + self._normalize_catalog(self._connected_catalog) or default_db, + } + if warehouse_name in in_use: self.close() - def set_current_catalog(self, catalog_name: str) -> None: + def get_current_catalog(self) -> t.Optional[str]: + """Return the explicit Fabric catalog target for the current thread.""" + return self._normalize_catalog(self._target_catalog) + + def set_current_catalog(self, catalog_name: t.Optional[str]) -> None: """ Set the current catalog for Microsoft Fabric connections. @@ -117,7 +152,8 @@ def set_current_catalog(self, catalog_name: str) -> None: recreate them with the new catalog in the connection configuration. Args: - catalog_name: The name of the catalog (warehouse) to switch to + catalog_name: The name of the catalog (warehouse) to switch to. + The configured default catalog is treated as the neutral state. Note: Fabric doesn't support catalog switching via USE statements because each @@ -127,33 +163,53 @@ def set_current_catalog(self, catalog_name: str) -> None: See: https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations """ - current_catalog = self.get_current_catalog() + target_catalog = self._normalize_catalog(catalog_name) - # If already using the requested catalog, do nothing - if current_catalog and current_catalog == catalog_name: - logger.debug(f"Already using catalog '{catalog_name}', no action needed") + # No-op: the logical catalog state already matches. + if self.get_current_catalog() == target_catalog: + logger.debug("Already using requested Fabric catalog state, no action needed") return - logger.info(f"Switching from catalog '{current_catalog}' to '{catalog_name}'") - - # commit the transaction before closing the connection to help prevent errors like: - # > Snapshot isolation transaction failed in database because the object accessed by the statement has been modified by a - # > DDL statement in another concurrent transaction since the start of this transaction - # on subsequent queries in the new connection - self._connection_pool.commit() - - # note: we call close() on the connection pool instead of self.close() because self.close() calls close_all() - # on the connection pool but we just want to close the connection for this thread - self._connection_pool.close() - self._target_catalog = catalog_name # new connections will use this catalog - - catalog_after_switch = self.get_current_catalog() - - if catalog_after_switch != catalog_name: - # We need to raise an error if the catalog switch failed to prevent the operation that needed the catalog switch from being run against the wrong catalog - raise SQLMeshError( - f"Unable to switch catalog to {catalog_name}, catalog ended up as {catalog_after_switch}" + # Decide whether the open connection needs to be replaced. + # + # The set_catalog decorator restores the previous catalog (often None) + # after every catalog-scoped call. For Fabric, a connection close + + # reopen is expensive because each new connection goes through ODBC and + # the Fabric gateway. We therefore apply lazy connection management: + # + # * When restoring to neutral (target=None): just update _target_catalog. + # The existing connection stays alive and will be reused or replaced + # on the next real switch, avoiding a pointless bounce through the + # default catalog. + # + # * When switching to a non-neutral catalog: only close/reopen if the + # open connection is already on a different catalog. If a previous + # restore-to-neutral left the connection on the right catalog, we + # skip the close entirely. + connected_catalog = self._normalize_catalog(self._connected_catalog) + needs_reconnect = target_catalog is not None and connected_catalog != target_catalog + + if needs_reconnect: + logger.info( + "Switching connection from catalog '%s' to '%s'", + self._catalog_state_label(connected_catalog), + self._catalog_state_label(target_catalog), ) + # Commit before closing to avoid snapshot-isolation errors on + # subsequent queries in the new connection. + self._connection_pool.commit() + # note: close() on the pool (not self.close()) to only affect this + # thread's connection rather than all threads. + self._connection_pool.close() + self._connected_catalog = target_catalog + else: + logger.debug( + "Updating catalog target to '%s' (connection remains on '%s')", + self._catalog_state_label(target_catalog), + self._catalog_state_label(connected_catalog), + ) + + self._target_catalog = target_catalog def alter_table( self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]] diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index a52218a097..b4aefde6ba 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -19,6 +19,131 @@ def adapter(make_mocked_engine_adapter: t.Callable) -> FabricEngineAdapter: return make_mocked_engine_adapter(FabricEngineAdapter) +def test_get_current_catalog_uses_only_explicit_target_catalog( + make_mocked_engine_adapter: t.Callable, +): + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + database="default_catalog", + ) + + assert adapter.get_current_catalog() is None + + adapter._target_catalog = "switched_catalog" + + assert adapter.get_current_catalog() == "switched_catalog" + + adapter._connection_pool.close() + + assert adapter._connection_pool.get_attribute("target_catalog") is None + assert adapter.get_current_catalog() is None + adapter.cursor.execute.assert_not_called() + + +def test_get_current_catalog_returns_none_without_target_or_database( + make_mocked_engine_adapter: t.Callable, +): + adapter = make_mocked_engine_adapter(FabricEngineAdapter) + + assert adapter.get_current_catalog() is None + adapter.cursor.execute.assert_not_called() + + +def test_set_current_catalog_does_not_query_database( + make_mocked_engine_adapter: t.Callable, +): + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + database="default_catalog", + ) + + adapter.set_current_catalog("new_catalog") + + assert adapter.get_current_catalog() == "new_catalog" + adapter.cursor.execute.assert_not_called() + + +def test_set_current_catalog_to_default_clears_explicit_target( + make_mocked_engine_adapter: t.Callable, +): + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + default_catalog="core", + database="core", + ) + + adapter.set_current_catalog("planning") + adapter.set_current_catalog("core") + + assert adapter.get_current_catalog() is None + adapter.cursor.execute.assert_not_called() + + +def test_catalog_scoped_call_restores_to_neutral_without_close( + make_mocked_engine_adapter: t.Callable, + mocker: MockerFixture, +): + """Decorator's restore-to-neutral must not close the existing connection.""" + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + default_catalog="core", + database="core", + ) + close_spy = mocker.spy(adapter._connection_pool, "close") + adapter.cursor.fetchone.return_value = (1,) + + adapter.table_exists("planning.db.table") + + # Decorator calls set_current_catalog("planning") then set_current_catalog(None). + # Only the first call (None→planning) should trigger a connection close. + assert close_spy.call_count == 1 + assert adapter._connected_catalog == "planning" + assert adapter.get_current_catalog() is None + + +def test_repeated_same_catalog_reuses_connection( + make_mocked_engine_adapter: t.Callable, + mocker: MockerFixture, +): + """Two consecutive operations on the same catalog share one connection.""" + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + default_catalog="core", + database="core", + ) + close_spy = mocker.spy(adapter._connection_pool, "close") + adapter.cursor.fetchone.return_value = (1,) + + adapter.table_exists("planning.db.table") + adapter.table_exists("planning.db.table") + + # Only the very first switch (None→planning) should close. + # The restore to neutral keeps the connection alive and the second + # planning operation reuses it without another close. + assert close_spy.call_count == 1 + assert adapter._connected_catalog == "planning" + + +def test_switching_between_catalogs_closes_each_time( + make_mocked_engine_adapter: t.Callable, + mocker: MockerFixture, +): + """Switching to a different catalog always triggers a connection close.""" + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + default_catalog="core", + database="core", + ) + close_spy = mocker.spy(adapter._connection_pool, "close") + adapter.cursor.fetchone.return_value = (1,) + + adapter.table_exists("safran.db.table") # None→safran: 1 close + adapter.table_exists("planning.db.table") # safran→planning: 2nd close + + assert close_spy.call_count == 2 + assert adapter._connected_catalog == "planning" + + def test_columns(adapter: FabricEngineAdapter): adapter.cursor.fetchall.return_value = [ ("decimal_ps", "decimal", None, 5, 4),