如标题所述,本文将叙述在仅使用 SQLALchemy 这一第三方库的情况下开发 Flask 会遇到的一系列问题,以及解决方案。
说明一下,这里的仅使用原生 SQLALchemy 的意思是,不使用其他支撑 Flask 应用的操作数据模型相关的库,例如 Flask-SQLALchemy、SQLAlchemy-serializer 等。
基本用法
在使用 Flask-SQLALchemy 的时候,我们总是将初始化的工作交由其内部完成,而我们只需要做简单的配置即可。
但如果使用原生的 SQLALchemy 构建项目,则需要考虑更多的事。
我们可以使用如下代码生成 db_session
用于操作数据模型使用:
from sqlalchemy import create_engine
DATABASE_URI = 'sqlite:///your_database.db'
engine = create_engine(
DATABASE_URI,
echo=True, # 启用日志输出
pool_size=10, # 连接池中的初始连接数为 10
max_overflow=5, # 允许连接池中的最大连接数超出 5
pool_recycle=3600, # 连接在 1 小时后重新连接
pool_timeout=30 # 获取连接的超时时间为 30 秒
)
Session = sessionmaker(bind=engine)
db_session = Session()
上述代码通过创建数据库引擎后使用 sessionmaker()
方法来创建 db_session
,之后就可以拿着 db_session
来操作数据模型了,例如操作查询:db_session.query(User).filter(User.id == 1).first()
,查询 ID 为 1 的一个用户。
配置项
在创建数据库引擎时,这里列出了一些常用的配置项
- echo : 用于启用或禁用引擎的日志输出。当设置为 True 时,将输出执行的 SQL 语句和其他调试信息
- pool_size : 指定连接池中的初始连接数。默认值为 5
- max_overflow : 指定连接池中可以超出
pool_size
数量的最大连接数 - pool_recycle : 指定连接在多久之后重新连接。默认为 -1,表示永不重新连接
- pool_timeout : 指定获取连接的超时时间,如果在超时时间内无法获取连接,则引发异常默认值为 30 秒
- pool_pre_ping : 如果为真,则布尔值将启用连接池的预备 ping 功能
- pool_use_lifo : 检索连接时使用后进先出,而不是先进先出。使用 LIFO,服务器端超时方案可以减少非高峰时段使用的连接数
Session 维持
在使用原生 SQLALchemy 时,会遇到一些令人困惑的问题,例如 Session 的丢失,或者说失效。很多时候我们首先需要检查一下上文提到的一个配置项:pool_recycle
,由于它的默认值是 -1,表示永不重新连接,但是 MySQL 默认的连接保活时间是 8 小时,具体要查看 MySQL 的 interactive_timeout
,那么 pool_recycle=-1
的配置显然是不合理的,至少需要小于 MySQL 的 8 小时才行。
所以最好给 pool_recycle
设置一个合适的值,例如 3600,也就是 1 小时。
进一步地,为了更好更安全地维护 Session,可以使用 scoped_session()
代替 sessionmaker()
方法,sessionmaker()
是一个工厂方法,可以用于创建 Session,而 scoped_session()
是一种特殊的 sessionmaker()
,能在多线程环境下管理会话。
此时要做的修改如下:
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engin))
再之,如果使用 Flask 在请求时需要一个新的数据库会话,则可以在 before_request
时,db_session.rollback()
,在 after_request
时,db_session.remove()
,以保证每次拿到的是新的 Session。
序列化
在编写 Flask API 时,我们总是需要将模型序列化为 JSON 并返回给客户端。
例如需要把一个 User 模型,转化为 JSON 对象。
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
create_time = Column(DateTime, default=datetime.utcnow)
转为:
{
'id': 1,
'username': 'Jeffrey',
'create_time': '2023-01-01 00:00:00'
}
此时我们通常会定义一个 JSONEncoder
,来完成对不同字段的序列化。
class JSONEncoder(_JSONEncoder):
def default(self, o):
if isinstance(o, (int, list, set, tuple)):
return json.dumps(o, cls=JSONEncoder)
if isinstance(o, datetime):
return o.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(o, date):
return o.strftime('%Y-%m-%d')
if isinstance(o, time):
return o.strftime('%H:%M:%S')
if isinstance(o, Enum):
return o.value
if isinstance(o, Decimal):
return o.__float__()
if hasattr(o, 'keys') and hasattr(o, '__getitem__'):
return dict(o)
return JSONEncoder.default(self, o)
但遗憾的是,在使用原生 SQLALchemy 开发时,用上述方法,会遇到很多麻烦的事情。这取决于 SQLALchemy 其实还包含着许多内置字段,而我们只需要模型表结构的字段而已。具体参照这个问题:https://stackoverflow.com/questions/5022066/how-to-serialize-sqlalchemy-result-to-json
难道我只是简单地想把字段序列化为 JSON 也需要引入第三方库了吗,其实不然。
实现上述功能,只需要使用到面向对象的三大特性之一:继承;以及 Python 的自省机制即可。
自省
关于 Python 的自省机制,我曾在 认清自我从 Python 的自省机制开始 这篇文章做过介绍,这里就不再展开。
继承
使用继承,是为了定义一个统一的模型基类,以及统一的序列化方法,这样就无需为不同的模型重复定义,实现了复用,具体的代码如下:
class BaseModel(Base):
_abstract_ = True
id = Column(Integer, primary_key=True, autoincrement=True, comment='主键标识')
create_time = Column(TIMESTAMP, server_default=func.now(), comment='创建时间')
update_time = Column(TIMESTANP, server_default=func.now(), onupdate=func.now(), comment='更新时间')
def serialize(self):
result = dict()
for c in inspect(self).mapper.column_attrs:
key = c.key
value = getattr(self, key)
# 枚塋类的特殊外理
if isinstance(value, enum.Enum):
result[key] = value.value
result['_' + key] = value.name
else:
result[key] = value
retrun result
最后,使用原生 SQLALchemy 支撑 Flask 会更方便这个问题,我认为还是见仁见智,如果是个小应用这么做其实也能达到很好的效果,并且如果项目是一个非 Flask 项目,那这样反而是个明智的选择,但如果是一个庞大的系统,那么不如把一些繁琐的事情交由其他插件完成吧。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于