ORM之SQLAlchemy


SQLAlchemy,ORM框架

  • 类 --> 表
  • 对象 --> 一行数据
  • 类的字段 --> 数据库表的一列

作用:帮助我们使用类和对象快速实现数据库操作

数据库连接

from sqlalchemy import create_engine
engine=create_engine('mysql+pymysql://username:password@hostname:port/dbname', echo=True) #echo=True 打印sql语句信息

# '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'
# 常用的
engine = create_engine('sqlite:///:memory:', echo=True)   # sqlite内存
engine = create_engine('sqlite:///./cnblogblog.db',echo=True) # sqlite文件
engine = create_engine("mysql+pymysql://username:password@hostname:port/dbname",echo=True) # mysql+pymysql
engine = create_engine('mssql+pymssql://username:password@hostname:port/dbname',echo=True) # mssql+pymssql
engine = create_engine('postgresql://scott:tiger@hostname:5432/dbname') # postgresql示例
engine = create_engine('oracle://scott:tiger@hostname:1521/sidname') # oracle
engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname') #pdb就可以用tns连接
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base 
engine = create_engine('oracle://spark:a@orclpdb',echo=True) #echo要求打印sql语句等调试信息
session_maker = sessionmaker(bind=engine)
session = session_maker()
Base = declarative_base()
#对应一张表
class Student(Base): 
  __tablename__ = 'STUDENT'
  id = Column('STUID', Integer, primary_key=True)
  name = Column('STUNAME', String(32), nullable=False)
  age = Column('STUAGE', Integer)
  def __repr__(self):
    return '' % (self.id, self.name, self.age)
Base.metadata.create_all(engine) #若存在STUDENT表则不做,不存在则创建。
queryObject = session.query(Student).order_by(Student.id.desc())
for ins in queryObject:
  print(ins.id, ins.name, ins.age)
'''
4 hey 24
3 lwtxxs 27
2 gyb 89
1 ns 23
'''
demo

单表操作

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer,String,Text,Date,DateTime
from sqlalchemy import create_engine


Base = declarative_base()


class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)      # 主键,默认自增
    name = Column(String(32), index=True, nullable=False)   # 设置索引, 非空
    depart_id = Column(Integer)


def create_all():
    engine = create_engine(
        "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine)


def drop_all():
    engine = create_engine(
        "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    # drop_all()
    create_all()
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users

engine = create_engine(
        "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

# 根据Users类对users表进行增删改查
session = SessionFactory()

# 1. 增加
# obj = Users(name='zhangyafeo')
# session.add(obj)
# session.commit()

# session.add_all([
#         Users(name='姚明'),
#         Users(name='科比')
# ])
# session.commit()

# 2. 查
# result = session.query(Users).all()
# for row in result:
#         print(row.id,row.name)

# result = session.query(Users).filter(Users.id >= 2)
# for row in result:
#         print(row.id,row.name)

# result = session.query(Users).filter(Users.id >= 2).first()
# print(result)

# 3.删
# session.query(Users).filter(Users.id >= 2).delete()
# session.commit()

# 4.改
# session.query(Users).filter(Users.id == 1).update({Users.depart_id: 1})
# session.query(Users).filter(Users.id == 2).update({'name':'姚明'})
# session.query(Users).filter(Users.id == 3).update({'name':Users.name+"NB"},synchronize_session=False)  # 字符串的增加
# session.commit()



session.close()
行操作示例
# 添加
session.add(对象)
session.add_all([
    对象1,
    对象2
])
session.commit()

# 查询
session.query(Users).all()
session.query(Users).filter(Users.id>4)

# 删除
session.query(Users).filter(Users.id>4).delete()

# 修改
session.query(Users).filter(Users.id>4).update({Users.age:19})
基本的增删改查
# 1. 指定列
# select id,name as cname from users;
# result = session.query(Users.id,Users.name.label('cname')).all()
# for item in result:
#         print(item[0],item.id,item.cname)
# 查看sql语句
# query = session.query(Users.id,Users.name.label('cname'))
# print(query)
# 2. 默认条件and
# session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
# 3. between
# session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
# 4. in
# session.query(Users).filter(Users.id.in_([1,3,4])).all()
# session.query(Users).filter(~Users.id.in_([1,3,4])).all()
# 5. 子查询
# session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all()
# 6. and 和 or
# from sqlalchemy import and_, or_
# session.query(Users).filter(Users.id > 3, Users.name == 'eric').all()
# session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
# session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
# session.query(Users).filter(
#     or_(
#         Users.id < 2,
#         and_(Users.name == 'eric', Users.id > 3),
#         Users.extra != ""
#     )).all()

# 7. filter_by
# session.query(Users).filter_by(name='alex').all()

# 8. 通配符
# ret = session.query(Users).filter(Users.name.like('e%')).all()
# ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 9. 切片
# result = session.query(Users)[1:2]

# 10.排序
# ret = session.query(Users).order_by(Users.name.desc()).all()
# ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 11. group by
from sqlalchemy.sql import func

# ret = session.query(
#         Users.depart_id,
#         func.count(Users.id),
# ).group_by(Users.depart_id).all()
# for item in ret:
#         print(item)
#
# from sqlalchemy.sql import func
#
# ret = session.query(
#         Users.depart_id,
#         func.count(Users.id),
# ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
# for item in ret:
#         print(item)

# 12.union 和 union all
"""
select id,name from users
UNION
select id,name from users;
"""
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union(q2).all()
#
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union_all(q2).all()
常用操作

limit分页

1.用offset()设置索引偏移量,limit()限制取出量
db.session.query(User.name).filter(User.email.like('%'+email+'%')).limit(page_size).offset((page_index-1)*page_size)
#filter语句后面可以跟order_by语句

2.用slice(偏移量,取出量)函数
db.session.query(User.name).filter(User.email.like('%'+email+'%')).slice((page_index - 1) * page_size, page_index * page_size)
#filter语句后面可以跟order_by语句
注释:此方法和第一种相同的效果。

因为:由一下内部方法可知,slice()函数第一个属性就是offset()函数值,第二个属性就是limit()函数值

3.用paginate(偏移量,取出量)函数,用于BaseQuery

user_obj=User.query.filter(User.email.like('%'+email+'%')).paginate(int(page_index), int(page_size),False)
#遍历时要加上items 
object_list =user_obj.items
 

4.filter中使用limit

db.session.query(User.name).filter(User.email.like('%'+email+'%') and limit (page_index - 1) * page_size, page_size)
#此处不能再跟order_by语句,否则报错
4种方式

一对多操作

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users, Depart

engine = create_engine(
        "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

session = SessionFactory()

############################一对多操作####################################

# 1.查询所有用户
ret = session.query(Users).all()
for row in ret:
    print(row.id, row.name, row.depart_id)

# 2.查询所有用户及所属部门名称
# ret1 = session.query(Users, Depart).join(Depart).all()
ret1 = session.query(Users.id, Users.name, Depart.title).join(Depart,Users.depart_id == Depart.id).all()
for row in ret1:
    # print(row)
    # print(row[0].id, row[0].name, row[0].depart_id, row[1].title)
    print(row.id, row.name, row.title)

# 3.relation字段:查询所有用户+所属部门名称
ret2 = session.query(Users).all()
for row in ret2:
    print(row.id, row.name, row.depart_id, row.dp.title)

# 4.relation:查询所有技术部的成员
ret3 = session.query(Depart).filter(Depart.title == '技术部').first()
for row in ret3.pers:
    print(row.id, row.name, ret3.title)

# 5.创建一个名称叫 IT部门,并在该部门中添加一个员工:浩南
# 方式一
# d1 = Depart(title='IT')
# session.add(d1)
# session.commit()
# print(d1.title, d1.id)
# u = Users(name='浩南',depart_id=d1.id)
# session.add(u)
# session.commit()

# 方式二
# u = Users(name='山鸡',dp=Depart(title='IT'))
# session.add(u)
# session.commit()
#


# 6.创建一个名称叫王者荣耀的部门,并在该部门中加入詹姆斯,科比,梅西
# d2 = Depart(title='王者荣耀')
# d2.pers = [Users(name='詹姆斯'), Users(name='科比'), Users(name='梅西')]
# session.add(d2)
# session.commit()

for row in session.query(Users).all():
    print(row.id, row.name, row.dp.title)

for row in session.query(Depart).all():
    print(row.title)
    for ret2 in row.pers:
        print('\t',ret2.name)

session.close()
FK操作

  多对多操作

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users, Depart, Student, Course, Student2Course

engine = create_engine(
        "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

session = SessionFactory()

############################多对多操作####################################

# 1.录入数据, 添加两个学生和班级
# session.add_all([
#     Student(name='大宝贝'),
#     Student(name='小哥哥'),
#     Course(title='生物'),
#     Course(title='体育'),
# ])
#
# session.commit()

# 2.为学生选课
# session.add_all([
#     Student2Course(student_id=1, course_id=1),
#     Student2Course(student_id=1, course_id=2),
#     Student2Course(student_id=2, course_id=1),
# ])
# session.commit()

# 3.三张表关联
ret = session.query(Student2Course.id, Student.name, Course.title).join(Student, Student2Course.student_id==Student.id).join(Course, Student2Course.course_id==Course.id, isouter=True).order_by(Student2Course.id.asc())
for row in ret:
    print(row)

# 4.大宝贝选的课
# ret = session.query(Student2Course.id, Student.name, Course.title).join(Student, Student2Course.student_id==Student.id).join(Course, Student2Course.course_id==Course.id).filter(Student.name=='大宝贝').all()
# print(ret)
#
# obj = session.query(Student).filter(Student.name=='大宝贝').first()
# for row in obj.course_list:
#     print(row.title)

# 5.选了生物的所有人
# objs = session.query(Course).filter(Course.title=='生物')
# print(objs)
# obj = session.query(Course).filter(Course.title=='生物').first()
# for row in obj.student_list:
#     print(row.name)

# 6.创建一个课程,创建2学生,两个学生选新创建的课程。
# obj = Course(title='英语')
# obj.student_list = [Student(name='浩哥'),Student(name='伙计')]
# session.add(obj)
# session.commit()


# students = session.query(Student).all()
# for row in students:
#     print(row.id, row.name)
#
# courses = session.query(Course).all()
# for row in courses:
#     print(row.id, row.title)
#
# stu_cous = session.query(Student2Course).all()
# for row in stu_cous:
#     print(row.id, row.student_id, row.course_id)

session.close()
多对多操作

  多线程连接

  from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

def task():
    # 去连接池中获取一个连接
    session = SessionFactory()

    ret = session.query(Student).all()

    # 将连接交还给连接池
    session.close()


from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
方式一
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)


def task():
    ret = session.query(Student).all()
    # 将连接交还给连接池
    session.remove()


from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
方式二(推荐,基于Threading.local实现)

  执行原生sql

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course

engine = create_engine(
    "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)


def task():
""""""
# 方式一:
"""
# 查询
# cursor = session.execute('select * from users')
# result = cursor.fetchall()

# 添加
cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'})
session.commit()
print(cursor.lastrowid)
"""
# 方式二:
"""
# conn = engine.raw_connection()
# cursor = conn.cursor()
# cursor.execute(
#     "select * from t1"
# )
# result = cursor.fetchall()
# cursor.close()
# conn.close()
"""

# 将连接交还给连接池
session.remove()


from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
执行原生sql