如何在spark中读写cassandra数据 ---- 分布式计算框架spark学习之六

由于预处理的数据都存储在cassandra里面,所以想要用spark进行数据分析的话,需要读取cassandra数据,并把分析结果也一并存回到cassandra;因此需要研究一下spark如何读写cassandra。

话说这个单词敲起来好累,说是spark,其实就是看你开发语言是否有对应的driver了。

因为cassandra是datastax主打的,所以该公司也提供了spark的对应的driver了,见这里

我就参考它的demo,使用scala语言来测试一把。

1.执行代码

//CassandraTest.scalaimport org.apache.spark.{Logging, SparkContext, SparkConf}
import com.datastax.spark.connector.cql.CassandraConnector

object CassandraTestApp {
  def main(args: Array[String]) {    #配置spark,cassandra的ip,这里都是本机
      val SparkMasterHost = "127.0.0.1"
      val CassandraHost = "127.0.0.1"

      // Tell Spark the address of one Cassandra node:
      val conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", CassandraHost)
        .set("spark.cleaner.ttl", "3600")
        .setMaster("local[12]")
        .setAppName("CassandraTestApp")

      // Connect to the Spark cluster:
      lazy val sc = new SparkContext(conf)
    //预处理脚本,连接的时候就执行这些
      CassandraConnector(conf).withSessionDo { session =>
        session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {‘class‘: ‘SimpleStrategy‘, ‘replication_factor‘: 1 }")
        session.execute("CREATE TABLE IF NOT EXISTS test.key_value (key INT PRIMARY KEY, value VARCHAR)")
        session.execute("TRUNCATE test.key_value")
        session.execute("INSERT INTO test.key_value(key, value) VALUES (1, ‘first row‘)")
        session.execute("INSERT INTO test.key_value(key, value) VALUES (2, ‘second row‘)")
        session.execute("INSERT INTO test.key_value(key, value) VALUES (3, ‘third row‘)")
      }
    

      //加载connector
      import com.datastax.spark.connector._

      // Read table test.kv and print its contents:
      val rdd = sc.cassandraTable("test", "key_value").select("key", "value")
      rdd.collect().foreach(row => println(s"Existing Data: $row"))

      // Write two new rows to the test.kv table:
      val col = sc.parallelize(Seq((4, "fourth row"), (5, "fifth row")))
      col.saveToCassandra("test", "key_value", SomeColumns("key", "value"))

      // Assert the two new rows were stored in test.kv table:
      assert(col.collect().length == 2)

      col.collect().foreach(row => println(s"New Data: $row"))
      println(s"Work completed, stopping the Spark context.")
      sc.stop()
  }
}

2.目录结构

由于构建工具是用sbt,所以目录结构也必须遵循sbt规范,主要是build.sbt 和 src目录, 其它目录会自动生成。

[email protected]:~/scala_code/CassandraTest $ll
total 8
drwxr-xr-x   6 qpzhang  staff  204 11 26 12:14 ./
drwxr-xr-x  10 qpzhang  staff  340 11 25 17:30 ../
-rw-r--r--   1 qpzhang  staff  460 11 26 10:11 build.sbt
drwxr-xr-x   3 qpzhang  staff  102 11 25 17:42 project/
drwxr-xr-x   3 qpzhang  staff  102 11 25 17:32 src/
drwxr-xr-x   6 qpzhang  staff  204 11 25 17:55 target/
[email protected]:~/scala_code/CassandraTest $tree src/
src/
└── main
    └── scala
        └── CassandraTest.scala
[email protected]:~/scala_code/CassandraTest $cat build.sbt

name := "CassandraTest"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2" % "provided"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M2"

assemblyMergeStrategy in assembly := {
  case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

这里需要注意的是,sbt安装的是当时最新版本 0.13 , 并且安装了 assembly插件(这里要吐槽一下sbt,下载一坨坨的jar包,最好有FQ代理,否则下载等待时间很长)。

[email protected]:~/scala_code/CassandraTest $cat ~/.sbt/0.13/plugins/plugins.sbt
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

3.sbt编译打包

在build.sbt 目录下,使用sbt命令启动。

然后使用 compile 命令进行编译,使用assembly进行打包。

在次期间,遇到了sbt-assembly-deduplicate-error的问题,参考这里

> compile
[success] Total time: 0 s, completed 2015-11-26 10:11:50
>> assembly
[info] Including from cache: slf4j-api-1.7.5.jar
[info] Including from cache: metrics-core-3.0.2.jar
[info] Including from cache: netty-codec-4.0.27.Final.jar
[info] Including from cache: netty-handler-4.0.27.Final.jar
[info] Including from cache: netty-common-4.0.27.Final.jar
[info] Including from cache: joda-time-2.3.jar
[info] Including from cache: netty-buffer-4.0.27.Final.jar
[info] Including from cache: commons-lang3-3.3.2.jar
[info] Including from cache: jsr166e-1.1.0.jar
[info] Including from cache: cassandra-clientutil-2.1.5.jar
[info] Including from cache: joda-convert-1.2.jar
[info] Including from cache: netty-transport-4.0.27.Final.jar
[info] Including from cache: guava-16.0.1.jar
[info] Including from cache: spark-cassandra-connector_2.10-1.5.0-M2.jar
[info] Including from cache: cassandra-driver-core-2.2.0-rc3.jar
[info] Including from cache: scala-reflect-2.10.5.jar
[info] Including from cache: scala-library-2.10.5.jar
[info] Checking every *.class/*.jar file‘s SHA-1.
[info] Merging files...
[warn] Merging ‘META-INF/INDEX.LIST‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/MANIFEST.MF‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/io.netty.versions.properties‘ with strategy ‘first‘
[warn] Merging ‘META-INF/maven/com.codahale.metrics/metrics-core/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/com.datastax.cassandra/cassandra-driver-core/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/com.google.guava/guava/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/com.twitter/jsr166e/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/io.netty/netty-buffer/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/io.netty/netty-codec/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/io.netty/netty-common/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/io.netty/netty-handler/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/io.netty/netty-transport/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/joda-time/joda-time/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/org.apache.commons/commons-lang3/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/org.joda/joda-convert/pom.xml‘ with strategy ‘discard‘
[warn] Merging ‘META-INF/maven/org.slf4j/slf4j-api/pom.xml‘ with strategy ‘discard‘
[warn] Strategy ‘discard‘ was applied to 15 files
[warn] Strategy ‘first‘ was applied to a file
[info] SHA-1: d2cb403e090e6a3ae36b08c860b258c79120fc90
[info] Packaging /Users/qpzhang/scala_code/CassandraTest/target/scala-2.10/CassandraTest-assembly-1.0.jar ...
[info] Done packaging.
[success] Total time: 19 s, completed 2015-11-26 10:12:22

4.提交到spark,执行结果

[email protected]:~/project/spark-1.5.2-bin-hadoop2.6 $./bin/spark-submit --class "CassandraTestApp" --master local[4] ~/scala_code/CassandraTest/target/scala-2.10/CassandraTest-assembly-1.0.jar
//...........................
5/11/26 11:40:23 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, NODE_LOCAL, 26660 bytes)
15/11/26 11:40:23 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/11/26 11:40:23 INFO Executor: Fetching http://10.60.215.42:57683/jars/CassandraTest-assembly-1.0.jar with timestamp 1448509221160
15/11/26 11:40:23 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
15/11/26 11:40:23 INFO Utils: Fetching http://10.60.215.42:57683/jars/CassandraTest-assembly-1.0.jar to /private/var/folders/2l/195zcc1n0sn2wjfjwf9hl9d80000gn/T/spark-4030cadf-8489-4540-976e-e98eedf50412/userFiles-63085bda-aa04-4906-9621-c1cedd98c163/fetchFileTemp7487594
894647111926.tmp
15/11/26 11:40:23 INFO Executor: Adding file:/private/var/folders/2l/195zcc1n0sn2wjfjwf9hl9d80000gn/T/spark-4030cadf-8489-4540-976e-e98eedf50412/userFiles-63085bda-aa04-4906-9621-c1cedd98c163/CassandraTest-assembly-1.0.jar to class loader
15/11/26 11:40:24 INFO Cluster: New Cassandra host localhost/127.0.0.1:9042 added
15/11/26 11:40:24 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
15/11/26 11:40:25 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2676 bytes result sent to driver
15/11/26 11:40:25 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2462 ms on localhost (1/1)
15/11/26 11:40:25 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/11/26 11:40:25 INFO DAGScheduler: ResultStage 0 (collect at CassandraTest.scala:32) finished in 2.481 s
15/11/26 11:40:25 INFO DAGScheduler: Job 0 finished: collect at CassandraTest.scala:32, took 2.940601 s
Existing Data: CassandraRow{key: 1, value: first row}
Existing Data: CassandraRow{key: 2, value: second row}
Existing Data: CassandraRow{key: 3, value: third row}
//....................
5/11/26 11:40:27 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
15/11/26 11:40:27 INFO DAGScheduler: ResultStage 3 (collect at CassandraTest.scala:41) finished in 0.032 s
15/11/26 11:40:27 INFO DAGScheduler: Job 3 finished: collect at CassandraTest.scala:41, took 0.046502 s
New Data: (4,fourth row)
New Data: (5,fifth row)
Work completed, stopping the Spark context.

cassandra中的数据

cqlsh:test> select * from key_value ;

 key | value
-----+------------
   5 |  fifth row
   1 |  first row
   2 | second row
   4 | fourth row
   3 |  third row

(5 rows)

到此位置,还算顺利,除了assembly 重复文件的问题,都还算ok。

时间: 2024-10-12 23:30:54

如何在spark中读写cassandra数据 ---- 分布式计算框架spark学习之六的相关文章

如何在SQL中产生交叉式数据表(枢纽分析表)Part 2(PIVOT,UNPIVOT)

之前小喵使用SQL2000的时候,为了产生交叉数据表(Excel中称为枢纽分析表),小喵用了以下这篇的方式来处理 http://www.dotblogs.com.tw/topcat/archive/2008/04/14/2909.aspx 这样的方式在SQL 2005里面一样可以用,不过2005又新增了一个更方便的东西,可以更简洁的来处理.这个东西就是[PIVOT],神奇的是,除了可以透过PIVOT产生交叉数据表,还可以透过UNPIVOT将交叉数据表转回原来的方式(这对于有些数据表的正规化很有帮

如何在Eclipse中查看JDK以及Java框架的源码

对于Java程序员来说,有时候是需要查看JDK或者一些Java框架的源码来分析问题的,而默认情况下,你按住Ctrl,再点击 Java本身的类库(例如ArrayList)是无法查看源码的,那么如何在Eclipse中查看JDK以及Java框架的源码呢?下面,跟着我 一起,一步步带你走进源码的世界. 方法一:快速简单 第一步: 打开你的Eclipse,然后随便找一个Java文件,随便找一个Java类库,比如String什么的,然后按住Ctrl,再点击它,你会发现跳到如下界面: 你会发现报错了:Sour

解决spark中遇到的数据倾斜问题

一. 数据倾斜的现象 多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败. 二. 数据倾斜的原因 常见于各种shuffle操作,例如reduceByKey,groupByKey,join等操作. 数据问题 key本身分布不均匀(包括大量的key为空) key的设置不合理 spark使用问题 shuffle时的并发度不够 计算方式有误 三. 数据倾斜的后果 spark中一个stage的执行时间受限于最后那个执行完的task,因此运行缓慢的任务会拖累整个

如何在python中读写和存储matlab的数据文件(*.mat)

使用sicpy.io即可.sicpy.io提供了两个函数loadmat和savemat,非常方便. 以前也有一些开源的库(pymat和pymat2等)来做这个事, 不过自从有了numpy和scipy以后,这些库都被抛弃了. 下面是一个简单的测试程序,具体的函数用法可以看帮助文档: [python] view plaincopy import scipy.io as sio import matplotlib.pyplot as plt import numpy as np #matlab文件名

【转】在Unity中读写文件数据:LitJSON快速教程

作者:王选易,出处:http://www.cnblogs.com/neverdie/ 欢迎转载,也请保留这段声明.如果你喜欢这篇文章,请点[推荐].谢谢! 介绍 JSON是一个简单的,但功能强大的序列化数据格式.它定义了简单的类型,如布尔,数(int和float)和字符串,和几个数据结构:list和dictionnary.可以在http://JSON.org了解关于JSON的更多信息. litjson是用C #编写的,它的目的是要小,快速,易用.它使用了Mono框架. 安装LitJSON 将Li

如何在Servlet中传递后台数据至前端

主要同时HttpServletRequest request参数的setAttribute方法实现, setAttribute()方法有两个参数,String arg0以及Obj arg1: 要将通过数据库查询得到的一系列数据集合传递至JSP,则可以这样做: request.setAttribute("students",students):为了表示方便,arg0与arg1可以取一样的值(但是 数据类型不一样) 当完成setAttribute之后,需要进行从servlet至JSP页面的

如何在SharePoint2010中实现大数据存储(四步完成RBS解决方案)

第一步.开启数据库的FILESTREAM支持 实际环境为Sharepoint2010 和 SQL Server2008 R2:数据库默认安装的时候可能没有开启FILESTREAM,此时需要我们核实是否已经开启.打开SQL配置管理器,如下图所示 在开启FILESTREAM之后,在数据库中执行下列语句 EXEC sp_configure filestream_access_level, 2RECONFIGURE 第二.配置内容数据库与文件系统的映射 use [WSS_Content]if not e

如何在PB中制作特殊形状数据窗口或按钮

以下是全部源码,感兴趣的朋友可以整个窗口的源代码导入到PBL里自己运行一下试试. PBExportHeaderw_button.srw forward global type w_button from window end type type cb_11 from commandbutton within w_button end type type cb_10 from commandbutton within w_button end type type em_3 from editmas

如何在PB中制作特殊形状数据窗口或按钮介绍

在 CSDN 上,经常有朋友问到不规则窗口或者不规则的按钮如何制作,在这里我介绍 几个比较常用的 API 函数,来实现不规则窗口或者不规则按钮的制作. 一.先看一下效果: 点击一下圆形花边按钮,效果如下: (蓝色的为桌面) CSDN 社区电子杂志-POWERBUILDER 杂志创刊号 二.下面我介绍一下程序中主要用到的几个 API 函数: 1. 创建一个圆角矩形 CreateRoundRectRgn CreateRoundRectRgn PB 声明 FUNCTION ulong CreateRo