java操作spark1.2.0



虽然推荐的是scala,但是还是试一下


 1 package org.admln.java7OperateSpark;
 2
 3 import java.util.Arrays;
 4 import java.util.List;
 5 import java.util.regex.Pattern;
 6
 7 import org.apache.spark.SparkConf;
 8 import org.apache.spark.api.java.JavaPairRDD;
 9 import org.apache.spark.api.java.JavaRDD;
10 import org.apache.spark.api.java.JavaSparkContext;
11 import org.apache.spark.api.java.function.FlatMapFunction;
12 import org.apache.spark.api.java.function.Function2;
13 import org.apache.spark.api.java.function.PairFunction;
14
15 import scala.Tuple2;
16
17 public class OperateSpark {
18     //单词切分分隔符
19     private static final Pattern SPACE = Pattern.compile(" ");
20
21     public static void main(String[] args) {
22         //初始化
23         SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("spark://hadoop:7077");
24         JavaSparkContext ctx = new JavaSparkContext(sparkConf);
25
26         //第二个参数是文件的最小切分
27         JavaRDD<String> lines = ctx.textFile("hdfs://hadoop:8020/in/spark/javaOperateSpark/wordcount.txt");
28         JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
29             public Iterable<String> call(String s) {
30                 return Arrays.asList(SPACE.split(s));
31             }
32         });
33
34         //划成键值对
35         JavaPairRDD<String,Integer> ones = words.mapToPair(new PairFunction<String,String,Integer>() {
36             public Tuple2<String, Integer> call(String t) {
37                 return new Tuple2<String,Integer>(t,1);
38             }
39         });
40
41         JavaPairRDD<String,Integer> counts = ones.reduceByKey(new Function2<Integer,Integer,Integer>() {
42             public Integer call(Integer v1, Integer v2) {
43                 return v1 + v2;
44             }
45         });
46
47         List<Tuple2<String,Integer>> output = counts.collect();
48         for(Tuple2<?,?> tuple : output) {
49             System.out.println(tuple._1() + ":" +tuple._2());
50         }
51         counts.saveAsTextFile("hdfs://hadoop:8020/out/spark/javaOperateSpark2/");
52         ctx.stop();
53     }
54 }


运行的时候出现了错误

eclipse中为:

Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;
    at org.apache.spark.util.collection.OpenHashSet.org$apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261)
    at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165)
    at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102)
    at org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210)
    at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169)
    at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
    at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
    at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
    at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
    at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249)
    at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136)
    at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
    at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)
    at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:695)
    at org.apache.spark.SparkContext.textFile(SparkContext.scala:540)
    at org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:184)
    at org.admln.java7OperateSpark.OperateSpark.main(OperateSpark.java:27)

shell中为:

Exception in thread "main" java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$AddBlockRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)

        ... ...

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

可以看到是protobuf版本和hadoop的冲突了

默认spark1.2.0的protobuf版本为

而hadoop2.2.0的为protobuf2.5.0

所以修改spark中pom.xml后重新编译生成部署包(花费一个多小时)

再运行的话shell端成功。但是eclipse端仍然报那个错误

这是因为我用的maven引用的spark包,存在guava版本冲突,默认为

单独加一个依赖

  <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>14.0.1</version>
    </dependency>


然后eclipse提交的话不报错了,不过任务一直循环不执行,报告资源不够

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

然后把核数加到2,内存加到1500M,可是仍然报

INFO SparkDeploySchedulerBackend: Granted executor ID app-20150111003236-0000/3 on hostPort hadoop:34766 with 2 cores, 512.0 MB RAM

也就是说核数改了,但是执行内存改不了,不知道为什么,还有就是同样的程序shell端提交就正常执行,eclipse外部提交就报内存不足

改驱动的内存也不行。

我推测有两种可能的原因

1.spark的BUG,SPARK_DRIVER_MEMORY变量默认是512M,但是外部修改不生效;

2.centos的资源和本机windows的资源混乱了,因为我看到了

ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 2

的错误,我本机是4核,虚拟机是2核。



不知道为什么网上没有eclipse提交的示例,应该要不就是本身就不支持,会和客户端资源混乱,要不就是还没人摸透。



java操作spark1.2.0

时间: 2024-08-07 17:06:26

java操作spark1.2.0的相关文章

Spark-1.6.0中的Sort Based Shuffle源码解读

从Spark-1.2.0开始,Spark的Shuffle由Hash Based Shuffle升级成了Sort Based Shuffle.即Spark.shuffle.manager从Hash换成了Sort.不同形式是Shuffle逻辑主要是ShuffleManager的实现类不同. 在org.apache.spark.SparkEnv类中: // Let the user specify short names for shuffle managers val shortShuffleMgr

java 操作 Excel,java导出excel

WritableWorkbook out = null; try { response.getServletResponse().reset(); ((HttpServletResponse) response.getServletResponse()).setHeader("Content-Disposition", "attachment;filename=export.xls"); response.getServletResponse().setConten

Spark-1.4.0单机部署(Hadoop-2.6.0采用伪分布式)【已测】

??目前手上只有一个机器,就先拿来练下手(事先服务器上没有安装软件)尝试一下Spark的单机部署. ??几个参数: ??JDK-1.7+ ??Hadoop-2.6.0(伪分布式): ??Scala-2.10.5: ??Spark-1.4.0: ??下面是具体的配置过程 安装JDK 1.7+ [下载网址]http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html 环境变量设置(最好不要采用o

java操作hbase例子

hbase安装方法请参考:hbase-0.94安装方法详解 hbase常用的shell命令请参考:hbase常用的shell命令例子 java操作hbase,在eclipse中创建一个java项目,将hbase安装文件根目录的jar包和lib目录下jar包导入项目,然后就可以编写java代码操作hbase了.下面代码给出来一个简单的示例 /** * @date 2015-07-23 21:28:10 * @author sgl */ package com.songguoliang.hbase;

spark1.1.0集群安装配置

和分布式文件系统和NoSQL数据库相比而言,spark集群的安装配置还算是比较简单的: 安装JDK,这个几乎不用介绍了(很多软件都需要JDK嘛) wget http://download.oracle.com/otn-pub/java/jdk/7u71-b14/jdk-7u71-linux-x64.tar.gz?AuthParam=1416666050_dca8969bfc01e3d8d42d04040f76ff1 tar -zxvf jdk-7u71-linux-x64.tar.gz 安装sc

java 操作oracle 建表,更新记录

1.  建立表的类 import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.PreparedStatement; public class lx01{ public static void main(String[] args) throws SQLException, ClassNotFoundE

Java操作IO各主要类介绍

DataInputStream和DataOutputStream 往二进制文件中读和写入java基本数据类型 public class BinaryReadWrite { private DataInputStream dis = null; private DataOutputStream dos = null; private String s_FilePath = "config\\bin.dat"; private byte[] buff = "{\"nam

Java I/O 从0到1 - 第Ⅰ滴血 File

前言 File 类的介绍主要会依据<Java 编程思想>以及官网API .相信大家在日常工作中,肯定会遇到文件流的读取等操作,但是在搜索过程中,并没有找到一个介绍的很简洁明了的文章.因此,在最近比较轻松的时间里做一个关于 <Java I/O 从0到1>系列.规划是包括以下几个方面:File类.字节流.字符流.字节字符的组合使用.编码示例以及JDK1.4 推出的为了提高性能增加了功能的nio类(新I/O).那么,接下来,进行File 的介绍. File 从File 的字面意思来讲,就

Java操作XML的JAXB工具

在java中操作XML的工作中中,比较方便的工具是JAXB(Java Architecture for XML Binding). 利用这个工具很方便生成XML的tag和Java类的对应关系.参照网上的资料,简单说明一下java操作xml的一些东西. 1.先定义一个XML Schema文件.比如: [html] view plain copy print? <?xml version="1.0" encoding="UTF-8" standalone=&quo