@nemos
        
        2017-05-05T15:04:03.000000Z
        字数 4835
        阅读 1318
    py
python的orm框架
这里以mysql为例子
import sqlalchemyimport sqlalchemy.ormimport sqlalchemy.ext.declarativeengine = sqlalchemy.create_engine("mysql+pymysql://username:password@hostname/dbname?charset=utf8", encoding="utf8", echo=True)# 直接使用sql的方式with engine.connect() as conn:result = conn.execute("select * from tablename limit 10;")for item in result:print(item)# 使用事务可以进行批量提交和回滚trans = conn.begin()try:conn.execute("insert into tablename(id, url, title) values(%s, %s, %s)", [(4, "url4", "title4"), (5, "url5", "title5")])trans.commit()except Exception as excep:trans.rollback()raisetrans.close()# 首先需要生成一个BaseModel类,作为所有模型类的基类BaseModel = sqlalchemy.ext.declarative.declarative_base()# 定义modelclass User(BaseModel):__tablename__ = "Users" # 表名__table_args__ = {"mysql_engine": "InnoDB", # 表的引擎"mysql_charset": "utf8", # 表的编码格式}# 表结构id = sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True)name = sqlalchemy.Column("name", sqlalchemy.String(50), nullable=False)age = sqlalchemy.Column("age", sqlalchemy.Integer, default=0)# 创建所有表,如果表已经存在,则不会创建BaseModel.metadata.create_all(engine)# 利用Session对象连接数据库DBSessinon = sqlalchemy.orm.sessionmaker(bind=engine) # 创建会话类session = DBSessinon() # 创建会话对象try:# 插入数据,这里的一个实例只插入一次,第二次插入不生效session.add(Role(id=1, name="student"))session.add(Role(id=2, name="teacher"))# 不commit() 不会应用到数据库中session.commit()except Exception as excep:session.rollback()raise# 使用完记得关闭session.close()
# 清空数据,不需要commit操作session.query(User).filter(User.id != -1).delete()session.query(Role).filter(Role.id != -1).delete()# 删除数据的另外一种形式:session.delete()# 删除所有表BaseModel.metadata.drop_all(engine)# 修改数据user.name = "Allen"session.merge(user) #使用merge方法,如果存在则修改,如果不存在则插入session.query(User).filter(User.id == user.id).update({User.name: "Allen"}) #使用update方法session.query(User).filter(User.id == user.id).update({User.age: User.age + 1}) #使用update方法,自增操作# 查询数据roles = session.query(Role) #返回全部结果for role in roles:print("Role:", role.id, role.name)# 其他获取数据的方式print("get(id):", session.query(User).get(1)) #返回结果集中id为1的项print("get[1:3]:", session.query(User)[1:3]) #返回结果集中的第2-3项
各种查询的构造方式
# 其他高级查询,这里以Users表为例# 条件查询 数据选取操作users = session.query(User).filter(User.id > 6) # 条件查询users = session.query(User).filter(User.id > 6).all() # 返回查询的全部数据user = session.query(User).filter(User.id > 6).first() # 返回查询数据的第一项users = session.query(User).filter(User.id > 6).limit(10) # 返回最多10条数据users = session.query(User).filter(User.id > 6).offset(2) # 从第3条数据开始返回# 条件查询,数据逻辑操作users = session.query(User).filter(User.id > 6, User.name == "Kobe") # and操作users = session.query(User).filter(User.id > 6).filter(User.name == "Kobe") # and操作users = session.query(User).filter(sqlalchemy.or_(User.id > 6, User.name == "Kobe")) # or操作users = session.query(User).filter(User.id.in_((1, 2))) # in操作users = session.query(User).filter(sqlalchemy.not_(User.name)) # not操作user_count = session.query(User.id).count() # 统计全部user的数量user_count = session.query(sqlalchemy.func.count(User.id)).scalar() # scalar操作返回第一行数据的第一个字段session.query(sqlalchemy.func.count("*")).select_from(User).scalar() # scalar操作返回第一行数据的第一个字段session.query(sqlalchemy.func.count(1)).select_from(User).scalar() # scalar操作返回第一行数据的第一个字段session.query(sqlalchemy.func.count(User.id)).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表session.query(sqlalchemy.func.sum(User.age)).scalar() # 求和运算,运用scalar函数session.query(sqlalchemy.func.avg(User.age)).scalar() # 求均值运算,运用scalar函数session.query(sqlalchemy.func.md5(User.name)).filter(User.id == 1).scalar() # 运用md5函数users = session.query(sqlalchemy.distinct(User.name)) # 去重查询,根据name进行去重users = session.query(User).order_by(User.name) # 排序查询,正序查询users = session.query(User).order_by(User.name.desc()) # 排序查询,倒序查询users = session.query(User).order_by(sqlalchemy.desc(User.name)) # 排序查询,倒序查询的另外一种形式users = session.query(User.id, User.name) # 只查询部分属性users = session.query(User.name.label("user_name")) # 结果集的列取别名for user in users:print("label test:", user.user_name) # 这里使用别名users = session.query(sqlalchemy.func.count(User.name).label("count"), User.age).group_by(User.age) # 分组查询for user in users:print("age:{0}, count:{1}".format(user.age, user.count))# 多表查询result = session.query(User, Role).filter(User.role_id == Role.id)for user, role in result:print("user %s's role is %s" % (user.name, role.name))users = session.query(User).join(Role, User.role_id == Role.id)for user in users:print("user join, name:", user.name)# 关联属性的用法roles = session.query(Role)for role in roles:print("role:%s users:" % role.name)for user in role.users:print("\t%s" % user.name)users = session.query(User)for user in users:print("user %s's role is %s" % (user.name, user.role.name))
import sqlalchemy.pool as poolimport pymysqlconnpool = pool.QueuePool(lambda: pymysql.connect(**cacheconf.DBCONF),pool_size=5,max_overflow=10)conn = mypool.connect()cursor = conn.cursor()cursor.execute("select foo")conn.close()
每次使用向一个类申请即可
from sqlalchemy.orm import scoped_sessionfrom sqlalchemy.orm import sessionmakersession_factory = sessionmaker(bind=some_engine)Sessiondb = scoped_session(session_factory)## 申请使用 ##session = Sessiondb()session.add()session.commit()