Spark读写Hbase中的数据

def main(args: Array[String])  {
    val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp").set("spark.kryo.registrator", classOf[HBaseConfiguration].getName)
      .set("spark.executor.memory", "4g")
    val sc: SparkContext = new SparkContext(sparkConf)
    val sqlContext = new HiveContext(sc)
    val mySQLUrl = "jdbc:mysql://localhost:3306/yangsy?user=root&password=yangsiyi"
    val rows = sqlContext.jdbc(mySQLUrl, "person")
    val tableName = "spark"
    val columnFamily = "cf" //rows.first().getString(1)
    val configuration = HBaseConfiguration.create();
    configuration.set(TableInputFormat.INPUT_TABLE, "test");
    val admin = new HBaseAdmin(configuration)
    val hBaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])
      hBaseRDD.count()

def toHbase(rows: DataFrame,tableName : String,columnFamily: String)   {
    val configuration = HBaseConfiguration.create();
    val admin = new HBaseAdmin(configuration)
    if (admin.tableExists(tableName)) {
      print("table Exists")
      admin.disableTable(tableName);
      admin.deleteTable(tableName);
    }
    configuration.addResource("hbase-site.xml")
    val tableDesc = new HTableDescriptor(tableName)
    tableDesc.addFamily(new HColumnDescriptor(columnFamily))
    admin.createTable(tableDesc)
    rows.foreachPartition { row =>
      val table = new HTable(configuration, tableName)

      row.foreach { a =>
        val put = new Put(Bytes.toBytes("row1"))
        put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("coulumn1"), Bytes.toBytes(a.getString(0)))
        table.put(put)
        println("insert into success")
      }
    }

然而并没有什么乱用,发现一个问题,就是说,在RDD取值与写入HBASE的时候,引入外部变量无法序列化。。。。。。网上很多说法是说extends Serializable ,可是尝试无效。Count()是可以获取到,但是如果我要在configuration中set列,然后进行查询就会报错了。暂时各种办法尝试无果,还在想办法,也不明原因。

				
时间: 2024-10-01 07:34:19

Spark读写Hbase中的数据的相关文章

Spark读取Hbase中的数据_云帆大数据分享

Spark读取Hbase中的数据 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1).调用parallelize函数直接从集合中获取数据,并存入RDD中:Java版本如下: 1 JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3)); Scala版本如下: 1 val myRDD= sc.parallelize(List(1,2,3)) 这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初

使用Hive或Impala执行SQL语句,对存储在HBase中的数据操作

使用Hive或Impala执行SQL语句,对存储在HBase中的数据操作 Hive Impala HBase HiveQL 大数据 使用Hive或Impala执行SQL语句,对存储在HBase中的数据操作 〇.摘要 一.基础环境 二.数据存储在HBase中,使用Hive执行SQL语句 Ⅰ.创建Hive外部表 Ⅱ.从HBase读 Ⅲ.向HBase写 三.数据存储在HBase中,使用Impala执行SQL语句 Ⅰ.从HBase读 Ⅱ.向HBase写 四.综上所述 〇.摘要 Hive是基于Hadoop

mysql通过sqoop导入到hbase中时数据量为1000w时出现Incorrect key file for table &#39;/tmp/#sql_458_0.MYI&#39;; try to repair it

问题:mysql通过sqoop导入到hbase中时数据量为1000w时出现Incorrect key file for table '/tmp/#sql_458_0.MYI'; try to repair it,数据量为100w等时没该问题 分析:出现该问题时因为mysql的临时目录(默认为/tmp)太小 解决方法:参考:http://blog.sina.com.cn/s/blog_4c197d420101bdn9.html mysql通过sqoop导入到hbase中时数据量为1000w时出现I

MapReduce 读取和操作HBase中的数据

MapReduce 中如何处理HBase中的数据?如何读取HBase数据给Map?如何将结果存储到HBase中? 2012-07-05 13:40 89人阅读 评论(0) 收藏 举报 MapReduce 中如何处理HBase中的数据?如何读取HBase数据给Map?如何将结果存储到HBase中? Mapper类:包括一个内部类(Context)和四个方法(setup,map,cleanup,run):          setup,cleanup用于管理Mapper生命周期中的资源.setup

mapreduce实现从hbase中统计数据,结果存入mysql中

最近开始学习使用mapreduce统计hbase中的数据,并将结果集存入mysql中,供前台查询使用. 使用hadoop版本为2.5.1,hbase版本为0.98.6.1 mapreduce程序分为三个部分:job.map函数.reduce函数 job类: 1 public class DayFaultStatisticsJob { 2 private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsJ

用Spark查询HBase中的表数据

java代码如下: package db.query; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Res

hadoop之根据Rowkey从HBase中查询数据

1.Hbase 根据rowkey 查询 conf的配置信息如下: conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.50.253:2181"); conf.set("hbase.rootdir", "hdfs://192.168.50.253:9000/hbase"); .csharpcode, .csharpcode pre { f

7.从Hbase中读取数据写入hdfs

/** public abstract classTableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable,Result, KEYOUT, VALUEOUT> { }  *@author [email protected]  *  */ public class HbaseReader {          publicstatic String flow_fields_import = "fl

Sqoop_具体总结 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出

一.使用Sqoop将MySQL中的数据导入到HDFS/Hive/HBase 二.使用Sqoop将HDFS/Hive/HBase中的数据导出到MySQL 2.3 HBase中的数据导出到mysql 眼下没有直接的命令将HBase中的数据导出到MySQL.但能够先将HBase中的数据导出到HDFS中.再将数据导出到MySQL. 三.使用Sqoop将Oracle中的数据导入到HDFS/Hive/HBase 以下仅仅给出将Oracle中的数据导入HBase,其它情况下的命令行选项与MySQL的操作相似