Apache Spark 1.4 读取 hadoop 2.6 文件系统上文件

scala> val file = sc.textFile("hdfs://9.125.73.217:9000/user/hadoop/logs")

scala> val count = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

scala> count.collect()

以Spark上经典的wordcount为例,验证spark对hdfs文件系统的读写

1. 启动Spark shell


/root/spark-1.4.0-bin-hadoop2.4/bin/spark-shell

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark‘s default log4j profile: org/apache/spark/log4j-defaults.properties
15/07/12 21:32:05 INFO SecurityManager: Changing view acls to: root
15/07/12 21:32:05 INFO SecurityManager: Changing modify acls to: root
15/07/12 21:32:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
15/07/12 21:32:05 INFO HttpServer: Starting HTTP Server
15/07/12 21:32:05 INFO Utils: Successfully started service ‘HTTP class server‘ on port 50452.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  ‘_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
15/07/12 21:32:09 INFO SparkContext: Running Spark version 1.4.0
15/07/12 21:32:10 INFO SecurityManager: Changing view acls to: root
15/07/12 21:32:10 INFO SecurityManager: Changing modify acls to: root
15/07/12 21:32:10 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
15/07/12 21:32:10 INFO Slf4jLogger: Slf4jLogger started
15/07/12 21:32:10 INFO Remoting: Starting remoting
15/07/12 21:32:10 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:35775]
15/07/12 21:32:10 INFO Utils: Successfully started service ‘sparkDriver‘ on port 35775.
15/07/12 21:32:10 INFO SparkEnv: Registering MapOutputTracker
15/07/12 21:32:10 INFO SparkEnv: Registering BlockManagerMaster
15/07/12 21:32:10 INFO DiskBlockManager: Created local directory at /tmp/spark-6bd4dc00-8a04-4b62-8f16-76f4beeba918/blockmgr-b0db297e-f183-4ca5-8cb5-7ee943df509d
15/07/12 21:32:10 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/07/12 21:32:10 INFO HttpFileServer: HTTP File server directory is /tmp/spark-6bd4dc00-8a04-4b62-8f16-76f4beeba918/httpd-b22e2de4-9618-4bba-b25a-a8c1fd28826d
15/07/12 21:32:10 INFO HttpServer: Starting HTTP Server
15/07/12 21:32:10 INFO Utils: Successfully started service ‘HTTP file server‘ on port 55255.
15/07/12 21:32:10 INFO SparkEnv: Registering OutputCommitCoordinator
15/07/12 21:32:11 INFO Utils: Successfully started service ‘SparkUI‘ on port 4040.
15/07/12 21:32:11 INFO SparkUI: Started SparkUI at http://9.125.73.217:4040
15/07/12 21:32:11 INFO Executor: Starting executor ID driver on host localhost
15/07/12 21:32:11 INFO Executor: Using REPL class URI: http://9.125.73.217:50452
15/07/12 21:32:11 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService‘ on port 60268.
15/07/12 21:32:11 INFO NettyBlockTransferService: Server created on 60268
15/07/12 21:32:11 INFO BlockManagerMaster: Trying to register BlockManager
15/07/12 21:32:11 INFO BlockManagerMasterEndpoint: Registering block manager localhost:60268 with 265.4 MB RAM, BlockManagerId(driver, localhost, 60268)
15/07/12 21:32:11 INFO BlockManagerMaster: Registered BlockManager
15/07/12 21:32:11 INFO SparkILoop: Created spark context..
Spark context available as sc.
15/07/12 21:32:12 INFO HiveContext: Initializing execution hive, version 0.13.1
15/07/12 21:32:12 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/07/12 21:32:12 INFO ObjectStore: ObjectStore, initialize called
15/07/12 21:32:13 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
15/07/12 21:32:13 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/07/12 21:32:13 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/07/12 21:32:13 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/07/12 21:32:14 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
15/07/12 21:32:15 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5.  Encountered: "@" (64), after : "".
15/07/12 21:32:15 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
15/07/12 21:32:15 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
15/07/12 21:32:17 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
15/07/12 21:32:17 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
15/07/12 21:32:17 INFO ObjectStore: Initialized ObjectStore
15/07/12 21:32:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa
15/07/12 21:32:17 INFO HiveMetaStore: Added admin role in metastore
15/07/12 21:32:17 INFO HiveMetaStore: Added public role in metastore
15/07/12 21:32:17 INFO HiveMetaStore: No user is added in admin role, since config is empty
15/07/12 21:32:18 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr.
15/07/12 21:32:18 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala>

 

2. 读取hdfs上的文件


scala> val file = sc.textFile("hdfs://9.125.73.217:9000/hbase/hbase.version")
15/07/12 21:34:50 INFO MemoryStore: ensureFreeSpace(80368) called with curMem=0, maxMem=278302556
15/07/12 21:34:50 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 78.5 KB, free 265.3 MB)
15/07/12 21:34:50 INFO MemoryStore: ensureFreeSpace(17237) called with curMem=80368, maxMem=278302556
15/07/12 21:34:50 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 16.8 KB, free 265.3 MB)
15/07/12 21:34:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:60268 (size: 16.8 KB, free: 265.4 MB)
15/07/12 21:34:50 INFO SparkContext: Created broadcast 0 from textFile at <console>:21
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

 

3.  计算单词数量

scala> val count = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

scala> val count = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
15/07/12 21:38:43 INFO FileInputFormat: Total input paths to process : 1
count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:23

scala>

scala> count.collect()
15/07/12 21:39:25 INFO SparkContext: Starting job: collect at <console>:26
15/07/12 21:39:25 INFO DAGScheduler: Registering RDD 7 (map at <console>:23)
15/07/12 21:39:25 INFO DAGScheduler: Got job 0 (collect at <console>:26) with 3 output partitions (allowLocal=false)
15/07/12 21:39:25 INFO DAGScheduler: Final stage: ResultStage 1(collect at <console>:26)
15/07/12 21:39:25 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
15/07/12 21:39:25 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
15/07/12 21:39:25 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[7] at map at <console>:23), which has no missing parents
15/07/12 21:39:25 INFO MemoryStore: ensureFreeSpace(4128) called with curMem=297554, maxMem=278302556
15/07/12 21:39:25 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.0 KB, free 265.1 MB)
15/07/12 21:39:25 INFO MemoryStore: ensureFreeSpace(2305) called with curMem=301682, maxMem=278302556
15/07/12 21:39:25 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.3 KB, free 265.1 MB)
15/07/12 21:39:25 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:60268 (size: 2.3 KB, free: 265.4 MB)
15/07/12 21:39:25 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:874
15/07/12 21:39:25 INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[7] at map at <console>:23)
15/07/12 21:39:25 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
15/07/12 21:39:25 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 1406 bytes)
15/07/12 21:39:25 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, ANY, 1406 bytes)
15/07/12 21:39:25 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/07/12 21:39:25 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/07/12 21:39:25 INFO HadoopRDD: Input split: hdfs://9.125.73.217:9000/hbase/hbase.version:0+3
15/07/12 21:39:25 INFO HadoopRDD: Input split: hdfs://9.125.73.217:9000/hbase/hbase.version:3+3
15/07/12 21:39:25 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/07/12 21:39:25 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/07/12 21:39:25 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/07/12 21:39:25 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/07/12 21:39:25 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/07/12 21:39:25 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2003 bytes result sent to driver
15/07/12 21:39:25 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2003 bytes result sent to driver
15/07/12 21:39:25 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, ANY, 1406 bytes)
15/07/12 21:39:25 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
15/07/12 21:39:25 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 162 ms on localhost (1/3)
15/07/12 21:39:25 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 179 ms on localhost (2/3)
15/07/12 21:39:25 INFO HadoopRDD: Input split: hdfs://9.125.73.217:9000/hbase/hbase.version:6+1
15/07/12 21:39:25 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2003 bytes result sent to driver
15/07/12 21:39:25 INFO DAGScheduler: ShuffleMapStage 0 (map at <console>:23) finished in 0.205 s
15/07/12 21:39:25 INFO DAGScheduler: looking for newly runnable stages
15/07/12 21:39:25 INFO DAGScheduler: running: Set()
15/07/12 21:39:25 INFO DAGScheduler: waiting: Set(ResultStage 1)
15/07/12 21:39:25 INFO DAGScheduler: failed: Set()
15/07/12 21:39:25 INFO DAGScheduler: Missing parents for ResultStage 1: List()
15/07/12 21:39:25 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[8] at reduceByKey at <console>:23), which is now runnable
15/07/12 21:39:25 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 25 ms on localhost (3/3)
15/07/12 21:39:25 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/07/12 21:39:25 INFO MemoryStore: ensureFreeSpace(2288) called with curMem=303987, maxMem=278302556
15/07/12 21:39:25 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.2 KB, free 265.1 MB)
15/07/12 21:39:25 INFO MemoryStore: ensureFreeSpace(1377) called with curMem=306275, maxMem=278302556
15/07/12 21:39:25 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1377.0 B, free 265.1 MB)
15/07/12 21:39:25 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:60268 (size: 1377.0 B, free: 265.4 MB)
15/07/12 21:39:25 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:874
15/07/12 21:39:25 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 1 (ShuffledRDD[8] at reduceByKey at <console>:23)
15/07/12 21:39:25 INFO TaskSchedulerImpl: Adding task set 1.0 with 3 tasks
15/07/12 21:39:25 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1165 bytes)
15/07/12 21:39:25 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 4, localhost, PROCESS_LOCAL, 1165 bytes)
15/07/12 21:39:25 INFO Executor: Running task 0.0 in stage 1.0 (TID 3)
15/07/12 21:39:25 INFO Executor: Running task 1.0 in stage 1.0 (TID 4)
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 3 blocks
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 3 blocks
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 7 ms
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
15/07/12 21:39:25 INFO Executor: Finished task 1.0 in stage 1.0 (TID 4). 1031 bytes result sent to driver
15/07/12 21:39:25 INFO Executor: Finished task 0.0 in stage 1.0 (TID 3). 1029 bytes result sent to driver
15/07/12 21:39:25 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 5, localhost, PROCESS_LOCAL, 1165 bytes)
15/07/12 21:39:25 INFO Executor: Running task 2.0 in stage 1.0 (TID 5)
15/07/12 21:39:25 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 47 ms on localhost (1/3)
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 3 blocks
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/07/12 21:39:25 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 4) in 46 ms on localhost (2/3)
15/07/12 21:39:25 INFO Executor: Finished task 2.0 in stage 1.0 (TID 5). 882 bytes result sent to driver
15/07/12 21:39:25 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 5) in 6 ms on localhost (3/3)
15/07/12 21:39:25 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/07/12 21:39:25 INFO DAGScheduler: ResultStage 1 (collect at <console>:26) finished in 0.043 s
15/07/12 21:39:25 INFO DAGScheduler: Job 0 finished: collect at <console>:26, took 0.352074 s
res1: Array[(String, Int)] = Array((?8,1), (PBUF,1))

scala>

时间: 2024-10-25 14:37:05

Apache Spark 1.4 读取 hadoop 2.6 文件系统上文件的相关文章

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

y欢迎转载,转载请注明出处,徽沪一郎. 概要 “spark已经比较头痛了,还要将其运行在yarn上,yarn是什么,我一点概念都没有哎,再怎么办啊.不要跟我讲什么原理了,能不能直接告诉我怎么将spark在yarn上面跑起来,I'm a dummy, just told me how to do it.” 如果你和我一样是一个对形而上的东西不是太感兴趣,而只纠结于怎么去做的话,看这份guide保证不会让你失望, :). 前期准备 本文所有的操作基于arch linux,保证下述软件已经安装 jdk

2018年前100名Apache Spark面试问题和解答(上)

我们知道Apache Spark现在是一项蓬勃发展的技术.因此,了解Apache Spark的各个方面以及Spark面试问题非常重要.我将介绍Spark的每个方面,这也可能是经常被问到的Spark面试问题.此外,我将尽力提供每个问题,从现在开始,您搜索最佳和所有Spark面试问题将在此结束. Apache Spark面试问题答案 一,什么是Apache Spark? Apache Spark是一个功能强大的开源灵活数据处理框架,围绕速度,易用性和复杂的分析而构建.Apache Spark在集群计

Hadoop和Apache Spark的异同

谈到大数据,相信大家对Hadoop和Apache Spark这两个名字并不陌生.但我们往往对它们的理解只是提留在字面上,并没有对它们进行深入的思考,下面不妨跟我一块看下它们究竟有什么异同. 1.解决问题的层面不一样 首先,Hadoop和Apache Spark两者都是大数据框架,但是各自存在的目的不尽相同.Hadoop实质上更多是一个分布式数据基础设施: 它将巨大的数据集分派到一个由普通计算机组成的集群中的多个节点进行存储,意味着您不需要购买和维护昂贵的服务器硬件. 同时,Hadoop还会索引和

Apache Spark Jobs 性能调优

当你开始编写 Apache Spark 代码或者浏览公开的 API 的时候,你会遇到各种各样术语,比如transformation,action,RDD 等等. 了解到这些是编写 Spark 代码的基础. 同样,当你任务开始失败或者你需要透过web界面去了解自己的应用为何如此费时的时候,你需要去了解一些新的名词: job, stage, task.对于这些新术语的理解有助于编写良好 Spark 代码.这里的良好主要指更快的 Spark 程序.对于 Spark 底层的执行模型的了解对于写出效率更高

【转载】Apache Spark Jobs 性能调优(二)

调试资源分配   Spark 的用户邮件邮件列表中经常会出现 "我有一个500个节点的集群,为什么但是我的应用一次只有两个 task 在执行",鉴于 Spark 控制资源使用的参数的数量,这些问题不应该出现.但是在本章中,你将学会压榨出你集群的每一分资源.推荐的配置将根据不同的集群管理系统(YARN.Mesos.Spark Standalone)而有所不同,我们将主要集中在YARN 上,因为这个Cloudera 推荐的方式. Spark(以及YARN) 需要关心的两项主要的资源是 CP

Spark 1.0.0 横空出世 Spark on yarn 部署(hadoop 2.4)

就在昨天,北京时间5月30日20点多.Spark 1.0.0终于发布了:Spark 1.0.0 released 根据官网描述,Spark 1.0.0支持SQL编写:Spark SQL Programming Guide 个人觉得这个功能对Hive的市场的影响很小,但对Shark冲击很大,就像win7和winXP的关系,自相残杀嘛? 这么着急的发布1.x 版是商业行为还是货真价实的体现,让我们拭目以待吧~~~~ 本文是CSDN-撸大湿原创,如要转载请注明出处,谢谢:http://blog.csd

新手福利:Apache Spark入门攻略

新手福利:Apache Spark入门攻略 作者Ashwini Kuntamukkala  出处:CSDN 本文聚焦Apache Spark入门,了解其在大数据领域的地位,覆盖Apache Spark的安装及应用程序的建立,并解释一些常见的行为和操作. 一. 为什么要使用Apache Spark 时下,我们正处在一个"大数据"的时代,每时每刻,都有各种类型的数据被生产.而在此紫外,数据增幅的速度也在显著增加.从广义上看,这些数据包含交易数据.社交媒体内容(比如文本.图像和视频)以及传感

《Apache Spark源码剖析》

Spark Contributor,Databricks工程师连城,华为大数据平台开发部部长陈亮,网易杭州研究院副院长汪源,TalkingData首席数据科学家张夏天联袂力荐1.本书全面.系统地介绍了Spark源码,深入浅出,细致入微2.提供给读者一系列分析源码的实用技巧,并给出一个合理的阅读顺序3.始终抓住资源分配.消息传递.容错处理等基本问题,抽丝拨茧4.一步步寻找答案,所有问题迎刃而解,使读者知其然更知其所以然 内容简介 书籍计算机书籍 <Apache Spark源码剖析>以Spark

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas