[关闭]
@nemos 2017-05-05T15:04:03.000000Z 字数 4835 阅读 1275

SQLAlchemy

py


python的orm框架

快速使用

各种连接的声明方式

这里以mysql为例子

  1. import sqlalchemy
  2. import sqlalchemy.orm
  3. import sqlalchemy.ext.declarative
  4. engine = sqlalchemy.create_engine("mysql+pymysql://username:password@hostname/dbname?charset=utf8", encoding="utf8", echo=True)
  5. # 直接使用sql的方式
  6. with engine.connect() as conn:
  7. result = conn.execute("select * from tablename limit 10;")
  8. for item in result:
  9. print(item)
  10. # 使用事务可以进行批量提交和回滚
  11. trans = conn.begin()
  12. try:
  13. conn.execute("insert into tablename(id, url, title) values(%s, %s, %s)", [(4, "url4", "title4"), (5, "url5", "title5")])
  14. trans.commit()
  15. except Exception as excep:
  16. trans.rollback()
  17. raise
  18. trans.close()
  19. # 首先需要生成一个BaseModel类,作为所有模型类的基类
  20. BaseModel = sqlalchemy.ext.declarative.declarative_base()
  21. # 定义model
  22. class User(BaseModel):
  23. __tablename__ = "Users" # 表名
  24. __table_args__ = {
  25. "mysql_engine": "InnoDB", # 表的引擎
  26. "mysql_charset": "utf8", # 表的编码格式
  27. }
  28. # 表结构
  29. id = sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True)
  30. name = sqlalchemy.Column("name", sqlalchemy.String(50), nullable=False)
  31. age = sqlalchemy.Column("age", sqlalchemy.Integer, default=0)
  32. # 创建所有表,如果表已经存在,则不会创建
  33. BaseModel.metadata.create_all(engine)
  34. # 利用Session对象连接数据库
  35. DBSessinon = sqlalchemy.orm.sessionmaker(bind=engine) # 创建会话类
  36. session = DBSessinon() # 创建会话对象
  37. try:
  38. # 插入数据,这里的一个实例只插入一次,第二次插入不生效
  39. session.add(Role(id=1, name="student"))
  40. session.add(Role(id=2, name="teacher"))
  41. # 不commit() 不会应用到数据库中
  42. session.commit()
  43. except Exception as excep:
  44. session.rollback()
  45. raise
  46. # 使用完记得关闭
  47. session.close()
  1. # 清空数据,不需要commit操作
  2. session.query(User).filter(User.id != -1).delete()
  3. session.query(Role).filter(Role.id != -1).delete()
  4. # 删除数据的另外一种形式:session.delete()
  5. # 删除所有表
  6. BaseModel.metadata.drop_all(engine)
  7. # 修改数据
  8. user.name = "Allen"
  9. session.merge(user) #使用merge方法,如果存在则修改,如果不存在则插入
  10. session.query(User).filter(User.id == user.id).update({User.name: "Allen"}) #使用update方法
  11. session.query(User).filter(User.id == user.id).update({User.age: User.age + 1}) #使用update方法,自增操作
  12. # 查询数据
  13. roles = session.query(Role) #返回全部结果
  14. for role in roles:
  15. print("Role:", role.id, role.name)
  16. # 其他获取数据的方式
  17. print("get(id):", session.query(User).get(1)) #返回结果集中id为1的项
  18. print("get[1:3]:", session.query(User)[1:3]) #返回结果集中的第2-3项

详细说明

各种查询的构造方式

  1. # 其他高级查询,这里以Users表为例
  2. # 条件查询 数据选取操作
  3. users = session.query(User).filter(User.id > 6) # 条件查询
  4. users = session.query(User).filter(User.id > 6).all() # 返回查询的全部数据
  5. user = session.query(User).filter(User.id > 6).first() # 返回查询数据的第一项
  6. users = session.query(User).filter(User.id > 6).limit(10) # 返回最多10条数据
  7. users = session.query(User).filter(User.id > 6).offset(2) # 从第3条数据开始返回
  8. # 条件查询,数据逻辑操作
  9. users = session.query(User).filter(User.id > 6, User.name == "Kobe") # and操作
  10. users = session.query(User).filter(User.id > 6).filter(User.name == "Kobe") # and操作
  11. users = session.query(User).filter(sqlalchemy.or_(User.id > 6, User.name == "Kobe")) # or操作
  12. users = session.query(User).filter(User.id.in_((1, 2))) # in操作
  13. users = session.query(User).filter(sqlalchemy.not_(User.name)) # not操作
  14. user_count = session.query(User.id).count() # 统计全部user的数量
  15. user_count = session.query(sqlalchemy.func.count(User.id)).scalar() # scalar操作返回第一行数据的第一个字段
  16. session.query(sqlalchemy.func.count("*")).select_from(User).scalar() # scalar操作返回第一行数据的第一个字段
  17. session.query(sqlalchemy.func.count(1)).select_from(User).scalar() # scalar操作返回第一行数据的第一个字段
  18. session.query(sqlalchemy.func.count(User.id)).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表
  19. session.query(sqlalchemy.func.sum(User.age)).scalar() # 求和运算,运用scalar函数
  20. session.query(sqlalchemy.func.avg(User.age)).scalar() # 求均值运算,运用scalar函数
  21. session.query(sqlalchemy.func.md5(User.name)).filter(User.id == 1).scalar() # 运用md5函数
  22. users = session.query(sqlalchemy.distinct(User.name)) # 去重查询,根据name进行去重
  23. users = session.query(User).order_by(User.name) # 排序查询,正序查询
  24. users = session.query(User).order_by(User.name.desc()) # 排序查询,倒序查询
  25. users = session.query(User).order_by(sqlalchemy.desc(User.name)) # 排序查询,倒序查询的另外一种形式
  26. users = session.query(User.id, User.name) # 只查询部分属性
  27. users = session.query(User.name.label("user_name")) # 结果集的列取别名
  28. for user in users:
  29. print("label test:", user.user_name) # 这里使用别名
  30. users = session.query(sqlalchemy.func.count(User.name).label("count"), User.age).group_by(User.age) # 分组查询
  31. for user in users:
  32. print("age:{0}, count:{1}".format(user.age, user.count))
  33. # 多表查询
  34. result = session.query(User, Role).filter(User.role_id == Role.id)
  35. for user, role in result:
  36. print("user %s's role is %s" % (user.name, role.name))
  37. users = session.query(User).join(Role, User.role_id == Role.id)
  38. for user in users:
  39. print("user join, name:", user.name)
  40. # 关联属性的用法
  41. roles = session.query(Role)
  42. for role in roles:
  43. print("role:%s users:" % role.name)
  44. for user in role.users:
  45. print("\t%s" % user.name)
  46. users = session.query(User)
  47. for user in users:
  48. print("user %s's role is %s" % (user.name, user.role.name))

连接池

文档

  1. import sqlalchemy.pool as pool
  2. import pymysql
  3. connpool = pool.QueuePool(
  4. lambda: pymysql.connect(**cacheconf.DBCONF),
  5. pool_size=5,
  6. max_overflow=10
  7. )
  8. conn = mypool.connect()
  9. cursor = conn.cursor()
  10. cursor.execute("select foo")
  11. conn.close()

线程安全

每次使用向一个类申请即可

  1. from sqlalchemy.orm import scoped_session
  2. from sqlalchemy.orm import sessionmaker
  3. session_factory = sessionmaker(bind=some_engine)
  4. Sessiondb = scoped_session(session_factory)
  5. ## 申请使用 ##
  6. session = Sessiondb()
  7. session.add()
  8. session.commit()

参考

文档
代码来源


添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注