从零系列--node爬虫利用进程池写数据

1、主进程

const http = require(‘http‘);
const fs = require(‘fs‘);
const cheerio = require(‘cheerio‘);
const request = require(‘request‘);
const makePool = require(‘./pooler‘)
const runJob = makePool(‘./worker‘)
var i = 0;
var url = "http://xxx.com/articles/";
//初始url
let g = ‘‘;
function fetchPage(x) {     //封装了一层函数
  console.log(x)
  if(!x || x==‘‘){
    g.next()
    return
  }
    startRequest(x);
}

function startRequest(x) {
     //采用http模块向服务器发起一次get请求
    return http.get(x, function (res) {
        var html = ‘‘;        //用来存储请求网页的整个html内容
        var titles = [];
        res.setEncoding(‘utf-8‘); //防止中文乱码
     //监听data事件,每次取一块数据
        res.on(‘data‘, function (chunk) {
            html += chunk;
        });
     //监听end事件,如果整个网页内容的html都获取完毕,就执行回调函数
        res.on(‘end‘, function () {
          var $ = cheerio.load(html); //采用cheerio模块解析html

          var time = new Date();
          var p =  $(‘.content p‘)
          p.each((index,item)=>{
                if($(item).find(‘strong‘).length) {
                  var fex_item = {
                    //获取文章的标题
                      title: $(item).find(‘strong‘).text().trim(),
                  //获取文章发布的时间
                      time: time,
                  //获取当前文章的url
                      link: $($(item).children(‘a‘).get(0)).attr(‘href‘),
                      des:$(item).children().remove()&&$(item).text(),
                  //i是用来判断获取了多少篇文章
                      i: index+1     

                  };
                  runJob(fex_item,(err,data)=>{
                    if(err) console.error(‘get link error‘)
                    console.log(‘get link ok‘)
                  })
                }

          })
          g.next()
        })         

    }).on(‘error‘, function (err) {
        console.log(err);
        g.next()
    });

}
function* gen(urls){
  let len = urls.length;
  for(var i=0;i<len;i++){
    yield fetchPage(urls[i])
  }
}

function getUrl(x){
    //采用http模块向服务器发起一次get请求
    http.get(x, function (res) {
      var html = ‘‘;        //用来存储请求网页的整个html内容
      var titles = [];
      res.setEncoding(‘utf-8‘); //防止中文乱码
   //监听data事件,每次取一块数据
      res.on(‘data‘, function (chunk) {
          html += chunk;
      });
   //监听end事件,如果整个网页内容的html都获取完毕,就执行回调函数
      res.on(‘end‘, function () {
        var $ = cheerio.load(html); //采用cheerio模块解析html

        var time = new Date();
        var lists =  $(‘.articles .post-list li‘)
        var urls = [];
        lists.each(function(index,item){
          if($(item).find(‘a‘).length) {
              var url = ‘http://xxxx.com‘+$($(item).children(‘a‘).get(0)).attr(‘href‘);
              if(url)
              urls.push(url);      //主程序开始运行
          }
       })
        g = gen(urls)
        g.next()
      })         

  }).on(‘error‘, function (err) {
      console.log(err);
  });
}

getUrl(url)

2、创建进程池

const cp = require(‘child_process‘)
const cpus = require(‘os‘).cpus().length;

module.exports =  function pooler(workModule){
  let awaiting = [],readyPool = [],poolSize = 0;
  return function doWork(job,cb){
    if(!readyPool.length&&poolSize>cpus)
      return awaiting.push([doWork,job,cb])

    let child = readyPool.length ? readyPool.shift():(poolSize++,cp.fork(workModule))
    let cbTriggered = false;
    child.removeAllListeners()
    .once(‘error‘,function(err){
      if(!cbTriggered){
        cb(err)
        cbTriggered = true
      }
      child.kill()
    })
    .once(‘eixt‘,function(){
      if(!cbTriggered)
      cb(new Error(‘childe exited with code:‘+code))
      poolSize--;
      let childIdx = readyPool.indexOf(child)
      if(childIdx > -1)readyPool.splice(childIdx,1)
    })
    .once(‘message‘,function(msg){
      cb(null,msg)
      cbTriggered = true
      readyPool.push(child)
      if(awaiting.length)setImmediate.apply(null,awaiting.shift())
    })
    .send(job)
  }
}

3、工作进程接受消息并处理内容

const fs = require(‘fs‘)
process.on(‘message‘,function(job){
  let _job = job
  let x = ‘TITLE:‘+_job.title+‘\n‘ + ‘LINK:‘+_job.link + ‘\n DES:‘+_job.des+‘\n SAVE-TIME:‘+_job.time

  fs.writeFile(‘../xx/data/‘ + _job.title + ‘.txt‘, x, ‘utf-8‘, function (err) {
      if (err) {
          console.log(err);
      }
  });
  process.send(‘finish‘)
})

原文地址:https://www.cnblogs.com/sjptech/p/9581595.html

时间: 2024-10-21 19:32:47

从零系列--node爬虫利用进程池写数据的相关文章

Python3 从零单排28_线程队列&amp;进程池&amp;线程池

1.线程队列 线程队列有三种:先进先出,后进先出,按优先级进出,具体如下: 1 import queue 2 3 # 先进先出 4 q = queue.Queue(3) 5 6 q.put(1) 7 q.put(2) 8 q.put(3) 9 # q.put(4) # 再放阻塞,等待队列消费 10 # q.put(4,block = False) # 不阻塞,强制放数据,如果满的情况下直接报错 等价与 q.put_nowait(4) 11 # q.put(4,block = True) # 阻塞

Node爬虫——利用superagent模拟登陆

一.概述 最近学习了node,试着写了个爬虫,这是模拟登陆的一部分. 1.需要的工具 2.superagent用法的简述 3.抓包分析 4.最小示例 二.需要的工具 nodejs,superagent,wireshark. nodejs没什么可介绍的. superagent是nodejs众多插件之一,用npm命令安装.是一个超轻的ajax api,有着可读性强,高度灵活,学习曲线低的优点. wireshark是一个抓包工具,很强大.之后我们需要用它来分析post请求与cookie. 三.supe

python基础-UDP、进程、进程池、paramike模块

1 基于UDP套接字1.1 介绍 udp是无连接的,是数据报协议,先启动哪段都不会报错 udp服务端 import socket sk = socket() #创建一个服务器的套接字 sk.bind() #绑定服务器套接字 while True: #服务器无限循环 cs = sk.recvfrom()/sk.sendto() # 对话(接收与发送) sk.close() # 关闭服务器套接字 udp客户端 import socket client = socket() # 创建客户套接字 whi

进程数据共享-进程池

数据共享 Manager  内部管理了很多数据类型,并不是所有的数据类型都是用来做数据分享,只是顺便包含了能够处理数据共享问题的数据类型 list dict 列表/字典  自带的方法基本都是数据安全的,但是对其中的元素进行+= -= *=  /=   都是数据不安全的 from multiprocessing import Manager,Process,Lock def func(dic,lock): with lock: dic['count'] -= 1 if __name__ == '_

进程池(Pool)

进程池用于进程维护, 当使用时,将会去进程池取数据 from multiprocessing import Pool, Processimport os, time def f(i): time.sleep(2) print('in process', os.getpid()) #os.getpid()获得进程序列号 return i+100 def Bar(arg): print('exec done--', arg, os.getpid()) if __name__ == '__main__

45_并发编程-进程池

一.为什么引入进程池 在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务.那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也需要消耗时间.第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行切换并且记录每个进程的执行节点,也就是记录上下文(各种变量等等乱七八糟的东西,虽然你看不到,但是操作系统都要做),这样反而会影

进程丶数据共享丶锁丶进程池丶模块(爬虫)

一丶进程 1.什么是进程 进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行进行资源分配和调度的基本单位,是操作系统结构的基础.在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器.程序时指令丶数据及其组织形式的描述,进程是程序的实体. 狭义定义:进程是正在运行的程序的实例. 广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动 .它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,

Python 3 进程池与回调函数

Python 3 进程池与回调函数 一.进程池 在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间.多进程是实现并发的手段之一,需要注意的问题是: 很明显需要并发执行的任务通常要远大于核数 一个操作系统不可能无限开启进程,通常有几个核就开几个进程 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行) 例如当被操作对象数目不大时,可以直接利用multiprocessing中的Proces

Python开发基础--- 进程间通信、进程池、协程

进程间通信 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的. 进程队列queue 不同于线程queue,进程queue的生成是用multiprocessing模块生成的. 在生成子进程的时候,会将代码拷贝到子进程中执行一遍,及子进程拥有和主进程内容一样的不同的名称空间. 示例1: 1 import multiprocessing 2 def foo(): 3 q.put([11,'hello',True]