為什么Sqlalchemy數據庫連接無法正確關閉?如何解決這個問題?

為什么Sqlalchemy數據庫連接無法正確關閉?如何解決這個問題?

SQLAlchemy數據庫連接的正確關閉方法及問題排查

在使用python的SQLAlchemy庫進行數據庫操作時,確保數據庫連接的正確關閉至關重要,以避免資源泄漏和性能問題。本文將分析一個常見的SQLAlchemy連接關閉問題,并提供解決方案。

以下代碼片段展示了一個可能存在連接關閉問題的示例:

from sqlalchemy import create_engine, url, delete, update, select, exists from sqlalchemy.orm import Sessionmaker, scoped_session from core.database.base import base  # 假設這是你的數據庫基類 from lib.type import type  # 假設這是你的類型定義 from typing import Any from flask import g, current_app  import importlib import re   class Database:  # 類名改為首字母大寫,符合Python規范      env = None      def set(self, key: str, value: Any):         """         設置屬性值,根據環境變量設置到g.application或g.platform         """         if self.env == "application":             g.application = self.container._replace(**{key: value})         elif self.env == 'platform':             g.platform = self.container._replace(**{key: value})      @Property     def container(self):         """         返回g.application或g.platform容器         """         if self.env == "application":             if "application" not in g:                 g.application = type.application(None, None, None)             return g.application         elif self.env == 'platform':             if "platform" not in g:                 g.platform = type.platform(None, None)             return g.platform      @property     def database_conf(self):         """         獲取數據庫配置         """         return base.setting(current_app.config["database"])      @property     def __database_core(self):         """         創建數據庫會話,并緩存到實例屬性         """         if not hasattr(self, '_database_core'):             self._database_core = self.__create_session(**self.database_conf)         return self._database_core      @property     def __create_engine(self):         """         獲取數據庫引擎,并緩存到實例屬性         """         return self.__database_core.engine      @property     def __create_database(self):         """         獲取數據庫會話,并緩存到實例屬性         """         return self.__database_core.session      def __create_session(self, **config):         """         創建數據庫會話         """         engine = self.create_engine(**config)         session = scoped_session(sessionmaker(bind=engine, autoflush=True))         return type.database(engine=engine, session=session())      @classmethod     def create_engine(cls, **kwargs):         """         創建數據庫引擎         """         return create_engine(url.create("mysql+pymysql", **kwargs), echo=True, isolation_level="autocommit")      @staticmethod     def create_all(models: list, engine=None):         """         創建所有模型的表         """         tables = [Database.get_model(model).__table__ for model in models]         base.metadata.create_all(bind=engine, tables=tables)      def create_table(self, tables: list):         """         創建指定模型的表         """         Database.create_all(models=tables, engine=self.__create_engine)      @staticmethod     def get_model(model: str):         """         獲取模型對象         """         module = importlib.import_module(f"model.{model.split('_')[0]}.{model}")         class_name = ''.join(re.findall(r"[a-za-z]+", model.split(".")[-1].title()))         return getattr(module, class_name)()      @property     def database(self):         """         獲取數據庫會話         """         return self.__create_database      def table_data_query_all(self, model: Any, condition: list = None, order: list = None, limit: int = 500,                              fields: list = None) -> list[dict]:         """         查詢所有數據         """         query = select(model)         if fields:             query = query.with_only_columns(*fields)         if condition:             query = query.filter(*condition)         if order:             query = query.order_by(*order)         results = [row.dict() for row in self.database.execute(query.limit(limit)).scalars()]         return results      def table_data_query_one(self, model: Any, condition: list = None) -> dict:         """         查詢單條數據         """         result = self.database.execute(select(model).filter(*condition).limit(1)).scalar_one_or_none()         return None if result is None else result.dict()      def table_data_query_exists(self, condition: list) -> bool:         """         查詢數據是否存在         """         return self.database.query(exists().where(*condition)).scalar()      def table_data_insert_all(self, models: list) -> None:         """         批量插入數據         """         with self.database as db:             db.add_all(models)             db.commit()      def table_data_insert_one(self, model, data: bool = False) -> int | dict:         """         插入單條數據         """         with self.database as db:             db.add(model)             db.commit()             return model.dict() if data else model.id      def table_data_update(self, model: Any, condition: list, data: dict) -> None:         """         更新數據         """         with self.database as db:             db.execute(update(model).where(*condition).values(**data))             db.commit() # 需要顯式提交      def table_data_delete(self, model: Any, condition: list) -> None:         """         刪除數據         """         with self.database as db:             db.execute(delete(model).where(*condition))             db.commit() # 需要顯式提交      def close(self):         """         關閉數據庫連接         """         if hasattr(self, '_database_core'):             self._database_core.session.close()             self._database_core.engine.dispose()             del self._database_core      def __del__(self):         """         析構函數,確保連接關閉         """         self.close()

改進說明:

  1. 類名規范: 將 database 改為 Database,符合Python命名規范。
  2. 屬性緩存: 使用 @property 和實例屬性緩存 _database_core,避免重復創建會話。
  3. 顯式提交: 在table_data_update 和 table_data_delete 中添加了 db.commit(),確保事務提交。
  4. 資源釋放: close() 方法中顯式調用 session.close() 和 engine.dispose() 來釋放資源。del self._database_core 刪除緩存的會話對象。
  5. 異常處理: 可以考慮添加 try…except 塊來處理潛在的異常,例如數據庫連接錯誤。
  6. scoped_session 的使用: scoped_session 在 Flask 應用中通常配合 g 對象使用,確保每個請求使用獨立的會話,并在請求結束時自動關閉。 但代碼中沒有體現Flask請求上下文管理,因此dispose()是必要的。如果使用Flask的上下文管理,dispose()可能不是必需的,但session.close()仍然是必要的。

解決方法

主要問題在于 scoped_session 的使用和資源釋放的時機。scoped_session 本身并不保證連接的自動關閉,它只是管理會話的范圍。 self.database.get_bind().dispose() 在某些情況下可能無效,因為它可能無法正確地關閉底層的數據庫連接。

因此,需要在合適的地方調用 close() 方法,或者在類的析構函數 __del__ 中調用 close() 方法,確保連接被正確關閉。 但是,依賴 __del__ 并非最佳實踐,因為 Python 的垃圾回收機制不可預測。 推薦在使用完 Database 實例后,顯式調用 instance.close()。

最佳實踐:

  • 使用上下文管理器 (with 語句) 來管理數據庫會話:這可以確保會話在代碼塊執行完畢后自動關閉。
  • 在 Flask 應用中,利用 Flask-SQLAlchemy 等擴展庫,可以更方便地管理數據庫連接和會話。 這些庫通常會自動處理連接的關閉和釋放。

通過以上改進,可以有效地解決 SQLAlchemy 數據庫連接無法正確關閉的問題,并提高代碼的健壯性和可維護性。 記住,顯式地關閉連接是最佳實踐,避免依賴垃圾回收機制。

? 版權聲明
THE END
喜歡就支持一下吧
點贊7 分享