[关闭]
@zwenqiang 2015-06-03T10:16:02.000000Z 字数 3300 阅读 3989

SQLAlchemy(一)

数据库


使用create_engine连接数据库

  1. from sqlalchemy import create_engine
  2. conn = "mysql://root:root@localhost/dbname?charset=utf8"
  3. engine = create_engine(conn, echo=True) # 在console中输出SQL语句

create_engine方法返回的是一个Engine对象实例,Engine对象实例engine并没有立即连接数据库,只有在第一次有与数据库操作的任务时才会去连接数据库。

定义和创建表

  1. from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
  2. metadata = MetaData()

SQLalchemy 使用 ORM 来映射对象和表的关系。
表(Table)的集合以及与它们有关系的子类(child object) 称为数据库元数据(database metadata)

  1. users = Table('user', metadata,
  2. Column('id', Integer, primary_key=True),
  3. Column('name', String(32), nullable=False),
  4. Column('gender', String(1), nullable=False, server_default='M')
  5. )
  6. metadata.create_all(engine)

创建好表对象之后,我们来创建表。
使用metadatacreate_all方法来在数据库中创建表。create_all这个方法首先会在数据库中查找对应的表是否存在,存在则不创建,所以这个方法多次调用是安全的。

数据库操作

  1. # Insert 构造对应insert语句
  2. ins = users.insert()
  3. # <sqlalchemy.sql.dml.Insert object at 0x1079a6290>
  4. print str(ins)
  5. # INSERT INTO "user" (id, name, gender) VALUES (:id, :name, :gender)
  6. ins = users.insert().values(name='jack', gender='M')
  7. # 也可以这样

执行插入操作
我们创建的engine对象是一个能执行SQL语句的数据库仓库。我们使用connect来创建一个连接.

  1. conn = engine.connect()
  2. # 执行插入操作
  3. result = conn.execute(ins) # 执行SQL插入操作,返回一个ResultProxy object
  4. print result.rowcount # 显示插入条数
  5. print result.inserted_primary_key # 显示插入的id
  1. conn.execute(users.insert(), [
  2. {'name' : 'Zs', 'gender': 'F'},
  3. {'name' : 'Ls', 'gender': 'M'},
  4. {'name' : 'Ww', 'gender': 'F'}
  5. ])
  1. from sqlalchemy import select
  2. s = select([users]) # select参数是一个列表
  3. r = conn.execute(s) # 返回的是一个ResultProxy object
  4. all = conn.fetchall() # 返回一个含有所有RowProxy的列表
  5. print all[0].name, all[0].id
  6. print all[0]['name'], all[0]['id']
  7. print all[0][0], all[1][1] # RowProxy 不支持赋值操作,可以强制转换为dict

条件查询

  1. s = select(['user', 'address']).where(user.c.id == address.c.user_id)

连接词

  1. from sqlalchemy import and_, or_, not_
  2. c = and_(users.c.name.like('j%'), users.c.gender == 'M', not_(users.c.id == 1)
  3. s = select([users.c.id, users.c.name]).where(c)

使用原生SQL语句

  1. from sqlalchemy import text
  2. s = text"SELECT * FROM users WHERE users.gender = :gender and name = :name")
  3. result = conn.execute(s, gender='F', name='Z').fetchall()
  4. # 也可以在select()中使用text语句
  5. s = select([users.c.name]).where(text("users.id = 2"))
  6. result = conn.execute(s).fetchall()
  7. # 尽量少使用text

使用假名

  1. u1 = users.alias()
  2. u2 = users.alias()

使用连接

  1. print users.join(address) # 需要之前设置外键
  2. print users.join(address, users.c.id == address.c.user_id)
  3. s = select([users.c.name, address.c.email]).select_from(users.join(address, address.c.user_id == users.c.id)).where(address.c.email.like("%qq.com"))
  4. # 产生的SQL语句为:
  5. `SELECT "user".name, address.email \nFROM "user" JOIN address ON address.user_id = "user".id \nWHERE address.email LIKE :email_1`
  1. from sqlalchemy.sql import func
  2. s = select([func.max(users.c.id).label("max_id")]).where(user.c.name.like("Z%))
  1. s = select([users]).order_by(users.c.id)
  2. s = select([users]).order_by(users.c.id.desc())
  1. s = select([users]).having(func.length(users.c.name) > 4)
  2. s = select([users], func.sum(users.c.score).lable("score_all")).group_by(users.c.gender)

更新和删除

  1. up = users.update().where(users.c.id == 1).values(name="testname")
  2. up = users.update().where(users.c.id == 1).values(name="prefix_" + users.c.name)
  3. # 使用bindparam来操作更新数据
  4. from sqlalchemy import bindparam
  5. stmt = users.update().where(users.c.id == bindparam("user_id")).values(name = bindparam("new_name"))
  6. s = conn.execute(stmt, user_id=1, new_name="hello")
  7. 批量更新数据
  8. s = conn.execute(stmt, [
  9. {'user_id': 1, 'new_name': 'good'},
  10. {'user_id': 2, 'new_name': 'bad'}
  11. ])

删除数据

  1. d = users.delete()
  2. conn.execute(d) # 删除全部数据
  3. d = users.delete().where(users.c.gender == "M")
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注