@skyway
2016-08-25T09:30:59.000000Z
字数 31639
阅读 1551
title: 【读书】Flask Web Development
date: 2016-02-11 15:54:37
categories: 编程
Python
flask
之前把flask的网络教程看了一遍,但是那个教程相对于《Flask Web Development》属于入门级,像应用结构和数据库方面都相对简单,其中有些东西也没讲的太清楚,于是再来刷一遍,希望会有点收获!
#linux
(venv) $ export MAIL_USERNAME=<Gmail username>
(venv) $ export MAIL_PASSWORD=<Gmail password>
#windows
(venv) $ set MAIL_USERNAME=<Gmail username>
(venv) $ set MAIL_PASSWORD=<Gmail password>
from threading import Thread
def send_async_email(app, msg):
with app.app_context():
mail.send(msg)
def send_email(to, subject, template, **kwargs):
msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + subject, sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to])
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
thr = Thread(target=send_async_email, args=[app, msg])
thr.start()
return thr
在不同线程中执行mail.send()函数时,程序上下文要使用app.app_context() 人工创建。
程序要发送大量电子邮件时,使用专门发送电子邮件的作业要比给每封邮件都新建一个线程更合适。例如,我们可以把执行send_async_email()函数的操作发给Celery(http://www.celeryproject.org/)任务队列。
|-flasky
|-app/
|-templates/
|-static/
|-main/
|-__init__.py
|-errors.py
|-forms.py
|-views.py
|-__init__.py
|-email.py
|-models.py
|-migrations/
|-tests/
|-__init__.py
|-test*.py
|-venv/
|-requirements.txt
|-config.py
|-manage.py
* 配置选项
# config.py
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'hard to guess string'
SQLALCHEMY_COMMIT_ON_TEARDOWN = True
FLASKY_MAIL_SUBJECT_PREFIX = '[Flasky]'
FLASKY_MAIL_SENDER = 'Flasky Admin <flasky@example.com>'
FLASKY_ADMIN = os.environ.get('FLASKY_ADMIN')
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config):
DEBUG = True
MAIL_SERVER = 'smtp.googlemail.com'
MAIL_PORT = 587
MAIL_USE_TLS = True
MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
SQLALCHEMY_DATABASE_URI = os.environ.get('DEV_DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data-dev.sqlite')
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = os.environ.get('TEST_DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data-test.sqlite')
class ProductionConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data.sqlite')
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
# app/__init__.py
from flask import Flask, render_template
from flask.ext.bootstrap import Bootstrap
from flask.ext.mail import Mail
from flask.ext.moment import Moment
from flask.ext.sqlalchemy import SQLAlchemy
from config import config
bootstrap = Bootstrap()
mail = Mail()
moment = Moment()
db = SQLAlchemy()
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
bootstrap.init_app(app)
mail.init_app(app)
moment.init_app(app)
db.init_app(app)
# 附加路由和自定义的错误页面
return app
#app/main/__init__.py:创建蓝本
from flask import Blueprint
main = Blueprint('main', __name__)
from . import views, errors
#app/_init_.py:注册蓝本
def create_app(config_name):
# ...
from .main import main as main_blueprint
app.register_blueprint(main_blueprint)
return app
# app/main/errors.py:蓝本中的错误处理程序
from flask import render_template
from . import main
@main.app_errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@main.app_errorhandler(500)
def internal_server_error(e):
return render_template('500.html'), 500
如果使用errorhandler修饰器,那么只有蓝本中的错误才能触发处理程序。要想注册程序全局的错误处理程序,必须使用app_errorhandler。(url_for()中需使用:命名空间+视图函数,如main.index)
os.getenv()是os.environ.get()缩写
# 生成
(venv) $ pip freeze >requirements.txt
# 重新创建
(venv) $ pip install -r requirements.txt
# tests/test_basics.py:单元测试
import unittest
from flask import current_app
from app import create_app, db
class BasicsTestCase(unittest.TestCase):
def setUp(self):
self.app = create_app('testing')
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
def tearDown(self):
db.session.remove()
db.drop_all()
self.app_context.pop()
def test_app_exists(self):
self.assertFalse(current_app is None)
def test_app_is_testing(self):
self.assertTrue(current_app.config['TESTING'])
# manage.py:启动单元测试的命令
@manager.command
def test():
"""Run the unit tests."""
import unittest
tests = unittest.TestLoader().discover('tests')
unittest.TextTestRunner(verbosity=2).run(tests)
密码安全
计算加盐,密码散列值的正确方法,https://crackstation.net/hashing-security.htm
认证用户
&
操作就很容易判断是否具有某个权限。
# app/models.py:权限常量
class Permission:
FOLLOW = 0x01
COMMENT = 0x02
WRITE_ARTICLES = 0x04
MODERATE_COMMENTS = 0x08
ADMINISTER = 0x80
# app/decorators.py:检查用户权限的自定义修饰器
from functools import wraps
from flask import abort
from flask.ext.login import current_user
def permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.can(permission):
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
def admin_required(f):
return permission_required(Permission.ADMINISTER)(f)
# app/main/__init__.py:把Permission 类加入模板上下文
@main.app_context_processor
def inject_permissions():
return dict(Permission=Permission)
class User(UserMixin, db.Model):
# ...
def ping(self):
self.last_seen = datetime.utcnow()
db.session.add(self)
@auth.before_app_request
def before_request():
if current_user.is_authenticated():
current_user.ping()
if not current_user.confirmed and request.endpoint[:5] != 'auth.':
return redirect(url_for('auth.unconfirmed'))
资料编辑
区分普通用户和管理员,可在类中初始化选项内容,但是太固定,要是能不必使用Bootstrap,form能转化到自定义样式会好很多,也就是Bootstrap的form渲染模板可自定义。
用户头像
默认随机分配头像:第三方服务:http://www.gravatar.com/
avatar/d4c74594d841139328695756648b6bd6
@main.route('/', methods=['GET', 'POST'])
def index():
form = PostForm()
if current_user.can(Permission.WRITE_ARTICLES) and form.validate_on_submit():
post = Post(body=form.body.data,
author=current_user._get_current_object())
db.session.add(post)
return redirect(url_for('.index'))
posts = Post.query.order_by(Post.timestamp.desc()).all()
return render_template('index.html', form=form, posts=posts)
新文章对象的author属性值为表达式current_user._get_current_object()。变量current_user由Flask-Login提供,和所有上下文变量一样,也是通过线程内的代理对象实现。这个对象的表现类似用户对象,但实际上却是一个轻度包装,包含真正的用户对象。数据库需要真正的用户对象,因此要调用_get_current_object() 方法。
资料页显示文章
分页显示
创建虚拟数据forgerypy
class User(UserMixin, db.Model):
# ...
@staticmethod
def generate_fake(count=100):
from sqlalchemy.exc import IntegrityError
from random import seed
import forgery_py
seed()
for i in range(count):
u = User(email=forgery_py.internet.email_address(),
username=forgery_py.internet.user_name(True),
password=forgery_py.lorem_ipsum.word(),
confirmed=True,
name=forgery_py.name.full_name(),
location=forgery_py.address.city(),
about_me=forgery_py.lorem_ipsum.sentence(),
member_since=forgery_py.date.date(True))
db.session.add(u)
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
对于commit我觉得在数据量小的时候,没必要add每一个都commit(当然可以保证失败之后任然继续commit),因为数据库commit还是很耗时间的,所以add完后一次性commit会快蛮多。
class User(UserMixin, db.Model):
# ...
@staticmethod
def generate_fake(count=100):
# ...
db.session.add(u)
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
# app/__init__.py:初始化Flask-PageDown
from flask.ext.pagedown import PageDown
# ...
pagedown = PageDown()
# ...
def create_app(config_name):
# ...
pagedown.init_app(app)
# ...
# app/main/forms.py:启用Markdown 的文章表单
from flask.ext.pagedown.fields import PageDownField
class PostForm(Form):
body = PageDownField("What's on your mind?", validators=[Required()])
submit = SubmitField('Submit')
# app/index.html:Flask-PageDown 模板声明
{% block scripts %}
{{ super() }}
{{ pagedown.include_pagedown() }}
{% endblock %}
服务端转换markdown文本为HTML,使用Bleach清理确保安全;SQLAlchemy可监听修改事件
app/models.py:在Post 模型中处理Markdown 文本
from markdown import markdown
import bleach
class Post(db.Model):
# ...
body_html = db.Column(db.Text)
# ...
@staticmethod
def on_changed_body(target, value, oldvalue, initiator):
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'h3', 'p']
target.body_html = bleach.linkify(bleach.clean(markdown(value, output_format='html'), tags=allowed_tags, strip=True))
db.event.listen(Post.body, 'set', Post.on_changed_body)
疑惑:“缓存”到Post模型的字段?(加载时是否算change,flask提示找不到event,待解决)
测试发现初次加载不会调用事件,修改才会调用。也就是如果采用markdown,第一次加载的是非渲染的?只有编辑过才会显示正常,这不科学,有待自看listen事件执行过程
文章固定链接
文章编辑
registrations = db.Table('registrations',
db.Column('student_id', db.Integer, db.ForeignKey('students.id')),
db.Column('class_id', db.Integer, db.ForeignKey('classes.id'))
)
class Student(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String)
classes = db.relationship('Class', secondary=registrations, backref=db.backref('students', lazy='dynamic'), lazy='dynamic')
class Class(db.Model):
id = db.Column(db.Integer, primary_key = True)
name = db.Column(db.String)
secondary参数设置为关联表(registrations),backref参数用于处理关系的另一侧(students),关联表registrations是简单表,并不是模型
s.classes.append(c)
db.session.add(s)
s.classes.remove(c)
s.classes.all()
c.students.all()
自引用关系
高级多对多关系
# app/models/user.py:关注关联表的模型实现
class Follow(db.Model):
__tablename__ = 'follows'
follower_id = db.Column(db.Integer, db.ForeignKey('users.id'),
primary_key=True)
followed_id = db.Column(db.Integer, db.ForeignKey('users.id'),
primary_key=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
SQLAlchemy 不能直接使用这个关联表,因为如果这么做程序就无法访问其中的自定义字段。相反地,要把这个多对多关系的左右两侧拆分成两个基本的一对多关系,而且要定义成标准的关系。
class User(UserMixin, db.Model):
# ...
followed = db.relationship('Follow',foreign_keys=[Follow.follower_id],backref=db.backref('follower',lazy='joined'),lazy='dynamic',cascade='all, delete-orphan')
followers = db.relationship('Follow',foreign_keys=[Follow.followed_id],backref=db.backref('followed',lazy='joined'),lazy='dynamic',cascade='all, delete-orphan')
foreign_keys是为了消除外键之间的歧义,db.backref()参数不是指定这两个关系之间的引用关系,而是回引Follow模型。回引中的lazy参数指定为joined。lazy模式可以实现立即从联结查询中加载相关对象。
例如,如果某个用户关注了100个用户,调用user.followed.all()后会返回一个列表,其中包含100 个Follow 实例,每一个实例的follower和followed 回引属性都指向相应的用户。设定为lazy='joined'模式,就可在一次数据库查询中完成这些操作。如果把lazy设为默认值select,那么首次访问follower 和followed 属性时才会加载对应的用户,而且每个属性都需要一个单独的查询,这就意味着获取全部被关注用户时需要增加100次额外的数据库查询。
cascade 参数配置在父对象上执行的操作对相关对象的影响。比如,层叠选项可设定为:将用户添加到数据库会话后,要自动把所有关系的对象都添加到会话中。层叠选项的默认值能满足大多数情况的需求,但对这个多对多关系来说却不合用。删除对象时,默认的层叠行为是把对象联接的所有相关对象的外键设为空值。但在关联表中,删除记录后正确的行为应该是把指向该记录的实体也删除,因为这样能有效销毁联接。这就是层叠选项值delete-orphan 的作用。
cascade 参数的值是一组由逗号分隔的层叠选项,这看起来可能让人有点困惑,但all 表示除了delete-orphan之外的所有层叠选项。设为all,delete-orphan 的意思是启用所有默认层叠选项,而且还要删除孤儿记录。
# app/models.py:关注关系的辅助方法
class User(db.Model):
# ...
def follow(self, user):
if not self.is_following(user):
f = Follow(follower=self, followed=user)
db.session.add(f)
def unfollow(self, user):
f = self.followed.filter_by(followed_id=user.id).first()
if f:
db.session.delete(f)
def is_following(self, user):
return self.followed.filter_by(followed_id=user.id).first() is not None
def is_followed_by(self, user):
return self.followers.filter_by(follower_id=user.id).first() is not None
return db.session.query(Post).select_from(Follow).filter_by(follower_id=self.id).join(Post, Follow.followed_id == Post.author_id)
# 优化
return Post.query.join(Follow,Follow.followed_id==Post.author_id).filter(Follow.follower_id==self.id)
先执行联结操作再过滤看起来工作量会更大一些,但实际上这两种查询是等效的。SQLAlchemy首先收集所有的过滤器,然后再以最高效的方式生成查询。这两种查询生成的原生SQL指令是一样的。
# app/models.py:获取所关注用户的文章
class User(db.Model):
# ...
@property
def followed_posts(self):
return Post.query.join(Follow, Follow.followed_id == Post.author_id).filter(Follow.follower_id == self.id)
followed_posts方法被定义为property,调用时无需()
#app/main/views.py:查询所有文章还是所关注用户的文章
@main.route('/all')
@login_required
def show_all():
resp = make_response(redirect(url_for('.index')))
resp.set_cookie('show_followed', '', max_age=30*24*60*60)
return resp
@main.route('/followed')
@login_required
def show_followed():
resp = make_response(redirect(url_for('.index')))
resp.set_cookie('show_followed', '1', max_age=30*24*60*60)
return resp
cookie 只能在响应对象中设置,因此这两个路由不能依赖Flask,要使用make_response()方法创建响应对象。
# app/models.py:Comment 模型
class Comment(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True)
body = db.Column(db.Text)
body_html = db.Column(db.Text)
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
disabled = db.Column(db.Boolean)
author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
post_id = db.Column(db.Integer, db.ForeignKey('posts.id'))
@staticmethod
def on_changed_body(target, value, oldvalue, initiator):
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'code', 'em', 'i',
'strong']
target.body_html = bleach.linkify(bleach.clean(
markdown(value, output_format='html'),
tags=allowed_tags, strip=True))
db.event.listen(Comment.body, 'set', Comment.on_changed_body)
# app/models/user.py:users 和posts 表与comments 表之间的一对多关系
class User(db.Model):
# ...
comments = db.relationship('Comment', backref='author', lazy='dynamic')
class Post(db.Model):
# ...
comments = db.relationship('Comment', backref='post', lazy='dynamic')
# app/templates/base.html:在导航条中加入管理评论链接
...
{% if current_user.can(Permission.MODERATE_COMMENTS) %}
<li><a href="{{ url_for('main.moderate') }}">Moderate Comments</a></li>
{% endif %}
...
# app/main/views.py:管理评论的路由
@main.route('/moderate')
@login_required
@permission_required(Permission.MODERATE_COMMENTS)
def moderate():
page = request.args.get('page', 1, type=int)
pagination = Comment.query.order_by(Comment.timestamp.desc()).paginate(page, per_page=current_app.config['FLASKY_COMMENTS_PER_PAGE'],error_out=False)
comments = pagination.items
return render_template('moderate.html',comments=comments,pagination=pagination, page=page)
# app/templates/moderate.html:评论管理页面的模板
{% extends "base.html" %}
{% import "_macros.html" as macros %}
{% block title %}Flasky - Comment Moderation{% endblock %}
{% block page_content %}
<div class="page-header">
<h1>Comment Moderation</h1>
</div>
{% set moderate = True %}
{% include '_comments.html' %}
{% if pagination %}
<div class="pagination">
{{ macros.pagination_widget(pagination, '.moderate') }}
</div>
{% endif %}
{% endblock %}
# app/templates/_comments.html:渲染评论的正文
...
<div class="comment-body">
{% if comment.disabled %}
<p></p><i>This comment has been disabled by a moderator.</i></p>
{% endif %}
{% if moderate or not comment.disabled %}
{% if comment.body_html %}
{{ comment.body_html | safe }}
{% else %}
{{ comment.body }}
{% endif %}
{% endif %}
</div>
{% if moderate %}
<br>
{% if comment.disabled %}
<a class="btn btn-default btn-xs" href="{{ url_for('.moderate_enable',
id=comment.id, page=page) }}">Enable</a>
{% else %}
<a class="btn btn-danger btn-xs" href="{{ url_for('.moderate_disable',
id=comment.id, page=page) }}">Disable</a>
{% endif %}
{% endif %}
...
# app/main/views.py:评论管理路由
@main.route('/moderate/enable/<int:id>')
@login_required
@permission_required(Permission.MODERATE_COMMENTS)
def moderate_enable(id):
comment = Comment.query.get_or_404(id)
comment.disabled = False
db.session.add(comment)
db.session.commit()
return redirect(url_for('.moderate',page=request.args.get('page', 1, type=int)))
@main.route('/moderate/disable/<int:id>')
@login_required
@permission_required(Permission.MODERATE_COMMENTS)
def moderate_disable(id):
comment = Comment.query.get_or_404(id)
comment.disabled = True
db.session.add(comment)
db.session.commit()
return redirect(url_for('.moderate',page=request.args.get('page', 1, type=int)))
修改后需要提交数据
有时候确实需要提供接口供其他应用程序使用,之前写过,估计不规范或者不安全。
客户端−服务器
客户端和服务器之间必须有明确的界线。
无状态
客户端发出的请求中必须包含所有必要的信息。服务器不能在两次请求之间保存客户端的任何状态。
缓存
服务器发出的响应可以标记为可缓存或不可缓存,这样出于优化目的,客户端(或客户端和服务器之间的中间服务)可以使用缓存。
接口统一
客户端访问服务器资源时使用的协议必须一致,定义良好,且已经标准化。REST Web服务最常使用的统一接口是HTTP 协议。
系统分层
在客户端和服务器之间可以按需插入代理服务器、缓存或网关,以提高性能、稳定性和伸缩性。
按需代码
客户端可以选择从服务器上下载代码,在客户端的环境中执行。
/api/posts/12345
表示)/api/posts/
,评论集合的URL可以是/api/comments/
)Flask请求:/api/posts/1234
可重定向到路由/api/posts/1234/
,反之不行
表14-1 REST架构API中使用的HTTP请求方法
请求方法 | 目标 | 说明 | HTTP状态码 |
---|---|---|---|
GET | 单个资源的URL | 获取目标资源 | 200 |
GET | 资源集合的URL | 获取资源的集合(如果服务器实现了分页,就是一页中的资源) | 200 |
POST | 资源集合的URL | 创建新资源,并将其加入目标集合。服务器为新资源指派URL,并在响应的Location 首部中返回 | 201 |
PUT | 单个资源的URL | 修改一个现有资源。如果客户端能为资源指派URL,还可用来创建新资源 | 200 |
DELETE | 单个资源的URL | 删除一个资源 | 200 |
DELETE | 资源集合的URL | 删除目标集合中的所有资源 | 200 |
一篇博客文章对应的资源可以使用如下的JSON 表示:
{
"url": "http://www.example.com/api/posts/12345",
"title": "Writing RESTful APIs in Python",
"author": "http://www.example.com/api/users/2",
"body": "... text of the article here ...",
"comments": "http://www.example.com/api/posts/12345/comments"
}
在设计良好的REST API中,客户端只需知道几个顶级资源的URL,其他资源的URL 则从响应中包含的链接上发掘。
/api/v1.0/posts/
//API 蓝本的结构
|-flasky
|-app/
|-api_1_0
|-__init__.py
|-users.py
|-posts.py
|-comments.py
|-authentication.py
|-errors.py
|-decorators.py
# app/api_1_0/__init__.py:API 蓝本的构造文件
from flask import Blueprint
api = Blueprint('api', __name__)
from . import authentication, posts, users, comments, errors
# app/_init_.py:注册API 蓝本
def create_app(config_name):
# ...
from .api_1_0 import api as api_1_0_blueprint
app.register_blueprint(api_1_0_blueprint, url_prefix='/api/v1.0')
# ...
表14-2 API返回的常见HTTP状态码
HTTP状态码 | 名 称 | 说 明 |
---|---|---|
200 | OK(成功) | 请求成功完成 |
201 | Created(已创建) | 请求成功完成并创建了一个新资源 |
400 | Bad request(坏请求) | 请求不可用或不一致 |
401 | Unauthorized(未授权) | 请求未包含认证信息 |
403 | Forbidden(禁止) | 请求中发送的认证密令无权访问目标 |
404 | Notfound(未找到) | URL 对应的资源不存在 |
405 | Method not allowed(不允许使用的方法) | 指定资源不支持请求使用的方法 |
500 | Internal server error(内部服务器错误) | 处理请求的过程中发生意外错误 |
让404/500/403错误兼容API请求
# app/main/errors.py:使用HTTP 内容协商处理错误
@main.app_errorhandler(404)
def page_not_found(e):
if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html:
response = jsonify({'error': 'not found'})
response.status_code = 404
return response
return render_template('404.html'), 404
API错误处理
# app/api_1_0/errors.py:API 蓝本中403 状态码的错误处理程序
def forbidden(message):
response = jsonify({'error': 'forbidden', 'message': message})
response.status_code = 403
return response
# app/api_1_0/authentication.py:初始化Flask-HTTPAuth
from flask.ext.httpauth import HTTPBasicAuth
auth = HTTPBasicAuth()
@auth.verify_password
def verify_password(email, password):
if email == '':
g.current_user = AnonymousUser()
return True
user = User.query.filter_by(email = email).first()
if not user:
return False
g.current_user = user
return user.verify_password(password)
# app/api_1_0/authentication.py:Flask-HTTPAuth 错误处理程序
@auth.error_handler
def auth_error():
return unauthorized('Invalid credentials')
# app/api_1_0/authentication.py:在before_request处理程序中进行认证
from .errors import forbidden_error
@api.before_request
@auth.login_required
def before_request():
if not g.current_user.is_anonymous and not g.current_user.confirmed:
return forbidden('Unconfirmed account')
# app/models.py:支持基于令牌的认证
class User(db.Model):
# ...
def generate_auth_token(self, expiration):
s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
return s.dumps({'id': self.id})
@staticmethod
def verify_auth_token(token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return None
return User.query.get(data['id'])
# app/api_1_0/authentication.py:支持令牌的改进验证回调
@auth.verify_password
def verify_password(email_or_token, password):
if email_or_token == '':
g.current_user = AnonymousUser()
return True
if password == '':
g.current_user = User.verify_auth_token(email_or_token)
g.token_used = True
return g.current_user is not None
user = User.query.filter_by(email=email_or_token).first()
if not user:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
# app/api_1_0/authentication.py:生成认证令牌
@api.route('/token')
def get_token():
if g.current_user.is_anonymous() or g.token_used:
return unauthorized('Invalid credentials')
return jsonify({'token': g.current_user.generate_auth_token(expiration=3600), 'expiration': 3600})
既然是无状态,那通过令牌的方式实现认证不就不破坏了无状态原则?
# app/models.py:把文章转换成JSON 格式的序列化字典
class Post(db.Model):
# ...
def to_json(self):
json_post = {
'url': url_for('api.get_post', id=self.id, _external=True),
'body': self.body,
'body_html': self.body_html,
'timestamp': self.timestamp,
'author': url_for('api.get_user', id=self.author_id, _external=True),
'comments': url_for('api.get_post_comments', id=self.id, _external=True)
'comment_count': self.comments.count()
}
return json_post
# app/models.py:把用户转换成JSON 格式的序列化字典
class User(UserMixin, db.Model):
# ...
def to_json(self):
json_user = {
'url': url_for('api.get_post', id=self.id, _external=True),
'username': self.username,
'member_since': self.member_since,
'last_seen': self.last_seen,
'posts': url_for('api.get_user_posts', id=self.id, _external=True),
'followed_posts':url_for('api.get_user_followed_posts',id=self.id, _external=True),
'post_count': self.posts.count()
}
return json_user
所有url_for()方法都指定了参数_external=True,这么做是为了生成完整的URL,而不是生成传统Web程序中经常使用的相对URL。
# app/api_1_0/posts.py:文章资源GET 请求的处理程序
@api.route('/posts/')
@auth.login_required
def get_posts():
posts = Post.query.all()
return jsonify({ 'posts': [post.to_json() for post in posts] })
@api.route('/posts/<int:id>')
@auth.login_required
def get_post(id):
post = Post.query.get_or_404(id)
return jsonify(post.to_json())
# app/api_1_0/posts.py:分页文章资源
@api.route('/posts/')
def get_posts():
page = request.args.get('page', 1, type=int)
pagination = Post.query.paginate(page,per_page=current_app.config['FLASKY_POSTS_PER_PAGE'],error_out=False)
posts = pagination.items
prev = None
if pagination.has_prev:
prev = url_for('api.get_posts', page=page-1, _external=True)
next = None
if pagination.has_next:
next = url_for('api.get_posts', page=page+1, _external=True)
return jsonify({
'posts': [post.to_json() for post in posts],
'prev': prev,
'next': next,
'count': pagination.total
})
(venv) $ pip install httpie
// 认证请求
(venv) $ http --json --auth <email>:<password> GET \
> http://127.0.0.1:5000/api/v1.0/posts
HTTP/1.0 200 OK
Content-Length: 7018
Content-Type: application/json
Date: Sun, 22 Dec 2013 08:11:24 GMT
Server: Werkzeug/0.9.4 Python/2.7.3
{
"posts": [
...
],
"prev": null
"next": "http://127.0.0.1:5000/api/v1.0/posts/?page=2",
"count": 150
}
// 匿名请求
(venv) $ http --json --auth : GET http://127.0.0.1:5000/api/v1.0/posts/
// 添加文章
(venv) $ http --auth <email>:<password> --json POST \
> http://127.0.0.1:5000/api/v1.0/posts/ \
> "body=I'm adding a post from the *command line*."
HTTP/1.0 201 CREATED
Content-Length: 360
Content-Type: application/json
Date: Sun, 22 Dec 2013 08:30:27 GMT
Location: http://127.0.0.1:5000/api/v1.0/posts/111
Server: Werkzeug/0.9.4 Python/2.7.3
{
"author": "http://127.0.0.1:5000/api/v1.0/users/1",
"body": "I'm adding a post from the *command line*.",
"body_html": "<p>I'm adding a post from the <em>command line</em>.</p>",
"comments": "http://127.0.0.1:5000/api/v1.0/posts/111/comments",
"comment_count": 0,
"timestamp": "Sun, 22 Dec 2013 08:30:27 GMT",
"url": "http://127.0.0.1:5000/api/v1.0/posts/111"
}
// 令牌认证
(venv) $ http --auth <email>:<password> --json GET \
> http://127.0.0.1:5000/api/v1.0/token
HTTP/1.0 200 OK
Content-Length: 162
Content-Type: application/json
Date: Sat, 04 Jan 2014 08:38:47 GMT
Server: Werkzeug/0.9.4 Python/3.3.3
{
"expiration": 3600,
"token": "eyJpYXQiOjEzODg4MjQ3MjcsImV4cCI6MTM4ODgyODMyNywiYWxnIjoiSFMy..."
}
// 令牌请求
(venv) $ http --json --auth eyJpYXQ...: GET http://127.0.0.1:5000/api/v1.0/posts/
(venv) $ pip install coverage
# manage.py:覆盖检测
#!/usr/bin/env python
import os
COV = None
if os.environ.get('FLASK_COVERAGE'):
import coverage
COV = coverage.coverage(branch=True, include='app/*')
COV.start()
# ...
@manager.command
def test(coverage=False):
"""Run the unit tests."""
if coverage and not os.environ.get('FLASK_COVERAGE'):
import sys
os.environ['FLASK_COVERAGE'] = '1'
os.execvp(sys.executable, [sys.executable] + sys.argv)
import unittest
tests = unittest.TestLoader().discover('tests')
unittest.TextTestRunner(verbosity=2).run(tests)
if COV:
COV.stop()
COV.save()
print('Coverage Summary:')
COV.report()
basedir = os.path.abspath(os.path.dirname(__file__))
covdir = os.path.join(basedir, 'tmp/coverage')
COV.html_report(directory=covdir)
print('HTML version: file://%s/index.html' % covdir)
COV.erase()
# ...
class FlaskClientTestCase(unittest.TestCase):
def setUp(self):
self.app = create_app('testing')
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
Role.insert_roles()
self.client = self.app.test_client(use_cookies=True)
def tearDown(self):
db.session.remove()
db.drop_all()
self.app_context.pop()
def test_home_page(self):
response = self.client.get(url_for('main.index'))
self.assertTrue(b'Stranger' in response.data)
def test_register_and_login(self):
# register a new account
response = self.client.post(url_for('auth.register'), data={
'email': 'john@example.com',
'username': 'john',
'password': 'cat',
'password2': 'cat'
})
self.assertTrue(response.status_code == 302)
# login with the new account
response = self.client.post(url_for('auth.login'), data={
'email': 'john@example.com',
'password': 'cat'
}, follow_redirects=True)
self.assertTrue(re.search(b'Hello,\s+john!', response.data))
self.assertTrue(
b'You have not confirmed your account yet' in response.data)
# send a confirmation token
user = User.query.filter_by(email='john@example.com').first()
token = user.generate_confirmation_token()
response = self.client.get(url_for('auth.confirm', token=token),
follow_redirects=True)
self.assertTrue(
b'You have confirmed your account' in response.data)
# log out
response = self.client.get(url_for('auth.logout'), follow_redirects=True)
self.assertTrue(b'You have been logged out' in response.data)
测试会检查响应的状态码是否为302,这个代码表示重定向。调用post()方法时指定了参数follow_redirects=True,让测试客户端和浏览器一样,自动向重定向的URL发起GET请求。指定这个参数后,返回的不是302状态码,而是请求重定向的URL返回的响应。
测试web服务
# tests/test_api.py:使用Flask 测试客户端测试REST API
class APITestCase(unittest.TestCase):
# ...
def get_api_headers(self, username, password):
return {
'Authorization':
'Basic ' + b64encode((username + ':' + password).encode('utf-8')).decode('utf-8'),
'Accept': 'application/json',
'Content-Type': 'application/json'
}
def test_no_auth(self):
response = self.client.get(url_for('api.get_posts'),
content_type='application/json')
self.assertTrue(response.status_code == 401)
def test_posts(self):
# 添加一个用户
r = Role.query.filter_by(name='User').first()
self.assertIsNotNone(r)
u = User(email='john@example.com',password='cat',confirmed=True,role=r)
db.session.add(u)
db.session.commit()
# 写一篇文章
response = self.client.post(url_for('api.new_post'), headers=self.get_auth_header('john@example.com', 'cat'), data=json.dumps({'body': 'body of the *blog* post'}))
self.assertTrue(response.status_code == 201)
url = response.headers.get('Location')
self.assertIsNotNone(url)
# 获取刚发布的文章
response = self.client.get(url, headers=self.get_auth_header('john@example.com', 'cat'))
self.assertTrue(response.status_code == 200)
json_response = json.loads(response.data.decode('utf-8'))
self.assertTrue(json_response['url'] == url)
self.assertTrue(json_response['body'] == 'body of the *blog* post')
self.assertTrue(json_response['body_html'] == '<p>body of the <em>blog</em> post</p>')
优雅的停止服务端
# app/main/views.py:关闭服务器的路由
@main.route('/shutdown')
def server_shutdown():
if not current_app.testing:
abort(404)
shutdown = request.environ.get('werkzeug.server.shutdown')
if not shutdown:
abort(500)
shutdown()
return 'Shutting down...'
class SeleniumTestCase(unittest.TestCase):
# ...
@classmethod
def setUpClass(cls):
# start Firefox
try:
cls.client = webdriver.Firefox()
except:
pass
# skip these tests if the browser could not be started
if cls.client:
# create the application
cls.app = create_app('testing')
cls.app_context = cls.app.app_context()
cls.app_context.push()
# suppress logging to keep unittest output clean
import logging
logger = logging.getLogger('werkzeug')
logger.setLevel("ERROR")
# create the database and populate with some fake data
db.create_all()
Role.insert_roles()
User.generate_fake(10)
Post.generate_fake(10)
# add an administrator user
admin_role = Role.query.filter_by(permissions=0xff).first()
admin = User(email='john@example.com',
username='john', password='cat',
role=admin_role, confirmed=True)
db.session.add(admin)
db.session.commit()
# start the Flask server in a thread
threading.Thread(target=cls.app.run).start()
# give the server a second to ensure it is up
time.sleep(1)
@classmethod
def tearDownClass(cls):
if cls.client:
# stop the flask server and the browser
cls.client.get('http://localhost:5000/shutdown')
cls.client.close()
# destroy database
db.drop_all()
db.session.remove()
# remove application context
cls.app_context.pop()
def setUp(self):
if not self.client:
self.skipTest('Web browser not available')
def tearDown(self):
pass
def test_admin_home_page(self):
# navigate to home page
self.client.get('http://localhost:5000/')
self.assertTrue(re.search('Hello,\s+Stranger!',
self.client.page_source))
# navigate to login page
self.client.find_element_by_link_text('Log In').click()
self.assertTrue('<h1>Login</h1>' in self.client.page_source)
# login
self.client.find_element_by_name('email').\
send_keys('john@example.com')
self.client.find_element_by_name('password').send_keys('cat')
self.client.find_element_by_name('submit').click()
self.assertTrue(re.search('Hello,\s+john!', self.client.page_source))
# navigate to the user's profile page
self.client.find_element_by_link_text('Profile').click()
self.assertTrue('<h1>john</h1>' in self.client.page_source)
# app/main/views.py:报告缓慢的数据库查询
from flask.ext.sqlalchemy import get_debug_queries
@main.after_app_request
def after_request(response):
for query in get_debug_queries():
if query.duration >= current_app.config['FLASKY_SLOW_DB_QUERY_TIME']:
current_app.logger.warning('Slow query: %s\nParameters: %s\nDuration: %fs\nContext: %s\n' % (query.statement, query.parameters, query.duration, query.context))
return response
表16-1 Flask-SQLAlchemy记录的查询信息
名 称 | 说 明 |
---|---|
statement | SQL 语句 |
parameters | SQL 语句使用的参数 |
start_time | 执行查询时的时间 |
end_time | 返回查询结果时的时间 |
duration | 查询持续的时间,单位为秒 |
context | 表示查询在源码中所处位置的字符串 |
# config.py:启用缓慢查询记录功能的配置
class Config:
# ...
SQLALCHEMY_RECORD_QUERIES = True
FLASKY_DB_QUERY_TIMEOUT = 0.5
# ...
# manage.py:在请求分析器的监视下运行程序
@manager.command
def profile(length=25, profile_dir=None):
"""Start the application under the code profiler."""
from werkzeug.contrib.profiler import ProfilerMiddleware
app.wsgi_app = ProfilerMiddleware(app.wsgi_app, restrictions=[length], profile_dir=profile_dir)
app.run()
# manage.py:部署命令
@manager.command
def deploy():
"""Run deployment tasks."""
from flask.ext.migrate import upgrade
from app.models import Role, User
# 把数据库迁移到最新修订版本
upgrade()
# 创建用户角色
Role.insert_roles()
# 让所有用户都关注自己
User.add_self_follows()
config.py:程序出错时发送电子邮件
class ProductionConfig(Config):
# ...
@classmethod
def init_app(cls, app):
Config.init_app(app)
# 把错误通过电子邮件发送给管理员
import logging
from logging.handlers import SMTPHandler
credentials = None
secure = None
if getattr(cls, 'MAIL_USERNAME', None) is not None:
credentials = (cls.MAIL_USERNAME, cls.MAIL_PASSWORD)
if getattr(cls, 'MAIL_USE_TLS', None):
secure = ()
mail_handler = SMTPHandler(mailhost=(cls.MAIL_SERVER, cls.MAIL_PORT), fromaddr=cls.FLASKY_MAIL_SENDER, toaddrs=[cls.FLASKY_ADMIN], subject=cls.FLASKY_MAIL_SUBJECT_PREFIX + ' Application Error', credentials=credentials, secure=secure)
mail_handler.setLevel(logging.ERROR)
app.logger.addHandler(mail_handler)
使用集成开发环境
PyCharm, PyDev, Python Tools For Visual Studio
配置Flask 程序在调试器中启动时,记得为runserver命令加入--passthrougherrors --no-reload选项。第一个选项禁用Flask对错误的缓存,这样处理请求过程中抛出的异常才会传到调试器中。第二个选项禁用重载模块,而这个模块会搅乱某些调试器。
Flask扩展