对象关系映射(ORM)可以建立类和数据库表直接的映射,一个类实例可以映射到这张表中的一个行。通过这样的模型,直接在编程语言层面对对象的操作就可以映射到对数据库管理系统(RDBMS)的操作。从开发的角度看,ORM免去了编写各种tables和CRUD SQL语句的麻烦。

ORM的全称为Object Relational Mapping,对象关系映射。本文讲述Python下的ORM。使用的模块是sqlalchemy,它兼容的数据库包括:MySQLPostgresqlSQLite。本文以MySQL为例。文章包括内容如下:

  1. ORM相关背景
  2. ORM的使用
  3. 结合Flask的开发
  4. 一个简单的例子

使用ORM的好处

在一些使用关系型数据库的项目中,通常面临如下的问题:

  1. 业务越多,SQL语句就越多,但很多是冗余的表达,管理不方便
  2. 通过字符操做拼接的SQL容易出错,分散开发人员的精力
  3. 需要把精力去考虑SQL的安全性问题,例如SQL注入等
  4. 如果数据库产品有变更或数据库模型改变,SQL语句可能失效

使用ORM的好处:

  1. 直接通过语言层面操作数据库,简单易用
  2. 容易应对复杂的查询。使用ORM的查询语法可读性更强
  3. ORM兼容各类关系型数据库
  4. 性能损耗少

建立连接

使用sqlalchemy建立连接到数据库是通过create_engine()函数。要注意,它的惰性连接,也就是在第一个执行数据库操作时才建立实际的连接。

1
2
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://root:root@localhost:3306/flask?charset=utf8', echo=True, encoding='utf-8')

create_engine()的第一个参数是数据库的URL,它的格式如下:

dialect+driver://username:password@host:port/database?charset=utf8

driver是对应的数据库的驱动,例如在Python3中,MySQL的常用驱动是pymysql。echo设置用于调试或日志,它会输出详细的过程,包括table的创建,对象的添加等。charset可选,用于指定使用的字符集合,注意这里没有一横杠:charset=utf8。如果要指定字符集合也可以通过encoding变量。

通用格式如下:

databasetype+driver://user:password@host:port/databasename

其他数据库的格式如下:

  • sqlite:// 直接指定内存存储
  • sqlite:///database.db
  • postgresql+psycopg2://user:password@host:port/database

建立连接后可以直接在连接上使用SQL。

1
2
3
>>> result = engine.execute('select now();')
>>> print(result.fetchone())
(datetime.datetime(2018, 1, 24, 3, 13, 58),)

或者调用engine.connect()生成一个实现DBAPI的连接对象,而且它支持上下文。

1
2
3
4
5
6
7
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://root:root@localhost:3306/flask', echo=True)

with engine.connect() as conn:
result = conn.execute('select * from files')
for row in result:
print(row)

上面这两种用法和使用普通的数据库客户端没有差别。但它有一个好处是为不同的关系数据库的客户端驱动提供了一层兼容抽象。就像虚拟文件系统为不同的文件系统提供已经接口通用的抽象。如果换成其他数据库系统,只需要修改数据库URL中的dialectdriver即可。

创建映射

使用ORM时,我们首先需要建立一个继承自Base的类。这个类会映射到数据库的table上。__tablename__指定定义的表的名字。具体一个例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy import create_engine

engine = create_engine('mysql+pymysql://root:root@localhost:3306/flask', echo=True)

Base = declarative_base()

class Files(Base):
__tablename__ = 'Files'

id = Column(Integer, primary_key=True)
name = Column(String(255))
ctime = Column(Integer)
size = Column(Integer)

def __repr__(self):
return '<File(%s)>' % self.id

Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)

这段代码终止执行如下的SQL常见Schema:

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `files` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(255) NULL DEFAULT NULL,
`ctime` INT(11) NULL DEFAULT NULL,
`size` INT(11) NULL DEFAULT NULL,
PRIMARY KEY (`id`)
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB
;

我们看到了SQL定义的字段和Files的对应关系。

创建会话

通过sessionmaker创建会话,会话提供事务控制的相关操作,例如创建对象、删除对象、commitrollback。接着上面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import os
from sqlalchemy.orm import sessionmaker

Session = sessionmaker()
Session.configure(bind=engine)
session = Session() # 需要先前创建的engine

def walk(path):
for driver, _, files in os.walk(path):
for file in files:
yield os.path.join(driver, file)

for file in walk('~'):
stat = os.stat(file)
session.add(Files(name=file, ctime=stat.st_ctime, size=stat.st_size))
session.commit()

这个实例遍历$HOME目录,然后保存相关的元数据。如果我们需要rollback,直接调用session.rollback()。这里例子要注意路径的深度,因为我们创建的字段name只有255个字符限制。

创建关系

对于InnoDB存储引擎来说,可以通过外键实现多个表的关联,以满足特定的数据范式设计。这样的关联能保证数据库操作时数据的一致性。相关的数据库理论这里不多说了。接下来举一个简单的例子。

简单例子

先看代码。两张表,用来存储文件元数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import warnings

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship

Base = declarative_base()

class File(Base):
__tablename__ = 'file'
id = Column(Integer, primary_key=True)
filename = Column(String(100), nullable=False)

def __repr__(self):
return '<File(id=%s, name=%s)>' % (self.id, self.filename)

class Stat(Base):
__tablename__ = 'stat'
id = Column(Integer, primary_key=True)
path = Column(String(1000), nullable=False)
ctime = Column(Integer, nullable=False)
atime = Column(Integer, nullable=False)
mtime = Column(Integer, nullable=False)
size = Column(Integer, nullable=False)
file_id = Column(Integer, ForeignKey('file.id'))

file = relationship("File", back_populates="stats")

def __repr__(self):
return '<Stat(path=%s)>' % self.path

# 这个需要放在Stat之后定义
File.stats = relationship('Stat', order_by=Stat.id, back_populates='file')
warnings.filterwarnings('ignore')
url = 'mysql+pymysql://root:root@localhost:3306/flask'

engine = create_engine(url, encoding='utf-8')
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)

session = sessionmaker(bind=engine)()

代码中定义两张表,File和Stat通过外键建立关系。建立外键关系的关键是使用ForeignKey实现对字段的关联。

实质上,这两张表创建的SQL语句如下。MySQL5.6默认使用InnoD存储引擎。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE TABLE `file` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`filename` VARCHAR(100) NOT NULL,
PRIMARY KEY (`id`)
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB
;

CREATE TABLE `stat` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`path` VARCHAR(1000) NOT NULL,
`ctime` INT(11) NOT NULL,
`atime` INT(11) NOT NULL,
`mtime` INT(11) NOT NULL,
`size` INT(11) NOT NULL,
`file_id` INT(11) NULL DEFAULT NULL,
PRIMARY KEY (`id`),
INDEX `file_id` (`file_id`),
CONSTRAINT `stat_ibfk_1` FOREIGN KEY (`file_id`) REFERENCES `file` (`id`)
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB
;

写一段简单的测试代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def test():
import os
all_files = []
for driver, _, files in os.walk('F:\\game'):
for file in files:
path = os.path.join(driver, file)
file = File(filename=file)
stat = os.stat(path)
file.stats = [Stat(path=path,
ctime=stat.st_ctime,
atime=stat.st_atime,
mtime=stat.st_mtime,
size=stat.st_size,
file_id=file.id)]
all_files.append(file)
session.add_all(all_files)
session.commit()

使用多变的一个问题是性能,由于涉及多表操作,需要注意。接下来尝试其他的关系模型。

一对多模型、多对多模型都是可以通过relationship建立起来。

CRUD

像SQL提供的CRUD操作一样,ORM也提供先向对象的CRUD操作。为了让CRUD操作得以实现,ORM需要建立于数据库管理系统的session。由session代理用户提供的操作,如果希望操作生效写入数据库,执行session.commit(),否则执行session.rollback()。接下来我们分别举例。

CRUD操作的公共代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Integer, String, Sequence, Column
from sqlalchemy import and_, or_, text

engine = create_engine('mysql+pymysql://root:root@localhost:3306/flask', encoding='utf-8')
Base = declarative_base()

class Files(Base):
__tablename__ = 'files'
id = Column(Integer, Sequence('files_id'), primary_key=True, autoincrement=True)
filename = Column(String(255))

Base.metadata.create_all(bind=engine)

无论是什么操作,都需要建立在session上。

insert

添加一个项目

1
2
3
session = sessionmaker(bind=engine)()
session.add(User(filename="allenwind.sql")
session.commit()

添加多个项目

1
2
3
4
session = sessionmaker(bind=engine)()
files = [Files(filename=name) for name in ("allen", "wind", "allenwind")]
session.add_all(files)
session.commit(()

select

在查询中,SQLandor分别由and_, or_替代。

查询表中所有项目

1
2
3
4
session = sessionmaker(bind=engine)
results = session.query(Files).all()
for r in results:
print(r)

条件查询举例

1
2
3
4
5
6
session = sessionmaker(bind=engine)
results = session.query(Files).filter(Files.id.in_([5, 10]))

results = session.query(Files).filter(or_(Files.id==1, Files.id==2))

results = session.query(Files).filter(and_(Files.id >= 5, Files.id <= 10))

指定字段的过滤

1
2
3
4
5
6
session = sessionmaker(bind=engine)
results = session.query(Files).filter_by(filename='wind')

# 如果只想返回第一个项目
result = session.query(Files).filter_by(filename='wind').first()
print(result)

使用like操作

1
2
session = sessionmaker(bind=engine)
results = session.query(Files).filter(Files.filename.like('allen%'))

delete

1
2
3
4
session = sessionmaker(bind=engine)
result = session.query(Files).filter_by(filename='wind').first()
session.delete(result)
session.commit()

一个例子—记录慢查询

我们已经知道了ORM的简便,现在使用ORM来实现一个简单的项目。

在Flask的基础上,结合SQLAlchemy可以定位业务查询中的“查询缓慢”情况。具体的实现是借用Flask的配置文件字段SQLALCHEMY_RECODE_QUERIESDATABASE_QUERY_TIMEOUT。通过这样的配置它能将慢查询和上下文相关的信息记录到日志中。当然,这样做只是一种规范

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import logging

from flask import Flask
from logging.handlers import RotatingFileHandler
from flask_sqlalchemy import get_debug_queries

app = Flask(__name__)
app.config['DATABASE_QUERY_TIMEOUT'] = 0.001
app.config['SQLALCHEMY_RECODE_QUERIES'] = True

# + 数据库相关的业务代码

formatter = logging.Formatter("[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s")
handler = RotatingFileHandler('slow.log', maxBytes=1<<20, backupCount=20)
handler.setLevel(logging.WARN)
handler.setFormatter(formatter)
app.logger.addHandler(handler)

@app.after_request
def log_slow_query(response):
for query in get_debug_queries():
if query.duration >= app.config['DATABASE_QUERY_TIMEOUT']:
app.logger.warn(query.context, query.statement, query.parameters, query.duration)
return response

if __name__ == '__main__':
app.run()

为了演示方便,把DATABASE_QUERY_TIMEOUT值设置更大点。

更多关于SQLAlchemy在Flask中的应用的内容,参考文章SQLAlchemy在Flask中的应用

尽管ORM很方便,在开发是可以代替纯SQL,使用ORM甚至可以不需要掌握数据库相关的知识,但如果熟练SQL和关系型数据库,在优化和调试时有很大帮助。

转载请包括本文地址:https://allenwind.github.io/blog/4666
更多文章请参考:https://allenwind.github.io/blog/archives/