最近工作需要,调研了一下LinkedIn开源的用于复杂大数据分析的高性能计算引擎Cubert。自己测了下,感觉比较适合做报表统计中的Cube计算和Join计算,效率往往比Hive高很多倍,节省资源和时间。
下面看下这个框架的介绍:
Cubert完全用Java开发,并提供一种脚本语言。它是针对报表领域里经常出现的复杂连接和聚合而设计的。Cubert使用MeshJoin算法处理大时间窗口下的大数据集,CPU和内存利用率显著提升。CUBE是Cubert定义的一个新操作符,可以计算累加和非累加分析维度。非累加维度是计算密集型的,如计算一个时间窗口内不同的用户数,但CUBE能加快这些运算,而且还可以计算准确的百分等级,如中位数统计,动态上卷内部维度以及在单个任务中计算多个度量值。
Cubert最适合于重复的报表工作流程,它利用部分结果缓存和增量处理技术来提高速度。最后,一种新的稀疏矩阵乘法算法可以用于大型图的分析计算。
项目地址在:https://github.com/linkedin/Cubert
一、Git Clone
首先Fork到我的github上。然后,
克隆项目
git clone [email protected].com:OopsOutOfMemory/Cubert.git ./cubert
配置环境变量:
注意CUBERT_HOME是在cubert/release后bin目录也在release下。
export HADOOP_HOME=/Users/shengli/cloudera/${CDH}/hadoop
export CUBERT_HOME=/Users/shengli/git_repos/cubert/release
二、编译打包:
指定Hadoop的版本号:
vim ./gradle.properties
修改 hadoopVersion=2.5.0
编译打包
Cubert是基于Gradle进行构建的,所以要执行./gradlew来进行编译导报
shengli-mac$ ./gradlew
:genParser
:compileJava
warning: Supported source version ‘RELEASE_6‘ from annotation processor ‘org.antlr.v4.runtime.misc.NullUsageProcessor‘ less than -source ‘1.8‘
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 warning
:processResources UP-TO-DATE
:classes
:jar
:dist
BUILD SUCCESSFUL
cubert包
shengli-mac$ ll release/lib/
total 12488
-rw-r--r-- 1 shengli staff 6392989 Jun 11 14:15 cubert-0.2.21.jar
配置环境变量
将$CUBERT_HOME/bin配置到PATH内,然后:
shengli-mac$ cubert -h
Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/*
usage: ScriptExecutor <cubert script file> [options]
-c,--compile stop after compilation
-d,--debug print debuging information
-D <property=value> use value for given property
-describe describe the schemas of output datasets
-f,--param_file <file> use given parameter file
-h,--help shows this message
-j,--json show the plan in JSON
-p,--parse stop after parsing
-P,--cache_path <lib path> classpath to be uploaded to distributed
cache
-parallel run independent jobs in parallel
-perf enable performance profiling
-s,--preprocess show the script after preprocessing
-x <job id/name> execute this job only
三、Cubert Example
预处理:
shengli-mac$ cat release/examples/word
wordcount.cmr words.txt
shengli-mac$ cat release/examples/wordcount.cmr
PROGRAM "Word Count";
JOB "count words"
REDUCERS 5;
MAP {
data = LOAD "$CUBERT_HOME/examples/words.txt" USING TEXT("schema": "STRING word");
with_count = FROM data GENERATE word, 1L AS count;
}
SHUFFLE with_count PARTITIONED ON word AGGREGATES COUNT(word) AS count;
REDUCE {
counted = GROUP with_count BY word AGGREGATES SUM(count) AS count;
}
STORE counted INTO "output" USING TEXT();
END
shengli-mac$ pwd
/Users/shengli/git_repos/cubert
查看预处理后的cubert脚本
shengli-mac$ cubert release/examples/wordcount.cmr -s
Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/*
PROGRAM "Word Count";
JOB "count words"
REDUCERS 5;
MAP {
data = LOAD "/Users/shengli/git_repos/cubert/release/examples/words.txt" USING TEXT("schema": "STRING word");
with_count = FROM data GENERATE word, 1L AS count;
}
SHUFFLE with_count PARTITIONED ON word AGGREGATES COUNT(word) AS count;
REDUCE {
counted = GROUP with_count BY word AGGREGATES SUM(count) AS count;
}
STORE counted INTO "output" USING TEXT();
END
-p 脚本Parse解析语法
这里演示了如果我们随便改输入目录,Parse会解析失败。
shengli-mac$ cubert release/examples/wordcount.cmr -p
Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/*
line 2:0 extraneous input ‘ssssss‘ expecting {<EOF>, CREATEDICTIONARY, FUNCTION, JOB, ONCOMPLETION, REGISTER, SET}
Cannot parse cubert script. Exiting.
Cannot compile cubert script. Exiting.
Exception in thread "main" java.text.ParseException
at com.linkedin.cubert.plan.physical.PhysicalParser.parsingTask(PhysicalParser.java:197)
at com.linkedin.cubert.plan.physical.PhysicalParser.parseInputStream(PhysicalParser.java:161)
at com.linkedin.cubert.plan.physical.PhysicalParser.parseProgram(PhysicalParser.java:156)
at com.linkedin.cubert.ScriptExecutor.compile(ScriptExecutor.java:304)
at com.linkedin.cubert.ScriptExecutor.main(ScriptExecutor.java:523)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
-pj json显示
shengli-mac$ cubert release/examples/wordcount.cmr -pj
Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/*
{
"program" : "Word Count",
"hadoopConf" : {
},
"libjars" : [ ],
"jobs" : [ {
"pigudfs" : {
},
"name" : "count words",
"mappers" : 0,
"reducers" : 5,
"map" : [ {
"input" : {
"line" : "data = LOAD \"/Users/shengli/git_repos/cubert/release/examples/words.txt\" USING TEXT(\"schema\": \"STRING word\");",
"name" : "data",
"type" : "TEXT",
"params" : {
"schema" : "STRING word"
},
"path" : [ "/Users/shengli/git_repos/cubert/release/examples/words.txt" ]
},
"operators" : [ {
"operator" : "GENERATE",
"input" : "data",
"output" : "with_count",
"outputTuple" : [ {
"col_name" : "word",
"expression" : {
"function" : "INPUT_PROJECTION",
"arguments" : [ "word" ]
}
}, {
"col_name" : "count",
"expression" : {
"function" : "CONSTANT",
"arguments" : [ 1, "long" ]
}
} ],
"line" : "with_count = FROM data GENERATE word, 1L AS count;"
} ]
} ],
"shuffle" : {
"line" : "SHUFFLE with_count PARTITIONED ON word AGGREGATES COUNT(word) AS count;",
"name" : "with_count",
"type" : "SHUFFLE",
"partitionKeys" : [ "word" ],
"pivotKeys" : [ "word" ],
"aggregates" : [ {
"type" : "COUNT",
"input" : [ "word" ],
"output" : "count"
} ]
},
"reduce" : [ {
"operator" : "GROUP_BY",
"input" : "with_count",
"output" : "counted",
"groupBy" : [ "word" ],
"aggregates" : [ {
"type" : "SUM",
"input" : [ "count" ],
"output" : "count"
} ],
"line" : "counted = GROUP with_count BY word AGGREGATES SUM(count) AS count;"
} ],
"cacheIndex" : [ ],
"output" : {
"name" : "counted",
"path" : "output",
"type" : "TEXT",
"line" : "STORE counted INTO \"output\" USING TEXT();",
"params" : {
"overwrite" : "false"
}
}
} ]
}
-c 编译脚本
shengli-mac$ cubert release/examples/wordcount.cmr -c
Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/*
15/06/11 14:38:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Dependency Analyzer] Program inputs: [/Users/shengli/git_repos/cubert/release/examples/words.txt]
Analyzing job [count words]...
shengli-mac$ cubert release/examples/wordcount.cmr -cj
Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/*
15/06/11 14:38:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Dependency Analyzer] Program inputs: [/Users/shengli/git_repos/cubert/release/examples/words.txt]
Analyzing job [count words]...
{
"program" : "Word Count",
"hadoopConf" : {
},
"libjars" : [ ],
"jobs" : [ {
"pigudfs" : {
},
"name" : "count words",
"mappers" : 0,
"reducers" : 5,
"map" : [ {
"input" : {
"line" : "data = LOAD \"/Users/shengli/git_repos/cubert/release/examples/words.txt\" USING TEXT(\"schema\": \"STRING word\");",
"name" : "data",
"type" : "TEXT",
"params" : {
"schema" : "STRING word"
},
"path" : [ "/Users/shengli/git_repos/cubert/release/examples/words.txt" ],
"schema" : [ {
"name" : "word",
"type" : "STRING"
} ]
},
"operators" : [ {
"operator" : "GENERATE",
"input" : "data",
"output" : "with_count",
"outputTuple" : [ {
"col_name" : "word",
"expression" : {
"function" : "INPUT_PROJECTION",
"arguments" : [ "word" ]
}
}, {
"col_name" : "count",
"expression" : {
"function" : "CONSTANT",
"arguments" : [ 1, "long" ]
}
} ],
"line" : "with_count = FROM data GENERATE word, 1L AS count;",
"schema" : [ {
"name" : "word",
"type" : "STRING"
}, {
"name" : "count",
"type" : "LONG"
} ]
} ]
} ],
"shuffle" : {
"line" : "SHUFFLE with_count PARTITIONED ON word AGGREGATES COUNT(word) AS count;",
"name" : "with_count",
"type" : "SHUFFLE",
"partitionKeys" : [ "word" ],
"pivotKeys" : [ "word" ],
"aggregates" : [ {
"type" : "COUNT",
"input" : [ "word" ],
"output" : "count"
} ],
"schema" : [ {
"name" : "word",
"type" : "STRING"
}, {
"name" : "count",
"type" : "LONG"
} ]
},
"reduce" : [ {
"operator" : "GROUP_BY",
"input" : "with_count",
"output" : "counted",
"groupBy" : [ "word" ],
"aggregates" : [ {
"type" : "SUM",
"input" : [ "count" ],
"output" : "count"
} ],
"line" : "counted = GROUP with_count BY word AGGREGATES SUM(count) AS count;",
"schema" : [ {
"name" : "word",
"type" : "STRING"
}, {
"name" : "count",
"type" : "LONG"
} ]
} ],
"cacheIndex" : [ ],
"output" : {
"name" : "counted",
"path" : "output",
"type" : "TEXT",
"line" : "STORE counted INTO \"output\" USING TEXT();",
"params" : {
"overwrite" : "false"
},
"schema" : [ {
"name" : "word",
"type" : "STRING"
}, {
"name" : "count",
"type" : "LONG"
} ]
},
"dependsOn" : [ ]
} ],
"input" : {
"/Users/shengli/git_repos/cubert/release/examples/words.txt" : {
"type" : "TEXT",
"schema" : [ {
"name" : "word",
"type" : "STRING"
} ]
}
},
"profileMode" : false
}
-cd 编译并显示调试信息:
shengli-mac$ cubert release/examples/wordcount.cmr -cd
Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/*
15/06/11 14:39:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Dependency Analyzer] Program inputs: [/Users/shengli/git_repos/cubert/release/examples/words.txt]
Analyzing job [count words]...
---------------------------------------------
with_count = FROM data GENERATE word, 1L AS count;
Precondition for data
Schema: [STRING word]
Partition Keys: null
Sort Keys: null
Post Condition
Schema: [STRING word, LONG count]
Partition Keys: null
Sort Keys: null
---------------------------------------------
counted = GROUP with_count BY word AGGREGATES SUM(count) AS count;
Precondition for with_count
Schema: [STRING word, LONG count]
Partition Keys: [word]
Sort Keys: [word]
Post Condition
Schema: [STRING word, LONG count]
Partition Keys: [word]
Sort Keys: [word]
运行脚本提交
eg: cubert script.cmr
Note:
如果指定了HADOOP_HOME和HADOOP_CONF_DIR
下面的路径必须是HDFS的文件路径
shengli-mac$ cubert release/examples/wordcount.cmr
Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/*
15/06/11 14:41:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Dependency Analyzer] Program inputs: [/Users/shengli/git_repos/cubert/release/examples/words.txt]
Analyzing job [count words]...
15/06/11 14:41:07 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
Executing jobs serially
Executing job [count words]....
15/06/11 14:41:07 INFO Configuration.deprecation: mapreduce.partitioner.class is deprecated. Instead, use mapreduce.job.partitioner.class
Setting partitioner: com.linkedin.cubert.plan.physical.CubertPartitioner
15/06/11 14:41:07 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/06/11 14:41:07 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
15/06/11 14:41:08 INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/shengli/.staging/job_1434002924656_0001
15/06/11 14:41:08 WARN security.UserGroupInformation: PriviledgedActionException as:shengli (auth:SIMPLE) cause:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://localhost:8020/Users/shengli/git_repos/cubert/release/examples/words.txt
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://localhost:8020/Users/shengli/git_repos/cubert/release/examples/words.txt
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:321)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat.listStatus(PigTextInputFormat.java:36)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:385)
at com.linkedin.cubert.io.CubertInputFormat.getSplits(CubertInputFormat.java:112)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:589)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:606)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:490)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1295)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1292)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1292)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1313)
at com.linkedin.cubert.plan.physical.JobExecutor.run(JobExecutor.java:151)
at com.linkedin.cubert.plan.physical.ExecutorService.executeJob(ExecutorService.java:253)
at com.linkedin.cubert.plan.physical.ExecutorService.executeJobId(ExecutorService.java:219)
at com.linkedin.cubert.plan.physical.ExecutorService.execute(ExecutorService.java:163)
at com.linkedin.cubert.ScriptExecutor.execute(ScriptExecutor.java:385)
at com.linkedin.cubert.ScriptExecutor.main(ScriptExecutor.java:575)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
shengli-mac$ hdfs dfs -copyFromLocal $CUBERT_HOME/examples/words.txt /cubert/words.txt
15/06/11 14:44:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
shengli-mac$ hdfs dfs -ls /cubert
15/06/11 14:44:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
-rw-r--r-- 1 shengli supergroup 27294 2015-06-11 14:44 /cubert/words.txt
修改:
data = LOAD "/cubert/words.txt" USING TEXT("schema": "STRING word");
再次运行:
shengli-mac$ cubert release/examples/wordcount.cmr
Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/*
15/06/11 14:46:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Dependency Analyzer] Program inputs: [/cubert/words.txt]
Analyzing job [count words]...
15/06/11 14:46:13 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
Executing jobs serially
Executing job [count words]....
15/06/11 14:46:13 INFO Configuration.deprecation: mapreduce.partitioner.class is deprecated. Instead, use mapreduce.job.partitioner.class
Setting partitioner: com.linkedin.cubert.plan.physical.CubertPartitioner
15/06/11 14:46:13 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/06/11 14:46:13 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
15/06/11 14:46:13 INFO input.FileInputFormat: Total input paths to process : 1
15/06/11 14:46:14 INFO util.MapRedUtil: Total input paths to process : 1
15/06/11 14:46:14 INFO mapreduce.JobSubmitter: number of splits:1
15/06/11 14:46:14 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1434002924656_0002
15/06/11 14:46:14 INFO impl.YarnClientImpl: Submviitted application application_1434002924656_0002
15/06/11 14:46:14 INFO mapreduce.Job: The url to track the job: http://shengli-mac.local:8088/proxy/application_1434002924656_0002/
Job: [count words], More information at: http://shengli-mac.local:8088/proxy/application_1434002924656_0002/
50% complete
60% complete
70% complete
80% complete
90% complete
Finished job [count words]....
100% complete
15/06/11 14:46:44 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
Statistics of individual Jobs
--------------------------------------------
Job Name minMapperTime maxMapperTime avgMapperTime medianMapperTime minReducerTime maxReducerTime avgReducerTime medianReducerTime
job_1434002924656_0002 3190 3190 3190 3190 3401 3914 3686 3849
Aggregated Hadoop Counters for the Cubert Job:
Duration: 30949 ms
Counters: 49
File System Counters
FILE: Number of bytes read=20667
FILE: Number of bytes written=692647
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=27394
HDFS: Number of bytes written=13129
HDFS: Number of read operations=18
HDFS: Number of large read operations=0
HDFS: Number of write operations=10
Job Counters
Launched map tasks=1
Launched reduce tasks=5
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3190
Total time spent by all reduces in occupied slots (ms)=18430
Total time spent by all map tasks (ms)=3190
Total time spent by all reduce tasks (ms)=18430
Total vcore-seconds taken by all map tasks=3190
Total vcore-seconds taken by all reduce tasks=18430
Total megabyte-seconds taken by all map tasks=3266560
Total megabyte-seconds taken by all reduce tasks=18872320
Map-Reduce Framework
Map input records=5334
Map output records=5334
Map output bytes=53964
Map output materialized bytes=20667
Input split bytes=100
Combine input records=5334
Combine output records=1421
Reduce input groups=0
Reduce shuffle bytes=20667
Reduce input records=1421
Reduce output records=1421
Spilled Records=2842
Shuffled Maps =5
Failed Shuffles=0
Merged Map outputs=5
GC time elapsed (ms)=276
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=1235746816
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=13129
查看日志:
yarn logs -applicationId application_1434002924656_0002 |
-x 分段运行Job
在脚本里我们指定了jobName
本例是count words, 如果随便指定jobName则会抛出异常。
shengli-mac$ cubert release/examples/wordcount.cmr -x "our first program"
Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/*
15/06/11 15:24:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Dependency Analyzer] Program inputs: [/cubert/words.txt]
Analyzing job [count words]...
Exception in thread "main" java.lang.IllegalStateException: ERROR: There is no job that matches [our]
at com.linkedin.cubert.ScriptExecutor.getJobId(ScriptExecutor.java:604)
at com.linkedin.cubert.ScriptExecutor.main(ScriptExecutor.java:568)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/46545973