英文博客地址:http://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-viii-followers-contacts-and-friends
中文翻译地址:http://www.pythondoc.com/flask-mega-tutorial/followers.html
开源中国社区:http://www.oschina.net/translate/the-flask-mega-tutorial-part-viii-followers-contacts-and-friends
一、“关注”功能
1.某一个用户想要知道都有谁关注了TA
2.某一个用户想要知道TA关注了谁
3.用户点击任何用户的信息页上一个 “关注” 的链接就开始关注这个用户。点击 “取消关注” 链接将会停止关注这个用户。
4.对于一个给定的用户,我们能够容易地查询数据库获取用户的被关注者的所有 blog
二、数据库内表示关注者和被关注者
对此我们需要一种多对多的关系,因为一个用户可以关注多个其他的用户,同样一个用户可以被其他多个用户关注。
多对多关系的图:
followers 表示我们的关联表。外键都是来自于用户表中,因为我们是用户连接到用户。在这个表中的每一个记录都是表示关注的用户以及被关注的用户的连接。
三、数据模型
根据多对多关系图,首先开始添加 followers 表(文件 app/models.py):
followers = db.Table(‘followers‘, db.Column(‘follower_id‘, db.Integer, db.ForeignKey(‘user.id‘)), db.Column(‘followed_id‘, db.Integer, db.ForeignKey(‘user.id‘)) )
这是对上面图表上的关系表的直接翻译。注意我们并没有像对 users 和 posts 一样把它声明为一个模式。因为这是一个辅助表,我们使用 flask-sqlalchemy 中的低级的 APIs 来创建,没有使用关联模式。
接着我们在 users 表中定义一个多对多的关系:
class User(db.Model): id = db.Column(db.Integer, primary_key=True) nickname = db.Column(db.String(64), index=True, unique=True) email = db.Column(db.String(120), index=True, unique=True) posts = db.relationship(‘Post‘, backref=‘author‘, lazy=‘dynamic‘) about_me = db.Column(db.String(140)) last_seen = db.Column(db.DateTime) followed = db.relationship(‘User‘, secondary=followers, primaryjoin=(followers.c.follower_id == id), secondaryjoin=(followers.c.followed_id == id), backref=db.backref(‘followers‘, lazy=‘dynamic‘), lazy=‘dynamic‘)
像我们在前面章节设置一对多关系一样,我们使用了 db.relationship 函数来定义关系。我们连接 User 实例到其它 User 实例,在这种关系下连接的一对用户,左边的用户关注着右边的用户。
因为我们定义左边的用户为 followed,当我们从左边用户查询这种关系的时候,我们将会得到被关注用户的列表。
db.relationship() 中的所有参数:
- ‘User’是这种关系类型的右侧实体(左侧实体是父类)。因为定义一个自我指向的关系,我们在两边使用同样的类。
- secondary 指明了用于这种关系的辅助表。
- primaryjoin 表明通过association表映射到左侧实体(粉丝)的条件。注意因为 followers 表不是一个模式,获得字段名的语法有些怪异。
- secondaryjoin 表示辅助表中连接右边实体(被关注的用户)的条件。
- backref 定义这种关系将如何从右边实体进行访问。当我们做出一个名为 followed 的查询的时候,将会返回所有跟左边实体联系的右边的用户。当我们做出一个名为 followers 的查询的时候,将会返回一个所有跟右边联系的左边的用户。lazy 指明了查询的模式。dynamic 模式表示直到有特定的请求才会运行查询,这是对性能有很好的考虑。
- lazy 是与 backref 中的同样名称的参数作用是类似的,但是这个是应用于常规查询。
由于我们升级了数据库,所以现在我们需要产生一个新的迁移:
./db_migrate.py
备注:需要把 followers 放到 User 的前面(反正我增加到最后是不行的),否则会报错(name ‘followers‘ is not defined):
成功后是:
四、添加和移除 ‘关注者’
为了增强可重用性,我们将在User模型里实现follow和unfollow方法而不是在view里直接实现它。
这种方法我们能在真实的应用中使用(通过view方法调用它)而且对我们的单元测试有好处。
原则上,从视图函数中移除应用程序的逻辑到数据模型中是一种好的方式。你们必须要保证视图函数尽可能简单,因为它能难被自动化测试。
下面是添加了添加和移除 ‘关注者’ 功能的 User 模型(文件 app/models.py):
class User(db.Model): #... def follow(self, user): if not self.is_following(user): self.followed.append(user) return self def unfollow(self, user): if self.is_following(user): self.followed.remove(user) return self def is_following(self, user): return self.followed.filter(followers.c.followed_id == user.id).count() > 0
上面这些方法是很简单了,多亏了 sqlalchemy 在底层做了很多的工作。我们只是从 followed 关系中添加或者移除了表项,sqlalchemy 为我们管理辅助表。
follow和unfollow方法要在执行成功时返回一个user对象,失败的时候返回None,user对象会被添加到数据库会话,并提交。
is_following方法只有一行代码却做了很多事情。我们递交获取followed关系的请求,它返回所有以我们的user为follower的(follower, followed)元组, 以followed user为条件进行过滤。上述方法是可行的,因为followed关系有一个懒惰模式的动态属性,我们得到的是请求执行之前请求对象而不是请求的结果。
过滤操作返回的是更改后、依然没有被执行的请求。当我们在这条请求上调用count()方法时,请求才会被执行并返回查询到的记录数目。若返回的记录数目是1,这俩用户之间已经存在联系;相反,则没有联系。
五、测试
让我们编写单元测试框架来检验目前我们已经写好的代码(文件 tests.py):
class TestCase(unittest.TestCase): #... def test_follow(self): u1 = User(nickname=‘john‘, email=‘john@example.com‘) u2 = User(nickname=‘susan‘, email=‘susan@example.com‘) db.session.add(u1) db.session.add(u2) db.session.commit() assert u1.unfollow(u2) is None u = u1.follow(u2) db.session.add(u) db.session.commit() assert u1.follow(u2) is None assert u1.is_following(u2) assert u1.followed.count() == 1 assert u1.followed.first().nickname == ‘susan‘ assert u2.followers.count() == 1 assert u2.followers.first().nickname == ‘john‘ u = u1.unfollow(u2) assert u is not None db.session.add(u) db.session.commit() assert not u1.is_following(u2) assert u1.followed.count() == 0 assert u2.followers.count() == 0
执行测试:
./tests.py
六、数据库查询
我们目前的数据库模型支持在开始罗列的大多数需求,缺少的部分也是最难的部分。索引页需要显示被当前登录用户follow的人所发布的所有帖子,因此我们需要这样一个请求能返回这些帖子。
数据库有索引,因此允许以一种高效地方式去查询以及排序。
所以我们真正想要的是要拿出一个单一的数据库查询,表示我们想要得到什么样的信息,然后我们让数据库弄清楚什么是最有效的方式来为我们获取数据。
下面这种查询可以实现上述的要求,这个单行的代码又被我们添加到 User 模型(文件 app/models.py):
class User(db.Model): #... def followed_posts(self): return Post.query.join(followers, (followers.c.followed_id == Post.user_id)).filter(followers.c.follower_id == self.id).order_by(Post.timestamp.desc())
上面这个查询分为三部分:
1. join。
Post.query.join(followers, (followers.c.followed_id == Post.user_id))
学过数据库的都能理解,join操作实现的是创建一个临时表,数据根据指定条件从Post和followers表合并得到。在这个例子中,我们使用followers的follower_id字段和Post的user_id字段做匹配。
2.过滤。
filter(followers.c.follower_id == self.id)
就是按要求返数据。连接操作给我们被某人关注的用户的 blog 的列表,但是没有指出谁是关注者。我们仅仅对这个列表的子集感兴趣,我们只需要被某一特定用户关注的用户的 blog 列表。
3.排序。
order_by(Post.timestamp.desc())
在这里,我们要说的结果应该按照 timestamp 字段按降序排列,这样的第一个结果将是最近的 blog。
这里还有一个小问题。当用户阅读他们关注者的 blog 的时候,他们可能也想看到自己的 blog。因此最好把用户自己的 blog 也包含进查询结果中。
其实这不需要做任何改变。我们只需要把自己添加为自己的关注者。
七、测试查询
为我们查询写些单元测试(文件 tests.py):
#... from datetime import datetime, timedelta from app.models import User, Post #... class TestCase(unittest.TestCase): #... def test_follow_posts(self): # make four users u1 = User(nickname=‘john‘, email=‘john@example.com‘) u2 = User(nickname=‘susan‘, email=‘susan@example.com‘) u3 = User(nickname=‘mary‘, email=‘mary@example.com‘) u4 = User(nickname=‘david‘, email=‘david@example.com‘) db.session.add(u1) db.session.add(u2) db.session.add(u3) db.session.add(u4) # make four posts utcnow = datetime.utcnow() p1 = Post(body="post from john", author=u1, timestamp=utcnow + timedelta(seconds=1)) p2 = Post(body="post from susan", author=u2, timestamp=utcnow + timedelta(seconds=2)) p3 = Post(body="post from mary", author=u3, timestamp=utcnow + timedelta(seconds=3)) p4 = Post(body="post from david", author=u4, timestamp=utcnow + timedelta(seconds=4)) db.session.add(p1) db.session.add(p2) db.session.add(p3) db.session.add(p4) db.session.commit() # setup the followers u1.follow(u1) # john follows himself u1.follow(u2) # john follows susan u1.follow(u4) # john follows david u2.follow(u2) # susan follows herself u2.follow(u3) # susan follows mary u3.follow(u3) # mary follows herself u3.follow(u4) # mary follows david u4.follow(u4) # david follows himself db.session.add(u1) db.session.add(u2) db.session.add(u3) db.session.add(u4) db.session.commit() # check the followed posts of each user f1 = u1.followed_posts().all() f2 = u2.followed_posts().all() f3 = u3.followed_posts().all() f4 = u4.followed_posts().all() assert len(f1) == 3 assert len(f2) == 2 assert len(f3) == 2 assert len(f4) == 1 assert f1 == [p4, p2, p1] assert f2 == [p3, p2] assert f3 == [p4, p3] assert f4 == [p4]
这个测试有一大部分是配置代码,实际上测试非常短小。我们先检查下每个用户关注帖子的返回数量和预期的是否一致,接着检查下返回的帖子是否正确,帖子的顺序是否和预期一样(注意我们按照时间顺序排列帖子,保证我们总是使用相同的排序方式)。
注意followed_posts()方法的用法,这个方法返回一个查询对象,而不是结果集,这就如同lazy=‘‘dynamic‘的关系。通常来说返回一个查询对象而不是结果集是一个很好的点子,因为这给调用者在执行查询前提供了添加更多查询条件的机会。
在查询对象中有几个方法触发查询的执行。我们看到的count()运行查询时返回的是一个结果集的数量(不关注实际结果的内容);我们也经常使用first()来返回结果集的第一条记录,在这个测试中我们使用了all()方法来获取所有结果集的一个数组。
八、在应用程序中应用
上面那些配置好了,但还没有应用到我们的应用程序中。下面继续。
1.做自己的关注者
在 after_login 中处理 OpenID 的时候就设置自己成为自己的关注者(文件 app/views.py):
@oid.after_login def after_login(resp): if resp.email is None or resp.email == "": flash(‘Invalid login. Please try again.‘) return redirect(url_for(‘login‘)) user = User.query.filter_by(email=resp.email).first() if user is None: nickname = resp.nickname if nickname is None or nickname == "": nickname = resp.email.split(‘@‘)[0] nickname = User.make_unique_nickname(nickname) user = User(nickname=nickname, email=resp.email) db.session.add(user) db.session.commit() # make the user follow him/herself db.session.add(user.follow(user)) db.session.commit() remember_me = False if ‘remember_me‘ in session: remember_me = session[‘remember_me‘] session.pop(‘remember_me‘, None) login_user(user, remember=remember_me) return redirect(request.args.get(‘next‘) or url_for(‘index‘))
2.关注以及取消关注的链接
定义关注以及取消关注用户的视图函数(文件 app/views.py):
@app.route(‘/follow/<nickname>‘) @login_required def follow(nickname): user = User.query.filter_by(nickname=nickname).first() if user is None: flash(‘User %s not found.‘ % nickname) return redirect(url_for(‘index‘)) if user == g.user: flash(‘You can\‘t follow yourself!‘) return redirect(url_for(‘user‘, nickname=nickname)) u = g.user.follow(user) if u is None: flash(‘Cannot follow ‘ + nickname + ‘.‘) return redirect(url_for(‘user‘, nickname=nickname)) db.session.add(u) db.session.commit() flash(‘You are now following ‘ + nickname + ‘!‘) return redirect(url_for(‘user‘, nickname=nickname)) @app.route(‘/unfollow/<nickname>‘) @login_required def unfollow(nickname): user = User.query.filter_by(nickname=nickname).first() if user is None: flash(‘User %s not found.‘ % nickname) return redirect(url_for(‘index‘)) if user == g.user: flash(‘You can\‘t unfollow yourself!‘) return redirect(url_for(‘user‘, nickname=nickname)) u = g.user.unfollow(user) if u is None: flash(‘Cannot unfollow ‘ + nickname + ‘.‘) return redirect(url_for(‘user‘, nickname=nickname)) db.session.add(u) db.session.commit() flash(‘You have stopped following ‘ + nickname + ‘.‘) return redirect(url_for(‘user‘, nickname=nickname))
注意的是检查周围的错误,为了防止期望之外的错误,试着给用户提供信息并且重定向到合适的位置当错误发生的时候。
最后需要修改下模版(文件 app/templates/user.html):
<!-- extend base layout --> {% extends "base.html" %} {% block content %} <table> <tr valign="top"> <td><img src="{{ user.avatar(128) }}"></td> <td> <h1>User: {{ user.nickname }}</h1> {% if user.about_me %}<p>{{ user.about_me }}</p>{% endif %} {% if user.last_seen %}<p><i>Last seen on: {{ user.last_seen }}</i></p>{% endif %} <p>{{ user.followers.count() }} followers | {% if user.id == g.user.id %} <a href="{{ url_for(‘edit‘) }}">Edit your profile</a> {% elif not g.user.is_following(user) %} <a href="{{ url_for(‘follow‘, nickname=user.nickname) }}">Follow</a> {% else %} <a href="{{ url_for(‘unfollow‘, nickname=user.nickname) }}">Unfollow</a> {% endif %} </p> </td> </tr> </table> <hr> {% for post in posts %} {% include ‘post.html‘ %} {% endfor %} {% endblock %}