Java微博爬虫-每日百万数据

没写过爬虫,赶鸭子上架,公司里有个老代码,我就拿来参考,边看边写3周后,把整个代码大换血,实现了单日单程序百万的爬取量。

使用springboot + JDK1.8 + mysql + redis。

主要有 关键词爬取、redis队列、多线程爬取程序 三部分。

一、关键词的爬取:

我用的是在微博搜索栏输入关键词的方法爬取数据,所以关键词的热度、新鲜度很重要。

我在百度、搜狗、微博这几个网站的热搜榜每隔40秒抓取一次实时的热词。

第一步,找热词质量高的网站。

# 百度热搜网址
baidu.hotnews = http://top.baidu.com/buzz?b=1&fr=topnews
baidu.topcategory = http://top.baidu.com/buzz?b=2&c=12&fr=topcategory_c12
baidu.oneday.hotbuzz = http://top.baidu.com/buzz?b=341&fr=topbuzz_b1
baidu.oneday.lifehot = http://top.baidu.com/buzz?b=342&c=513&fr=topbuzz_b344_c513

 # 微博热搜网址
weibo.realtimehot = https://s.weibo.com/top/summary?cate=realtimehot
weibo.realtime = https://weibo.com/a/hot/realtime

# 搜狗热搜网址
sogou.hotTop1 = http://top.sogou.com/hot/shishi_1.html
sogou.hotTop2 = http://top.sogou.com/hot/shishi_2.html
sogou.hotTop3 = http://top.sogou.com/hot/shishi_3.html

# 360热搜网址
360.hotlist.star = https://trends.so.com/top/list?cate1=%E4%BA%BA%E7%89%A9&cate2=%E6%98%8E%E6%98%9F&page=1&size=100
360.hotlist.netstar = https://trends.so.com/top/list?cate1=%E4%BA%BA%E7%89%A9&cate2=%E7%BD%91%E7%BA%A2&page=1&size=100
360.hotlist.famous = https://trends.so.com/top/list?cate1=%E4%BA%BA%E7%89%A9&cate2=%E5%90%8D%E5%AE%B6&page=1&size=100
360.hotlist.website = https://trends.so.com/top/list?cate1=%E7%BD%91%E7%AB%99&cate2=&page=1&size=100
360.hotlist.ip = https://trends.so.com/top/list?cate1=IP&cate2=&page=1&size=100
360.hotlist.ai = https://trends.so.com/top/list?cate1=%E6%99%BA%E8%83%BD%E7%BB%88%E7%AB%AF&cate2=%E6%89%8B%E6%9C%BA&page=10&size=100

360.hotlist.car = https://trends.so.com/top/list?cate1=%E6%B1%BD%E8%BD%A6&cate2=&page=11&size=100
360.hotlist.live = https://trends.so.com/top/list?cate1=%E7%9B%B4%E6%92%AD&cate2=%E4%B8%BB%E6%92%AD&page=8&size=80
360.hotlist.livesite = https://trends.so.com/top/list?cate1=%E7%9B%B4%E6%92%AD&cate2=%E7%9B%B4%E6%92%AD%E5%B9%B3%E5%8F%B0&page=6&size=60
360.hotlist.drink = https://trends.so.com/top/list?cate1=%E9%85%92%E7%B1%BB&cate2=&page=1&size=40
360.hotlist.carton = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E5%8A%A8%E6%BC%AB&page=1&size=100
360.hotlist.sports = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E4%BD%93%E8%82%B2&page=1&size=100
360.hotlist.music = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E9%9F%B3%E4%B9%90&page=1&size=100

360.hotlist.movie = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E7%94%B5%E5%BD%B1&page=8&size=100
360.hotlist.tv = https://trends.so.com/top/list?cate1=%E9%85%92%E7%B1%BB&cate2=&page=6&size=100
360.hotlist.fun = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E7%94%B5%E8%A7%86%E5%89%A7&page=6&size=100
360.hotlist.novel = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E5%B0%8F%E8%AF%B4&page=1&size=100
360.hotlist.game = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E6%B8%B8%E6%88%8F&page=6&size=100
360.hotlist.cosmetics = https://trends.so.com/top/list?cate1=%E5%8C%96%E5%A6%86%E5%93%81&cate2=&page=4&size=40
360.hotlist.luxury = https://trends.so.com/top/list?cate1=%E5%A5%A2%E4%BE%88%E5%93%81&cate2=&page=3&size=30

(附上爬取关键词网页,这些热词的质量极高)

第二步,热词爬取:

以微博热搜榜的爬取为例。

        String str= "https://s.weibo.com/top/summary?cate=realtimehot";//网页链接
        HotListSearch hotListSearch = new HotListSearch();//创建爬取热词对象
        List<Keywords> keywords = hotListSearch.queryWeibo(str);//使用爬取微博方法
        int i =1;
        for(Keywords key:keywords){
            System.out.println("No."+i+"==========="+key.toString());
            i++;
        }//将爬取的结果封装成java对象
HotListSearch.class
public class HotListSearch {

    public HotListSearch() {
        this(null);
    }

    private HttpProxy proxy;

    public HotListSearch(HttpProxy proxy) {
        this.proxy = proxy;
    }

    /*
    * 微博
    * */
    public List<Keywords> queryWeibo(String url) {
        Connect connect = new Connect();
        String html = connect.get(url, proxy);
        String str = "div[class=data] tbody tr";//jsoup需抓取的css标识
        List<Keywords> keywords = parseWeibo(html,str);//解析html为需要的集合
        return keywords;
    }

    /*
     * 解析 HTML变集合
     * */
    private List<Keywords> parseWeibo(String html,String str) {
        if (html == null || html.isEmpty())
            return null;

        Document doc = Jsoup.parse(html);//解析html为java对象
        Elements list = doc.select(str);//根据css标识把Document分为集合
        if (list == null || list.isEmpty())
            return null;

        List<Keywords> keywords = new ArrayList<>();
        for (int i = 0, len = list.size(); i < len; i++) {
            try {
                HotSearchElementParser parser = new HotSearchElementParser();//解析list中每一个元素的工具,变为java对象
                Keywords key = parser.parseSearchWeibo(list.get(i));//将元素变为关键词对象
                if(key!=null) keywords.add(key);
            } catch (Exception e) {
                e.getMessage();
            }
        }

        return keywords;
    }
}
HotSearchElementParser.class
public class HotSearchElementParser {
    public Keywords parseSearchWeibo(Element item) throws ParseException{
        Keywords keywords=parseSearch();
        String querystr=item.select("td[class=td-02] a").text();//获取热词
        if(querystr==null||querystr.isEmpty()){
            return null;
        }
        keywords.setQuerystr(querystr);
        return keywords;
    }
}
Keywords.class

/**
 * 下载关键词
 *
 */
public class Keywords implements Serializable {

    private static final long serialVersionUID = 1L;

    private int id;
    private String querystr;
    private String region; // keywords region
    private String nodup; // keywords nodup

    private int status; // 状态,1:正在下载、2:暂停下载

    private long next; // 下一次加载
    private String growth; // 最近 5 次下载数量
    private long lastDownloadTime; // 最后下载时间

    private int total; // total downloads
    private int amount; // amount of downloads

    private String updateDate;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getQuerystr() {
        return querystr;
    }

    public void setQuerystr(String querystr) {
        this.querystr = querystr;
    }

    public String getRegion() {
        return region;
    }

    public void setRegion(String region) {
        this.region = region;
    }

    public String getNodup() {
        return nodup;
    }

    public void setNodup(String nodup) {
        this.nodup = nodup;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public long getNext() {
        return next;
    }

    public void setNext(long next) {
        this.next = next;
    }

    public String getGrowth() {
        return growth;
    }

    public void setGrowth(String growth) {
        this.growth = growth;
    }

    public long getLastDownloadTime() {
        return lastDownloadTime;
    }

    public void setLastDownloadTime(long lastDownloadTime) {
        this.lastDownloadTime = lastDownloadTime;
    }

    public int getTotal() {
        return total;
    }

    public void setTotal(int total) {
        this.total = total;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getUpdateDate() {
        return updateDate;
    }

    public void setUpdateDate(String updateDate) {
        this.updateDate = updateDate;
    }

    @Override
    public String toString() {
        return "Keywords{" +
                "id=" + id +
                ", querystr=‘" + querystr + ‘\‘‘ +
                ", region=‘" + region + ‘\‘‘ +
                ", nodup=‘" + nodup + ‘\‘‘ +
                ", status=" + status +
                ", next=" + next +
                ", growth=‘" + growth + ‘\‘‘ +
                ", lastDownloadTime=" + lastDownloadTime +
                ", total=" + total +
                ", amount=" + amount +
                ", updateDate=" + updateDate +
                ‘}‘;
    }
}

Connect.class

package com.cnxunao.common.utils;

import com.cnxunao.weibospider.entities.HttpProxy;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.util.Random;

public class Connect {

    private static Logger logger = LoggerFactory.getLogger(Connect.class);

    public String get(String url) {
        return get(url, null);
    }

    public String get(String url, HttpProxy proxy) {
        try (CloseableHttpClient httpclient = HttpClients.custom().setUserAgent(this.userAgent).build()) {
            HttpGet request = new HttpGet(url.trim());
            HttpContext context = createContext(proxy);
            try (CloseableHttpResponse response = httpclient.execute(request, context)) {
                return EntityUtils.toString(response.getEntity(), charset);
            }
        } catch (Exception e) {
            e.printStackTrace();
             throw new IllegalArgumentException("timeout");

        }
    }
    public String getKeyword(String targetUrl, HttpProxy proxy) {
        String proxyHost = proxy.getHost();
        int proxyPort = proxy.getPort();
        Proxy.Type proxyType = Proxy.Type.SOCKS;
        try {
            InetSocketAddress addr = new InetSocketAddress(proxyHost, proxyPort);
            Proxy Httpproxy = new Proxy(proxyType, addr);
            URL url = new URL(targetUrl);
            URLConnection conn = url.openConnection(Httpproxy);
            InputStream in = conn.getInputStream();
            return IO2String(in);
        } catch (Exception e) {
            e.printStackTrace();
            throw new IllegalArgumentException("timeout");
        }

    }

    public String get(String url, HttpProxy proxy, int reconnectionTimes) {
        if (reconnectionTimes < 2)
            return get(url, proxy);

        if (reconnectionTimes > 5)
            throw new IllegalArgumentException("Too many reconnection");

        String html = null;
        for (int i = 0; i < reconnectionTimes; i++) {
            try {
                html = get(url, proxy);
                break;
            } catch (Exception e) {
                logger.error("reconnection: {}", url);

                try {
                    Thread.sleep(1_500L);
                } catch (InterruptedException e1) {

                }
            }
        }

        if (html == null)
            throw new IllegalArgumentException("timeout");

        return html;
    }

    private HttpContext createContext(HttpProxy proxy) {
        HttpClientContext context = HttpClientContext.create();

        Builder builder = RequestConfig.custom().setConnectTimeout(timeout).setSocketTimeout(timeout);
        if (proxy != null && StringUtils.isNotEmpty(proxy.getHost())) {
            builder.setProxy(new HttpHost(proxy.getHost(), proxy.getPort()));

            if (StringUtils.isNotEmpty(proxy.getUsername()) && StringUtils.isNotEmpty(proxy.getPassword())) {
                CredentialsProvider credsProvider = new BasicCredentialsProvider();
                credsProvider.setCredentials(new AuthScope(proxy.getHost(), proxy.getPort()),
                        new UsernamePasswordCredentials(proxy.getUsername(), proxy.getPassword()));
                context.setCredentialsProvider(credsProvider);
            }
        }
        RequestConfig config = builder.build();
        context.setRequestConfig(config);
        return context;
    }
    private static Random random = new Random();
//    private String userAgent = "Opera/9.27 (Windows NT 5.2; U; zh-cn)";
    private String userAgent = userAgents[random.nextInt(14)];

    public void setUserAgent(String userAgent) {
        this.userAgent = userAgent;
    }

    private String charset = "UTF-8";

    public void setCharset(String charset) {
        this.charset = charset;
    }

    private int timeout = 15_000;

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public static String IO2String(InputStream inStream) throws IOException {
        ByteArrayOutputStream result = new ByteArrayOutputStream();
        byte[] buffer = new byte[1024];
        int len;
        while ((len = inStream.read(buffer)) != -1) {
            result.write(buffer, 0, len);
        }
        String str = result.toString(StandardCharsets.UTF_8.name());
        return str;
    }
    //user_Agent池
    private static String[] userAgents = {
            "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.89 Safari/537.36",
            "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1",
            "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:6.0) Gecko/20100101 Firefox/6.0",
            "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50",
            "Opera/9.80 (Windows NT 6.1; U; zh-cn) Presto/2.9.168 Version/11.50",
            "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 2.0.50727; SLCC2; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.3; .NET4.0C; Tablet PC 2.0; .NET4.0E)",
            "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; InfoPath.3)",
            "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; GTB7.0)",
            "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)",
            "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)",
            "Mozilla/5.0 (Windows; U; Windows NT 6.1; ) AppleWebKit/534.12 (KHTML, like Gecko) Maxthon/3.0 Safari/534.12",
            "Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/534.3 (KHTML, like Gecko) Chrome/6.0.472.33 Safari/534.3 SE 2.X MetaSr 1.0",
            "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/13.0.782.41 Safari/535.1 QQBrowser/6.9.11079.201",
            "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)"
    };

}

HttpResponse.class

package com.cnxunao.weibospider.utils;
import java.util.Vector;
public class HttpResponse {
    String urlString;
    int defaultPort;
    String file;
    String host;
    String path;
    int port;
    String protocol;
    String query;
    String ref;
    String userInfo;
    String contentEncoding;
    int contentLength;
    String content;
    String contentType;
    int code;
    String message;
    String method;

    int connectTimeout;

    int readTimeout;

    Vector<String> contentCollection;

    public String getContent() {
        return content;
    }

    public String getContentType() {
        return contentType;
    }

    public int getCode() {
        return code;
    }

    public String getMessage() {
        return message;
    }

    public Vector<String> getContentCollection() {
        return contentCollection;
    }

    public String getContentEncoding() {
        return contentEncoding;
    }

    public String getMethod() {
        return method;
    }

    public int getConnectTimeout() {
        return connectTimeout;
    }

    public int getReadTimeout() {
        return readTimeout;
    }

    public String getUrlString() {
        return urlString;
    }

    public int getDefaultPort() {
        return defaultPort;
    }

    public String getFile() {
        return file;
    }

    public String getHost() {
        return host;
    }

    public String getPath() {
        return path;
    }

    public int getPort() {
        return port;
    }

    public String getProtocol() {
        return protocol;
    }

    public String getQuery() {
        return query;
    }

    public String getRef() {
        return ref;
    }

    public String getUserInfo() {
        return userInfo;
    }
}

测试通过后,使用@Scheduled来写一个线程,把爬取到的关键词定时加入redis队列

WeiboHotThread.class
/*
 * 爬取 微博实时榜
 * */
@Component
@EnableScheduling
public class WeiboHotThread {

    protected Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    RedisTempService redisService;

    @Autowired
    private HotListSearch hotListSearch;

    @Scheduled(initialDelay = 80_000,fixedRate= 120_000)
    public void run(){
        System.out.println("开始执行微博");
        if(redisService.count("KeywordsQueue")<=600) {
            List<Keywords> list=hotListSearch.queryWeibo("https://s.weibo.com/top/summary?cate=realtimehot");
            Keywords[] array=new Keywords[list.size()];
            for(int i=0;i<list.size();i++){
                Keywords keywords=list.get(i);
                array[i]=keywords;
            }
            redisService.lpush("KeywordsQueue",array);//装入redis队列
            logger.info("Successful download keywords,add to redis: "+array.length);
        }
    }
}    
RedisTempService.class(redis具体使用操作方法,这里就不做讲解了,直接附上一个方法)
    //队列中插入元素
    public void lpush(String key, Serializable... keywords){
        redisTemplate.opsForList().leftPushAll(key,keywords);
    }

第三步,爬取微博关键词信息

爬取思路大概就是,写定时线程获取代理服务器和关键词,将关键词生成网页链接,使用代理请求该链接,获取返回值,将返回值处理成java对象后写成xml,再写一个线程定时将许多xml文件打成jar包,之后jar包任君处置。

下面贴出部分代理用于参考。

AbstractDownload.class
public abstract class AbstractDownload<T> {

    protected Logger logger = LoggerFactory.getLogger(getClass());
  
    protected void exec(boolean multi, int multinum, int multiple, ThreadPoolExecutor executor) {
        if (multi)
            multi(multinum, multiple, executor);
        else
            single();
    }

    private void multi(int multinum, int multiple, ThreadPoolExecutor executor) {
        if (multinum == 1) {
            single();
            return;
        }
        List<HttpProxy> proxys = getValidProxy(multinum);
        List<T> entities = getValidEntity(proxys.size() * multiple);

        int total = entities.size();
        int len = total / multiple + (total % multiple == 0 ? 0 : 1);
        CompletableFuture<?>[] cfs = IntStream.range(0, len).mapToObj(i -> {
                HttpProxy proxy = proxys.get(i);
                    CopyOnWriteArrayList<T> list =new CopyOnWriteArrayList(entities.subList(i * multiple, i == len - 1 ? total : (i + 1) * multiple).toArray());
                    return CompletableFuture.runAsync(() -> {
                        download(proxy, list);
                    }, executor);
                }).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(cfs).join();
    }

    private void single() {
        HttpProxy proxy = getValidProxy(1).get(0);
        T entity = getValidEntity(1).get(0);

        download(proxy, entity);
    }

    private void download(HttpProxy proxy, CopyOnWriteArrayList<T> entities) {
        for (int i = 0, len = entities.size(); i < len; i++) {
            try {
                download(proxy, entities.get(i));
                entities.remove(i);
            } catch (Exception e) {
                logger.error(e.getMessage());
            } finally {
                // 最后一次下载之后将更换代理,不用暂停
                if (i < len - 1) {
                    try {
                        Thread.sleep(getPauseTime());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    @Autowired
    RedisTempService redisService;

    public abstract void download(HttpProxy proxy, T entity);
    // 校验下载实体
    protected abstract void validate(T entity);

    // 查询
    protected abstract List<Weibo> query(HttpProxy proxy, T entity);

    // 下载完成,更新下一次的下载时间
    protected abstract void updateEntity(T entity, List<Weibo> weibos);

    // 日志保存
    protected abstract void saveDownloadLog(T entity, HttpProxy proxy, long consumeTime, List<Weibo> weibos);

    /*
     * 下载微博写入临时文件
     */
    protected void storeWeibos(List<Weibo> weibos) {
        if (weibos == null || weibos.isEmpty())
            return;

        try {
            WeiboUtils.writeToTempXml(weibos);
        } catch (IOException e) {
            logger.error("write temp xml error.", e);
        }
    }

    protected abstract List<HttpProxy> getValidProxy(int size);

    protected abstract List<T> getValidEntity(int size);

    // 两次下载之间的间隔时间
    protected int getPauseTime() {
        return 1000 * RandomUtils.nextInt(3, 5);
    }

    protected static class DefaultThreadFactory implements ThreadFactory {

        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }

    }

}
DownloadKeywordThread.class
@Component
@EnableAsync
@EnableScheduling
public class DownloadKeywordThread extends AbstractDownload<Keywords> {

    @Value("${download.keyword.use}")
    private boolean use;

    @Value("${download.keyword.multi}")
    private boolean multi;

    @Value("${download.keyword.multinum}")
    private int multinum;

    @Value("${download.keyword.multiple}")
    private int multiple;

    @Autowired
    HttpProxyService proxyService;

    private ThreadPoolExecutor executor;

    public DownloadKeywordThread() {
        int nThreads = Runtime.getRuntime().availableProcessors()*3;
        executor = new ThreadPoolExecutor(nThreads, nThreads, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(100),
                new DefaultThreadFactory("download.keyword-"));
    }

    @Async
    @Scheduled(initialDelay = 10_000, fixedRate = 1_000)
    public void run() throws InterruptedException {
        System.out.println("开始执行关键词");

        if (use) {
            try {
                exec(multi, multinum, multiple, executor);
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
        }
    }

    @Override
    protected void validate(Keywords entity) {
        if (StringUtils.isEmpty(entity.getQuerystr())) {
            entity.setStatus(Constants.STATUS_SUSPEND);
            kwService.saveOrUpdate(entity);

            throw new IllegalArgumentException("Keywords not null");
        }
    }

    @Override
    protected List<Weibo> query(HttpProxy proxy, Keywords kw) {
        List<Weibo> weibos = null;
        for (int i = 0; i < 3; i++) {
            try {
                KeywordsSearch download = new KeywordsSearch(proxy);

                weibos = download.query(kw);

                proxy.setSuccess(proxy.getSuccess() + 1);
                logger.info("Successful download, weibos: {}, keywords: {}, proxy: {}", weibos.size(), kw.getQuerystr(),
                        proxy != null ? proxy.getHost() : "");
                break;
            }catch(NullPointerException e1){
                // 动态代理被限制
                logger.error("proxyIp {} is limit by weibo", proxy.getHost());
                proxy.setFailure(proxy.getFailure()+1);
                break;
            }catch (Exception e) {
                // 连接动态代理失败
                if ("timeout".equals(e.getMessage())) {
                    logger.error("can not connect to proxyIp: {} ", proxy.getHost());
                    proxy.setFailure(proxy.getFailure()+1);
                    break;
                }
                // 微博没有相关结果
                if ("noresult".equals(e.getMessage())) {
                    logger.error("Keywords {} not found relevant results", kw.getQuerystr());
                    break;
                }
                // 代理需要人工输入验证码
                if ("verification".equals(e.getMessage())) {
                    proxy.setFailure(proxy.getFailure() + 1);
                    proxy.setStatus(Constants.STATUS_SUSPEND);
                    logger.error("Proxy {}:{} requires verification code", proxy.getHost(), proxy.getPort());
                    break;
                }
            } finally {
                queryFinally(proxy);
            }
        }
        return weibos;
    }

    @Autowired
    DownloadLogService logService;

    @Override
    protected void saveDownloadLog(Keywords entity, HttpProxy proxy, long consumeTime, List<Weibo> weibos) {
        logService.storeLog(entity.getQuerystr(), proxy, Constants.TYPE_KEYWORDS, consumeTime, weibos);
    }

    /*
     * 有效代理
     */
    @Override
    protected List<HttpProxy> getValidProxy(int size) {
        List<HttpProxy> list = StaticService.getVailid().stream()
                // 最近至少6秒内未使用
                .filter(proxy -> proxy.getLastUseTime() + 6_000  < System.currentTimeMillis())
                .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(list))
            throw new IllegalArgumentException("not found valid proxy");
        return list;
    }

    @Autowired
    KeywordsService kwService;
    @Autowired
    RedisTempService redisService;
    /*
     * 关键词,size = proxy.size * 10
     */
    @Override
    protected List<Keywords> getValidEntity(int size) {
        List<Serializable> list= (List<Serializable>) redisService.rpop("KeywordsQueue",size);
        JSONArray jsonArray = JSONArray.fromObject(list);

        List arrayList = JSONArray.toList(jsonArray,Keywords.class);

        if (CollectionUtils.isEmpty(list))
            throw new IllegalArgumentException("not found valid keywords");

        return arrayList;
    }

    @Override
    protected void updateEntity(Keywords entity, List<Weibo> weibos)  {
        kwService.updateAfterDownload(entity, weibos);
    }

    private void queryFinally(HttpProxy proxy){
        if(proxy.getFailure()<=3 && proxy.getLiveTime()>(System.currentTimeMillis()/1000)){
            proxy.setStatus(1);
            StaticService.update(proxy);
            proxyService.saveOrUpdate(proxy);
        }else {
            proxyService.deleteByHostAndPort(proxy.getHost(),proxy.getPort());
            StaticService.del(proxy);
        }
    }

    @Override
    public void download(HttpProxy proxy, Keywords entity){
        try {
            long consumeTime = System.currentTimeMillis();
            List<Weibo> weibos = query(proxy, entity);
            storeWeibos(weibos);
            if(entity!=null){
                if(!(entity.getRegion().equalsIgnoreCase("hot"))){
                    updateEntity(entity, weibos);
                }
            }
            consumeTime = System.currentTimeMillis() - consumeTime;
            saveDownloadLog(entity, proxy, consumeTime, weibos);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Storage.class(将xml文件打为jar包)
@Component
public class Storage {

    private static Logger logger = LoggerFactory.getLogger(Storage.class);

    private BloomFilter<String> filter;

    public Storage() {
        int expectedInsertions = Integer.MAX_VALUE >> 4;
        filter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), expectedInsertions);
    }

    @Scheduled(initialDelay = 10_000,fixedRate = 540_000)
    public void run() {
        logger.info("storage thread running.");

        try {
            JSONArray jArray = readTempXml();
            if (jArray == null || jArray.isEmpty())
                return;

            writeToZip(jArray);
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

    private void writeToZip(JSONArray jArray) {
        // 保存的文件名
        String filename = getFilename(jArray);

        try (ZipOutputStream output = new ZipOutputStream(new FileOutputStream(filename))) {
            int total = jArray.size(), xmlsize = 100;
            for (int i = 0, len = total / xmlsize + (total % xmlsize == 0 ? 0 : 1); i < len; i++) {
                int fromIndex = i * xmlsize, toIndex = i == len - 1 ? total : (i + 1) * xmlsize;

                JSONArray list = JSONArray.fromObject(jArray.subList(fromIndex, toIndex));

                ZipEntry entry = new ZipEntry((i + 1) + ".xml");
                output.putNextEntry(entry);

                XmlWriter writer = new XmlWriter();
                writer.write(list, output);
            }
        } catch (Exception e) {
            logger.error("write to zip: {}", e.getMessage());
        }

        logger.info("{}\t{}", jArray.size(), filename);

        WeiboUtils.total+=jArray.size();
        logger.info("下载总数:{}", WeiboUtils.total);
    }

    private String getFilename(JSONArray jArray) {
        File directory = new File(
                Constants.STORE_BASE + File.separator + DateFormatUtils.format(new Date(), "yyyyMMdd"));
        if (!directory.exists())
            directory.mkdirs();

        int index;
        Collection<File> c = FileUtils.listFiles(directory, new String[] { "zip" }, true);
        if (!c.isEmpty()) {
            index = c.stream().mapToInt(file -> {
                String filename = StringUtils.substringBefore(file.getName(), "_");

                return NumberUtils.toInt(filename);
            }).max().getAsInt() + 1;
        } else {
            index = 1;
        }

        return directory.getPath() + File.separator + index + "_" + jArray.size() + ".zip";
    }

    AtomicLong incr = new AtomicLong(100_000_000L);

    private JSONArray readTempXml() {
        File directory = new File(Constants.STORE_TEMP);
        if (!directory.isDirectory()) {
            logger.error("{} is not a directory", directory.getPath());
            return null;
        }

        Collection<File> c = FileUtils.listFiles(directory, new String[] { "xml" }, true);
        if (c.isEmpty()) {
            logger.info("XML file not found");
            return null;
        }

        JSONArray jArray = new JSONArray();
        for (File file : c) {
            try {
                XmlReader reader = new XmlReader();
                JSONArray subArray = reader.read(file.getAbsolutePath());
                logger.info("read temp xml: " + file.getAbsolutePath());
                for (int i = 0, len = subArray.size(); i < len; i++) {
                    JSONObject jObject = subArray.getJSONObject(i);

                    try {
                        String ur = jObject.getString("ur");
                        String md5Hex = DigestUtils.md5DigestAsHex(ur.getBytes());
                        md5Hex += incr.incrementAndGet();
                        if (!filter.mightContain(md5Hex)) {
                            jArray.add(jObject);
                            filter.put(md5Hex);
                        }
                    } catch (Exception e) {

                    }
                }
            } catch (Exception e) {
                logger.error("read xml: {}", e.getMessage());
            } finally {
                file.delete();
            }
        }

        return jArray;
    }

}
XmlReader.class
public class XmlReader {

    public XmlReader() {

    }

    public JSONArray read(String filename) throws IOException, ParserConfigurationException, SAXException {
        try (InputStream input = new FileInputStream(filename)) {
            return read(input);
        }
    }

    public JSONArray read(InputStream input) throws ParserConfigurationException, SAXException, IOException {
        Document document = buildDocument(input);
        // 节点列表
        NodeList nodes = document.getElementsByTagName("article");

        JSONArray jArray = new JSONArray();
        for (int i = 0, len = nodes.getLength(); i < len; i++) {
            // 子节点列表
            NodeList cNodes = nodes.item(i).getChildNodes();
            if (cNodes.getLength() == 0)
                continue;

            JSONObject jObject = new JSONObject();
            for (int j = 0; j < cNodes.getLength(); j++) {
                Node cNode = cNodes.item(j);
                if (StringUtils.isNotBlank(cNode.getTextContent()))
                    // 子节点名称和值
                    jObject.put(cNode.getNodeName().toLowerCase(), cNode.getTextContent());
            }

            if (jObject.size() > 0)
                jArray.add(jObject);
        }
        return jArray;
    }

    private Document buildDocument(InputStream in) throws ParserConfigurationException, SAXException, IOException {
        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
        DocumentBuilder builder = factory.newDocumentBuilder();
        return builder.parse(in);
    }

}
XmlWriter.class
public class XmlWriter {

    public void write(JSONArray jArray, OutputStream output)
            throws IOException {
        String xmlContent;
        try {
            xmlContent = toXmlstr(jArray);
        } catch (TransformerException | ParserConfigurationException e) {
            throw new IOException(e);
        }
        IOUtils.write(xmlContent, output, "UTF-8");
    }

    private String toXmlstr(JSONArray jArray) throws IOException, TransformerException, ParserConfigurationException {
        TransformerFactory factory = TransformerFactory.newInstance();
        factory.setAttribute("indent-number", 4); // 设置缩进长度

        Transformer transformer = factory.newTransformer();
        transformer.setOutputProperty(OutputKeys.INDENT, "yes"); // 设置自动换行

        StringWriter writer = new StringWriter();

        Source source = new DOMSource(buildDocument(jArray));
        transformer.transform(source, new StreamResult(writer));

        return writer.toString();
    }

    private Document buildDocument(JSONArray jArray) throws ParserConfigurationException {
        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
        DocumentBuilder builder = factory.newDocumentBuilder();
        Document document = builder.newDocument();

        // parent
        Element root = document.createElement("articles");
        document.appendChild(root);

        for (int i = 0, len = jArray.size(); i < len; i++) {
            JSONObject jObject = jArray.getJSONObject(i);

            // children
            Element item = document.createElement("article");
            root.appendChild(item);

            for (Object key : jObject.keySet()) {
                String field = (String) key, value = jObject.getString(field);
                if (value == null || value.isEmpty())
                    continue;

                // attribute
                Element attr = document.createElement(field);
                attr.setTextContent(value);
                item.appendChild(attr);
            }
        }
        return document;
    }

}

爬取程序的结构用自己的就行,主要说一下每日单程序爬取百万数的问题:

1.微博的反爬。

  我用的方法有:1)使用动态代理服务器 买了一个ip池每天2500个ip,我用的快代理

  2)使用user-agent池,这个之前的博客有写过。

  3)抓取的速度在1秒1次还是可以的。

2.关键词质量。

  用的方法:抓取微博、百度、搜狗、360热搜榜。

3.程序稳定性、持久性。

使用多线程+spring框架+定时重启程序的方法。

本人也刚学习爬虫有许多不足,请多指教。

爬虫虽好,可不要贪杯啊。

原创文章,转发请私聊。

原文地址:https://www.cnblogs.com/Post-90sDachenchen/p/11214101.html

时间: 2024-10-19 22:04:30

Java微博爬虫-每日百万数据的相关文章

iOS—网络实用技术OC篇&amp;网络爬虫-使用java语言抓取网络数据

网络爬虫-使用java语言抓取网络数据 前提:熟悉java语法(能看懂就行) 准备阶段:从网页中获取html代码 实战阶段:将对应的html代码使用java语言解析出来,最后保存到plist文件 上一片文章已经介绍我们可以使用两个方式来抓取网络数据实现网络爬虫,并且大致介绍了一下怎么使用正则表达式去实现数据的抓取 由于笔者曾经学过一段时间java和android相关的技术,今天就讲讲怎么使用java去抓取网络数据,关于Python有机会等笔者好好研究一下再来分享,但其实会一种就可以,除非你的需求

iOS开发——网络实用技术OC篇&amp;网络爬虫-使用java语言抓取网络数据

网络爬虫-使用java语言抓取网络数据 前提:熟悉java语法(能看懂就行) 准备阶段:从网页中获取html代码 实战阶段:将对应的html代码使用java语言解析出来,最后保存到plist文件 上一片文章已经介绍我们可以使用两个方式来抓取网络数据实现网络爬虫,并且大致介绍了一下怎么使用正则表达式去实现数据的抓取 由于笔者曾经学过一段时间java和android相关的技术,今天就讲讲怎么使用java去抓取网络数据,关于Python有机会等笔者好好研究一下再来分享,但其实会一种就可以,除非你的需求

网易云音乐Java版爬虫

网易云音乐Java版爬虫 在编写爬虫之前,我们需要对网易云音乐网站网页类型进行分析,确认哪些页面是我们需要的,哪些页面是我们可以忽略的. 进入网易云音乐首页,浏览后发现其大概有这么几种类型的URL: 推荐页面 排行榜列表以及排行榜页面 歌单列表以及歌单页面 主播电台列表以及主播电台页面 歌手列表以及歌手页面 专辑列表(新碟上架)以及专辑页面 歌曲页面 最终需要爬取的数据在歌曲页面中,该页面里包含了歌曲的名称以及歌曲的评论数量. 另外,我们还需要尽可能多的获取歌曲页面,这些信息我们可以从前面6种类

Java微博搜索关键字采集

import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.UnsupportedEncodingException; import java.net.Mal

Java广度优先爬虫示例(抓取复旦新闻信息)

一.使用的技术 这个爬虫是近半个月前学习爬虫技术的一个小例子,比较简单,怕时间久了会忘,这里简单总结一下.主要用到的外部Jar包有HttpClient4.3.4,HtmlParser2.1,使用的开发工具(IDE)为intelij 13.1,Jar包管理工具为Maven,不习惯用intelij的同学,也可以使用eclipse新建一个项目. 二.爬虫基本知识 1.什么是网络爬虫?(爬虫的基本原理) 网络爬虫,拆开来讲,网络即指互联网,互联网就像一个蜘蛛网一样,爬虫就像是蜘蛛一样可以到处爬来爬去,把

开源的49款Java 网络爬虫软件

参考地址 搜索引擎 Nutch Nutch 是一个开源Java 实现的搜索引擎.它提供了我们运行自己的搜索引擎所需的全部工具.包括全文搜索和Web爬虫. Nutch的创始人是Doug Cutting,他同时也是Lucene.Hadoop和Avro开源项目的创始人. Nutch诞生于2002年8月,是Apache旗下的一个用Java实现... JAVA爬虫 WebCollector 爬虫简介: WebCollector是一个无须配置.便于二次开发的JAVA爬虫框架(内核),它提供精简的的API,只

【转】44款Java 网络爬虫开源软件

原帖地址 http://www.oschina.net/project/lang/19?tag=64&sort=time 极简网络爬虫组件 WebFetch WebFetch 是无依赖极简网页爬取组件,能在移动设备上运行的微型爬虫. WebFetch 要达到的目标: 没有第三方依赖jar包 减少内存使用 提高CPU利用率 加快网络爬取速度 简洁明了的api接口 能在Android设备上稳定运行 小巧灵活可以方便集成的网页抓取组件 使用...更多WebFetch信息 开源爬虫框架 Guozhong

深入理解空间搜索算法 ——数百万数据中的瞬时搜索

转自 干货|深入理解空间搜索算法 ——数百万数据中的瞬时搜索 2017-05-01 10:50 全球人工智能:专注为AI开发者提供全球最新AI技术动态和社群交流.用户来源包括:北大.清华.中科院.复旦.麻省理工.卡内基梅隆.斯坦福.哈佛.牛津.剑桥等世界名校的AI技术硕士.博士和教授:以及谷歌.腾讯.百度.脸谱.微软.华为.阿里.海康威视.滴滴.英伟达等全球名企的AI开发者和AI科学家. 文章来源:medium 编译:孙菁 上图为全球138,000个热门地点的R-tree的可视化图示 我这个人沉

java 网络爬虫框架

java 网络爬虫框架: apache Nutch ,Heritrix等,主要参照开源社区提供的40个开源项目 文章背景: 最近要写个爬虫抓去新浪微博的数据,然后用hadoop存储后,进行分析,就在网上搜取相关资料. 网友推荐使用python来做,但鉴于本人擅长的是java,学习python需要一定的时间成本,还是选择java.一开始想自己从头写,搜到apache httpClient,  后来想着还是用开源成熟的框架来做,目前觉得apache Nutch 和Heritrix是个不错的选择,不过