[关闭]
@skyway 2016-08-25T09:30:59.000000Z 字数 31639 阅读 1551

【读书】Flask Web Development

title: 【读书】Flask Web Development
date: 2016-02-11 15:54:37
categories: 编程
Python flask


写在前面

之前把flask的网络教程看了一遍,但是那个教程相对于《Flask Web Development》属于入门级,像应用结构和数据库方面都相对简单,其中有些东西也没讲的太清楚,于是再来刷一遍,希望会有点收获!

第一章 安装

第二章 程序的基本结构

第三章 模板

第四章 表单

第五章 数据库

第六章 电子邮件

  1. #linux
  2. (venv) $ export MAIL_USERNAME=<Gmail username>
  3. (venv) $ export MAIL_PASSWORD=<Gmail password>
  4. #windows
  5. (venv) $ set MAIL_USERNAME=<Gmail username>
  6. (venv) $ set MAIL_PASSWORD=<Gmail password>
  1. from threading import Thread
  2. def send_async_email(app, msg):
  3. with app.app_context():
  4. mail.send(msg)
  5. def send_email(to, subject, template, **kwargs):
  6. msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + subject, sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to])
  7. msg.body = render_template(template + '.txt', **kwargs)
  8. msg.html = render_template(template + '.html', **kwargs)
  9. thr = Thread(target=send_async_email, args=[app, msg])
  10. thr.start()
  11. return thr

在不同线程中执行mail.send()函数时,程序上下文要使用app.app_context() 人工创建。

程序要发送大量电子邮件时,使用专门发送电子邮件的作业要比给每封邮件都新建一个线程更合适。例如,我们可以把执行send_async_email()函数的操作发给Celeryhttp://www.celeryproject.org/)任务队列。

第七章 大型程序结构

  1. |-flasky
  2. |-app/
  3. |-templates/
  4. |-static/
  5. |-main/
  6. |-__init__.py
  7. |-errors.py
  8. |-forms.py
  9. |-views.py
  10. |-__init__.py
  11. |-email.py
  12. |-models.py
  13. |-migrations/
  14. |-tests/
  15. |-__init__.py
  16. |-test*.py
  17. |-venv/
  18. |-requirements.txt
  19. |-config.py
  20. |-manage.py

* 配置选项

  1. # config.py
  2. import os
  3. basedir = os.path.abspath(os.path.dirname(__file__))
  4. class Config:
  5. SECRET_KEY = os.environ.get('SECRET_KEY') or 'hard to guess string'
  6. SQLALCHEMY_COMMIT_ON_TEARDOWN = True
  7. FLASKY_MAIL_SUBJECT_PREFIX = '[Flasky]'
  8. FLASKY_MAIL_SENDER = 'Flasky Admin <flasky@example.com>'
  9. FLASKY_ADMIN = os.environ.get('FLASKY_ADMIN')
  10. @staticmethod
  11. def init_app(app):
  12. pass
  13. class DevelopmentConfig(Config):
  14. DEBUG = True
  15. MAIL_SERVER = 'smtp.googlemail.com'
  16. MAIL_PORT = 587
  17. MAIL_USE_TLS = True
  18. MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
  19. MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
  20. SQLALCHEMY_DATABASE_URI = os.environ.get('DEV_DATABASE_URL') or \
  21. 'sqlite:///' + os.path.join(basedir, 'data-dev.sqlite')
  22. class TestingConfig(Config):
  23. TESTING = True
  24. SQLALCHEMY_DATABASE_URI = os.environ.get('TEST_DATABASE_URL') or \
  25. 'sqlite:///' + os.path.join(basedir, 'data-test.sqlite')
  26. class ProductionConfig(Config):
  27. SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
  28. 'sqlite:///' + os.path.join(basedir, 'data.sqlite')
  29. config = {
  30. 'development': DevelopmentConfig,
  31. 'testing': TestingConfig,
  32. 'production': ProductionConfig,
  33. 'default': DevelopmentConfig
  34. }
  1. # app/__init__.py
  2. from flask import Flask, render_template
  3. from flask.ext.bootstrap import Bootstrap
  4. from flask.ext.mail import Mail
  5. from flask.ext.moment import Moment
  6. from flask.ext.sqlalchemy import SQLAlchemy
  7. from config import config
  8. bootstrap = Bootstrap()
  9. mail = Mail()
  10. moment = Moment()
  11. db = SQLAlchemy()
  12. def create_app(config_name):
  13. app = Flask(__name__)
  14. app.config.from_object(config[config_name])
  15. config[config_name].init_app(app)
  16. bootstrap.init_app(app)
  17. mail.init_app(app)
  18. moment.init_app(app)
  19. db.init_app(app)
  20. # 附加路由和自定义的错误页面
  21. return app
  1. #app/main/__init__.py:创建蓝本
  2. from flask import Blueprint
  3. main = Blueprint('main', __name__)
  4. from . import views, errors
  5. #app/_init_.py:注册蓝本
  6. def create_app(config_name):
  7. # ...
  8. from .main import main as main_blueprint
  9. app.register_blueprint(main_blueprint)
  10. return app
  11. # app/main/errors.py:蓝本中的错误处理程序
  12. from flask import render_template
  13. from . import main
  14. @main.app_errorhandler(404)
  15. def page_not_found(e):
  16. return render_template('404.html'), 404
  17. @main.app_errorhandler(500)
  18. def internal_server_error(e):
  19. return render_template('500.html'), 500

如果使用errorhandler修饰器,那么只有蓝本中的错误才能触发处理程序。要想注册程序全局的错误处理程序,必须使用app_errorhandler。(url_for()中需使用:命名空间+视图函数,如main.index)
os.getenv()是os.environ.get()缩写

  1. # 生成
  2. (venv) $ pip freeze >requirements.txt
  3. # 重新创建
  4. (venv) $ pip install -r requirements.txt
  1. # tests/test_basics.py:单元测试
  2. import unittest
  3. from flask import current_app
  4. from app import create_app, db
  5. class BasicsTestCase(unittest.TestCase):
  6. def setUp(self):
  7. self.app = create_app('testing')
  8. self.app_context = self.app.app_context()
  9. self.app_context.push()
  10. db.create_all()
  11. def tearDown(self):
  12. db.session.remove()
  13. db.drop_all()
  14. self.app_context.pop()
  15. def test_app_exists(self):
  16. self.assertFalse(current_app is None)
  17. def test_app_is_testing(self):
  18. self.assertTrue(current_app.config['TESTING'])
  19. # manage.py:启动单元测试的命令
  20. @manager.command
  21. def test():
  22. """Run the unit tests."""
  23. import unittest
  24. tests = unittest.TestLoader().discover('tests')
  25. unittest.TextTestRunner(verbosity=2).run(tests)

第八章 用户认证

第九章 用户角色

此处输入图片的描述

  1. # app/models.py:权限常量
  2. class Permission:
  3. FOLLOW = 0x01
  4. COMMENT = 0x02
  5. WRITE_ARTICLES = 0x04
  6. MODERATE_COMMENTS = 0x08
  7. ADMINISTER = 0x80

此处输入图片的描述

  1. # app/decorators.py:检查用户权限的自定义修饰器
  2. from functools import wraps
  3. from flask import abort
  4. from flask.ext.login import current_user
  5. def permission_required(permission):
  6. def decorator(f):
  7. @wraps(f)
  8. def decorated_function(*args, **kwargs):
  9. if not current_user.can(permission):
  10. abort(403)
  11. return f(*args, **kwargs)
  12. return decorated_function
  13. return decorator
  14. def admin_required(f):
  15. return permission_required(Permission.ADMINISTER)(f)
  1. # app/main/__init__.py:把Permission 类加入模板上下文
  2. @main.app_context_processor
  3. def inject_permissions():
  4. return dict(Permission=Permission)

第十章 用户资料

  1. class User(UserMixin, db.Model):
  2. # ...
  3. def ping(self):
  4. self.last_seen = datetime.utcnow()
  5. db.session.add(self)
  6. @auth.before_app_request
  7. def before_request():
  8. if current_user.is_authenticated():
  9. current_user.ping()
  10. if not current_user.confirmed and request.endpoint[:5] != 'auth.':
  11. return redirect(url_for('auth.unconfirmed'))

第十一章 博客文章

  1. @main.route('/', methods=['GET', 'POST'])
  2. def index():
  3. form = PostForm()
  4. if current_user.can(Permission.WRITE_ARTICLES) and form.validate_on_submit():
  5. post = Post(body=form.body.data,
  6. author=current_user._get_current_object())
  7. db.session.add(post)
  8. return redirect(url_for('.index'))
  9. posts = Post.query.order_by(Post.timestamp.desc()).all()
  10. return render_template('index.html', form=form, posts=posts)

新文章对象的author属性值为表达式current_user._get_current_object()。变量current_user由Flask-Login提供,和所有上下文变量一样,也是通过线程内的代理对象实现。这个对象的表现类似用户对象,但实际上却是一个轻度包装,包含真正的用户对象。数据库需要真正的用户对象,因此要调用_get_current_object() 方法。

  1. class User(UserMixin, db.Model):
  2. # ...
  3. @staticmethod
  4. def generate_fake(count=100):
  5. from sqlalchemy.exc import IntegrityError
  6. from random import seed
  7. import forgery_py
  8. seed()
  9. for i in range(count):
  10. u = User(email=forgery_py.internet.email_address(),
  11. username=forgery_py.internet.user_name(True),
  12. password=forgery_py.lorem_ipsum.word(),
  13. confirmed=True,
  14. name=forgery_py.name.full_name(),
  15. location=forgery_py.address.city(),
  16. about_me=forgery_py.lorem_ipsum.sentence(),
  17. member_since=forgery_py.date.date(True))
  18. db.session.add(u)
  19. try:
  20. db.session.commit()
  21. except IntegrityError:
  22. db.session.rollback()

对于commit我觉得在数据量小的时候,没必要add每一个都commit(当然可以保证失败之后任然继续commit),因为数据库commit还是很耗时间的,所以add完后一次性commit会快蛮多。

  1. class User(UserMixin, db.Model):
  2. # ...
  3. @staticmethod
  4. def generate_fake(count=100):
  5. # ...
  6. db.session.add(u)
  7. try:
  8. db.session.commit()
  9. except IntegrityError:
  10. db.session.rollback()
  1. # app/__init__.py:初始化Flask-PageDown
  2. from flask.ext.pagedown import PageDown
  3. # ...
  4. pagedown = PageDown()
  5. # ...
  6. def create_app(config_name):
  7. # ...
  8. pagedown.init_app(app)
  9. # ...
  10. # app/main/forms.py:启用Markdown 的文章表单
  11. from flask.ext.pagedown.fields import PageDownField
  12. class PostForm(Form):
  13. body = PageDownField("What's on your mind?", validators=[Required()])
  14. submit = SubmitField('Submit')
  15. # app/index.html:Flask-PageDown 模板声明
  16. {% block scripts %}
  17. {{ super() }}
  18. {{ pagedown.include_pagedown() }}
  19. {% endblock %}

服务端转换markdown文本为HTML,使用Bleach清理确保安全;SQLAlchemy可监听修改事件

  1. app/models.py:在Post 模型中处理Markdown 文本
  2. from markdown import markdown
  3. import bleach
  4. class Post(db.Model):
  5. # ...
  6. body_html = db.Column(db.Text)
  7. # ...
  8. @staticmethod
  9. def on_changed_body(target, value, oldvalue, initiator):
  10. allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'h3', 'p']
  11. target.body_html = bleach.linkify(bleach.clean(markdown(value, output_format='html'), tags=allowed_tags, strip=True))
  12. db.event.listen(Post.body, 'set', Post.on_changed_body)

疑惑:“缓存”到Post模型的字段?(加载时是否算change,flask提示找不到event,待解决)
测试发现初次加载不会调用事件,修改才会调用。也就是如果采用markdown,第一次加载的是非渲染的?只有编辑过才会显示正常,这不科学,有待自看listen事件执行过程

第十二章 关注与粉丝

  1. registrations = db.Table('registrations',
  2. db.Column('student_id', db.Integer, db.ForeignKey('students.id')),
  3. db.Column('class_id', db.Integer, db.ForeignKey('classes.id'))
  4. )
  5. class Student(db.Model):
  6. id = db.Column(db.Integer, primary_key=True)
  7. name = db.Column(db.String)
  8. classes = db.relationship('Class', secondary=registrations, backref=db.backref('students', lazy='dynamic'), lazy='dynamic')
  9. class Class(db.Model):
  10. id = db.Column(db.Integer, primary_key = True)
  11. name = db.Column(db.String)

secondary参数设置为关联表(registrations),backref参数用于处理关系的另一侧(students),关联表registrations是简单表,并不是模型

  1. s.classes.append(c)
  2. db.session.add(s)
  3. s.classes.remove(c)
  4. s.classes.all()
  5. c.students.all()

自引用关系

高级多对多关系

  1. # app/models/user.py:关注关联表的模型实现
  2. class Follow(db.Model):
  3. __tablename__ = 'follows'
  4. follower_id = db.Column(db.Integer, db.ForeignKey('users.id'),
  5. primary_key=True)
  6. followed_id = db.Column(db.Integer, db.ForeignKey('users.id'),
  7. primary_key=True)
  8. timestamp = db.Column(db.DateTime, default=datetime.utcnow)

SQLAlchemy 不能直接使用这个关联表,因为如果这么做程序就无法访问其中的自定义字段。相反地,要把这个多对多关系的左右两侧拆分成两个基本的一对多关系,而且要定义成标准的关系。

  1. class User(UserMixin, db.Model):
  2. # ...
  3. followed = db.relationship('Follow',foreign_keys=[Follow.follower_id],backref=db.backref('follower',lazy='joined'),lazy='dynamic',cascade='all, delete-orphan')
  4. followers = db.relationship('Follow',foreign_keys=[Follow.followed_id],backref=db.backref('followed',lazy='joined'),lazy='dynamic',cascade='all, delete-orphan')

foreign_keys是为了消除外键之间的歧义,db.backref()参数不是指定这两个关系之间的引用关系,而是回引Follow模型。回引中的lazy参数指定为joined。lazy模式可以实现立即从联结查询中加载相关对象。

例如,如果某个用户关注了100个用户,调用user.followed.all()后会返回一个列表,其中包含100 个Follow 实例,每一个实例的follower和followed 回引属性都指向相应的用户。设定为lazy='joined'模式,就可在一次数据库查询中完成这些操作。如果把lazy设为默认值select,那么首次访问follower 和followed 属性时才会加载对应的用户,而且每个属性都需要一个单独的查询,这就意味着获取全部被关注用户时需要增加100次额外的数据库查询。

cascade 参数配置在父对象上执行的操作对相关对象的影响。比如,层叠选项可设定为:将用户添加到数据库会话后,要自动把所有关系的对象都添加到会话中。层叠选项的默认值能满足大多数情况的需求,但对这个多对多关系来说却不合用。删除对象时,默认的层叠行为是把对象联接的所有相关对象的外键设为空值。但在关联表中,删除记录后正确的行为应该是把指向该记录的实体也删除,因为这样能有效销毁联接。这就是层叠选项值delete-orphan 的作用。

cascade 参数的值是一组由逗号分隔的层叠选项,这看起来可能让人有点困惑,但all 表示除了delete-orphan之外的所有层叠选项。设为all,delete-orphan 的意思是启用所有默认层叠选项,而且还要删除孤儿记录。

  1. # app/models.py:关注关系的辅助方法
  2. class User(db.Model):
  3. # ...
  4. def follow(self, user):
  5. if not self.is_following(user):
  6. f = Follow(follower=self, followed=user)
  7. db.session.add(f)
  8. def unfollow(self, user):
  9. f = self.followed.filter_by(followed_id=user.id).first()
  10. if f:
  11. db.session.delete(f)
  12. def is_following(self, user):
  13. return self.followed.filter_by(followed_id=user.id).first() is not None
  14. def is_followed_by(self, user):
  15. return self.followers.filter_by(follower_id=user.id).first() is not None
  1. return db.session.query(Post).select_from(Follow).filter_by(follower_id=self.id).join(Post, Follow.followed_id == Post.author_id)
  1. db.session.query(Post) 指明这个查询要返回Post 对象;
  2. select_from(Follow) 的意思是这个查询从Follow 模型开始;
  3. filter_by(follower_id=self.id) 使用关注用户过滤follows 表;
  4. join(Post, Follow.followed_id == Post.author_id) 联结filter_by() 得到的结果和Post 对象。
  1. # 优化
  2. return Post.query.join(Follow,Follow.followed_id==Post.author_id).filter(Follow.follower_id==self.id)

先执行联结操作再过滤看起来工作量会更大一些,但实际上这两种查询是等效的。SQLAlchemy首先收集所有的过滤器,然后再以最高效的方式生成查询。这两种查询生成的原生SQL指令是一样的。

  1. # app/models.py:获取所关注用户的文章
  2. class User(db.Model):
  3. # ...
  4. @property
  5. def followed_posts(self):
  6. return Post.query.join(Follow, Follow.followed_id == Post.author_id).filter(Follow.follower_id == self.id)

followed_posts方法被定义为property,调用时无需()

  1. #app/main/views.py:查询所有文章还是所关注用户的文章
  2. @main.route('/all')
  3. @login_required
  4. def show_all():
  5. resp = make_response(redirect(url_for('.index')))
  6. resp.set_cookie('show_followed', '', max_age=30*24*60*60)
  7. return resp
  8. @main.route('/followed')
  9. @login_required
  10. def show_followed():
  11. resp = make_response(redirect(url_for('.index')))
  12. resp.set_cookie('show_followed', '1', max_age=30*24*60*60)
  13. return resp

cookie 只能在响应对象中设置,因此这两个路由不能依赖Flask,要使用make_response()方法创建响应对象。

第十三章 用户评论

  1. # app/models.py:Comment 模型
  2. class Comment(db.Model):
  3. __tablename__ = 'comments'
  4. id = db.Column(db.Integer, primary_key=True)
  5. body = db.Column(db.Text)
  6. body_html = db.Column(db.Text)
  7. timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
  8. disabled = db.Column(db.Boolean)
  9. author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
  10. post_id = db.Column(db.Integer, db.ForeignKey('posts.id'))
  11. @staticmethod
  12. def on_changed_body(target, value, oldvalue, initiator):
  13. allowed_tags = ['a', 'abbr', 'acronym', 'b', 'code', 'em', 'i',
  14. 'strong']
  15. target.body_html = bleach.linkify(bleach.clean(
  16. markdown(value, output_format='html'),
  17. tags=allowed_tags, strip=True))
  18. db.event.listen(Comment.body, 'set', Comment.on_changed_body)
  1. # app/models/user.py:users 和posts 表与comments 表之间的一对多关系
  2. class User(db.Model):
  3. # ...
  4. comments = db.relationship('Comment', backref='author', lazy='dynamic')
  5. class Post(db.Model):
  6. # ...
  7. comments = db.relationship('Comment', backref='post', lazy='dynamic')
  1. # app/templates/base.html:在导航条中加入管理评论链接
  2. ...
  3. {% if current_user.can(Permission.MODERATE_COMMENTS) %}
  4. <li><a href="{{ url_for('main.moderate') }}">Moderate Comments</a></li>
  5. {% endif %}
  6. ...
  1. # app/main/views.py:管理评论的路由
  2. @main.route('/moderate')
  3. @login_required
  4. @permission_required(Permission.MODERATE_COMMENTS)
  5. def moderate():
  6. page = request.args.get('page', 1, type=int)
  7. pagination = Comment.query.order_by(Comment.timestamp.desc()).paginate(page, per_page=current_app.config['FLASKY_COMMENTS_PER_PAGE'],error_out=False)
  8. comments = pagination.items
  9. return render_template('moderate.html',comments=comments,pagination=pagination, page=page)
  1. # app/templates/moderate.html:评论管理页面的模板
  2. {% extends "base.html" %}
  3. {% import "_macros.html" as macros %}
  4. {% block title %}Flasky - Comment Moderation{% endblock %}
  5. {% block page_content %}
  6. <div class="page-header">
  7. <h1>Comment Moderation</h1>
  8. </div>
  9. {% set moderate = True %}
  10. {% include '_comments.html' %}
  11. {% if pagination %}
  12. <div class="pagination">
  13. {{ macros.pagination_widget(pagination, '.moderate') }}
  14. </div>
  15. {% endif %}
  16. {% endblock %}
  1. # app/templates/_comments.html:渲染评论的正文
  2. ...
  3. <div class="comment-body">
  4. {% if comment.disabled %}
  5. <p></p><i>This comment has been disabled by a moderator.</i></p>
  6. {% endif %}
  7. {% if moderate or not comment.disabled %}
  8. {% if comment.body_html %}
  9. {{ comment.body_html | safe }}
  10. {% else %}
  11. {{ comment.body }}
  12. {% endif %}
  13. {% endif %}
  14. </div>
  15. {% if moderate %}
  16. <br>
  17. {% if comment.disabled %}
  18. <a class="btn btn-default btn-xs" href="{{ url_for('.moderate_enable',
  19. id=comment.id, page=page) }}">Enable</a>
  20. {% else %}
  21. <a class="btn btn-danger btn-xs" href="{{ url_for('.moderate_disable',
  22. id=comment.id, page=page) }}">Disable</a>
  23. {% endif %}
  24. {% endif %}
  25. ...
  1. # app/main/views.py:评论管理路由
  2. @main.route('/moderate/enable/<int:id>')
  3. @login_required
  4. @permission_required(Permission.MODERATE_COMMENTS)
  5. def moderate_enable(id):
  6. comment = Comment.query.get_or_404(id)
  7. comment.disabled = False
  8. db.session.add(comment)
  9. db.session.commit()
  10. return redirect(url_for('.moderate',page=request.args.get('page', 1, type=int)))
  11. @main.route('/moderate/disable/<int:id>')
  12. @login_required
  13. @permission_required(Permission.MODERATE_COMMENTS)
  14. def moderate_disable(id):
  15. comment = Comment.query.get_or_404(id)
  16. comment.disabled = True
  17. db.session.add(comment)
  18. db.session.commit()
  19. return redirect(url_for('.moderate',page=request.args.get('page', 1, type=int)))

修改后需要提交数据

第十四章 应用编程接口

有时候确实需要提供接口供其他应用程序使用,之前写过,估计不规范或者不安全。

REST简介

客户端−服务器
客户端和服务器之间必须有明确的界线。
无状态
客户端发出的请求中必须包含所有必要的信息。服务器不能在两次请求之间保存客户端的任何状态。
缓存
服务器发出的响应可以标记为可缓存或不可缓存,这样出于优化目的,客户端(或客户端和服务器之间的中间服务)可以使用缓存。
接口统一
客户端访问服务器资源时使用的协议必须一致,定义良好,且已经标准化。REST Web服务最常使用的统一接口是HTTP 协议。
系统分层
在客户端和服务器之间可以按需插入代理服务器、缓存或网关,以提高性能、稳定性和伸缩性。
按需代码
客户端可以选择从服务器上下载代码,在客户端的环境中执行。

Flask请求:/api/posts/1234可重定向到路由/api/posts/1234/,反之不行

表14-1 REST架构API中使用的HTTP请求方法

请求方法 目标 说明 HTTP状态码
GET 单个资源的URL 获取目标资源 200
GET 资源集合的URL 获取资源的集合(如果服务器实现了分页,就是一页中的资源) 200
POST 资源集合的URL 创建新资源,并将其加入目标集合。服务器为新资源指派URL,并在响应的Location 首部中返回 201
PUT 单个资源的URL 修改一个现有资源。如果客户端能为资源指派URL,还可用来创建新资源 200
DELETE 单个资源的URL 删除一个资源 200
DELETE 资源集合的URL 删除目标集合中的所有资源 200

一篇博客文章对应的资源可以使用如下的JSON 表示:

  1. {
  2. "url": "http://www.example.com/api/posts/12345",
  3. "title": "Writing RESTful APIs in Python",
  4. "author": "http://www.example.com/api/users/2",
  5. "body": "... text of the article here ...",
  6. "comments": "http://www.example.com/api/posts/12345/comments"
  7. }

在设计良好的REST API中,客户端只需知道几个顶级资源的URL,其他资源的URL 则从响应中包含的链接上发掘。

使用Flask提供REST Web服务

  1. //API 蓝本的结构
  2. |-flasky
  3. |-app/
  4. |-api_1_0
  5. |-__init__.py
  6. |-users.py
  7. |-posts.py
  8. |-comments.py
  9. |-authentication.py
  10. |-errors.py
  11. |-decorators.py
  1. # app/api_1_0/__init__.py:API 蓝本的构造文件
  2. from flask import Blueprint
  3. api = Blueprint('api', __name__)
  4. from . import authentication, posts, users, comments, errors
  1. # app/_init_.py:注册API 蓝本
  2. def create_app(config_name):
  3. # ...
  4. from .api_1_0 import api as api_1_0_blueprint
  5. app.register_blueprint(api_1_0_blueprint, url_prefix='/api/v1.0')
  6. # ...

表14-2 API返回的常见HTTP状态码

HTTP状态码 名  称 说  明
200 OK(成功) 请求成功完成
201 Created(已创建) 请求成功完成并创建了一个新资源
400 Bad request(坏请求) 请求不可用或不一致
401 Unauthorized(未授权) 请求未包含认证信息
403 Forbidden(禁止) 请求中发送的认证密令无权访问目标
404 Notfound(未找到) URL 对应的资源不存在
405 Method not allowed(不允许使用的方法) 指定资源不支持请求使用的方法
500 Internal server error(内部服务器错误) 处理请求的过程中发生意外错误

让404/500/403错误兼容API请求

  1. # app/main/errors.py:使用HTTP 内容协商处理错误
  2. @main.app_errorhandler(404)
  3. def page_not_found(e):
  4. if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html:
  5. response = jsonify({'error': 'not found'})
  6. response.status_code = 404
  7. return response
  8. return render_template('404.html'), 404

API错误处理

  1. # app/api_1_0/errors.py:API 蓝本中403 状态码的错误处理程序
  2. def forbidden(message):
  3. response = jsonify({'error': 'forbidden', 'message': message})
  4. response.status_code = 403
  5. return response
  1. # app/api_1_0/authentication.py:初始化Flask-HTTPAuth
  2. from flask.ext.httpauth import HTTPBasicAuth
  3. auth = HTTPBasicAuth()
  4. @auth.verify_password
  5. def verify_password(email, password):
  6. if email == '':
  7. g.current_user = AnonymousUser()
  8. return True
  9. user = User.query.filter_by(email = email).first()
  10. if not user:
  11. return False
  12. g.current_user = user
  13. return user.verify_password(password)
  14. # app/api_1_0/authentication.py:Flask-HTTPAuth 错误处理程序
  15. @auth.error_handler
  16. def auth_error():
  17. return unauthorized('Invalid credentials')
  18. # app/api_1_0/authentication.py:在before_request处理程序中进行认证
  19. from .errors import forbidden_error
  20. @api.before_request
  21. @auth.login_required
  22. def before_request():
  23. if not g.current_user.is_anonymous and not g.current_user.confirmed:
  24. return forbidden('Unconfirmed account')
  1. # app/models.py:支持基于令牌的认证
  2. class User(db.Model):
  3. # ...
  4. def generate_auth_token(self, expiration):
  5. s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
  6. return s.dumps({'id': self.id})
  7. @staticmethod
  8. def verify_auth_token(token):
  9. s = Serializer(current_app.config['SECRET_KEY'])
  10. try:
  11. data = s.loads(token)
  12. except:
  13. return None
  14. return User.query.get(data['id'])
  15. # app/api_1_0/authentication.py:支持令牌的改进验证回调
  16. @auth.verify_password
  17. def verify_password(email_or_token, password):
  18. if email_or_token == '':
  19. g.current_user = AnonymousUser()
  20. return True
  21. if password == '':
  22. g.current_user = User.verify_auth_token(email_or_token)
  23. g.token_used = True
  24. return g.current_user is not None
  25. user = User.query.filter_by(email=email_or_token).first()
  26. if not user:
  27. return False
  28. g.current_user = user
  29. g.token_used = False
  30. return user.verify_password(password)
  31. # app/api_1_0/authentication.py:生成认证令牌
  32. @api.route('/token')
  33. def get_token():
  34. if g.current_user.is_anonymous() or g.token_used:
  35. return unauthorized('Invalid credentials')
  36. return jsonify({'token': g.current_user.generate_auth_token(expiration=3600), 'expiration': 3600})

既然是无状态,那通过令牌的方式实现认证不就不破坏了无状态原则?

  1. # app/models.py:把文章转换成JSON 格式的序列化字典
  2. class Post(db.Model):
  3. # ...
  4. def to_json(self):
  5. json_post = {
  6. 'url': url_for('api.get_post', id=self.id, _external=True),
  7. 'body': self.body,
  8. 'body_html': self.body_html,
  9. 'timestamp': self.timestamp,
  10. 'author': url_for('api.get_user', id=self.author_id, _external=True),
  11. 'comments': url_for('api.get_post_comments', id=self.id, _external=True)
  12. 'comment_count': self.comments.count()
  13. }
  14. return json_post
  15. # app/models.py:把用户转换成JSON 格式的序列化字典
  16. class User(UserMixin, db.Model):
  17. # ...
  18. def to_json(self):
  19. json_user = {
  20. 'url': url_for('api.get_post', id=self.id, _external=True),
  21. 'username': self.username,
  22. 'member_since': self.member_since,
  23. 'last_seen': self.last_seen,
  24. 'posts': url_for('api.get_user_posts', id=self.id, _external=True),
  25. 'followed_posts':url_for('api.get_user_followed_posts',id=self.id, _external=True),
  26. 'post_count': self.posts.count()
  27. }
  28. return json_user

所有url_for()方法都指定了参数_external=True,这么做是为了生成完整的URL,而不是生成传统Web程序中经常使用的相对URL。

  1. # app/api_1_0/posts.py:文章资源GET 请求的处理程序
  2. @api.route('/posts/')
  3. @auth.login_required
  4. def get_posts():
  5. posts = Post.query.all()
  6. return jsonify({ 'posts': [post.to_json() for post in posts] })
  7. @api.route('/posts/<int:id>')
  8. @auth.login_required
  9. def get_post(id):
  10. post = Post.query.get_or_404(id)
  11. return jsonify(post.to_json())
  1. # app/api_1_0/posts.py:分页文章资源
  2. @api.route('/posts/')
  3. def get_posts():
  4. page = request.args.get('page', 1, type=int)
  5. pagination = Post.query.paginate(page,per_page=current_app.config['FLASKY_POSTS_PER_PAGE'],error_out=False)
  6. posts = pagination.items
  7. prev = None
  8. if pagination.has_prev:
  9. prev = url_for('api.get_posts', page=page-1, _external=True)
  10. next = None
  11. if pagination.has_next:
  12. next = url_for('api.get_posts', page=page+1, _external=True)
  13. return jsonify({
  14. 'posts': [post.to_json() for post in posts],
  15. 'prev': prev,
  16. 'next': next,
  17. 'count': pagination.total
  18. })
  1. (venv) $ pip install httpie
  2. // 认证请求
  3. (venv) $ http --json --auth <email>:<password> GET \
  4. > http://127.0.0.1:5000/api/v1.0/posts
  5. HTTP/1.0 200 OK
  6. Content-Length: 7018
  7. Content-Type: application/json
  8. Date: Sun, 22 Dec 2013 08:11:24 GMT
  9. Server: Werkzeug/0.9.4 Python/2.7.3
  10. {
  11. "posts": [
  12. ...
  13. ],
  14. "prev": null
  15. "next": "http://127.0.0.1:5000/api/v1.0/posts/?page=2",
  16. "count": 150
  17. }
  18. // 匿名请求
  19. (venv) $ http --json --auth : GET http://127.0.0.1:5000/api/v1.0/posts/
  20. // 添加文章
  21. (venv) $ http --auth <email>:<password> --json POST \
  22. > http://127.0.0.1:5000/api/v1.0/posts/ \
  23. > "body=I'm adding a post from the *command line*."
  24. HTTP/1.0 201 CREATED
  25. Content-Length: 360
  26. Content-Type: application/json
  27. Date: Sun, 22 Dec 2013 08:30:27 GMT
  28. Location: http://127.0.0.1:5000/api/v1.0/posts/111
  29. Server: Werkzeug/0.9.4 Python/2.7.3
  30. {
  31. "author": "http://127.0.0.1:5000/api/v1.0/users/1",
  32. "body": "I'm adding a post from the *command line*.",
  33. "body_html": "<p>I'm adding a post from the <em>command line</em>.</p>",
  34. "comments": "http://127.0.0.1:5000/api/v1.0/posts/111/comments",
  35. "comment_count": 0,
  36. "timestamp": "Sun, 22 Dec 2013 08:30:27 GMT",
  37. "url": "http://127.0.0.1:5000/api/v1.0/posts/111"
  38. }
  39. // 令牌认证
  40. (venv) $ http --auth <email>:<password> --json GET \
  41. > http://127.0.0.1:5000/api/v1.0/token
  42. HTTP/1.0 200 OK
  43. Content-Length: 162
  44. Content-Type: application/json
  45. Date: Sat, 04 Jan 2014 08:38:47 GMT
  46. Server: Werkzeug/0.9.4 Python/3.3.3
  47. {
  48. "expiration": 3600,
  49. "token": "eyJpYXQiOjEzODg4MjQ3MjcsImV4cCI6MTM4ODgyODMyNywiYWxnIjoiSFMy..."
  50. }
  51. // 令牌请求
  52. (venv) $ http --json --auth eyJpYXQ...: GET http://127.0.0.1:5000/api/v1.0/posts/

第十五章 测试

  1. # manage.py:覆盖检测
  2. #!/usr/bin/env python
  3. import os
  4. COV = None
  5. if os.environ.get('FLASK_COVERAGE'):
  6. import coverage
  7. COV = coverage.coverage(branch=True, include='app/*')
  8. COV.start()
  9. # ...
  10. @manager.command
  11. def test(coverage=False):
  12. """Run the unit tests."""
  13. if coverage and not os.environ.get('FLASK_COVERAGE'):
  14. import sys
  15. os.environ['FLASK_COVERAGE'] = '1'
  16. os.execvp(sys.executable, [sys.executable] + sys.argv)
  17. import unittest
  18. tests = unittest.TestLoader().discover('tests')
  19. unittest.TextTestRunner(verbosity=2).run(tests)
  20. if COV:
  21. COV.stop()
  22. COV.save()
  23. print('Coverage Summary:')
  24. COV.report()
  25. basedir = os.path.abspath(os.path.dirname(__file__))
  26. covdir = os.path.join(basedir, 'tmp/coverage')
  27. COV.html_report(directory=covdir)
  28. print('HTML version: file://%s/index.html' % covdir)
  29. COV.erase()
  30. # ...
  1. class FlaskClientTestCase(unittest.TestCase):
  2. def setUp(self):
  3. self.app = create_app('testing')
  4. self.app_context = self.app.app_context()
  5. self.app_context.push()
  6. db.create_all()
  7. Role.insert_roles()
  8. self.client = self.app.test_client(use_cookies=True)
  9. def tearDown(self):
  10. db.session.remove()
  11. db.drop_all()
  12. self.app_context.pop()
  13. def test_home_page(self):
  14. response = self.client.get(url_for('main.index'))
  15. self.assertTrue(b'Stranger' in response.data)
  16. def test_register_and_login(self):
  17. # register a new account
  18. response = self.client.post(url_for('auth.register'), data={
  19. 'email': 'john@example.com',
  20. 'username': 'john',
  21. 'password': 'cat',
  22. 'password2': 'cat'
  23. })
  24. self.assertTrue(response.status_code == 302)
  25. # login with the new account
  26. response = self.client.post(url_for('auth.login'), data={
  27. 'email': 'john@example.com',
  28. 'password': 'cat'
  29. }, follow_redirects=True)
  30. self.assertTrue(re.search(b'Hello,\s+john!', response.data))
  31. self.assertTrue(
  32. b'You have not confirmed your account yet' in response.data)
  33. # send a confirmation token
  34. user = User.query.filter_by(email='john@example.com').first()
  35. token = user.generate_confirmation_token()
  36. response = self.client.get(url_for('auth.confirm', token=token),
  37. follow_redirects=True)
  38. self.assertTrue(
  39. b'You have confirmed your account' in response.data)
  40. # log out
  41. response = self.client.get(url_for('auth.logout'), follow_redirects=True)
  42. self.assertTrue(b'You have been logged out' in response.data)

测试会检查响应的状态码是否为302,这个代码表示重定向。调用post()方法时指定了参数follow_redirects=True,让测试客户端和浏览器一样,自动向重定向的URL发起GET请求。指定这个参数后,返回的不是302状态码,而是请求重定向的URL返回的响应。

测试web服务

  1. # tests/test_api.py:使用Flask 测试客户端测试REST API
  2. class APITestCase(unittest.TestCase):
  3. # ...
  4. def get_api_headers(self, username, password):
  5. return {
  6. 'Authorization':
  7. 'Basic ' + b64encode((username + ':' + password).encode('utf-8')).decode('utf-8'),
  8. 'Accept': 'application/json',
  9. 'Content-Type': 'application/json'
  10. }
  11. def test_no_auth(self):
  12. response = self.client.get(url_for('api.get_posts'),
  13. content_type='application/json')
  14. self.assertTrue(response.status_code == 401)
  15. def test_posts(self):
  16. # 添加一个用户
  17. r = Role.query.filter_by(name='User').first()
  18. self.assertIsNotNone(r)
  19. u = User(email='john@example.com',password='cat',confirmed=True,role=r)
  20. db.session.add(u)
  21. db.session.commit()
  22. # 写一篇文章
  23. response = self.client.post(url_for('api.new_post'), headers=self.get_auth_header('john@example.com', 'cat'), data=json.dumps({'body': 'body of the *blog* post'}))
  24. self.assertTrue(response.status_code == 201)
  25. url = response.headers.get('Location')
  26. self.assertIsNotNone(url)
  27. # 获取刚发布的文章
  28. response = self.client.get(url, headers=self.get_auth_header('john@example.com', 'cat'))
  29. self.assertTrue(response.status_code == 200)
  30. json_response = json.loads(response.data.decode('utf-8'))
  31. self.assertTrue(json_response['url'] == url)
  32. self.assertTrue(json_response['body'] == 'body of the *blog* post')
  33. self.assertTrue(json_response['body_html'] == '<p>body of the <em>blog</em> post</p>')

优雅的停止服务端

  1. # app/main/views.py:关闭服务器的路由
  2. @main.route('/shutdown')
  3. def server_shutdown():
  4. if not current_app.testing:
  5. abort(404)
  6. shutdown = request.environ.get('werkzeug.server.shutdown')
  7. if not shutdown:
  8. abort(500)
  9. shutdown()
  10. return 'Shutting down...'
  1. class SeleniumTestCase(unittest.TestCase):
  2. # ...
  3. @classmethod
  4. def setUpClass(cls):
  5. # start Firefox
  6. try:
  7. cls.client = webdriver.Firefox()
  8. except:
  9. pass
  10. # skip these tests if the browser could not be started
  11. if cls.client:
  12. # create the application
  13. cls.app = create_app('testing')
  14. cls.app_context = cls.app.app_context()
  15. cls.app_context.push()
  16. # suppress logging to keep unittest output clean
  17. import logging
  18. logger = logging.getLogger('werkzeug')
  19. logger.setLevel("ERROR")
  20. # create the database and populate with some fake data
  21. db.create_all()
  22. Role.insert_roles()
  23. User.generate_fake(10)
  24. Post.generate_fake(10)
  25. # add an administrator user
  26. admin_role = Role.query.filter_by(permissions=0xff).first()
  27. admin = User(email='john@example.com',
  28. username='john', password='cat',
  29. role=admin_role, confirmed=True)
  30. db.session.add(admin)
  31. db.session.commit()
  32. # start the Flask server in a thread
  33. threading.Thread(target=cls.app.run).start()
  34. # give the server a second to ensure it is up
  35. time.sleep(1)
  36. @classmethod
  37. def tearDownClass(cls):
  38. if cls.client:
  39. # stop the flask server and the browser
  40. cls.client.get('http://localhost:5000/shutdown')
  41. cls.client.close()
  42. # destroy database
  43. db.drop_all()
  44. db.session.remove()
  45. # remove application context
  46. cls.app_context.pop()
  47. def setUp(self):
  48. if not self.client:
  49. self.skipTest('Web browser not available')
  50. def tearDown(self):
  51. pass
  52. def test_admin_home_page(self):
  53. # navigate to home page
  54. self.client.get('http://localhost:5000/')
  55. self.assertTrue(re.search('Hello,\s+Stranger!',
  56. self.client.page_source))
  57. # navigate to login page
  58. self.client.find_element_by_link_text('Log In').click()
  59. self.assertTrue('<h1>Login</h1>' in self.client.page_source)
  60. # login
  61. self.client.find_element_by_name('email').\
  62. send_keys('john@example.com')
  63. self.client.find_element_by_name('password').send_keys('cat')
  64. self.client.find_element_by_name('submit').click()
  65. self.assertTrue(re.search('Hello,\s+john!', self.client.page_source))
  66. # navigate to the user's profile page
  67. self.client.find_element_by_link_text('Profile').click()
  68. self.assertTrue('<h1>john</h1>' in self.client.page_source)

第十六章 性能

  1. # app/main/views.py:报告缓慢的数据库查询
  2. from flask.ext.sqlalchemy import get_debug_queries
  3. @main.after_app_request
  4. def after_request(response):
  5. for query in get_debug_queries():
  6. if query.duration >= current_app.config['FLASKY_SLOW_DB_QUERY_TIME']:
  7. current_app.logger.warning('Slow query: %s\nParameters: %s\nDuration: %fs\nContext: %s\n' % (query.statement, query.parameters, query.duration, query.context))
  8. return response

表16-1 Flask-SQLAlchemy记录的查询信息

名  称 说  明
statement SQL 语句
parameters SQL 语句使用的参数
start_time 执行查询时的时间
end_time 返回查询结果时的时间
duration 查询持续的时间,单位为秒
context 表示查询在源码中所处位置的字符串
  1. # config.py:启用缓慢查询记录功能的配置
  2. class Config:
  3. # ...
  4. SQLALCHEMY_RECORD_QUERIES = True
  5. FLASKY_DB_QUERY_TIMEOUT = 0.5
  6. # ...
  1. # manage.py:在请求分析器的监视下运行程序
  2. @manager.command
  3. def profile(length=25, profile_dir=None):
  4. """Start the application under the code profiler."""
  5. from werkzeug.contrib.profiler import ProfilerMiddleware
  6. app.wsgi_app = ProfilerMiddleware(app.wsgi_app, restrictions=[length], profile_dir=profile_dir)
  7. app.run()

第十七章 部署

  1. # manage.py:部署命令
  2. @manager.command
  3. def deploy():
  4. """Run deployment tasks."""
  5. from flask.ext.migrate import upgrade
  6. from app.models import Role, User
  7. # 把数据库迁移到最新修订版本
  8. upgrade()
  9. # 创建用户角色
  10. Role.insert_roles()
  11. # 让所有用户都关注自己
  12. User.add_self_follows()
  1. config.py:程序出错时发送电子邮件
  2. class ProductionConfig(Config):
  3. # ...
  4. @classmethod
  5. def init_app(cls, app):
  6. Config.init_app(app)
  7. # 把错误通过电子邮件发送给管理员
  8. import logging
  9. from logging.handlers import SMTPHandler
  10. credentials = None
  11. secure = None
  12. if getattr(cls, 'MAIL_USERNAME', None) is not None:
  13. credentials = (cls.MAIL_USERNAME, cls.MAIL_PASSWORD)
  14. if getattr(cls, 'MAIL_USE_TLS', None):
  15. secure = ()
  16. mail_handler = SMTPHandler(mailhost=(cls.MAIL_SERVER, cls.MAIL_PORT), fromaddr=cls.FLASKY_MAIL_SENDER, toaddrs=[cls.FLASKY_ADMIN], subject=cls.FLASKY_MAIL_SUBJECT_PREFIX + ' Application Error', credentials=credentials, secure=secure)
  17. mail_handler.setLevel(logging.ERROR)
  18. app.logger.addHandler(mail_handler)

第十八章 其他资源

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注