SQLAlchemy數據庫連接的正確關閉方法及問題排查
在使用python的SQLAlchemy庫進行數據庫操作時,確保數據庫連接的正確關閉至關重要,以避免資源泄漏和性能問題。本文將分析一個常見的SQLAlchemy連接關閉問題,并提供解決方案。
以下代碼片段展示了一個可能存在連接關閉問題的示例:
from sqlalchemy import create_engine, url, delete, update, select, exists from sqlalchemy.orm import Sessionmaker, scoped_session from core.database.base import base # 假設這是你的數據庫基類 from lib.type import type # 假設這是你的類型定義 from typing import Any from flask import g, current_app import importlib import re class Database: # 類名改為首字母大寫,符合Python規范 env = None def set(self, key: str, value: Any): """ 設置屬性值,根據環境變量設置到g.application或g.platform """ if self.env == "application": g.application = self.container._replace(**{key: value}) elif self.env == 'platform': g.platform = self.container._replace(**{key: value}) @Property def container(self): """ 返回g.application或g.platform容器 """ if self.env == "application": if "application" not in g: g.application = type.application(None, None, None) return g.application elif self.env == 'platform': if "platform" not in g: g.platform = type.platform(None, None) return g.platform @property def database_conf(self): """ 獲取數據庫配置 """ return base.setting(current_app.config["database"]) @property def __database_core(self): """ 創建數據庫會話,并緩存到實例屬性 """ if not hasattr(self, '_database_core'): self._database_core = self.__create_session(**self.database_conf) return self._database_core @property def __create_engine(self): """ 獲取數據庫引擎,并緩存到實例屬性 """ return self.__database_core.engine @property def __create_database(self): """ 獲取數據庫會話,并緩存到實例屬性 """ return self.__database_core.session def __create_session(self, **config): """ 創建數據庫會話 """ engine = self.create_engine(**config) session = scoped_session(sessionmaker(bind=engine, autoflush=True)) return type.database(engine=engine, session=session()) @classmethod def create_engine(cls, **kwargs): """ 創建數據庫引擎 """ return create_engine(url.create("mysql+pymysql", **kwargs), echo=True, isolation_level="autocommit") @staticmethod def create_all(models: list, engine=None): """ 創建所有模型的表 """ tables = [Database.get_model(model).__table__ for model in models] base.metadata.create_all(bind=engine, tables=tables) def create_table(self, tables: list): """ 創建指定模型的表 """ Database.create_all(models=tables, engine=self.__create_engine) @staticmethod def get_model(model: str): """ 獲取模型對象 """ module = importlib.import_module(f"model.{model.split('_')[0]}.{model}") class_name = ''.join(re.findall(r"[a-za-z]+", model.split(".")[-1].title())) return getattr(module, class_name)() @property def database(self): """ 獲取數據庫會話 """ return self.__create_database def table_data_query_all(self, model: Any, condition: list = None, order: list = None, limit: int = 500, fields: list = None) -> list[dict]: """ 查詢所有數據 """ query = select(model) if fields: query = query.with_only_columns(*fields) if condition: query = query.filter(*condition) if order: query = query.order_by(*order) results = [row.dict() for row in self.database.execute(query.limit(limit)).scalars()] return results def table_data_query_one(self, model: Any, condition: list = None) -> dict: """ 查詢單條數據 """ result = self.database.execute(select(model).filter(*condition).limit(1)).scalar_one_or_none() return None if result is None else result.dict() def table_data_query_exists(self, condition: list) -> bool: """ 查詢數據是否存在 """ return self.database.query(exists().where(*condition)).scalar() def table_data_insert_all(self, models: list) -> None: """ 批量插入數據 """ with self.database as db: db.add_all(models) db.commit() def table_data_insert_one(self, model, data: bool = False) -> int | dict: """ 插入單條數據 """ with self.database as db: db.add(model) db.commit() return model.dict() if data else model.id def table_data_update(self, model: Any, condition: list, data: dict) -> None: """ 更新數據 """ with self.database as db: db.execute(update(model).where(*condition).values(**data)) db.commit() # 需要顯式提交 def table_data_delete(self, model: Any, condition: list) -> None: """ 刪除數據 """ with self.database as db: db.execute(delete(model).where(*condition)) db.commit() # 需要顯式提交 def close(self): """ 關閉數據庫連接 """ if hasattr(self, '_database_core'): self._database_core.session.close() self._database_core.engine.dispose() del self._database_core def __del__(self): """ 析構函數,確保連接關閉 """ self.close()
改進說明:
- 類名規范: 將 database 改為 Database,符合Python命名規范。
- 屬性緩存: 使用 @property 和實例屬性緩存 _database_core,避免重復創建會話。
- 顯式提交: 在table_data_update 和 table_data_delete 中添加了 db.commit(),確保事務提交。
- 資源釋放: close() 方法中顯式調用 session.close() 和 engine.dispose() 來釋放資源。del self._database_core 刪除緩存的會話對象。
- 異常處理: 可以考慮添加 try…except 塊來處理潛在的異常,例如數據庫連接錯誤。
- scoped_session 的使用: scoped_session 在 Flask 應用中通常配合 g 對象使用,確保每個請求使用獨立的會話,并在請求結束時自動關閉。 但代碼中沒有體現Flask請求上下文管理,因此dispose()是必要的。如果使用Flask的上下文管理,dispose()可能不是必需的,但session.close()仍然是必要的。
解決方法:
主要問題在于 scoped_session 的使用和資源釋放的時機。scoped_session 本身并不保證連接的自動關閉,它只是管理會話的范圍。 self.database.get_bind().dispose() 在某些情況下可能無效,因為它可能無法正確地關閉底層的數據庫連接。
因此,需要在合適的地方調用 close() 方法,或者在類的析構函數 __del__ 中調用 close() 方法,確保連接被正確關閉。 但是,依賴 __del__ 并非最佳實踐,因為 Python 的垃圾回收機制不可預測。 推薦在使用完 Database 實例后,顯式調用 instance.close()。
最佳實踐:
- 使用上下文管理器 (with 語句) 來管理數據庫會話:這可以確保會話在代碼塊執行完畢后自動關閉。
- 在 Flask 應用中,利用 Flask-SQLAlchemy 等擴展庫,可以更方便地管理數據庫連接和會話。 這些庫通常會自動處理連接的關閉和釋放。
通過以上改進,可以有效地解決 SQLAlchemy 數據庫連接無法正確關閉的問題,并提高代碼的健壯性和可維護性。 記住,顯式地關閉連接是最佳實踐,避免依賴垃圾回收機制。
? 版權聲明
文章版權歸作者所有,未經允許請勿轉載。
THE END