[关闭]
@zhengyuhong 2015-06-05T10:07:15.000000Z 字数 5999 阅读 2702

Linux下 SQLAlchemy 连接 MySQL

db MySQL SQLAlchemy


依赖项

  SQLAlchemy的依赖项:MySQLdb(又叫mysql-python)
  如果安装过程中出现如下错误

  1.  raise EnvironmentError("%s not found" % (mysql_config.path,))
  1. #原因是没有安装:libmysqlclient-dev
  2. sudo apt-get install libmysqlclient-dev
  3. sudo updatedb
  4. locate mysql_config

连接MySQL数据库

  1. from sqlalchemy import create_engine
  2. engine = create_engine('mysql://user:passwd@host/db')
  3. #passwd是数据库db的授权口令,非用户密码

  create_engine使用MySQLdb(mysql-python)作为连接MySQL的默认API,可以指定其他,如MySQL-connector-python(MySQL官方API),OurSQL, 显式指定方式如下:

  1. # default
  2. engine = create_engine('mysql://user:passwd@host/db')
  3. # mysql-python
  4. engine = create_engine('mysql+mysqldb://user:passwd@host/db')
  5. # MySQL-connector-python
  6. engine = create_engine('mysql+mysqlconnector://user:passwd@host/db')
  7. # OurSQL
  8. engine = create_engine('mysql+oursql://user:passwd@host/db')

使用SQL语句直接操作数据库

  连接上数据库之后,可以使用连接引擎直接用SQL语句操作数据库:

  1. #假如在MySQL中的zhengyuhongDB数据库已有表student,列为id,name,score
  2. cmd = 'SELECT * from student where id = 0'
  3. ret = e.execute(cmd)
  4. for r in ret:
  5. print r ## 输出id为0的行

  如果需要执行数据库事务时,如下:

  1. conn = e.connect()
  2. conn.begin()
  3. # 执行数据库事务
  4. try:
  5. conn.commit() #
  6. except:
  7. conn.rollback()

使用python对象管理数据库

Table

  1. from sqlalchemy import Table, MetaData,Column,Integer,String, text
  2. meta = MetaData(bind=e, reflect=True)
  3. #meta是连接的数据库的元信息,print一下就可以看到
  4. print meta.tables #可以查看到连接的数据库的表以及表的列属性
  5. #使用已有的表
  6. student = meta.tables['student'] #或者
  7. student = Table('student', metadata, autoload=True)
  8. #创建新表
  9. teacher = Table('teacher',meta,Column('teacher_id',Integer,primary_key=True),Column('name',String),Column('salary',Integer))

Table.select

  1. print student.columns ##输出表的列名
  2. s = student.select(whereclause=None, **params)
  3. s = student.select(student.id == 0)

Table.delete

  1. s = stduent.delete(whereclause=None, **kwargs)

Table.insert

  1. s = student.insert(values=None, inline=False, **kwargs)
  2. s = student.insert(('3','Kim','99'))
  3. s = student.insert(('3','Kim'))
  4. s = student.insert().values(id='3', score='96')

Table.update

  1. s = student.update(whereclause=None, values=None, inline=False, **kwargs)
  2. s = student.update().where(student.c.id==7).values(name='Seven')

Table.join

  1. s = student.join(right, onclause=None, isouter=False)

having

  1. s = student.select().having(text('student.c.id > 2')) ##仅仅举例having用法,在SQL在应该与group by 结合,having用在组,where用在行
  2. #均是返回一个SQL语句类,括号里面就是语句的参数条件,再执行s.execute(),每一条语句执行一次,再如下就是一个查询例子

execute example

  1. for r in student.select(student.id == 0).execute():
  2. print r

select

  1. from sqlalchemy import select,insert,update,delete,and_,or_,not_,in_
  2. select(columns=None, whereclause=None, from_obj=None, distinct=False, having=None, correlate=True, prefixes=None, suffixes=None, **kwargs)
  3. s = select([student]) ##全部选定
  4. s = select([student.c.id, student.c.name],()) ##部分选定
  5. s = select([student,teacher]) ##两个表
  6. s = select([student.c.id,teacher.c.name]) ## 两个表部分选定,多个表同理
  7. s = select([product_a.c.price,product_b.c.price])
  8. s = select([tableOne, tableTwo], tableOne.c.id==tableTwo.c.user_id)
  9. s = select([users_table], users_table.c.id > 3)
  10. s = select(and_(users_table.c.name=="Martha", users_table.c.age < 25))
  11. s = select([student]).order_by(student.c.id)

insert

  1. insert(table, values=None, inline=False, bind=None, prefixes=None, returning=None, return_defaults=False, **dialect_kw)

update

  1. update(table, whereclause=None, values=None, inline=False, bind=None, prefixes=None, returning=None, return_defaults=False, **dialect_kw)
  2. s = update(student, student.c.name=='Jim').values(name='Jimmy')

delete

  1. delete(table, whereclause=None, bind=None, returning=None, prefixes=None, **dialect_kw)
  2. s = delete(student, student.c.name=='Jimmy')

join

  1. join(left, right, onclause=None, isouter=False)
  2. j = join(user_table, address_table,
  3. user_table.c.id == address_table.c.user_id)

  数据库最重要就是这四个语句,毕竟在编程层次多数都是对单数据库调用,关于触发器,外键设定等等这些应该是DBA在建表时做好的,编程阶段尽量时应用数据库数据,而非过多涉及数据库细节。上面的方法我个人比较喜欢使用Table类的方法,更具有面向对象,而后部分的更倾向于面向过程。关于Table类可以参考官方Table文档

使用 ORM

sessionmaker

  1. from sqlalchemy.orm import sessionmaker,mapper
  2. DBSession = sessionmaker(autocommit=False, autoflush=False,bind=engine))
  3. session = DBSession()

mapper

  1. class student_m(object):
  2. def __init__(self, id,name,score):
  3. self.id = id
  4. self.name = name
  5. self.score = score
  6. def __repr__(self):
  7. return '%s(%r,%r,%r)' % (self.__class__.__name__,self.id,self.name,self.score)
  8. ##还可以定义其他便于编程的方法
  9. mapper(student_m, meta.tables['student']) # 将table映射到object

session.add

  1. kim = student_mapper('8','kim','93')
  2. session.add(kim)

session.flush

  1. session.flush() ## Flush all the object changes to the database

session.rollback

  1. session.rollback()回滚到上一次的commit之后的状态

session.commit()

  1. session.commit()

原子事务的使用

  1. session.begin() # begin(subtransactions=False, nested=False)
  2. #work work work
  3. session.commit() # Flush pending changes and commit the current transaction.

session.execute(s)

  1. session.execute(s)
  2. 这里可以结合上面的Table类面向对象、面向过程方法

session.query

  1. session.query()
  2. q = session.query(student_m,teacher_m) #student_m teacher_m 是两个mapper 到Table的对象
  3. q = session.query(student_m.name,student_m.name) # 类似上面的select,只是不需要加入[]

Query.all

  1. q.all() #上面只是产生检索语句,还没真正执行,all返回检索全部结果每一行以__repr__中的定义格式返回

Query.count

  1. q.count() # 返回结果行数

Query.filter(*criterion)

  criterion 就是python真值表达式如

  1. student_m.id != '1' and student_m.name != 'Jim'
  1. q = session.query(student_m)
  2. ret = q.filter(student_m.id == '1' and ).all()

Query.delete(synchronize_session='evaluate')

  1. q.delete() #将检索出来的结果删除
  2. session.query(student_m).filter(student_m.id == '1').delete()
  3. #在commit之后生效

Query.update(values, synchronize_session='evaluate', update_args=None)

  1. session.query(student_m).filter(student_m.name== 'Jim').update({'score': '100'}, synchronize_session='evaluate')
  2. ##values就是一个字典,very pythonic

Query.select_from(*from_obj)

  1. q = session.query(Address_m).select_from(User_m).join(User_m.addresses).filter(User_m.name == 'ed')

Query.union(*q)

  1. q1 = session.query(school_a_student_m).filter(school_a_student_m.score > 90)
  2. q2 = session.query(school_b_student_m).filter(school_b_student_m.score > 90)
  3. q3 = q1.union(q2)
  4. # union对两张表的操作是合并数据条数,等于是纵向的,就是两个表的列属性必须相同。上面就是合并两校学生分数大于90分的数据

Query.order_by(*criterion)

Query.group_by(*criterion)

Query.having(criterion)

  1. q = session.query(User_m.id).\
  2. join(User_m.addresses).\
  3. group_by(User_m.id).\
  4. having(func.count(Address_m.id) > 2)

Query.limit(limit)

  限制返回个数

  1. ret = session.query(school_a_student_m).order_by(student_m.score).limit(5).all() ##默认升序
  2. ret = session.query(school_a_student_m).order_by(student_m.score.desc()).limit(5).all()

  简单说,session.flush之后你才能在这个session中看到效果,而session.commit之后你才能从其它session中看到效果。ps前面的execute会自动commit
  flush与commit区别
  SQLAlchemy tutorial: how to start
  官方文档更靠谱

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注