功能:通过spark sql 将hdfs 中文件导入到mongdo
所需jar包有:mongo-spark-connector_2.11-2.1.2.jar、mongo-java-driver-3.8.0.jar
scala代码如下:
import org.apache.spark.sql.Rowimport org.apache.spark.sql.Datasetimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.hadoop.conf.Configurationimport org.apache.spark.sql.SparkSessionimport com.mongodb.spark._import org.bson.Documentimport com.mongodb.spark.config._ object Exec { def main(args: Array[String]) { if (args.length < 6) { System.err.println("Usage: Exec <hdfsServer> <logPath> <fileName> <mongoHost> <mongoDB> <mongoCollection>") System.exit(1) } val hdfsServer = args(0) // "hdfs://master" val logPath = args(1) // "/user/hdfs/log/" val fileName = args(2) // 2017-05-04.txt val mongoHost = args(3) // "10.15.22.22:23000" val mongoDB = args(4) // "mongo db" val mongoCollection = args(5) //"mongo collection" try { import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .master("local") .appName("SparkImportDataToMongo") .config("spark.debug.maxToStringFields", 500).getOrCreate() import spark.implicits._ val df = spark.read.json(hdfsServer + logPath + "/" + fileName) df.printSchema() df.write.mode("append").format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://" + mongoHost + "/" + mongoDB + "." + mongoCollection).save() } catch { case ex: Exception => { printf(ex.toString()) } } }}
在spark 运行目录执行如下命令:
./bin/spark-submit --master spark://11.12.13.14:7077 --class Exec //bigdata/spark-2.1.1-bin-hadoop2.6/examples/ImportDataToMongo.jar hdfs://master /user/hdfs/log/ 2017-05-04.txt 10.15.22.22:27017 mydb data_default_test
运行:
[[email protected] spark-2.1.1-bin-hadoop2.6]# ./bin/spark-submit --master spark://11.12.13.14:7077 --class Exec //bigdata/spark-2.1.1-bin-hadoop2.6/examples/ImportDataToMongo.jar hdfs://master /user/hdfs/log/ 2017-05-04.txt 10.15.22.22:27017 mydb data_default_test
18/07/20 23:41:13 INFO spark.SparkContext: Running Spark version 2.1.1
18/07/20 23:41:14 INFO spark.SecurityManager: Changing view acls to: root
18/07/20 23:41:14 INFO spark.SecurityManager: Changing modify acls to: root
18/07/20 23:41:14 INFO spark.SecurityManager: Changing view acls groups to:
18/07/20 23:41:14 INFO spark.SecurityManager: Changing modify acls groups to:
18/07/20 23:41:14 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
18/07/20 23:41:14 INFO util.Utils: Successfully started service ‘sparkDriver‘ on port 24073.
18/07/20 23:41:14 INFO spark.SparkEnv: Registering MapOutputTracker
18/07/20 23:41:14 INFO spark.SparkEnv: Registering BlockManagerMaster
18/07/20 23:41:14 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/07/20 23:41:14 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/07/20 23:41:14 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-9c42a710-559b-4c97-b92a-58208a77afeb
18/07/20 23:41:14 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
18/07/20 23:41:14 INFO spark.SparkEnv: Registering OutputCommitCoordinator
18/07/20 23:41:14 INFO util.log: Logging initialized @1777ms
18/07/20 23:41:14 INFO server.Server: jetty-9.2.z-SNAPSHOT
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/jobs,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/jobs/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/jobs/job,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/jobs/job/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages/stage,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages/stage/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages/pool,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/stages/pool/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/storage,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/storage/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/storage/rdd,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/storage/rdd/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/environment,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/environment/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/executors,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/executors/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/executors/threadDump,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/executors/threadDump/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started [email protected]{/static,null,AVAILABLE,@Spark}
原文地址:https://www.cnblogs.com/abcdwxc/p/9344637.html