使用AKKA做分布式爬虫的思路

上周公司其他小组在讨论做分布式爬虫,我也思考了一下,提了一个方案,就是使用akka分布式rpc框架来做,自己写master和worker程序,client向master提交begin任务或者其它爬虫需求,master让worker去爬网页,worker都是kafka的同一个group然后从kafka里面拉取数据(URL),然后处理爬了的网页,解析内容,把爬下来的网页通过正则表达式匹配出嵌套的网页,然后请求actor判断是否爬过(防止生成有向图,让其变成树形结构)(这里应该是个单独的actor,这样多个请求过来不会出现线程同步问题),最后把没有爬的URL扔到Kafka,直到kafka的URL被拉去完

这里有个简单的图例:

代码上面没有写爬虫的东西,也没有写checkActor,只是简单的做了下模拟,就写了个简单的分布式事例作为参考

代码结构如下:

其中POM同这篇博客一样http://blog.csdn.net/qq_20641565/article/details/65488828

Master的代码:

package com.lijie.scala.service

import scala.collection.mutable.HashMap
import scala.concurrent.duration.DurationInt

import com.lijie.scala.bean.WorkBean
import com.lijie.scala.utils.ActorUtils

import akka.actor.Actor
import akka.actor.Props
import akka.actor.actorRef2Scala
import com.lijie.scala.bean.WorkBeanInfo
import com.lijie.scala.bean.WorkBeanInfo
import com.lijie.scala.caseclass.Submit
import com.lijie.scala.caseclass.SubmitAble
import com.lijie.scala.caseclass.Hearbeat
import com.lijie.scala.caseclass.RegisterSucess
import com.lijie.scala.caseclass.CheckConn
import com.lijie.scala.caseclass.Register
import com.lijie.scala.caseclass.SubmitCrawler
import com.lijie.scala.caseclass.BeginCrawler

class Master(val masterHost: String, val masterPort: Int, val masterActorSystem: String, val masterName: String) extends Actor {

  //保存work的Actor连接
  var workerConn = new HashMap[String, WorkBean]

  //保存客户端的连接
  var clientConn = new HashMap[String, WorkBean]

  //超时时间
  val OVERTIME = 20000

  override def preStart(): Unit = {

    //隐式转换
    import context.dispatcher

    //启动的时候定时检查worker是否挂了,如果挂了就从workerConn移除
    context.system.scheduler.schedule(0 millis, OVERTIME millis, self, CheckConn)
  }

  def receive: Actor.Receive = {

    //注册
    case Register(workerId, workerHost, workerPort, workerActorSystem, workerName) => {

      //打印worker上线消息
      println(workerId + "," + workerHost + "," + workerPort + "," + workerActorSystem + "," + workerName)

      //获取Master的代理对象
      var workerRef = context.actorSelection(s"akka.tcp://[email protected]$workerHost:$workerPort/user/$workerName")

      //保存连接
      workerConn += (workerId -> new WorkBean(workerRef, 0))

      //给worker返回应答注册成功
      sender ! RegisterSucess

    }

    //接受心跳
    case Hearbeat(workerId) => {
      if (workerConn.contains(workerId)) {

        //取出workerBean
        var workBean = workerConn.get(workerId).get

        //重新设置时间
        workBean.time = System.currentTimeMillis()

        //移除之前的值
        workerConn -= workerId

        //将新值放入conn
        workerConn += (workerId -> workBean)
      }
    }

    //定时检查
    case CheckConn => {

      //得到超时的值
      var over = workerConn.filter(System.currentTimeMillis() - _._2.time > OVERTIME)

      //得到超时的值
      for (key <- over.keySet) {

        //将超时的从链接中移除
        workerConn -= key
      }

      //测试输出还有多少个链接
      val alive = workerConn.size
      println(s"还有$alive 个worker活着")
    }

    case Submit(clientId, clientHost, clientPort, clientActorSystem, clientName) => {
      //打印client上线消息
      println(clientId + "," + clientHost + "," + clientPort + "," + clientActorSystem + "," + clientName)

      //获取Master的代理对象
      var clientRef = context.actorSelection(s"akka.tcp://[email protected]$clientHost:$clientPort/user/$clientName")

      //保存连接
      clientConn += (clientId -> new WorkBean(clientRef, 0))

      //给client返回可以提交申请
      sender ! SubmitAble
    }

    //收到爬虫任务分发给worker
    case SubmitCrawler(kafka, redis, other) => {

      //让所有worker开始爬虫任务
      for (workerBean <- workerConn.values) {

        //向所有存活的worker发送爬虫任务
        workerBean.worker ! BeginCrawler(kafka, redis, other)
      }
    }
  }

}

object Master {

  def main(args: Array[String]): Unit = {
    val argss = Array[String]("127.0.0.1", "8080", "masterSystem", "actorMaster")

    val host = argss(0)

    val port = argss(1).toInt

    val actorSystem = argss(2)

    val actorName = argss(3)

    //获取master的actorSystem
    val masterSystem = ActorUtils.getActorSystem(host, port, actorSystem)

    val master = masterSystem.actorOf(Props(new Master(host, port, actorSystem, actorName)), actorName)

    masterSystem.awaitTermination()
  }
}

Worker代码如下:

package com.lijie.scala.service

import akka.actor.Actor
import akka.actor.ActorSelection
import java.util.UUID
import scala.concurrent.duration._
import com.lijie.scala.caseclass.SendHearbeat
import com.lijie.scala.utils.ActorUtils
import akka.actor.Props
import com.lijie.scala.caseclass.BeginCrawler
import com.lijie.scala.caseclass.Hearbeat
import com.lijie.scala.caseclass.RegisterSucess
import com.lijie.scala.caseclass.Register

class Worker(val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String, val masterHost: String, val masterPort: Int, val masterActorSystem: String, val masterName: String) extends Actor {

  //master的代理对象
  var master: ActorSelection = _

  //每个worker的id
  val workerId = UUID.randomUUID().toString()

  override def preStart(): Unit = {

    //获取Master的代理对象
    master = context.actorSelection(s"akka.tcp://[email protected]$masterHost:$masterPort/user/$masterName")

    //向master注册
    master ! Register(workerId, workerHost, workerPort, workerActorSystem, workerName)
  }

  def receive: Actor.Receive = {

    //收到注册成功的消息,定时发送心跳
    case RegisterSucess => {
      println("收到注册成功的消息,开始发送心跳")

      //隐式转换
      import context.dispatcher

      //创建定时器,并发送心跳
      context.system.scheduler.schedule(0 millis, 10000 millis, self, SendHearbeat)

    }

    //发送心跳
    case SendHearbeat => {
      println("向master发送心跳")

      //发送心跳
      master ! Hearbeat(workerId)
    }

    //开始爬虫
    case BeginCrawler(kafka, redis, other) => {

      println("开始执行爬虫任务...")
      println("kafka和redis以及其他消息内容:" + kafka + "," + redis + "," + other)
      println("初始化kafka连接和redis连接...")
      println("从队列里面取出url...")
      println("开始爬数据...")
      println("如果失败重试几次...")
      println("............")
      println("解析这个网页的内容,解析出里面的url...")
      //请求actionCheck
      println("请求actionCheck...")
      println("检查是否爬过...")
      println("把该刚爬了的url扔到redis")
      println("把该网页解析的没有爬过的url扔到队列...")
      println("继续从队列里面拿url直到队列里面url被爬完...")
    }

  }
}

object Worker {

  def main(args: Array[String]): Unit = {
    val argss = Array[String]("127.0.0.1", "8088", "workSystem", "actorWorker", "127.0.0.1", "8080", "masterSystem", "actorMaster")

    //worker
    val host = argss(0)

    val port = argss(1).toInt

    val actorSystem = argss(2)

    val actorName = argss(3)

    //master
    val hostM = argss(4)

    val portM = argss(5).toInt

    val actorSystemM = argss(6)

    val actorNameM = argss(7)

    //获取woker的actorSystem
    val workerSystem = ActorUtils.getActorSystem(host, port, actorSystem)

    val worker = workerSystem.actorOf(Props(new Worker(host, port, actorSystem, actorName, hostM, portM, actorSystemM, actorNameM)), actorName)

    workerSystem.awaitTermination()
  }
}

ActionUtils代码如下:

package com.lijie.scala.utils

import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor

object ActorUtils {

  //获取actor工具类
  def getActorSystem(host: String, port: Int, actorSystem: String) = {
    val conf = s"""
      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
      |akka.remote.netty.tcp.hostname = "$host"
      |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    val config = ConfigFactory.parseString(conf)

    //创建注册worker的的ActorSystem
    val actorSys = ActorSystem(actorSystem, config)

    //返回actorSystem
    actorSys
  }
}

WorkBean代码如下:

package com.lijie.scala.bean

import akka.actor.ActorSelection

//封装worker的引用和当前时间戳
class WorkBean(var worker: ActorSelection, var time: Long)

class WorkBeanInfo(val workerId: String, val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String, var time: Long)

caseClass代码如下:

package com.lijie.scala.caseclass

//开始提交任务 client2client
case object BeginSubmit
// client2client-------------------------------

//client提交任务  client2master
case class Submit(val clientId: String, val clientHost: String, val clientPort: Int, val clientActorSystem: String, val clientName: String) extends Serializable

//提交爬虫任务
case class SubmitCrawler(val kafkaInfo: String, val redisInfo: String, val otherInfo: String)
// client2master-------------------------------

//可以提交任务 master2client
case object SubmitAble
// master2client-------------------------------

//检查哪些worker挂了 master2master
case object CheckConn

//返回注册成功 master2worker
case object RegisterSucess extends Serializable
// master2worker-------------------------------

//worker注册 worker2master
case class Register(val workerId: String, val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String) extends Serializable

//发送心跳 worker2master
case class Hearbeat(workId: String) extends Serializable
// worker2master-------------------------------

//发送心跳 worker2worker
case object SendHearbeat

//爬虫 worker2worker
case class BeginCrawler(val kafkaInfo: String, val redisInfo: String, val otherInfo: String)
// worker2worker-------------------------------

最后先运行master,然后运行worker,我这里运行的两个worker,最后运行client 结果如下

Master:

Worker01:

Worker02:

Client:

时间: 2024-09-30 11:13:38

使用AKKA做分布式爬虫的思路的相关文章

分布式爬虫系统设计、实现与实战:爬取京东、苏宁易购全网手机商品数据+MySQL、HBase存储

[TOC] 1 概述 在不用爬虫框架的情况,经过多方学习,尝试实现了一个分布式爬虫系统,并且可以将数据保存到不同地方,类似MySQL.HBase等. 基于面向接口的编码思想来开发,因此这个系统具有一定的扩展性,有兴趣的朋友直接看一下代码,就能理解其设计思想,虽然代码目前来说很多地方还是比较紧耦合,但只要花些时间和精力,很多都是可抽取出来并且可配置化的. 因为时间的关系,我只写了京东和苏宁易购两个网站的爬虫,但是完全可以实现不同网站爬虫的随机调度,基于其代码结构,再写国美.天猫等的商品爬取,难度不

python3 分布式爬虫

背景 部门(东方IC.图虫)业务驱动,需要搜集大量图片资源,做数据分析,以及正版图片维权.前期主要用node做爬虫(业务比较简单,对node比较熟悉).随着业务需求的变化,大规模爬虫遇到各种问题.python爬虫具有先天优势,社区资源比较齐全,各种框架也完美支持.爬虫性能也得到极大提升.本次分享从基础知识入手,涉及python 的两大爬虫框架pyspider.scrapy,并基于scrapy.scrapy-redis 做了分布式爬虫的介绍(直接粘贴的ppt截图)会涉及 redis.mongodb

爬虫进阶之分布式爬虫编写

本篇文章将是『如何构建一个分布式爬虫』系列文章的最后一篇,拟从实战角度来介绍如何构建一个稳健的分布式微博爬虫.这里我没敢谈高效,抓过微博数据的同学应该都知道微博的反爬虫能力,也知道微博数据抓取的瓶颈在哪里.我在知乎上看过一些同学的说法,把微博的数据抓取难度简单化了,我只能说,那是你太naive,没深入了解和长期抓取而已. 本文将会以PC端微博进行讲解,因为移动端微博数据不如PC短全面,而且抓取和解析难度都会小一些.文章比较长,由于篇幅所限,文章并没有列出所有代码,只是讲了大致流程和思路. 要抓微

Python3分布式爬虫(scrap+redis)基础知识和实战详解

背景 随着业务需求的变化,大规模爬虫遇到各种问题.python爬虫具有先天优势,社区资源比较齐全,各种框架也完美支持.爬虫性能也得到极大提升.本次分享从基础知识入手,涉及python 的两大爬虫框架pyspider.scrapy,并基于scrapy.scrapy-redis 做了分布式爬虫的介绍(直接粘贴的ppt截图)会涉及 redis.mongodb等相关知识. 一.前沿 1.1 爬虫是什么? 网络爬虫(又被称为网页蜘蛛,网络机器人),是一种按照一定的规则,自动的抓取万维网信息的程序或者脚本.

【Python3爬虫】爬取美女图新姿势--Redis分布式爬虫初体验

一.写在前面 之前写的爬虫都是单机爬虫,还没有尝试过分布式爬虫,这次就是一个分布式爬虫的初体验.所谓分布式爬虫,就是要用多台电脑同时爬取数据,相比于单机爬虫,分布式爬虫的爬取速度更快,也能更好地应对IP的检测.本文介绍的是利用Redis数据库实现的分布式爬虫,Redis是一种常用的菲关系型数据库,常用数据类型包括String.Hash.Set.List和Sorted Set,重要的是Redis支持主从复制,主机能将数据同步到从机,也就能够实现读写分离.因此我们可以利用Redis的特性,借助req

【Python3爬虫】学习分布式爬虫第一步--Redis分布式爬虫初体验

一.写在前面 之前写的爬虫都是单机爬虫,还没有尝试过分布式爬虫,这次就是一个分布式爬虫的初体验.所谓分布式爬虫,就是要用多台电脑同时爬取数据,相比于单机爬虫,分布式爬虫的爬取速度更快,也能更好地应对IP的检测.本文介绍的是利用Redis数据库实现的分布式爬虫,Redis是一种常用的菲关系型数据库,常用数据类型包括String.Hash.Set.List和Sorted Set,重要的是Redis支持主从复制,主机能将数据同步到从机,也就能够实现读写分离.因此我们可以利用Redis的特性,借助req

第9章 scrapy-redis分布式爬虫

9-1 分布式爬虫要点 1.分布式的优点 充分利用多机器的宽带加速爬取 充分利用多机的IP加速爬取速度 问:为什么scrapy不支持分布式? 答:在scrapy中scheduler是运行在队列的,而队列是在单机内存中的,服务器上爬虫是无法利用内存的队列做任何处理,所以scrapy不支持分布式. 2.分布式需要解决的问题 requests队列集中管理 去重集中管理 所以要用redis来解决. 9-2~3 redis基础知识 Ⅰ.redis的安装(windows 64位) 1.百度:redis fo

基于Redis的三种分布式爬虫策略

前言: 爬虫是偏IO型的任务,分布式爬虫的实现难度比分布式计算和分布式存储简单得多. 个人以为分布式爬虫需要考虑的点主要有以下几个: 爬虫任务的统一调度 爬虫任务的统一去重 存储问题 速度问题 足够"健壮"的情况下实现起来越简单/方便越好 最好支持"断点续爬"功能 Python分布式爬虫比较常用的应该是scrapy框架加上Redis内存数据库,中间的调度任务等用scrapy-redis模块实现. 此处简单介绍一下基于Redis的三种分布式策略,其实它们之间还是很相似

第三百五十八节,Python分布式爬虫打造搜索引擎Scrapy精讲—将bloomfilter(布隆过滤器)集成到scrapy-redis中

第三百五十八节,Python分布式爬虫打造搜索引擎Scrapy精讲-将bloomfilter(布隆过滤器)集成到scrapy-redis中,判断URL是否重复 布隆过滤器(Bloom Filter)详解 基本概念 如果想判断一个元素是不是在一个集合里,一般想到的是将所有元素保存起来,然后通过比较确定.链表,树等等数据结构都是这种思路. 但是随着集合中元素的增加,我们需要的存储空间越来越大,检索速度也越来越慢.不过世界上还有一种叫作散列表(又叫哈希表,Hash table)的数据结构.它可以通过一