spark cassandra connector 使用

1、cassandra 准备

启动cqlsh,

CQLSH_HOST=172.16.163.131 bin/cqlsh
cqlsh>CREATE KEYSPACE productlogs WITH REPLICATION = { ‘class‘ : ‘org.apache.cassandra.locator.SimpleStrategy‘, ‘replication_factor‘: ‘2‘ } 

cqlsh>CREATE TABLE productlogs.logs (
    ids uuid,
    app_name text,
    app_version text,
    city text,
    client_time timestamp,
    country text,
    created_at timestamp,
    cs_count int,
    device_id text,
    id int,
    modle_name text,
    province text,
    remote_ip text,
    updated_at timestamp,
    PRIMARY KEY (ids)
)

2、spark cassandra conector jar包

新建空项目,使用sbt,引入connector,打包为spark-cassandra-connector-full.jar

这步的意义在于:官方的connector包没有将依赖打进去,所以,直接使用官方包的时候,需要自己将依赖找出来。不同版本依赖的包及版本也不相同,简单起见,直接打一个full包

3、启动spark-shell

/opt/db/spark-1.5.2-bin-hadoop2.6/bin/spark-shell --master spark://u1:7077  --jars ~/spark-cassandra-connector-full.jar

以下为sparkshell 命令

4、准备数据源:

//可能大多数文档都先stop掉当前sc,再重启一个,其实根本没必要,直接在原有sc上添加cassandra的参数就好
scala>sc.getConf.set("spark.cassandra.connection.host", "172.16.163.131")
//读取HDFS上的数据源
scala>val df = sc.textFile("/data/logs")
//引入需要的命令空间
scala>import org.apache.spark.sql._
scala>import org.apache.spark.sql.types._
scala>import com.datastax.spark.connector._
scala>import java.util.UUID
//定义shcmea
scala>val schema = StructType(
  StructField("ids", StringType, true) ::
    StructField("id", IntegerType, true) ::
    StructField("app_name", StringType, true) ::
    StructField("app_version", StringType, true) ::
    StructField("client_time", TimestampType, true) ::
    StructField("device_id", StringType, true) ::
    StructField("modle_name", StringType, true) ::
    StructField("cs_count", IntegerType, true) ::
    StructField("created_at", TimestampType, true) ::
    StructField("updated_at", TimestampType, true) ::
    StructField("remote_ip", StringType, true) ::
    StructField("country", StringType, true) ::
    StructField("province", StringType, true) ::
   StructField("city", StringType, true) :: Nil)
//指定数据源的schema
scala>val rowRDD = df.map(_.split("\t")).map(p => Row(UUID.randomUUID().toString(), p(0).toInt, p(1), p(2), java.sql.Timestamp.valueOf(p(3)), p(4), p(5), p(6).toInt, java.sql.Timestamp.valueOf(p(7)), java.sql.Timestamp.valueOf(p(8)), p(9), p(10), p(11), p(12)))
scala>val df= sqlContext.createDataFrame(rowRDD, schema)
scala>df.registerTempTable("logs")
//看下结果
scala>sqlContext.sql("select * from logs limit 1").show

5、将数据存入cassandra

scala>import org.apache.spark.sql.cassandra._
scala>df.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "logs", "keyspace" -> "productlogs")).save()

6、取出刚存的数据:

scala>import org.apache.spark.sql.cassandra._
scala>val cdf = sqlContext.read.
  format("org.apache.spark.sql.cassandra").
  options(Map("table" -> "logs", "keyspace" -> "productlogs")).
  load().registerTempTable("logs")
scala>sqlContext.sql("select * from logs_jsut_save limit 1").show

  

时间: 2024-10-05 06:44:36

spark cassandra connector 使用的相关文章

Apache Spark技术实战之3 -- Spark Cassandra Connector的安装和使用

欢迎转载,转载请注明出处,徽沪一郎. 概要 前提 假设当前已经安装好如下软件 jdk sbt git scala 安装cassandra 以archlinux为例,使用如下指令来安装cassandra yaourt -S cassandra 启动cassandra cassandra -f 创建keyspace和table, 运行/usr/bin/cqlsh进入cql console,然后执行下述语句创建keyspace和table CREATE KEYSPACE test WITH repli

Cassandra非关系型数据库

由于在工作中用到了Cassandra非关系型数据库,因此总结一下常用操作.Cassandra使用Java语言编写的,因此在安装之前,首先需要安装JDK.自己使用的版本为apache-cassandra-2.1.11-bin.tar.gz,在Ubuntu 12.04上进行安装.由于目前仅仅在单机上面实验,因此集群部分以后整理. 1. Cassandra安装和配置 (1)~/.bashrc环境变量配置 export CASSANDRA_HOME=/usr/local/cassandra/apache

Apache Spark技术实战之6 --Standalone部署模式下的临时文件清理

问题导读 1.在Standalone部署模式下,Spark运行过程中会创建哪些临时性目录及文件? 2.在Standalone部署模式下分为几种模式? 3.在client模式和cluster模式下有什么不同? 概要 在Standalone部署模式下,Spark运行过程中会创建哪些临时性目录及文件,这些临时目录和文件又是在什么时候被清理,本文将就这些问题做深入细致的解答. 从资源使用的方面来看,一个进程运行期间会利用到这四个方面的资源,分别是CPU,内存,磁盘和网络.进程退出之后,CPU,内存和网络

Apache Spark技术实战之4 -- 利用Spark将json文件导入Cassandra

欢迎转载,转载请注明出处. 概要 本文简要介绍如何使用spark-cassandra-connector将json文件导入到cassandra数据库,这是一个使用spark的综合性示例. 前提条件 假设已经阅读技术实战之3,并安装了如下软件 jdk scala sbt cassandra spark-cassandra-connector 实验目的 将存在于json文件中的数据导入到cassandra数据库,目前由cassandra提供的官方工具是json2sstable,由于对cassandr

如何在spark中读写cassandra数据 ---- 分布式计算框架spark学习之六

由于预处理的数据都存储在cassandra里面,所以想要用spark进行数据分析的话,需要读取cassandra数据,并把分析结果也一并存回到cassandra:因此需要研究一下spark如何读写cassandra. 话说这个单词敲起来好累,说是spark,其实就是看你开发语言是否有对应的driver了. 因为cassandra是datastax主打的,所以该公司也提供了spark的对应的driver了,见这里. 我就参考它的demo,使用scala语言来测试一把. 1.执行代码 //Cassa

Cassandra联手Spark 大数据分析将迎来哪些改变?

2014Spark峰会在美国旧金山举行,与会数据库平台供应商DataStax宣布,与Spark供应商Databricks合作,在它的旗舰产 品 DataStax Enterprise 4.5 (DSE)中,将Cassandra NoSQL数据库与Apache Spark开源引擎相结合,为用户提供基于内存处理的实时分析. Databricks是一家由Apache Spark创始人成立的公司.谈到这次合作,DataStax副总裁John Glendenning表示:“将Spark与Cassandra

Apache Spark技术实战之7 -- CassandraRDD高并发数据读取实现剖析

未经本人同意,严禁转载,徽沪一郎. 概要 本文就 spark-cassandra-connector 的一些实现细节进行探讨,主要集中于如何快速将大量的数据从cassandra 中读取到本地内存或磁盘. 数据分区 存储在 Cassandra 中数据的一般都会比较多,记录数在千万级别或上亿级别是常见的事.如何将这些表中的内容快速加载到本地内存就是一个非常现实的问题.解决这一挑战的思路从大的方面来说是比较简单的,那就是将整张表中的内容分成不同的区域,然后分区加载,不同的分区可以在不同的线程或进程中加

Spark 入门

原文链接 什么是Spark Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架.最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一. 与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark有如下优势. 首先,Spark为我们提供了一个全面.统一的框架用于管理各种有着不同性质(文本数据.图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求. Spark可以将Hadoop集群中的应

spark基本概念及入门

spark spark背景 什么是spark Spark是一种快速.通用.可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目.目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL.Spark Streaming.GraphX.MLlib等子项目,Spark是基于内存计算的大数据并行计算框架.Spark基于内存计算,提高了在大数据环境下数据处理的实时