@zwenqiang
2015-06-03T10:16:02.000000Z
字数 3300
阅读 4780
数据库
create_engine连接数据库
from sqlalchemy import create_engineconn = "mysql://root:root@localhost/dbname?charset=utf8"engine = create_engine(conn, echo=True) # 在console中输出SQL语句
create_engine方法返回的是一个Engine对象实例,Engine对象实例engine并没有立即连接数据库,只有在第一次有与数据库操作的任务时才会去连接数据库。
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKeymetadata = MetaData()
SQLalchemy 使用 ORM 来映射对象和表的关系。
表(Table)的集合以及与它们有关系的子类(child object) 称为数据库元数据(database metadata)
users = Table('user', metadata,Column('id', Integer, primary_key=True),Column('name', String(32), nullable=False),Column('gender', String(1), nullable=False, server_default='M'))metadata.create_all(engine)
创建好表对象之后,我们来创建表。
使用metadata的create_all方法来在数据库中创建表。create_all这个方法首先会在数据库中查找对应的表是否存在,存在则不创建,所以这个方法多次调用是安全的。
# Insert 构造对应insert语句ins = users.insert()# <sqlalchemy.sql.dml.Insert object at 0x1079a6290>print str(ins)# INSERT INTO "user" (id, name, gender) VALUES (:id, :name, :gender)ins = users.insert().values(name='jack', gender='M')# 也可以这样
执行插入操作
我们创建的engine对象是一个能执行SQL语句的数据库仓库。我们使用connect来创建一个连接.
conn = engine.connect()# 执行插入操作result = conn.execute(ins) # 执行SQL插入操作,返回一个ResultProxy objectprint result.rowcount # 显示插入条数print result.inserted_primary_key # 显示插入的id
conn.execute(users.insert(), [{'name' : 'Zs', 'gender': 'F'},{'name' : 'Ls', 'gender': 'M'},{'name' : 'Ww', 'gender': 'F'}])
select的SQL语句的方法是select()
from sqlalchemy import selects = select([users]) # select参数是一个列表r = conn.execute(s) # 返回的是一个ResultProxy objectall = conn.fetchall() # 返回一个含有所有RowProxy的列表print all[0].name, all[0].idprint all[0]['name'], all[0]['id']print all[0][0], all[1][1] # RowProxy 不支持赋值操作,可以强制转换为dict
条件查询
s = select(['user', 'address']).where(user.c.id == address.c.user_id)
连接词
from sqlalchemy import and_, or_, not_c = and_(users.c.name.like('j%'), users.c.gender == 'M', not_(users.c.id == 1)s = select([users.c.id, users.c.name]).where(c)
使用原生SQL语句
from sqlalchemy import texts = text("SELECT * FROM users WHERE users.gender = :gender and name = :name")result = conn.execute(s, gender='F', name='Z').fetchall()# 也可以在select()中使用text语句s = select([users.c.name]).where(text("users.id = 2"))result = conn.execute(s).fetchall()# 尽量少使用text
使用假名
u1 = users.alias()u2 = users.alias()
使用连接
print users.join(address) # 需要之前设置外键print users.join(address, users.c.id == address.c.user_id)s = select([users.c.name, address.c.email]).select_from(users.join(address, address.c.user_id == users.c.id)).where(address.c.email.like("%qq.com"))# 产生的SQL语句为:`SELECT "user".name, address.email \nFROM "user" JOIN address ON address.user_id = "user".id \nWHERE address.email LIKE :email_1`
function函数
from sqlalchemy.sql import funcs = select([func.max(users.c.id).label("max_id")]).where(user.c.name.like("Z%))
s = select([users]).order_by(users.c.id)s = select([users]).order_by(users.c.id.desc())
s = select([users]).having(func.length(users.c.name) > 4)s = select([users], func.sum(users.c.score).lable("score_all")).group_by(users.c.gender)
up = users.update().where(users.c.id == 1).values(name="testname")up = users.update().where(users.c.id == 1).values(name="prefix_" + users.c.name)# 使用bindparam来操作更新数据from sqlalchemy import bindparamstmt = users.update().where(users.c.id == bindparam("user_id")).values(name = bindparam("new_name"))s = conn.execute(stmt, user_id=1, new_name="hello")批量更新数据s = conn.execute(stmt, [{'user_id': 1, 'new_name': 'good'},{'user_id': 2, 'new_name': 'bad'}])
删除数据
d = users.delete()conn.execute(d) # 删除全部数据d = users.delete().where(users.c.gender == "M")