通过spark sql 将 hdfs上文件导入到mongodb

功能:通过spark sql 将hdfs 中文件导入到mongdo

所需jar包有:mongo-spark-connector_2.11-2.1.2.jar、mongo-java-driver-3.8.0.jar

scala代码如下:

import org.apache.spark.sql.Rowimport org.apache.spark.sql.Datasetimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.hadoop.conf.Configurationimport org.apache.spark.sql.SparkSessionimport com.mongodb.spark._import org.bson.Documentimport com.mongodb.spark.config._

object Exec {  def main(args: Array[String]) {

    if (args.length < 6) {      System.err.println("Usage: Exec <hdfsServer> <logPath> <fileName> <mongoHost> <mongoDB> <mongoCollection>")      System.exit(1)    }    val hdfsServer = args(0) // "hdfs://master" val logPath = args(1) // "/user/hdfs/log/" val fileName = args(2) // 2017-05-04.txt val mongoHost = args(3) // "10.15.22.22:23000" val mongoDB = args(4) // "mongo db" val mongoCollection = args(5) //"mongo collection"

 try {      import org.apache.spark.sql.SparkSession      val spark = SparkSession        .builder()        .master("local")        .appName("SparkImportDataToMongo")        .config("spark.debug.maxToStringFields", 500).getOrCreate()      import spark.implicits._      val df = spark.read.json(hdfsServer + logPath + "/" + fileName)      df.printSchema() df.write.mode("append").format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://" + mongoHost + "/" + mongoDB + "." + mongoCollection).save()

    } catch {      case ex: Exception => {        printf(ex.toString())      }    }  }}

在spark 运行目录执行如下命令:

./bin/spark-submit  --master spark://11.12.13.14:7077 --class Exec //bigdata/spark-2.1.1-bin-hadoop2.6/examples/ImportDataToMongo.jar hdfs://master /user/hdfs/log/ 2017-05-04.txt 10.15.22.22:27017 mydb data_default_test

运行:

[[email protected] spark-2.1.1-bin-hadoop2.6]#   ./bin/spark-submit  --master spark://11.12.13.14:7077 --class Exec //bigdata/spark-2.1.1-bin-hadoop2.6/examples/ImportDataToMongo.jar hdfs://master /user/hdfs/log/ 2017-05-04.txt 10.15.22.22:27017 mydb data_default_test

18/07/20 23:41:13 INFO spark.SparkContext: Running Spark version 2.1.1

18/07/20 23:41:14 INFO spark.SecurityManager: Changing view acls to: root

18/07/20 23:41:14 INFO spark.SecurityManager: Changing modify acls to: root

18/07/20 23:41:14 INFO spark.SecurityManager: Changing view acls groups to:

18/07/20 23:41:14 INFO spark.SecurityManager: Changing modify acls groups to:

18/07/20 23:41:14 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()

18/07/20 23:41:14 INFO util.Utils: Successfully started service ‘sparkDriver‘ on port 24073.

18/07/20 23:41:14 INFO spark.SparkEnv: Registering MapOutputTracker

18/07/20 23:41:14 INFO spark.SparkEnv: Registering BlockManagerMaster

18/07/20 23:41:14 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information

18/07/20 23:41:14 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up

18/07/20 23:41:14 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-9c42a710-559b-4c97-b92a-58208a77afeb

18/07/20 23:41:14 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB

18/07/20 23:41:14 INFO spark.SparkEnv: Registering OutputCommitCoordinator

18/07/20 23:41:14 INFO util.log: Logging initialized @1777ms

18/07/20 23:41:14 INFO server.Server: jetty-9.2.z-SNAPSHOT

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/jobs,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/jobs/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/jobs/job,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/jobs/job/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages/stage,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages/stage/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages/pool,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages/pool/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/storage,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/storage/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/storage/rdd,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/storage/rdd/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/environment,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/environment/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/executors,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/executors/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/executors/threadDump,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/executors/threadDump/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/static,null,AVAILABLE,@Spark}

原文地址:https://www.cnblogs.com/abcdwxc/p/9344637.html

时间: 2024-10-08 05:58:02

通过spark sql 将 hdfs上文件导入到mongodb的相关文章

SQL Server服务器上需要导入Excel数据的必要条件

SQL Server服务器上需要导入Excel数据,必须安装2007 Office system 驱动程序:数据连接组件,或者Access2010的数据库引擎可再发行程序包,这样就不必在服务器上装Excel了.

[Spark][Python]对HDFS 上的文件,采用绝对路径,来读取获得 RDD

对HDFS 上的文件,采用绝对路径,来读取获得 RDD: In [102]: mydata=sc.textFile("file:/home/training/test.txt")17/09/24 06:31:04 INFO storage.MemoryStore: Block broadcast_30 stored as values in memory (estimated size 230.5 KB, free 2.4 MB)17/09/24 06:31:04 INFO stora

向Hive中导入HDFS上文件时要注意的问题

前几天往HDFS写文件写的时候把文件名起成了.aaa.txt,这样本来是可以的,上传到HDFS也是没有任务问题的,但是将这个文件与Hive进行关联的时候却出现问题了,并不是导入的时候报错了,是导入的时候什么也没有报,默认已为成功了,但是Hive中怎么都查不到数据,反复了好多次,最后把文件名改成了aaa.txt,问题解决了,难道Hive不认以.开头的文件?其实并不是不认,因为在Linux中以.打头的文件或文件夹都是隐藏的,用ls是查不到的,只有用ll才能看到,这就是关联后,为什么在Hive中查不到

【赵强老师】在Spark SQL中读取JSON文件

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用.为什么要学习Spark SQL?如果大家了解Hive的话,应该知道它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢.所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL

shell脚本监控Flume输出到HDFS上文件合法性

在使用flume中发现由于网络.HDFS等其它原因,使得经过Flume收集到HDFS上得日志有一些异常,表现为: 1.有未关闭的文件:以tmp(默认)结尾的文件.加入存到HDFS上得文件应该是gz压缩文件,以tmp为结尾的文件就无法使用: 2.有大小为0的文件,比如gz压缩文件大小为0,我们单独拿下这个文件解压发现是无限循环压缩的...这个也不能直接用来跑mapreduce 目前发现上述两种情况,其它还暂未发现.至于出现上述情况还没明确原因,且这两种情况都会影响hive.MapReduce的正常

HDFS上文件块的副本数设置

一.使用 setrep 命令来设置 # 设置 /javafx-src.zip 的文件块只存三份 hadoop fs -setrep 3 /javafx-src.zip 二.文件块在磁盘上的路径 # 设置的 hdfs 目录为/opt/hadoop-tmp/ # hdfs 文件块的路径为 /opt/hadoop-tmp/dfs/data/current/BP-362764591-192.168.8.136-1554970906073/current/finalized/subdir0/subdir0

SQL Server 2008R2数据库文件导入到SQL Server 2008数据库中

最近,电脑重装系统之后,安装了SQL Server 2008.附加数据库文件的时候,发现无法附加,提示版本不对.想起来,原来的数据库版本是SQL Server 2008R2.低版本的数据库管理工具无法兼容高版本的数据库文件,所以无法直接附加.我有10几个这样的数据库文件需要附加到数据库管理工具中,上网百度一番之后,没有查到什么特别的捷径.最后,只好选择了先将这些数据库文件附加到一台安装了SQL Server 2008R2的电脑上,再导出数据脚本,通过数据脚本还原数据库文件到SQL Server

pl/sql txt格式的文件导入Oracle

选择pl/sql的工具---文本导入 2. 3.

Sqoop 将hdfs上的文件导入到oracle中,关于date类型的问题

近期的项目中,需要将hadoop运行完成的结果(存在于hdfs上)导入到oracle中,但是在用sqoop导入hdfs中的日期字段'2016-03-01'时,sqoop报错,说date类型必须为'yyyy-mm-dd HH:MM:SS.ffffffff'. 难道sqoop不支持自定义to_date函数么,于是我开始在网上寻求答案,但几乎找不到相关的资料,直到我在sqoop官网上看到如下的信息: sqoop导入date类型到oracle上时,只支持全格式的时间类型!!! 在这里贴出这个问题,希望对