tieba_spider.py:
# -*- coding: utf-8 -*-
import scrapy
import json
from tieba.items import ThreadItem, PostItem, CommentItem
import helper
import time
class TiebaSpider(scrapy.Spider):
name = "tieba"
cur_page = 1 #modified by pipelines (open_spider)
end_page = 9999
filter = None
see_lz = False
def parse(self, response): #forum parser
for sel in response.xpath(‘//li[contains(@class, "j_thread_list")]‘):
data = json.loads(sel.xpath(‘@data-field‘).extract_first())
item = ThreadItem()
item[‘id‘] = data[‘id‘]
item[‘author‘] = data[‘author_name‘]
item[‘reply_num‘] = data[‘reply_num‘]
item[‘good‘] = data[‘is_good‘]
if not item[‘good‘]:
item[‘good‘] = False
item[‘title‘] = sel.xpath(‘.//div[contains(@class, "threadlist_title")]/a/text()‘).extract_first()
if self.filter and not self.filter(item["id"], item["title"], item[‘author‘], item[‘reply_num‘], item[‘good‘]):
continue
#filter过滤掉的帖子及其回复均不存入数据库
yield item
meta = {‘thread_id‘: data[‘id‘], ‘page‘: 1}
url = ‘http://tieba.baidu.com/p/%d‘ % data[‘id‘]
if self.see_lz:
url += ‘?see_lz=1‘
yield scrapy.Request(url, callback = self.parse_post, meta = meta)
next_page = response.xpath(‘//a[@class="next pagination-item "]/@href‘)
self.cur_page += 1
if next_page:
if self.cur_page <= self.end_page:
yield self.make_requests_from_url(next_page.extract_first())
def parse_post(self, response):
meta = response.meta
has_comment = False
for floor in response.xpath("//div[contains(@class, ‘l_post‘)]"):
if not helper.is_ad(floor):
data = json.loads(floor.xpath("@data-field").extract_first())
item = PostItem()
item[‘id‘] = data[‘content‘][‘post_id‘]
item[‘author‘] = data[‘author‘][‘user_name‘]
item[‘comment_num‘] = data[‘content‘][‘comment_num‘]
if item[‘comment_num‘] > 0:
has_comment = True
content = floor.xpath(".//div[contains(@class,‘j_d_post_content‘)]").extract_first()
#以前的帖子, data-field里面没有content
item[‘content‘] = helper.parse_content(content, True)
#以前的帖子, data-field里面没有thread_id
item[‘thread_id‘] = meta[‘thread_id‘]
item[‘floor‘] = data[‘content‘][‘post_no‘]
#只有以前的帖子, data-field里面才有date
if ‘time‘ in data[‘content‘].keys():
item[‘time‘] = data[‘content‘][‘date‘]
#只有以前的帖子, data-field里面才有date
else:
item[‘time‘] = floor.xpath(".//span[@class=‘tail-info‘]")\
.re_first(r‘[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}‘)
yield item
if has_comment:
url = "http://tieba.baidu.com/p/totalComment?tid=%d&fid=1&pn=%d" % (meta[‘thread_id‘], meta[‘page‘])
if self.see_lz:
url += ‘&see_lz=1‘
yield scrapy.Request(url, callback = self.parse_comment, meta = meta)
next_page = response.xpath(u".//ul[@class=‘l_posts_num‘]//a[text()=‘下一页‘]/@href")
if next_page:
meta[‘page‘] += 1
url = response.urljoin(next_page.extract_first())
yield scrapy.Request(url, callback = self.parse_post, meta = meta)
def parse_comment(self, response):
comment_list = json.loads(response.body)[‘data‘][‘comment_list‘]
for value in comment_list.values():
comments = value[‘comment_info‘]
for comment in comments:
item = CommentItem()
item[‘id‘] = comment[‘comment_id‘]
item[‘author‘] = comment[‘username‘]
item[‘post_id‘] = comment[‘post_id‘]
item[‘content‘] = helper.parse_content(comment[‘content‘], False)
item[‘time‘] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(comment[‘now_time‘]))
yield item
helper.py:
# -*- coding: utf-8 -*-
import re
import urllib2
from bs4 import BeautifulSoup
import emotion
def is_ad(s): #判断楼层是否为广告
ad = s.xpath(u".//span[contains(text(), ‘广告‘)]")
# 广告楼层中间有个span含有广告俩字
return ad
def parse_content(content, is_post):
if not content or not content.strip():
return None
content = content.replace(‘\r‘, ‘\n‘) #古老的帖子会出现奇怪的\r
s = BeautifulSoup(content, ‘lxml‘)
if is_post:
s = s.div #post 外层有个div
l = list(s.children)
for i in range(len(l)):
parse_func = (is_str, is_br, is_img, is_video, other_case)
for func in parse_func:
try:
ret = func(l[i])
except:
continue
if ret is not False:
l[i] = ret
break
return strip_blank(‘‘.join(l))
def strip_blank(s): #按个人喜好去掉空白字符
s = re.sub(r‘\n[ \t]+\n‘, ‘\n‘, s)
s = re.sub(r‘ +‘, ‘ ‘, s) #去掉多余的空格
s = re.sub(r‘\n\n\n+‘, ‘\n\n‘, s) #去掉过多的连续换行
return s.strip()
def is_str(s):
if s.name:
return False
#NavigableString类型需要手动转换下
return unicode(s)
def is_br(s):
if s.name == ‘br‘:
return ‘\n‘
return False
def is_img(s):
# 处理了部分表情
if s.name == ‘img‘:
src = unicode(s.get(‘src‘))
return emotion.get_text(src)
return False
def is_video(s):
t = unicode(s.get(‘class‘))
if ‘video‘ in t:
url = s.find(‘a‘).get(‘href‘)
return ‘ ‘ + getJumpUrl(url) + ‘ ‘
return False
#bs带的get_text功能,很好很强大
#粗体红字之类的都一句话搞定了
def other_case(s):
return s.get_text()