[关闭]
@StrGlee 2016-10-19T14:13:17.000000Z 字数 3104 阅读 1469

flask-sqlalchemy

flask sqlalchemy

add

  1. user_dict = {
  2. 'username': 'strglee',
  3. 'sex': 'man',
  4. 'birthday': '1993',
  5. }
  6. user = UserModel(**user_dict)
  7. db.session.add(user)
  8. db.session.commit()

update

  1. user = UserModel.query.filter(UserModel.phonenum == '123456').one()
  2. if user:
  3. user.username = 'strglee'
  4. db.session.commit()
  5. UserModel.query.filter(UserModel.phonenum == '123456').update({'username': 'strglee'})
  6. session.query(JobModel).filter(JobModel.id.in_([1, 3, 5, 7, 9])).update(dict(local_job_id = 0,local_company_id=0),synchronize_session=False)

delete

  1. UserModel.query.filter(UserModel.phonenum == '123456').delete()

select

  1. UserModel.query.filter(UserModel.username=='strglee').one()
  2. UserModel.query.filter_by(username='strglee').one()
  3. UserModel.query.filter_by(username='strglee').first()
  4. UserModel.query.filter_by(username='strglee').first_or_404()

join 查询

  1. UserModel.query.join(AddressModel,AddressModel.user_id == UserModel.id).\
  2. add_columns(
  3. UserModel.username,
  4. AddressModel.city,
  5. AddressModel.address,
  6. UserModel.sex).\
  7. limit(10).all()

paginate

  1. paginate = UserModel.query.order_by(SeoModel.dt_update.desc()).paginate(page, 15, False)
  2. query4.filter(not_(User.name == None)).all() # not
  3. query4.filter(User.name != None).all()

func

  1. from sqlalchemy import func
  2. rows = session.query(Person).count()
  3. c.count = Session.query(func.count(Person.id)).scalar()
  4. c.avg = Session.query(func.avg(Person.id).label('average')).scalar()
  5. c.sum = Session.query(func.sum(Person.id).label('average')).scalar()
  6. c.max = Session.query(func.max(Person.id).label('average')).scalar()

group by

  1. c.coutg = Session.query(func.count(Person.id).label('count'), Person.name ).group_by(Person.name).all()

destinct

  1. from sqlalchemy import distinct
  2. # count distinct "name" values
  3. session.query(func.count(distinct(User.name)))

sqlalchemy

  1. #!/usr/bin/env python
  2. #-*- coding: utf-8 -*-
  3. from sqlalchemy import create_engine, Column, Integer, String, func
  4. from sqlalchemy.ext.declarative import declarative_base
  5. from sqlalchemy.orm import sessionmaker
  6. Base = declarative_base()
  7. class StudentInfo(Base):
  8. __tablename__ = 'stuinfo'
  9. id = Column(Integer, primary_key=True)
  10. name = Column(String)
  11. cls = Column(String)
  12. cert_count = Column(Integer)
  13. def __init__(self, name, cls, cert_count):
  14. self.name = name
  15. self.cls = cls
  16. self.cert_count = cert_count
  17. engine = create_engine('sqlite:///:memory:', echo=True)
  18. Base.metadata.create_all(engine)
  19. Session = sessionmaker(engine)
  20. session = Session()
  21. data = [
  22. [u'学生1', u'A2', 1],
  23. [u'学生1', u'A1', 4],
  24. [u'学生1', u'A2', 4],
  25. [u'学生1', u'A1', 1],
  26. [u'学生1', u'A1', 5],
  27. [u'学生1', u'A2', 1]]
  28. reocords = [StudentInfo(*record) for record in data]
  29. session.add_all(reocords)
  30. session.commit()

sqlalchemy way

  1. rs = session.query(StudentInfo.cls, func.sum(StudentInfo.cert_count)) \
  2. .group_by(StudentInfo.cls).all()
  3. for row in rs:
  4. print row[0], row[1]
  5. """ Output
  6. A1 10
  7. A2 6
  8. """

sql way

  1. sql = 'select cls, sum(cert_count) from stuinfo group by cls'
  2. rs = engine.execute(sql)
  3. for row in rs:
  4. print row[0], row[1]
  5. """ Output
  6. A1 10
  7. A2 6
  8. """

group by and order by

  1. jobs = session.query(JobModel.salary_start,func.count(JobModel.id).label('cnt')).filter(JobModel.id.between(1,10000)).group_by(JobModel.salary_start).order_by('cnt desc').all()
  2. jobs = session.query(JobModel.salary_start,func.count(JobModel.id).label('cnt')).filter(JobModel.id.between(1,10000)).group_by(JobModel.salary_start).order_by(desc('cnt')).all()
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注