ORM之SQLAlchemy
SQLAlchemy,ORM框架
- 类 --> 表
- 对象 --> 一行数据
- 类的字段 --> 数据库表的一列
作用:帮助我们使用类和对象快速实现数据库操作
数据库连接
from sqlalchemy import create_engine engine=create_engine('mysql+pymysql://username:password@hostname:port/dbname', echo=True) #echo=True 打印sql语句信息 # '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名' # 常用的 engine = create_engine('sqlite:///:memory:', echo=True) # sqlite内存 engine = create_engine('sqlite:///./cnblogblog.db',echo=True) # sqlite文件 engine = create_engine("mysql+pymysql://username:password@hostname:port/dbname",echo=True) # mysql+pymysql engine = create_engine('mssql+pymssql://username:password@hostname:port/dbname',echo=True) # mssql+pymssql engine = create_engine('postgresql://scott:tiger@hostname:5432/dbname') # postgresql示例 engine = create_engine('oracle://scott:tiger@hostname:1521/sidname') # oracle engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname') #pdb就可以用tns连接
from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base engine = create_engine('oracle://spark:a@orclpdb',echo=True) #echo要求打印sql语句等调试信息 session_maker = sessionmaker(bind=engine) session = session_maker() Base = declarative_base() #对应一张表 class Student(Base): __tablename__ = 'STUDENT' id = Column('STUID', Integer, primary_key=True) name = Column('STUNAME', String(32), nullable=False) age = Column('STUAGE', Integer) def __repr__(self): return 'demo' % (self.id, self.name, self.age) Base.metadata.create_all(engine) #若存在STUDENT表则不做,不存在则创建。 queryObject = session.query(Student).order_by(Student.id.desc()) for ins in queryObject: print(ins.id, ins.name, ins.age) ''' 4 hey 24 3 lwtxxs 27 2 gyb 89 1 ns 23 '''
单表操作
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column from sqlalchemy import Integer,String,Text,Date,DateTime from sqlalchemy import create_engine Base = declarative_base() class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) # 主键,默认自增 name = Column(String(32), index=True, nullable=False) # 设置索引, 非空 depart_id = Column(Integer) def create_all(): engine = create_engine( "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): engine = create_engine( "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': # drop_all() create_all()表
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users engine = create_engine( "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根据Users类对users表进行增删改查 session = SessionFactory() # 1. 增加 # obj = Users(name='zhangyafeo') # session.add(obj) # session.commit() # session.add_all([ # Users(name='姚明'), # Users(name='科比') # ]) # session.commit() # 2. 查 # result = session.query(Users).all() # for row in result: # print(row.id,row.name) # result = session.query(Users).filter(Users.id >= 2) # for row in result: # print(row.id,row.name) # result = session.query(Users).filter(Users.id >= 2).first() # print(result) # 3.删 # session.query(Users).filter(Users.id >= 2).delete() # session.commit() # 4.改 # session.query(Users).filter(Users.id == 1).update({Users.depart_id: 1}) # session.query(Users).filter(Users.id == 2).update({'name':'姚明'}) # session.query(Users).filter(Users.id == 3).update({'name':Users.name+"NB"},synchronize_session=False) # 字符串的增加 # session.commit() session.close()行操作示例
# 添加 session.add(对象) session.add_all([ 对象1, 对象2 ]) session.commit() # 查询 session.query(Users).all() session.query(Users).filter(Users.id>4) # 删除 session.query(Users).filter(Users.id>4).delete() # 修改 session.query(Users).filter(Users.id>4).update({Users.age:19})基本的增删改查
# 1. 指定列 # select id,name as cname from users; # result = session.query(Users.id,Users.name.label('cname')).all() # for item in result: # print(item[0],item.id,item.cname) # 查看sql语句 # query = session.query(Users.id,Users.name.label('cname')) # print(query) # 2. 默认条件and # session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() # 3. between # session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() # 4. in # session.query(Users).filter(Users.id.in_([1,3,4])).all() # session.query(Users).filter(~Users.id.in_([1,3,4])).all() # 5. 子查询 # session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all() # 6. and 和 or # from sqlalchemy import and_, or_ # session.query(Users).filter(Users.id > 3, Users.name == 'eric').all() # session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() # session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() # session.query(Users).filter( # or_( # Users.id < 2, # and_(Users.name == 'eric', Users.id > 3), # Users.extra != "" # )).all() # 7. filter_by # session.query(Users).filter_by(name='alex').all() # 8. 通配符 # ret = session.query(Users).filter(Users.name.like('e%')).all() # ret = session.query(Users).filter(~Users.name.like('e%')).all() # 9. 切片 # result = session.query(Users)[1:2] # 10.排序 # ret = session.query(Users).order_by(Users.name.desc()).all() # ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 11. group by from sqlalchemy.sql import func # ret = session.query( # Users.depart_id, # func.count(Users.id), # ).group_by(Users.depart_id).all() # for item in ret: # print(item) # # from sqlalchemy.sql import func # # ret = session.query( # Users.depart_id, # func.count(Users.id), # ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all() # for item in ret: # print(item) # 12.union 和 union all """ select id,name from users UNION select id,name from users; """ # q1 = session.query(Users.name).filter(Users.id > 2) # q2 = session.query(Favor.caption).filter(Favor.nid < 2) # ret = q1.union(q2).all() # # q1 = session.query(Users.name).filter(Users.id > 2) # q2 = session.query(Favor.caption).filter(Favor.nid < 2) # ret = q1.union_all(q2).all()常用操作
limit分页
1.用offset()设置索引偏移量,limit()限制取出量 db.session.query(User.name).filter(User.email.like('%'+email+'%')).limit(page_size).offset((page_index-1)*page_size) #filter语句后面可以跟order_by语句 2.用slice(偏移量,取出量)函数 db.session.query(User.name).filter(User.email.like('%'+email+'%')).slice((page_index - 1) * page_size, page_index * page_size) #filter语句后面可以跟order_by语句 注释:此方法和第一种相同的效果。 因为:由一下内部方法可知,slice()函数第一个属性就是offset()函数值,第二个属性就是limit()函数值 3.用paginate(偏移量,取出量)函数,用于BaseQuery user_obj=User.query.filter(User.email.like('%'+email+'%')).paginate(int(page_index), int(page_size),False) #遍历时要加上items object_list =user_obj.items 4.filter中使用limit db.session.query(User.name).filter(User.email.like('%'+email+'%') and limit (page_index - 1) * page_size, page_size) #此处不能再跟order_by语句,否则报错4种方式
一对多操作
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users, Depart engine = create_engine( "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = SessionFactory() ############################一对多操作#################################### # 1.查询所有用户 ret = session.query(Users).all() for row in ret: print(row.id, row.name, row.depart_id) # 2.查询所有用户及所属部门名称 # ret1 = session.query(Users, Depart).join(Depart).all() ret1 = session.query(Users.id, Users.name, Depart.title).join(Depart,Users.depart_id == Depart.id).all() for row in ret1: # print(row) # print(row[0].id, row[0].name, row[0].depart_id, row[1].title) print(row.id, row.name, row.title) # 3.relation字段:查询所有用户+所属部门名称 ret2 = session.query(Users).all() for row in ret2: print(row.id, row.name, row.depart_id, row.dp.title) # 4.relation:查询所有技术部的成员 ret3 = session.query(Depart).filter(Depart.title == '技术部').first() for row in ret3.pers: print(row.id, row.name, ret3.title) # 5.创建一个名称叫 IT部门,并在该部门中添加一个员工:浩南 # 方式一 # d1 = Depart(title='IT') # session.add(d1) # session.commit() # print(d1.title, d1.id) # u = Users(name='浩南',depart_id=d1.id) # session.add(u) # session.commit() # 方式二 # u = Users(name='山鸡',dp=Depart(title='IT')) # session.add(u) # session.commit() # # 6.创建一个名称叫王者荣耀的部门,并在该部门中加入詹姆斯,科比,梅西 # d2 = Depart(title='王者荣耀') # d2.pers = [Users(name='詹姆斯'), Users(name='科比'), Users(name='梅西')] # session.add(d2) # session.commit() for row in session.query(Users).all(): print(row.id, row.name, row.dp.title) for row in session.query(Depart).all(): print(row.title) for ret2 in row.pers: print('\t',ret2.name) session.close()FK操作
多对多操作
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users, Depart, Student, Course, Student2Course engine = create_engine( "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = SessionFactory() ############################多对多操作#################################### # 1.录入数据, 添加两个学生和班级 # session.add_all([ # Student(name='大宝贝'), # Student(name='小哥哥'), # Course(title='生物'), # Course(title='体育'), # ]) # # session.commit() # 2.为学生选课 # session.add_all([ # Student2Course(student_id=1, course_id=1), # Student2Course(student_id=1, course_id=2), # Student2Course(student_id=2, course_id=1), # ]) # session.commit() # 3.三张表关联 ret = session.query(Student2Course.id, Student.name, Course.title).join(Student, Student2Course.student_id==Student.id).join(Course, Student2Course.course_id==Course.id, isouter=True).order_by(Student2Course.id.asc()) for row in ret: print(row) # 4.大宝贝选的课 # ret = session.query(Student2Course.id, Student.name, Course.title).join(Student, Student2Course.student_id==Student.id).join(Course, Student2Course.course_id==Course.id).filter(Student.name=='大宝贝').all() # print(ret) # # obj = session.query(Student).filter(Student.name=='大宝贝').first() # for row in obj.course_list: # print(row.title) # 5.选了生物的所有人 # objs = session.query(Course).filter(Course.title=='生物') # print(objs) # obj = session.query(Course).filter(Course.title=='生物').first() # for row in obj.student_list: # print(row.name) # 6.创建一个课程,创建2学生,两个学生选新创建的课程。 # obj = Course(title='英语') # obj.student_list = [Student(name='浩哥'),Student(name='伙计')] # session.add(obj) # session.commit() # students = session.query(Student).all() # for row in students: # print(row.id, row.name) # # courses = session.query(Course).all() # for row in courses: # print(row.id, row.title) # # stu_cous = session.query(Student2Course).all() # for row in stu_cous: # print(row.id, row.student_id, row.course_id) session.close()多对多操作
多线程连接
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) def task(): # 去连接池中获取一个连接 session = SessionFactory() ret = session.query(Student).all() # 将连接交还给连接池 session.close() from threading import Thread for i in range(20): t = Thread(target=task) t.start()方式一
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = scoped_session(SessionFactory) def task(): ret = session.query(Student).all() # 将连接交还给连接池 session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()方式二(推荐,基于Threading.local实现)
执行原生sql
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = scoped_session(SessionFactory) def task(): """""" # 方式一: """ # 查询 # cursor = session.execute('select * from users') # result = cursor.fetchall() # 添加 cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'}) session.commit() print(cursor.lastrowid) """ # 方式二: """ # conn = engine.raw_connection() # cursor = conn.cursor() # cursor.execute( # "select * from t1" # ) # result = cursor.fetchall() # cursor.close() # conn.close() """ # 将连接交还给连接池 session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()执行原生sql