python sqlalchemy执行原始sql语句示例

参考文档

官方文档首页: http://docs.sqlalchemy.org/en/latest/

连接数据库

这边使用sqlite的内存数据库,方便测试

import sqlalchemy

db_engine = sqlalchemy.create_engine('sqlite:///:memory:', echo=True)
db_conn = db_engine.connect()

使用create_engine可以建立一个连接池,调用connect可以从连接池中获取一个连接,调用db_conn.close()会把连接释放给连接池,并做相应的清理工作。

官方说明:http://docs.sqlalchemy.org/en/latest/core/connections.html#basic-usage

The connection is an instance of Connection, which is a proxy object for an actual DBAPI connection. The DBAPI connection is retrieved from the connection pool at the point at which Connection is created.
The returned result is an instance of ResultProxy, which references a DBAPI cursor and provides a largely compatible interface with that of the DBAPI cursor. The DBAPI cursor will be closed by the ResultProxy when all of its result rows (if any) are exhausted. A ResultProxy that returns no rows, such as that of an UPDATE statement (without any returned rows), releases cursor resources immediately upon construction.
When the close() method is called, the referenced DBAPI connection is released to the connection pool. From the perspective of the database itself, nothing is actually “closed”, assuming pooling is in use. The pooling mechanism issues a rollback() call on the DBAPI connection so that any transactional state or locks are removed, and the connection is ready for its next usage.

使用原始SQL

先进行建表操作

db_conn.execute(r'''
CREATE TABLE IF NOT EXISTS stocks (
        date text, 
        trans text, 
        symbol text, 
        qty real, 
        pricereal
    )
''')

插入单条数据

直接拼装一个完整的sql字符串,或使用下面防注入的参数绑定方式, 注意:会自动commit

db_conn.execute(r'''
INSERT INTO stocks VALUES (?, ?, ?, ?, ?)
''', ('2001-01-11', 'BUY', 'RHAT', 100, 35.14) )
db_conn.execute(r'''
INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)
''', ('2001-01-11', 'BUY', 'RHAT', 100, 35.14) )

操作日志显示会自动commit

2017-08-27 15:05:26,834 INFO sqlalchemy.engine.base.Engine ()
2017-08-27 15:05:26,834 INFO sqlalchemy.engine.base.Engine COMMIT
2017-08-27 15:05:26,834 INFO sqlalchemy.engine.base.Engine 
INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)

2017-08-27 15:05:26,834 INFO sqlalchemy.engine.base.Engine ('2001-01-11', 'BUY', 'RHAT', 100, 35.14)
2017-08-27 15:05:26,834 INFO sqlalchemy.engine.base.Engine COMMIT
2017-08-27 15:05:26,835 INFO sqlalchemy.engine.base.Engine 
INSERT INTO stocks VALUES (?, ?, ?, ?, ?)

2017-08-27 15:05:26,835 INFO sqlalchemy.engine.base.Engine ('2001-01-11', 'BUY', 'RHAT', 100, 35.14)
2017-08-27 15:05:26,835 INFO sqlalchemy.engine.base.Engine COMMIT
2017-08-27 15:05:26,835 INFO sqlalchemy.engine.base.Engine 
INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)

插入多条数据

与插入单条数据类似, 注意:会自动commit

purchases = [('2006-03-28', 'BUY', 'IBM', 1000, 45.00),
             ('2006-04-05', 'BUY', 'MSFT', 1000, 72.00),
             ('2006-04-06', 'SELL', 'IBM', 500, 53.00),
            ]
db_conn.execute(r'''
INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)
''', purchases)
db_conn.execute(r'''
INSERT INTO stocks VALUES (?, ?, ?, ?, ?)
''', purchases)

操作日志显示会自动commit

2017-08-27 15:05:26,835 INFO sqlalchemy.engine.base.Engine 
INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)

2017-08-27 15:05:26,835 INFO sqlalchemy.engine.base.Engine [('2006-03-28', 'BUY', 'IBM', 1000, 45.0), ('2006-04-05', 'BUY', 'MSFT', 1000, 72.0), ('2006-04-06', 'SELL', 'IBM', 500, 53.0)]
2017-08-27 15:05:26,835 INFO sqlalchemy.engine.base.Engine COMMIT
2017-08-27 15:05:26,835 INFO sqlalchemy.engine.base.Engine 
INSERT INTO stocks VALUES (?, ?, ?, ?, ?)

2017-08-27 15:05:26,835 INFO sqlalchemy.engine.base.Engine [('2006-03-28', 'BUY', 'IBM', 1000, 45.0), ('2006-04-05', 'BUY', 'MSFT', 1000, 72.0), ('2006-04-06', 'SELL', 'IBM', 500, 53.0)]
2017-08-27 15:05:26,835 INFO sqlalchemy.engine.base.Engine COMMIT

使用事务处理

参考文档: http://docs.sqlalchemy.org/en/latest/core/connections.html#using-transactions

由于上面的插入操作会自动进行commit,sqlalchemy提供了Transactions来管理commit和rollback。

完整的测试代码如下

import sqlalchemy

db_engine = sqlalchemy.create_engine('sqlite:///:memory:', echo=True, pool_size=2)
db_conn = db_engine.connect()

db_conn.execute(r'''
CREATE TABLE IF NOT EXISTS stocks (date text, trans text, symbol text, qty real, price real)
''')

with db_conn.begin() as db_trans:
    #
    db_conn.execute(r'''
    INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)
    ''', ('aa', 'BUY', 'RHAT', 100, 35.14) )

    db_conn.execute(r'''
    INSERT INTO stocks VALUES (?, ?, ?, ?, ?)
    ''', ('bb', 'BUY', 'RHAT', 100, 35.14) )

    #提前进行commit
    db_trans.commit()

    purchases = [('cc', 'BUY', 'IBM', 1000, 45.00),
                 ('dd', 'BUY', 'MSFT', 1000, 72.00),
                 ('ee', 'SELL', 'IBM', 500, 53.00),
                ]
    db_conn.execute(r'''
    INSERT INTO stocks VALUES (?,?,?,?,?)
    ''', purchases)


query_result = db_conn.execute(r'''
SELECT * FROM  stocks
''').fetchall()
print('query_result:', query_result)

完整的日志如下

2017-08-27 15:18:22,057 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2017-08-27 15:18:22,057 INFO sqlalchemy.engine.base.Engine ()
2017-08-27 15:18:22,057 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2017-08-27 15:18:22,057 INFO sqlalchemy.engine.base.Engine ()
2017-08-27 15:18:22,058 INFO sqlalchemy.engine.base.Engine 
CREATE TABLE IF NOT EXISTS stocks (date text, trans text, symbol text, qty real, price real)

2017-08-27 15:18:22,058 INFO sqlalchemy.engine.base.Engine ()
2017-08-27 15:18:22,058 INFO sqlalchemy.engine.base.Engine COMMIT
2017-08-27 15:18:22,058 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2017-08-27 15:18:22,058 INFO sqlalchemy.engine.base.Engine 
    INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)

2017-08-27 15:18:22,058 INFO sqlalchemy.engine.base.Engine ('aa', 'BUY', 'RHAT', 100, 35.14)
2017-08-27 15:18:22,058 INFO sqlalchemy.engine.base.Engine 
    INSERT INTO stocks VALUES (?, ?, ?, ?, ?)

2017-08-27 15:18:22,059 INFO sqlalchemy.engine.base.Engine ('bb', 'BUY', 'RHAT', 100, 35.14)
2017-08-27 15:18:22,059 INFO sqlalchemy.engine.base.Engine COMMIT
2017-08-27 15:18:22,059 INFO sqlalchemy.engine.base.Engine 
    INSERT INTO stocks VALUES (?,?,?,?,?)

2017-08-27 15:18:22,059 INFO sqlalchemy.engine.base.Engine [('cc', 'BUY', 'IBM', 1000, 45.0), ('dd', 'BUY', 'MSFT', 1000, 72.0), ('ee', 'SELL', 'IBM', 500, 53.0)]
2017-08-27 15:18:22,059 INFO sqlalchemy.engine.base.Engine COMMIT
2017-08-27 15:18:22,059 INFO sqlalchemy.engine.base.Engine 
SELECT * FROM  stocks

2017-08-27 15:18:22,059 INFO sqlalchemy.engine.base.Engine ()
query_result: [('aa', 'BUY', 'RHAT', 100.0, 35.14), ('bb', 'BUY', 'RHAT', 100.0, 35.14), ('cc', 'BUY', 'IBM', 1000.0, 45.0), ('dd', 'BUY', 'MSFT', 1000.0, 72.0), ('ee', 'SELL', 'IBM', 500.0, 53.0)]

可以看到有三次commit操作,有三次commit操作, 一次是建表, 两次是插入,其中第二次是显示调用commit,第三次是with结束后自动调用。因此如果想避免频繁的commit,可以使用with来进行上下文管理。

测试异常

import sqlalchemy

db_engine = sqlalchemy.create_engine('sqlite:///:memory:', echo=True, pool_size=2)
db_conn = db_engine.connect()

try:
    with db_conn.begin() as db_trans:
        db_conn.execute(r'''
        CREATE TABLE IF NOT EXISTS stocks (date text, trans text, symbol text, qty real, price real)
        ''')

        db_conn.execute(r'''
        INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)
        ''', ('aa', 'BUY', 'RHAT', 100, 35.14) )
        db_trans.commit()

        db_conn.execute(r'''
        INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)
        ''', ('bb', 'BUY', 'RHAT', 100, 35.14) )

        db_conn.execute(r'''
        INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)
        ''', ('cc', 'BUY', 'RHAT', 100) ) #这里故意少传一个参数来制造异常
except:
    print('exception!!!')


query_result = db_conn.execute(r'''
SELECT * FROM  stocks
''').fetchall()
print('query_result:', query_result)

日志

2017-08-27 15:40:53,151 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2017-08-27 15:40:53,151 INFO sqlalchemy.engine.base.Engine ()
2017-08-27 15:40:53,152 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2017-08-27 15:40:53,152 INFO sqlalchemy.engine.base.Engine ()
2017-08-27 15:40:53,152 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2017-08-27 15:40:53,152 INFO sqlalchemy.engine.base.Engine 
        CREATE TABLE IF NOT EXISTS stocks (date text, trans text, symbol text, qty real, price real)

2017-08-27 15:40:53,153 INFO sqlalchemy.engine.base.Engine ()
2017-08-27 15:40:53,153 INFO sqlalchemy.engine.base.Engine 
        INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)

2017-08-27 15:40:53,153 INFO sqlalchemy.engine.base.Engine ('aa', 'BUY', 'RHAT', 100, 35.14)
2017-08-27 15:40:53,153 INFO sqlalchemy.engine.base.Engine COMMIT
2017-08-27 15:40:53,153 INFO sqlalchemy.engine.base.Engine 
        INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)

2017-08-27 15:40:53,153 INFO sqlalchemy.engine.base.Engine ('bb', 'BUY', 'RHAT', 100, 35.14)
2017-08-27 15:40:53,153 INFO sqlalchemy.engine.base.Engine COMMIT
2017-08-27 15:40:53,153 INFO sqlalchemy.engine.base.Engine 
        INSERT INTO stocks VALUES (:1, :2, :3, :4, :5)

2017-08-27 15:40:53,153 INFO sqlalchemy.engine.base.Engine ('cc', 'BUY', 'RHAT', 100)
2017-08-27 15:40:53,154 INFO sqlalchemy.engine.base.Engine ROLLBACK
exception!!!
2017-08-27 15:40:53,154 INFO sqlalchemy.engine.base.Engine 
SELECT * FROM  stocks

2017-08-27 15:40:53,154 INFO sqlalchemy.engine.base.Engine ()
query_result: [('aa', 'BUY', 'RHAT', 100.0, 35.14), ('bb', 'BUY', 'RHAT', 100.0, 35.14)]
  • 出现异常时会自动rollback
  • 如果在with中提前调用commit,会导致以后的每个改动操作都会自动commit

python flask-sqlalchemy如何设置使自动建的mysql表字符集charset为utf8

问题

发现flask-sqlalchemy自动创建的mysql表为默认的latin1,如何不更改mysql服务器的默认字符集,直接在flask里配置自动建的mysql表字符集为utf8?

最佳答案

配置table_args就可以了,如:

class Foo(Base):
    __tablename__ = "foo"
    __table_args__ = {'mysql_collate': 'utf8_general_ci'}

    ...

    column = db.Column(db.String(500))

python3.6使用sqlalchemy读取mysql中的数据并进行多进程并发处理

1. 介绍 SQLALChemy

SQLALChemy 是一个 python 的 ORM(Object Relational Mapper) 框架,开发人员可以快速开发操作数据库的程序,
它提供完整的数据库访问层,提供高性能的数据库访问能力。
它支持 SQLite、MySQL、Postgres、Oracle 等常用的数据库访问

2. 安装 SQLAlChemy

pip install sqlalchemy

2.1 创建测试数据库

# 建立数据库
CREATE DATABASE `test` /*!40100 DEFAULT CHARACTER SET utf8mb4 */;

2.2 用 SQLALChemy 创建数据库表

2.2.1 程序关键点

  • 创建操作数据库的 engine,使用 pymysql 库访问 mysql 数据库
  • 创建操作数据库的 session,绑定到 engine 上
  • 从 Base 继承定义 User,Article 类,对应 mapping 到数据库的 member,article 表
  • 使用 session.create_all 创建数据库表结构
  • session.add_all 新增数据到数据库
  • session.commit 提交所有变更到数据库,此时可以再数据库中查询插入的数据
  • 查询数据使用 session.query 方法,也可以在后面连接使用 filter 进行条件过滤
#!/usr/bin/env python
# coding: utf-8


from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


# 创建数据库 engine 使用 utf8 编码
eng = create_engine('mysql+pymysql://root:1@localhost:3306/test?charset=utf8')
Base = declarative_base()

# 创建 session 类,绑定engine
Session = sessionmaker(bind=eng)
session = Session()


class User(Base):
    '''
    用户类,对应数据库的 member 表
    '''
    __tablename__ = 'member'

    # 定义表字段
    mid = Column(Integer, primary_key=True)
    nickname = Column(String(50))
    email = Column(String(128))

    def __repl__(self):
        return '<User(name={}, email={}, nickname{}>'.format(mid,
                                                             email,
                                                             nickname)


class Article(Base):
    '''
    文章类,对应数据库中的 article 表
    '''
    __tablename__ = 'article'

    # 定义表字段
    arid = Column(Integer, primary_key=True)
    tags = Column(String(128))
    description = Column(String(256))
    title = Column(String(256))


def create_table():
    '''
    创建数据库表结构,导入初始数据
    '''
    # 创建表
    Base.metadata.create_all(eng)

    # 插入数据
    session.add_all([
        User(mid=1, nickname='测试数据 test hello', email='[email protected]'),
        User(mid=2, nickname='测试数据 china hello', email='[email protected]'),
        User(mid=3, nickname='测试数据 上海 hello', email='[email protected]'),
        User(mid=4, nickname='测试数据 北京 hello', email='[email protected]'),
        User(mid=5, nickname='测试数据 上海 hello', email='[email protected]'),
        User(mid=6, nickname='测试数据 山东 hello', email='[email protected]'),
        User(mid=7, nickname='测试数据 武夷山 hello', email='[email protected]'),
        User(mid=8, nickname='测试数据 黄山 hello', email='[email protected]'),

        Article(arid=1, tags='测试数据 test hello', title='销售额度', description='测试 test ok'),
        Article(arid=2, tags='测试数据 china hello', title='成功转型', description='测试 test ok'),
        Article(arid=3, tags='测试数据 上海 hello', title='蓝蓝的天上白云飘', description='测试 test ok'),
        Article(arid=4, tags='测试数据 背景 hello', title='在水一方', description='测试 test ok'),
        Article(arid=5, tags='测试数据 上海 hello', title='晴天,阴天,雨天,大风天', description='测试 test ok'),
        Article(arid=6, tags='测试数据 山东 hello', title='每年365天,每天24小时', description='测试 test ok'),
        Article(arid=7, tags='测试数据 武夷山 hello', title='高效工作的秘密', description='测试 test ok'),
        Article(arid=8, tags='测试数据 黄山 hello', title='战狼2', description='测试 test ok'),
    ]
    )

    # 提交到数据库
    session.commit()


def modify_data():
    '''
    测试修改数据
    '''

    # 查询用户表
    users = session.query(User).all()
    print(users)

    # 查询文章表
    # articles = session.query(Article).all()
    articles = session.query(Article).filter(Article.arid==2)
    print(articles)

    # 修改文章表
    articles[0].description = '程度,修改描述可以成功'
    print(session.dirty)

    # 提交到数据库
    session.commit()


if __name__ == '__main__':
    create_engine()
    # modify_data()

3. 多进程搜索程序

3.1 程序关键点

  • 创建 DbMgr 类,封装数据库访问操作
  • 用 sqlalchemy 从数据库获取 member,article 表中的数据
  • 使用 automap 自动反射出 member,article 表对应的类
  • 创建 Searcher 类,提供进程调用函数,用来查询符合条件的结果,并且提供进程执行完的回调展示方法
  • 创建 10 个进程的进程池
  • 循环获取用户输入,创建 searcher 对象,多进程并发执行过滤
  • 多进程调用使用 python multiprocessing
#!/usr/bin/env python
# coding: utf-8


import os


from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.automap import automap_base
from multiprocessing import Pool


class DbMgr():
    '''
    连接数据库,从数据库获取 用户表,文章表数据
    '''

    def __init__(self):
        eng = create_engine('mysql+pymysql://root:1@localhost/test?charset=utf8')

        # 使用 automap 自动反射出 member,article 表对应的类
        meta = MetaData()
        meta.reflect(eng, only=['member', 'article'])
        Base = automap_base(metadata=meta)
        Base.prepare()

        self._Member = Base.classes.member
        self._Article = Base.classes.article

        # 获取操作数据库的 session
        Session = sessionmaker(eng, autocommit=True)
        self._ses = Session()

    def get_data(self):
        '''
        查询用户表,文章表
        '''
        self._users = self._ses.query('"user"', self._Member.mid,
                                      self._Member.email,
                                      self._Member.nickname).all()

        self._articles = self._ses.query(self._Article).all()
        self._articles = [('ar', i.arid, i.title, i.tags, i.description)
                          for i in self._articles]

        return list(self._users) + list(self._articles)


class Searcher():
    '''
    进城处理函数,查找符合条件的结果,找到后返回结果
    '''
    def __init__(self, keyword):
        self._keyword = keyword

    def run(self, data):
        '''
        查找字符串
        '''
        try:
            if self._keyword in str(data):
                return 'ret: ' + str(os.getpid()) + '->' + str(data)
            else:
                return None
        except Exception as e:
            return e

    def callback(self, data):
        '''
        全部执行完后回调函数,展示结果
        '''
        try:
            for i in data:
                if i:
                    print('match: {}'.format(i))
        except Exception as e:
            print(e)


def main():
    # 从数据库读取数据
    mgr = DbMgr()

    # 创建过滤进程池
    pool = Pool(4)

    # 创建搜索器
    while True:
        keyword = input('n输入搜索词:  ')
        if keyword == 'q':
            break

        searcher = Searcher(keyword)

        # 从数据库获取数据
        data = mgr.get_data()
        res = pool.map_async(searcher.run,
                             data,
                             10,
                             callback=searcher.callback)

        # 等待所有进程执行完成
        res.wait()
        print('all done', res.successful())


if __name__ == '__main__':
    main()

3.2 程序运行

(py36env) servadmin@debian:~/test # python mysql2es.py

输入搜索词:  test
match: ret: 10013->('user', 1, '[email protected]', '测试数据 test hello')
match: ret: 10013->('user', 2, '[email protected]', '测试数据 china hello')
match: ret: 10013->('user', 3, '[email protected]', '测试数据 上海 hello')
match: ret: 10013->('user', 4, '[email protected]', '测试数据 背景 hello')
match: ret: 10013->('user', 5, '[email protected]', '测试数据 上海 hello')
match: ret: 10013->('user', 6, '[email protected]', '测试数据 山东 hello')
match: ret: 10013->('user', 7, '[email protected]', '测试数据 武夷山 hello')
match: ret: 10013->('user', 8, '[email protected]', '测试数据 黄山 hello')
match: ret: 10013->('ar', 1, '销售额度', '测试数据 test hello', '程度,修改描述可以成功')
match: ret: 10013->('ar', 3, '蓝蓝的天上白云飘', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 4, '在水一方', '测试数据 背景 hello', '测 test ok')
match: ret: 10013->('ar', 5, '晴天,阴天,雨天,大风天', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 6, '每年365天,每天24小时', '测试数据 山东 hello', '测试 test ok')
match: ret: 10013->('ar', 7, '高效工作的秘密', '测试数据 武夷山 hello', '测试 test ok')
match: ret: 10013->('ar', 8, '战狼2', '测试数据 黄山 hello', '测试 test ok')
all done True

输入搜索词:  转型
match: ret: 10013->('ar', 2, '成功转型', '测试数据 china hello', '程度,修改描述可以成功')
all done True

输入搜索词:  test
match: ret: 10013->('user', 1, '[email protected]', '测试数据 test hello')
match: ret: 10013->('user', 2, '[email protected]', '测试数据 china hello')
match: ret: 10013->('user', 3, '[email protected]', '测试数据 上海 hello')
match: ret: 10013->('user', 4, '[email protected]', '测试数据 背景 hello')
match: ret: 10013->('user', 5, '[email protected]', '测试数据 上海 hello')
match: ret: 10013->('user', 6, '[email protected]', '测试数据 山东 hello')
match: ret: 10013->('user', 7, '[email protected]', '测试数据 武夷山 hello')
match: ret: 10013->('user', 8, '[email protected]', '测试数据 黄山 hello')
match: ret: 10013->('ar', 1, '销售额度', '测试数据 test hello', '程度,修改描述可以成功')
match: ret: 10013->('ar', 3, '蓝蓝的天上白云飘', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 4, '在水一方', '测试数据 背景 hello', '测试 test ok')
match: ret: 10013->('ar', 5, '晴天,阴天,雨天,大风天', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 6, '每年365天,每天24小时', '测试数据 山东 hello', '测试 test ok')
match: ret: 10013->('ar', 7, '高效工作的秘密', '测试数据 武夷山 hello', '测试 test ok')
match: ret: 10013->('ar', 8, '战狼2', '测试数据 黄山 hello', '测试 test ok')
all done True

输入搜索词:

解决使用Flask-SQLAlchemy中出现的1366报错

最近在按照这本书学Flask,学到通过ORM方式操作数据库时遇到一个很奇怪的问题:

会报下面这个1366的错

...default.py:470: Warning: (1366, "Incorrect string value: 'xD6xD0xB9xFAxB1xEA...' for column 'VARIABLE_VALUE' at row 479")

未分类

奇怪的地方在于我表格里的数据全部都是英语

因为看到UTF编码,首先就想到是不是编码的问题,于是

  • 检查了自己的MySQL的配置
    没发现配置有问题,都是UTF-8编码

  • 网上搜了下相关资料
    发现除了MySQL中的编码配置之外,Python的编码也要是UTF-8,检查没发现问题 (如下图)
    另外就是在字符串前加上u,变成u’string’的形式,当然这招也没用

未分类

到此我比较郁闷,遂求助于程序员好友,他看/听完描述,马上就找到了最可疑之处 – MySQL驱动

的确,书中在创建数据库连接时,并没提到相关概念,但我之前再根据廖雪峰网站学MySQL操作时,是有这个步骤的

然后根据这个思路进行操作

  • 安装MySQL驱动(我升级过Python,所以要再装一遍)
    本想安装MySQL官方驱动mysql-connector-python的,然而官方目前只支持到3.4
    我又懒,所以就用了另一个驱动mysql-connector,也不知道是谁开发的……

  • 修改代码,把

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:password@localhost/database'

改成

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+mysqlconnector://root:password@localhost/database'

至此,上述1366报错信息消失!

我推测是因为SQLAlchemy使用了默认的数据库驱动(按官方文档,是mysql-python)有问题,才导致此问题。

还望看到此文章的大神能验证一下我的说法。

sqlalchemy增删改查及关系使用介绍

1. SQLAlchemy的作用

ORM对象关系映射技术

2. SQLAlchemy安装

pip install SQLAlchemy

查看SQLAlchemy版本

3. 声明模型Model

from sqlalchemy import Column,String,Integer
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__='user'
    id=Column(Integer,primary_key=True)
    name=Column(String(50))

基类Base,是一个model和数据库表管理类。
通过继承Base,可自动实现model和数据库表的关联。

4. 创建数据库表

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from user import User
if __name__ == "__main__":
    engine=create_engine("postgresql://postgres:sj1107@localhost:5432/sampledb")
    Session=sessionmaker(bind=engine)
    session=Session()
    User.metadata.create_all(engine)

engine:数据库连接引擎
Session:一个持久的数据库连接会话
User.metadata.create_all(engine):创建表

5. 增删改查

session是关系型数据库中的事务。

5.1 增加记录

user=User(name="shijingjing07")
session.add(user)
session.commit()

必须commit,才能真正写入数据库

5.2 删除记录

usr=session.query(User).first()
session.delete(usr)
session.commit()

5.3 更新记录

usr=session.query(User).first()
usr.name="icefinger"
session.add(usr)
session.commit()

5.4 查询记录

过滤器:

#==
usr=session.query(User).filter(User.name=="icefinger").first()
print(usr.id)
#!=
usr=session.query(User).filter(User.name!="icefinger").first()
print(usr.id)
#like
usr=session.query(User).filter(User.name.like("icefinger%")).first()
print(usr.id)
#in
usr=session.query(User).filter(User.name.in_(["icefinger","tomcat","james"])).first()
print(usr.id)
#not in
usr=session.query(User).filter(~User.name.in_(["icefinger","tomcat","james"])).first()
print(usr.id)
#is null
usr=session.query(User).filter(User.name==None).first()
print(usr.id)
usr=session.query(User).filter(User.name.is_(None)).first()
print(usr.id)
#and
from sqlalchemy import and_
usr=session.query(User).filter(and_(User.name=="icefinger",User.id=="2")).first()
print(usr.id)
#or
from sqlalchemy import or_
usr=session.query(User).filter(or_(User.name=="icefinger",User.id=="3")).first()
print(usr.id)

返回值:

#first,使用limit返回第一行
print("--first--")
usr=session.query(User).filter(or_(User.name=="icefinger",User.id=="3")).first()
print(usr.id)
#all,返回所有行
print("--all--")
usrlist=session.query(User).filter(or_(User.name=="icefinger",User.id=="3")).all()
for usr in usrlist:
    print(usr.id)
#one,返回行数只能是一条
print("--one--")
try:
    usr = session.query(User).filter(or_(User.name == "icefinger", User.id == "3")).one()
    print(usr)
except:
    print("must be one")
#one_on_none,返回行数只能是一条,或none
print("--one_or_none--")
usr = session.query(User).filter(and_(User.name == "icefinger", User.id == "2")).one_or_none()
print(usr)
#scalar,同one_on_none,返回行数只能是一条,或none
print("--scalar--")
usr = session.query(User).filter(or_(User.name == "icefinger", User.id == "2")).scalar()
print(usr)

运行结果:

未分类

统计个数:

print("--count1--")
count=session.query(User).count()
print(count)
print("--count2--")
count = session.query(func.count('*')).select_from(User).scalar()
print(count)

6. 关系-一对多

6.1 如下图所示,一个用户可能对应多个地址

from sqlalchemy import create_engine,and_,or_,func
from sqlalchemy import Table,Column,String,Integer,ForeignKey
from sqlalchemy.orm import relationship,sessionmaker
from sqlalchemy.ext.declarative import declarative_base
engine=create_engine("postgresql://postgres:sj1107@localhost:5432/sampledb")
Session=sessionmaker(bind=engine)
session = Session()
Base = declarative_base(bind=engine)
class User(Base):
    __tablename__='user'
    id=Column(Integer,primary_key=True)
    addresses=relationship('Address')
class Address(Base):
    __tablename__='address'
    id=Column(Integer,primary_key=True)
    user_id=Column(Integer,ForeignKey('user.id'))

if __name__ == "__main__":
    Base.metadata.create_all()
    u=User()
    session.add(u)
    session.commit()
    a1=Address(user_id=u.id)
    a2=Address(user_id=u.id)
    session.add(a1)
    session.add(a2)
    session.commit()
    print(u.addresses)

运行结果:

未分类

ForeignKey:外键,制定了user_id和User的关系
relationship:绑定了两个Model的联系,通过User直接得到所有的地址。

6.2 根据address获取user

address只能获得user_id,然后根据user_id获取user
能不能通过address直接获取user呢?在model里,添加relationship关系就可以了。

class User(Base):
    __tablename__='user'
    id=Column(Integer,primary_key=True)
    addresses=relationship('Address')
class Address(Base):
    __tablename__='address'
    id=Column(Integer,primary_key=True)
    user_id=Column(Integer,ForeignKey('user.id'))
    user=relationship('User')

运行结果:

未分类

6.3 上例中两个model中都添加relationship,看起来很繁琐,能不能只指定一个,另一个默认就可以访问呢?

backref参数就可以了。

class User(Base):
    __tablename__='user'
    id=Column(Integer,primary_key=True)
    addresses=relationship('Address',backref="user")
class Address(Base):
    __tablename__='address'
    id=Column(Integer,primary_key=True)
    user_id=Column(Integer,ForeignKey('user.id'))

运行结果:

未分类

7. 关系-多对多

user和address关系为多对多,即一个user对应多个address,一个address对应多个user
多对多需要中间表来关联

#定义中间表,关联多对多关系
user_address_table =Table(
    'user_address',Base.metadata,
    Column('user_id',Integer,ForeignKey('user.id')),
    Column('address_id',Integer,ForeignKey('address.id'))
)
class User(Base):
    __tablename__='user'
    id=Column(Integer,primary_key=True)
    addresses=relationship('Address',secondary=user_address_table)
class Address(Base):
    __tablename__='address'
    id=Column(Integer,primary_key=True)
    users=relationship('User',secondary=user_address_table)

if __name__ == "__main__":
    # Base.metadata.create_all()
    u1=User()
    u2=User()
    session.add(u1)
    session.add(u2)
    session.commit()
    a1=Address(users=[u1,u2])
    a2 = Address(users=[u1, u2])
    session.add(a1)
    session.add(a2)
    session.commit()
    print(u1.addresses)
    print(a1.users)
    session.delete(u1)
    print(a1.users)

运行结果:

未分类

python使用sqlalchemy连接mysql数据库

sqlalchemy是python当中比较出名的orm程序。

什么是orm?

orm英文全称object relational mapping,就是对象映射关系程序,简单来说我们类似python这种面向对象的程序来说一切皆对象,但是我们使用的数据库却都是关系型的,为了保证一致的使用习惯,通过orm将编程语言的对象模型和数据库的关系模型建立映射关系,这样我们在使用编程语言对数据库进行操作的时候可以直接使用编程语言的对象模型进行操作就可以了,而不用直接使用sql语言。

什么是sqlalchemy?

sqlalchemy是python的orm程序,在整个python界当中相当出名。

安装sqlalchemy

在使用sqlalchemy之前要先给python安装mysql驱动,由于我使用的是python3原来的mysqldb不可用,所以这里推荐使用pymysql。
我们通过pip进行安装,在windows下使用pip安装包的时候要记得使用管理员身份运行cmd不然有些操作是无法进行的。

pip install pymysql

安装完以后安装再安装sqlalchemy

pip install sqlalchemy

如何使用sqlalchemy连接mysql?

通过import导入必要的包

from sqlalchemy import create_engine,Table,Column,Integer,String,MetaData,ForeignKey

创建一个连接引擎

engine=create_engine("mysql+pymysql://root:a5230411@localhost:3306/test",echo=True)

我们将连接引擎放到engine里面方便后面使用。
create_engine(“数据库类型+数据库驱动://数据库用户名:数据库密码@IP地址:端口/数据库”,其他参数)
上文当中echo=True是开启调试,这样当我们执行文件的时候会提示相应的文字。

创建元数据

什么是元数据?元数据就是描述数据的数据,举个简单的例子,小明身高170cm,体重50kg,性别男。其中身高,体重,性别就是元数据。当我们创建好连接引擎以后可以通过这个引擎抓取元数据。

metadata=MetaData(engine)

通过MetaData()方法创建了metadata实例,在这个方法里面带上engine的目的是绑定要连接引擎,当我们对这个metadata实例进行操作的时候就会直接连接到数据库。

添加表结构

设定好连接引擎和元数据,让我们向mysql里面创建表结构来进行测试。

user=Table('user',metadata,
    Column('id',Integer,primary_key=True),
    Column('name',String(20)),
    Column('fullname',String(40)),
    )
address_table = Table('address', metadata,
    Column('id', Integer, primary_key=True),
    Column('user_id', None, ForeignKey('user.id')),
    Column('email', String(128), nullable=False)
    )

其中Table()方法用来创建表,第一个参数为表明,第二是存入元数据,后面的参数使用Column()方法将数据库当中每一个字段的数据参数设置好。

执行创建

metadata.create_all()

因为已将将表结构存到了metadata里面,然后让metadata执行create_all()方法,这样就向数据库里创建了user和address表。

完成代码

from sqlalchemy import create_engine,Table,Column,Integer,String,MetaData,ForeignKey
engine=create_engine("mysql+pymysql://root:a5230411@localhost:3306/test",echo=True)
metadata=MetaData(engine)

user=Table('user',metadata,
    Column('id',Integer,primary_key=True),
    Column('name',String(20)),
    Column('fullname',String(40)),
    )
address_table = Table('address', metadata,
    Column('id', Integer, primary_key=True),
    Column('user_id', None, ForeignKey('user.id')),
    Column('email', String(128), nullable=False)
    )

metadata.create_all()