Spark 与 Hadoop 关于 TeraGen/TeraSort 的对比实验(包含源代码)

自从 Hadoop 问世以来,MapReduce 在很长时间内都是排序基准测试的纪录保持者,但这一垄断在最近被基于内存计算的 Spark 打破了。在今年Databricks与AWS一起完成的一个Daytona Gray类别的Sort Benchmark中,Spark 完胜 Hadoop MapReduce:“1/10计算资源,1/3耗时”。这是个很有意思的对比实验,因此笔者也在一个小规模集群上做了一个微缩版的类似试验。

1、Hadoop 与 Spark 集群环境完全相同:

- Hadoop 2.2.0

- Spark 1.0

- 5 节点集群:

node1: NameNode, ResourceManger

node2 - node 5: NodeManager

- Hardware:

8 core cpu, 32 GB memory, 400 GB disk

2、排序数据规模:100 GB

3、Hadoop 排序:

3.1 TeraGen:

在4个slaves上共启动了120个mapper:

hadoop jar ${HADOOP_EXAMPLE_JAR_PATH} teragen -Dmapred.map.tasks=120 ${TERAGEN_ROW} ${TERAGEN_OUTPUT}

3.2 TeraSort:

在4个slaves上共启动了32个reducer:

hadoop jar ${HADOOP_EXAMPLE_JAR_PATH} terasort -Dmapred.reduce.tasks=32 ${TERAGEN_OUTPUT} ${TERASORT_OUTPUT}

3.3 生成100 GB测试数据、完成排序总共花费的时间:

总计:6723 秒

4、Spark 排序:

4.1 源代码:

4.1.1 来源:

https://github.com/apache/spark/pull/1242

https://github.com/rxin/spark/tree/adcae69145905162fa3b6932f70be2c932f95f87/examples/src/main/scala/org/apache/spark/examples/terasort

4.1.2 为了便于大家阅读源代码,我把源代码也附于本文文末(已做些许更改)

4.2 生成测试数据、完成排序(如果输出文件格式为text file,则排序结果的文件总大小为309.2 GB)总共花费的时间:

- 试验一:

* 任务提交参数:num-executors: 4, executor-memory: 8g, executor-cores: 4

* 输出文件格式:Sequence File

* 输出文件所占空间为:20.8 GB

* 总时间为:  2203 秒

- 试验二:

* 任务提交参数:num-executors: 4, executor-memory: 16g, executor-cores: 6

* 输出文件格式:Text File

* 输出文件所占空间为:309.2 GB

* 总时间为: 9849 秒

- 试验三:

* 任务提交参数:num-executors: 4, executor-memory: 16g, executor-cores: 6

* 输出文件格式:Sequence File

* 输出文件所占空间为:20.8 GB

* 总时间为: 2212 秒

- 试验四:

* 任务提交参数:num-executors: 8, executor-memory: 7g, executor-cores: 3

* 输出文件格式:Sequence File

* 输出文件所占空间为:20.8 GB

* 总时间为: 1213 秒

- 试验五:

* 任务提交参数:num-executors: 28, executor-memory: 2g, executor-cores: 1

* 输出文件格式:Sequence File

* 输出文件所占空间为:20.8 GB

* 总时间为: 483 秒

- 试验六:

* 任务提交参数:num-executors: 56, executor-memory: 1g, executor-cores: 1

* 输出文件格式:Sequence File

* 输出文件所占空间为:20.8 GB

* 总时间为: 434 秒

5、小结:

5.1 Hadoop 与 Spark 比较:

当然,执行过程肯定还有调优空间,但 Spark 明显快于 Hadoop MapReduce。这个结果也很正常:这是内存对于硬盘的胜利。

5.2 Spark 几次试验之间的比较:

- 输出结果为Sequence file时,要大大快于输出结果为 Text file时。因为Sequence file大大压缩了输出文件大小,也减少了大量 disk IO,这样也就很大地缩短了执行时间

- 如果单个executor的计算并不需要过大的内存,不如降低单个executor的内存共给量,同时增加executor的并发数(如果任务适合并发)

- 一旦单个worker的内存与cpu已经被充分利用,而且并发的executor数也比较合理,那么再进一步分割executor数并不会增加效率

附:Spark Sort 源代码

a. GenSort.scala

package scala.spark.examples.terasort

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.hadoop.io.{ BytesWritable, NullWritable }
import org.apache.hadoop.io.compress.BZip2Codec
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
import SparkContext._
object GenSort {
  def main(args: Array[String]) {
    if (args.length < 3) {
      println("usage:")
      println("MASTER=[spark-master] bin/run-example org.apache.spark.examples.terasort.GenSort " +
        " [num-parts] [records-per-part] [output-path]")
      System.exit(0)
    }
    val master = sys.env.getOrElse("MASTER", "local")
    val parts = args(0).toInt
    val recordsPerPartition = args(1).toInt
    val numRecords = parts.toLong * recordsPerPartition.toLong
    val output = args(2)
    println(s"Generating $numRecords records on $parts partitions")
    println(s"Output path: $output")
//    val sc = new SparkContext(master, "GenSort")
    val conf = new SparkConf().setAppName("GenSort")
    val sc = new SparkContext(conf)
    val dataset = sc.parallelize(1 to parts, parts).mapPartitionsWithIndex {
      case (index, _) =>
        val one = new Unsigned16(1)
        val firstRecordNumber = new Unsigned16(index * recordsPerPartition)
        val recordsToGenerate = new Unsigned16(recordsPerPartition)
        val recordNumber = new Unsigned16(firstRecordNumber)
        val lastRecordNumber = new Unsigned16(firstRecordNumber)
        lastRecordNumber.add(recordsToGenerate)
        val rand = Random16.skipAhead(firstRecordNumber)
        val row: Array[Byte] = new Array[Byte](100)
        Iterator.tabulate(recordsPerPartition) { offset =>
          Random16.nextRand(rand)
          generateRecord(row, rand, recordNumber)
          recordNumber.add(one)
          row
        }
    }
    // Save output result as text file
    dataset.map(row => (NullWritable.get(), new BytesWritable(row))).saveAsTextFile(output)
    
    // Save output result as sequence file
//    dataset.map(row => (NullWritable.get(), new BytesWritable(row)))
//      .saveAsSequenceFile(output, Some(classOf[BZip2Codec]))
  }
  /**
   * Generate a binary record suitable for all sort benchmarks except PennySort.
   *
   * @param recBuf record to return
   */
  def generateRecord(recBuf: Array[Byte], rand: Unsigned16, recordNumber: Unsigned16): Unit = {
    // Generate the 10-byte key using the high 10 bytes of the 128-bit random number
    var i = 0
    while (i < 10) {
      recBuf(i) = rand.getByte(i)
      i += 1
    }
    // Add 2 bytes of "break"
    recBuf(10) = 0x00.toByte
    recBuf(11) = 0x11.toByte
    // Convert the 128-bit record number to 32 bits of ascii hexadecimal
    // as the next 32 bytes of the record.
    i = 0
    while (i < 32) {
      recBuf(12 + i) = recordNumber.getHexDigit(i).toByte
      i += 1
    }
    // Add 4 bytes of "break" data
    recBuf(44) = 0x88.toByte
    recBuf(45) = 0x99.toByte
    recBuf(46) = 0xAA.toByte
    recBuf(47) = 0xBB.toByte
    // Add 48 bytes of filler based on low 48 bits of random number
    i = 0
    while (i < 12) {
      val v = rand.getHexDigit(20 + i).toByte
      recBuf(48 + i * 4) = v
      recBuf(49 + i * 4) = v
      recBuf(50 + i * 4) = v
      recBuf(51 + i * 4) = v
      i += 1
    }
    // Add 4 bytes of "break" data
    recBuf(96) = 0xCC.toByte
    recBuf(97) = 0xDD.toByte
    recBuf(98) = 0xEE.toByte
    recBuf(99) = 0xFF.toByte
  }
}

b. Random16.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package scala.spark.examples.terasort;

/**
 * This file is copied from Hadoop package org.apache.hadoop.examples.terasort.
 */
/**
 * This class implements a 128-bit linear congruential generator. Specifically,
 * if X0 is the most recently issued 128-bit random number (or a seed of 0 if no
 * random number has already been generated, the next number to be generated,
 * X1, is equal to: X1 = (a * X0 + c) mod 2**128 where a is
 * 47026247687942121848144207491837523525 or 0x2360ed051fc65da44385df649fccf645
 * and c is 98910279301475397889117759788405497857 or
 * 0x4a696d47726179524950202020202001 The coefficient "a" is suggested by:
 * Pierre L'Ecuyer, "Tables of linear congruential generators of different sizes
 * and good lattice structure", Mathematics of Computation, 68 pp. 249 - 260
 * (1999)
 * http://www.ams.org/mcom/1999-68-225/S0025-5718-99-00996-5/S0025-5718-99
 * -00996-5.pdf The constant "c" meets the simple suggestion by the same
 * reference that it be odd.
 *
 * There is also a facility for quickly advancing the state of the generator by
 * a fixed number of steps - this facilitates parallel generation.
 *
 * This is based on 1.0 of rand16.c from Chris Nyberg
 * <[email protected]>.
 */
class Random16 {
	/**
	 * The "Gen" array contain powers of 2 of the linear congruential generator.
	 * The index 0 struct contain the "a" coefficient and "c" constant for the
	 * generator. That is, the generator is: f(x) = (Gen[0].a * x + Gen[0].c)
	 * mod 2**128
	 *
	 * All structs after the first contain an "a" and "c" that comprise the
	 * square of the previous function.
	 *
	 * f**2(x) = (Gen[1].a * x + Gen[1].c) mod 2**128 f**4(x) = (Gen[2].a * x +
	 * Gen[2].c) mod 2**128 f**8(x) = (Gen[3].a * x + Gen[3].c) mod 2**128 ...
	 */
	private static class RandomConstant {
		final Unsigned16 a;
		final Unsigned16 c;

		public RandomConstant(String left, String right) {
			a = new Unsigned16(left);
			c = new Unsigned16(right);
		}
	}

	private static final RandomConstant[] genArray = new RandomConstant[] {
			/* [ 0] */new RandomConstant("2360ed051fc65da44385df649fccf645",
					"4a696d47726179524950202020202001"),
			/* [ 1] */new RandomConstant("17bce35bdf69743c529ed9eb20e0ae99",
					"95e0e48262b3edfe04479485c755b646"),
			/* [ 2] */new RandomConstant("f4dd417327db7a9bd194dfbe42d45771",
					"882a02c315362b60765f100068b33a1c"),
			/* [ 3] */new RandomConstant("6347af777a7898f6d1a2d6f33505ffe1",
					"5efc4abfaca23e8ca8edb1f2dfbf6478"),
			/* [ 4] */new RandomConstant("b6a4239f3b315f84f6ef6d3d288c03c1",
					"f25bd15439d16af594c1b1bafa6239f0"),
			/* [ 5] */new RandomConstant("2c82901ad1cb0cd182b631ba6b261781",
					"89ca67c29c9397d59c612596145db7e0"),
			/* [ 6] */new RandomConstant("dab03f988288676ee49e66c4d2746f01",
					"8b6ae036713bd578a8093c8eae5c7fc0"),
			/* [ 7] */new RandomConstant("602167331d86cf5684fe009a6d09de01",
					"98a2542fd23d0dbdff3b886cdb1d3f80"),
			/* [ 8] */new RandomConstant("61ecb5c24d95b058f04c80a23697bc01",
					"954db923fdb7933e947cd1edcecb7f00"),
			/* [ 9] */new RandomConstant("4a5c31e0654c28aa60474e83bf3f7801",
					"00be4a36657c98cd204e8c8af7dafe00"),
			/* [ 10] */new RandomConstant("ae4f079d54fbece1478331d3c6bef001",
					"991965329dccb28d581199ab18c5fc00"),
			/* [ 11] */new RandomConstant("101b8cb830c7cb927ff1ed50ae7de001",
					"e1a8705b63ad5b8cd6c3d268d5cbf800"),
			/* [ 12] */new RandomConstant("f54a27fc056b00e7563f3505e0fbc001",
					"2b657bbfd6ed9d632079e70c3c97f000"),
			/* [ 13] */new RandomConstant("df8a6fc1a833d201f98d719dd1f78001",
					"59b60ee4c52fa49e9fe90682bd2fe000"),
			/* [ 14] */new RandomConstant("5480a5015f101a4ea7e3f183e3ef0001",
					"cc099c88030679464fe86aae8a5fc000"),
			/* [ 15] */new RandomConstant("a498509e76e5d7925f539c28c7de0001",
					"06b9abff9f9f33dd30362c0154bf8000"),
			/* [ 16] */new RandomConstant("0798a3d8b10dc72e60121cd58fbc0001",
					"e296707121688d5a0260b293a97f0000"),
			/* [ 17] */new RandomConstant("1647d1e78ec02e665fafcbbb1f780001",
					"189ffc4701ff23cb8f8acf6b52fe0000"),
			/* [ 18] */new RandomConstant("a7c982285e72bf8c0c8ddfb63ef00001",
					"5141110ab208fb9d61fb47e6a5fc0000"),
			/* [ 19] */new RandomConstant("3eb78ee8fb8c56dbc5d4e06c7de00001",
					"3c97caa62540f2948d8d340d4bf80000"),
			/* [ 20] */new RandomConstant("72d03b6f4681f2f9fe8e44d8fbc00001",
					"1b25cb9cfe5a0c963174f91a97f00000"),
			/* [ 21] */new RandomConstant("ea85f81e4f502c9bc8ae99b1f7800001",
					"0c644570b4a487103c5436352fe00000"),
			/* [ 22] */new RandomConstant("629c320db08b00c6bfa57363ef000001",
					"3d0589c28869472bde517c6a5fc00000"),
			/* [ 23] */new RandomConstant("c5c4b9ce268d074a386be6c7de000001",
					"bc95e5ab36477e65534738d4bf800000"),
			/* [ 24] */new RandomConstant("f30bbbbed1596187555bcd8fbc000001",
					"ddb02ff72a031c01011f71a97f000000"),
			/* [ 25] */new RandomConstant("4a1000fb26c9eeda3cc79b1f78000001",
					"2561426086d9acdb6c82e352fe000000"),
			/* [ 26] */new RandomConstant("89fb5307f6bf8ce2c1cf363ef0000001",
					"64a788e3c118ed1c8215c6a5fc000000"),
			/* [ 27] */new RandomConstant("830b7b3358a5d67ea49e6c7de0000001",
					"e65ea321908627cfa86b8d4bf8000000"),
			/* [ 28] */new RandomConstant("fd8a51da91a69fe1cd3cd8fbc0000001",
					"53d27225604d85f9e1d71a97f0000000"),
			/* [ 29] */new RandomConstant("901a48b642b90b55aa79b1f780000001",
					"ca5ec7a3ed1fe55e07ae352fe0000000"),
			/* [ 30] */new RandomConstant("118cdefdf32144f394f363ef00000001",
					"4daebb2e085330651f5c6a5fc0000000"),
			/* [ 31] */new RandomConstant("0a88c0a91cff430829e6c7de00000001",
					"9d6f1a00a8f3f76e7eb8d4bf80000000"),
			/* [ 32] */new RandomConstant("433bef4314f16a9453cd8fbc00000001",
					"158c62f2b31e496dfd71a97f00000000"),
			/* [ 33] */new RandomConstant("c294b02995ae6738a79b1f7800000001",
					"290e84a2eb15fd1ffae352fe00000000"),
			/* [ 34] */new RandomConstant("913575e0da8b16b14f363ef000000001",
					"e3dc1bfbe991a34ff5c6a5fc00000000"),
			/* [ 35] */new RandomConstant("2f61b9f871cf4e629e6c7de000000001",
					"ddf540d020b9eadfeb8d4bf800000000"),
			/* [ 36] */new RandomConstant("78d26ccbd68320c53cd8fbc000000001",
					"8ee4950177ce66bfd71a97f000000000"),
			/* [ 37] */new RandomConstant("8b7ebd037898518a79b1f78000000001",
					"39e0f787c907117fae352fe000000000"),
			/* [ 38] */new RandomConstant("0b5507b61f78e314f363ef0000000001",
					"659d2522f7b732ff5c6a5fc000000000"),
			/* [ 39] */new RandomConstant("4f884628f812c629e6c7de0000000001",
					"9e8722938612a5feb8d4bf8000000000"),
			/* [ 40] */new RandomConstant("be896744d4a98c53cd8fbc0000000001",
					"e941a65d66b64bfd71a97f0000000000"),
			/* [ 41] */new RandomConstant("daf63a553b6318a79b1f780000000001",
					"7b50d19437b097fae352fe0000000000"),
			/* [ 42] */new RandomConstant("2d7a23d8bf06314f363ef00000000001",
					"59d7b68e18712ff5c6a5fc0000000000"),
			/* [ 43] */new RandomConstant("392b046a9f0c629e6c7de00000000001",
					"4087bab2d5225feb8d4bf80000000000"),
			/* [ 44] */new RandomConstant("eb30fbb9c218c53cd8fbc00000000001",
					"b470abc03b44bfd71a97f00000000000"),
			/* [ 45] */new RandomConstant("b9cdc30594318a79b1f7800000000001",
					"366630eaba897fae352fe00000000000"),
			/* [ 46] */new RandomConstant("014ab453686314f363ef000000000001",
					"a2dfc77e8512ff5c6a5fc00000000000"),
			/* [ 47] */new RandomConstant("395221c7d0c629e6c7de000000000001",
					"1e0d25a14a25feb8d4bf800000000000"),
			/* [ 48] */new RandomConstant("4d972813a18c53cd8fbc000000000001",
					"9d50a5d3944bfd71a97f000000000000"),
			/* [ 49] */new RandomConstant("06f9e2374318a79b1f78000000000001",
					"bf7ab5eb2897fae352fe000000000000"),
			/* [ 50] */new RandomConstant("bd220cae86314f363ef0000000000001",
					"925b14e6512ff5c6a5fc000000000000"),
			/* [ 51] */new RandomConstant("36fd3a5d0c629e6c7de0000000000001",
					"724cce0ca25feb8d4bf8000000000000"),
			/* [ 52] */new RandomConstant("60def8ba18c53cd8fbc0000000000001",
					"1af42d1944bfd71a97f0000000000000"),
			/* [ 53] */new RandomConstant("8d500174318a79b1f780000000000001",
					"0f529e32897fae352fe0000000000000"),
			/* [ 54] */new RandomConstant("48e842e86314f363ef00000000000001",
					"844e4c6512ff5c6a5fc0000000000000"),
			/* [ 55] */new RandomConstant("4af185d0c629e6c7de00000000000001",
					"9f40d8ca25feb8d4bf80000000000000"),
			/* [ 56] */new RandomConstant("7a670ba18c53cd8fbc00000000000001",
					"9912b1944bfd71a97f00000000000000"),
			/* [ 57] */new RandomConstant("86de174318a79b1f7800000000000001",
					"9c69632897fae352fe00000000000000"),
			/* [ 58] */new RandomConstant("55fc2e86314f363ef000000000000001",
					"e1e2c6512ff5c6a5fc00000000000000"),
			/* [ 59] */new RandomConstant("ccf85d0c629e6c7de000000000000001",
					"68058ca25feb8d4bf800000000000000"),
			/* [ 60] */new RandomConstant("1df0ba18c53cd8fbc000000000000001",
					"610b1944bfd71a97f000000000000000"),
			/* [ 61] */new RandomConstant("4be174318a79b1f78000000000000001",
					"061632897fae352fe000000000000000"),
			/* [ 62] */new RandomConstant("d7c2e86314f363ef0000000000000001",
					"1c2c6512ff5c6a5fc000000000000000"),
			/* [ 63] */new RandomConstant("af85d0c629e6c7de0000000000000001",
					"7858ca25feb8d4bf8000000000000000"),
			/* [ 64] */new RandomConstant("5f0ba18c53cd8fbc0000000000000001",
					"f0b1944bfd71a97f0000000000000000"),
			/* [ 65] */new RandomConstant("be174318a79b1f780000000000000001",
					"e1632897fae352fe0000000000000000"),
			/* [ 66] */new RandomConstant("7c2e86314f363ef00000000000000001",
					"c2c6512ff5c6a5fc0000000000000000"),
			/* [ 67] */new RandomConstant("f85d0c629e6c7de00000000000000001",
					"858ca25feb8d4bf80000000000000000"),
			/* [ 68] */new RandomConstant("f0ba18c53cd8fbc00000000000000001",
					"0b1944bfd71a97f00000000000000000"),
			/* [ 69] */new RandomConstant("e174318a79b1f7800000000000000001",
					"1632897fae352fe00000000000000000"),
			/* [ 70] */new RandomConstant("c2e86314f363ef000000000000000001",
					"2c6512ff5c6a5fc00000000000000000"),
			/* [ 71] */new RandomConstant("85d0c629e6c7de000000000000000001",
					"58ca25feb8d4bf800000000000000000"),
			/* [ 72] */new RandomConstant("0ba18c53cd8fbc000000000000000001",
					"b1944bfd71a97f000000000000000000"),
			/* [ 73] */new RandomConstant("174318a79b1f78000000000000000001",
					"632897fae352fe000000000000000000"),
			/* [ 74] */new RandomConstant("2e86314f363ef0000000000000000001",
					"c6512ff5c6a5fc000000000000000000"),
			/* [ 75] */new RandomConstant("5d0c629e6c7de0000000000000000001",
					"8ca25feb8d4bf8000000000000000000"),
			/* [ 76] */new RandomConstant("ba18c53cd8fbc0000000000000000001",
					"1944bfd71a97f0000000000000000000"),
			/* [ 77] */new RandomConstant("74318a79b1f780000000000000000001",
					"32897fae352fe0000000000000000000"),
			/* [ 78] */new RandomConstant("e86314f363ef00000000000000000001",
					"6512ff5c6a5fc0000000000000000000"),
			/* [ 79] */new RandomConstant("d0c629e6c7de00000000000000000001",
					"ca25feb8d4bf80000000000000000000"),
			/* [ 80] */new RandomConstant("a18c53cd8fbc00000000000000000001",
					"944bfd71a97f00000000000000000000"),
			/* [ 81] */new RandomConstant("4318a79b1f7800000000000000000001",
					"2897fae352fe00000000000000000000"),
			/* [ 82] */new RandomConstant("86314f363ef000000000000000000001",
					"512ff5c6a5fc00000000000000000000"),
			/* [ 83] */new RandomConstant("0c629e6c7de000000000000000000001",
					"a25feb8d4bf800000000000000000000"),
			/* [ 84] */new RandomConstant("18c53cd8fbc000000000000000000001",
					"44bfd71a97f000000000000000000000"),
			/* [ 85] */new RandomConstant("318a79b1f78000000000000000000001",
					"897fae352fe000000000000000000000"),
			/* [ 86] */new RandomConstant("6314f363ef0000000000000000000001",
					"12ff5c6a5fc000000000000000000000"),
			/* [ 87] */new RandomConstant("c629e6c7de0000000000000000000001",
					"25feb8d4bf8000000000000000000000"),
			/* [ 88] */new RandomConstant("8c53cd8fbc0000000000000000000001",
					"4bfd71a97f0000000000000000000000"),
			/* [ 89] */new RandomConstant("18a79b1f780000000000000000000001",
					"97fae352fe0000000000000000000000"),
			/* [ 90] */new RandomConstant("314f363ef00000000000000000000001",
					"2ff5c6a5fc0000000000000000000000"),
			/* [ 91] */new RandomConstant("629e6c7de00000000000000000000001",
					"5feb8d4bf80000000000000000000000"),
			/* [ 92] */new RandomConstant("c53cd8fbc00000000000000000000001",
					"bfd71a97f00000000000000000000000"),
			/* [ 93] */new RandomConstant("8a79b1f7800000000000000000000001",
					"7fae352fe00000000000000000000000"),
			/* [ 94] */new RandomConstant("14f363ef000000000000000000000001",
					"ff5c6a5fc00000000000000000000000"),
			/* [ 95] */new RandomConstant("29e6c7de000000000000000000000001",
					"feb8d4bf800000000000000000000000"),
			/* [ 96] */new RandomConstant("53cd8fbc000000000000000000000001",
					"fd71a97f000000000000000000000000"),
			/* [ 97] */new RandomConstant("a79b1f78000000000000000000000001",
					"fae352fe000000000000000000000000"),
			/* [ 98] */new RandomConstant("4f363ef0000000000000000000000001",
					"f5c6a5fc000000000000000000000000"),
			/* [ 99] */new RandomConstant("9e6c7de0000000000000000000000001",
					"eb8d4bf8000000000000000000000000"),
			/* [100] */new RandomConstant("3cd8fbc0000000000000000000000001",
					"d71a97f0000000000000000000000000"),
			/* [101] */new RandomConstant("79b1f780000000000000000000000001",
					"ae352fe0000000000000000000000000"),
			/* [102] */new RandomConstant("f363ef00000000000000000000000001",
					"5c6a5fc0000000000000000000000000"),
			/* [103] */new RandomConstant("e6c7de00000000000000000000000001",
					"b8d4bf80000000000000000000000000"),
			/* [104] */new RandomConstant("cd8fbc00000000000000000000000001",
					"71a97f00000000000000000000000000"),
			/* [105] */new RandomConstant("9b1f7800000000000000000000000001",
					"e352fe00000000000000000000000000"),
			/* [106] */new RandomConstant("363ef000000000000000000000000001",
					"c6a5fc00000000000000000000000000"),
			/* [107] */new RandomConstant("6c7de000000000000000000000000001",
					"8d4bf800000000000000000000000000"),
			/* [108] */new RandomConstant("d8fbc000000000000000000000000001",
					"1a97f000000000000000000000000000"),
			/* [109] */new RandomConstant("b1f78000000000000000000000000001",
					"352fe000000000000000000000000000"),
			/* [110] */new RandomConstant("63ef0000000000000000000000000001",
					"6a5fc000000000000000000000000000"),
			/* [111] */new RandomConstant("c7de0000000000000000000000000001",
					"d4bf8000000000000000000000000000"),
			/* [112] */new RandomConstant("8fbc0000000000000000000000000001",
					"a97f0000000000000000000000000000"),
			/* [113] */new RandomConstant("1f780000000000000000000000000001",
					"52fe0000000000000000000000000000"),
			/* [114] */new RandomConstant("3ef00000000000000000000000000001",
					"a5fc0000000000000000000000000000"),
			/* [115] */new RandomConstant("7de00000000000000000000000000001",
					"4bf80000000000000000000000000000"),
			/* [116] */new RandomConstant("fbc00000000000000000000000000001",
					"97f00000000000000000000000000000"),
			/* [117] */new RandomConstant("f7800000000000000000000000000001",
					"2fe00000000000000000000000000000"),
			/* [118] */new RandomConstant("ef000000000000000000000000000001",
					"5fc00000000000000000000000000000"),
			/* [119] */new RandomConstant("de000000000000000000000000000001",
					"bf800000000000000000000000000000"),
			/* [120] */new RandomConstant("bc000000000000000000000000000001",
					"7f000000000000000000000000000000"),
			/* [121] */new RandomConstant("78000000000000000000000000000001",
					"fe000000000000000000000000000000"),
			/* [122] */new RandomConstant("f0000000000000000000000000000001",
					"fc000000000000000000000000000000"),
			/* [123] */new RandomConstant("e0000000000000000000000000000001",
					"f8000000000000000000000000000000"),
			/* [124] */new RandomConstant("c0000000000000000000000000000001",
					"f0000000000000000000000000000000"),
			/* [125] */new RandomConstant("80000000000000000000000000000001",
					"e0000000000000000000000000000000"),
			/* [126] */new RandomConstant("00000000000000000000000000000001",
					"c0000000000000000000000000000000"),
			/* [127] */new RandomConstant("00000000000000000000000000000001",
					"80000000000000000000000000000000") };

	/**
	 * generate the random number that is "advance" steps from an initial random
	 * number of 0. This is done by starting with 0, and then advancing the by
	 * the appropriate powers of 2 of the linear congruential generator.
	 */
	public static Unsigned16 skipAhead(Unsigned16 advance) {
		Unsigned16 result = new Unsigned16();
		long bit_map;
		bit_map = advance.getLow8();
		for (int i = 0; bit_map != 0 && i < 64; i++) {
			if ((bit_map & (1L << i)) != 0) {
				/*
				 * advance random number by f**(2**i) (x)
				 */
				result.multiply(genArray[i].a);
				result.add(genArray[i].c);
				bit_map &= ~(1L << i);
			}
		}
		bit_map = advance.getHigh8();
		for (int i = 0; bit_map != 0 && i < 64; i++) {
			if ((bit_map & (1L << i)) != 0) {
				/*
				 * advance random number by f**(2**(i + 64)) (x)
				 */
				result.multiply(genArray[i + 64].a);
				result.add(genArray[i + 64].c);
				bit_map &= ~(1L << i);
			}
		}
		return result;
	}

	/**
	 * Generate the next 16 byte random number.
	 */
	public static void nextRand(Unsigned16 rand) {
		/*
		 * advance the random number forward once using the linear congruential
		 * generator, and then return the new random number
		 */
		rand.multiply(genArray[0].a);
		rand.add(genArray[0].c);
	}
}

c. Unsigned16.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package scala.spark.examples.terasort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

/**
 * This file is copied from Hadoop package org.apache.hadoop.examples.terasort.
 */
/**
 * An unsigned 16 byte integer class that supports addition, multiplication, and
 * left shifts.
 */
class Unsigned16 implements Writable {
	private long hi8;
	private long lo8;

	public Unsigned16() {
		hi8 = 0;
		lo8 = 0;
	}

	public Unsigned16(long l) {
		hi8 = 0;
		lo8 = l;
	}

	public Unsigned16(Unsigned16 other) {
		hi8 = other.hi8;
		lo8 = other.lo8;
	}

	@Override
	public boolean equals(Object o) {
		if (o instanceof Unsigned16) {
			Unsigned16 other = (Unsigned16) o;
			return other.hi8 == hi8 && other.lo8 == lo8;
		}
		return false;
	}

	@Override
	public int hashCode() {
		return (int) lo8;
	}

	/**
	 * Parse a hex string
	 *
	 * @param s
	 *            the hex string
	 */
	public Unsigned16(String s) throws NumberFormatException {
		set(s);
	}

	/**
	 * Set the number from a hex string
	 *
	 * @param s
	 *            the number in hexadecimal
	 * @throws NumberFormatException
	 *             if the number is invalid
	 */
	public void set(String s) throws NumberFormatException {
		hi8 = 0;
		lo8 = 0;
		final long lastDigit = 0xfl << 60;
		for (int i = 0; i < s.length(); ++i) {
			int digit = getHexDigit(s.charAt(i));
			if ((lastDigit & hi8) != 0) {
				throw new NumberFormatException(s + " overflowed 16 bytes");
			}
			hi8 <<= 4;
			hi8 |= (lo8 & lastDigit) >>> 60;
			lo8 <<= 4;
			lo8 |= digit;
		}
	}

	/**
	 * Set the number to a given long.
	 *
	 * @param l
	 *            the new value, which is treated as an unsigned number
	 */
	public void set(long l) {
		lo8 = l;
		hi8 = 0;
	}

	/**
	 * Map a hexadecimal character into a digit.
	 *
	 * @param ch
	 *            the character
	 * @return the digit from 0 to 15
	 * @throws NumberFormatException
	 */
	private static int getHexDigit(char ch) throws NumberFormatException {
		if (ch >= '0' && ch <= '9') {
			return ch - '0';
		}
		if (ch >= 'a' && ch <= 'f') {
			return ch - 'a' + 10;
		}
		if (ch >= 'A' && ch <= 'F') {
			return ch - 'A' + 10;
		}
		throw new NumberFormatException(ch + " is not a valid hex digit");
	}

	private static final Unsigned16 TEN = new Unsigned16(10);

	public static Unsigned16 fromDecimal(String s) throws NumberFormatException {
		Unsigned16 result = new Unsigned16();
		Unsigned16 tmp = new Unsigned16();
		for (int i = 0; i < s.length(); i++) {
			char ch = s.charAt(i);
			if (ch < '0' || ch > '9') {
				throw new NumberFormatException(ch
						+ " not a valid decimal digit");
			}
			int digit = ch - '0';
			result.multiply(TEN);
			tmp.set(digit);
			result.add(tmp);
		}
		return result;
	}

	/**
	 * Return the number as a hex string.
	 */
	public String toString() {
		if (hi8 == 0) {
			return Long.toHexString(lo8);
		} else {
			StringBuilder result = new StringBuilder();
			result.append(Long.toHexString(hi8));
			String loString = Long.toHexString(lo8);
			for (int i = loString.length(); i < 16; ++i) {
				result.append('0');
			}
			result.append(loString);
			return result.toString();
		}
	}

	/**
	 * Get a given byte from the number.
	 *
	 * @param b
	 *            the byte to get with 0 meaning the most significant byte
	 * @return the byte or 0 if b is outside of 0..15
	 */
	public byte getByte(int b) {
		if (b >= 0 && b < 16) {
			if (b < 8) {
				return (byte) (hi8 >> (56 - 8 * b));
			} else {
				return (byte) (lo8 >> (120 - 8 * b));
			}
		}
		return 0;
	}

	/**
	 * Get the hexadecimal digit at the given position.
	 *
	 * @param p
	 *            the digit position to get with 0 meaning the most significant
	 * @return the character or '0' if p is outside of 0..31
	 */
	public char getHexDigit(int p) {
		byte digit = getByte(p / 2);
		if (p % 2 == 0) {
			digit >>>= 4;
		}
		digit &= 0xf;
		if (digit < 10) {
			return (char) ('0' + digit);
		} else {
			return (char) ('A' + digit - 10);
		}
	}

	/**
	 * Get the high 8 bytes as a long.
	 */
	public long getHigh8() {
		return hi8;
	}

	/**
	 * Get the low 8 bytes as a long.
	 */
	public long getLow8() {
		return lo8;
	}

	/**
	 * Multiple the current number by a 16 byte unsigned integer. Overflow is
	 * not detected and the result is the low 16 bytes of the result. The
	 * numbers are divided into 32 and 31 bit chunks so that the product of two
	 * chucks fits in the unsigned 63 bits of a long.
	 *
	 * @param b
	 *            the other number
	 */
	void multiply(Unsigned16 b) {
		// divide the left into 4 32 bit chunks
		long[] left = new long[4];
		left[0] = lo8 & 0xffffffffl;
		left[1] = lo8 >>> 32;
		left[2] = hi8 & 0xffffffffl;
		left[3] = hi8 >>> 32;
		// divide the right into 5 31 bit chunks
		long[] right = new long[5];
		right[0] = b.lo8 & 0x7fffffffl;
		right[1] = (b.lo8 >>> 31) & 0x7fffffffl;
		right[2] = (b.lo8 >>> 62) + ((b.hi8 & 0x1fffffffl) << 2);
		right[3] = (b.hi8 >>> 29) & 0x7fffffffl;
		right[4] = (b.hi8 >>> 60);
		// clear the cur value
		set(0);
		Unsigned16 tmp = new Unsigned16();
		for (int l = 0; l < 4; ++l) {
			for (int r = 0; r < 5; ++r) {
				long prod = left[l] * right[r];
				if (prod != 0) {
					int off = l * 32 + r * 31;
					tmp.set(prod);
					tmp.shiftLeft(off);
					add(tmp);
				}
			}
		}
	}

	/**
	 * Add the given number into the current number.
	 *
	 * @param b
	 *            the other number
	 */
	public void add(Unsigned16 b) {
		long sumHi;
		long sumLo;
		long reshibit, hibit0, hibit1;
		sumHi = hi8 + b.hi8;
		hibit0 = (lo8 & 0x8000000000000000L);
		hibit1 = (b.lo8 & 0x8000000000000000L);
		sumLo = lo8 + b.lo8;
		reshibit = (sumLo & 0x8000000000000000L);
		if ((hibit0 & hibit1) != 0 | ((hibit0 ^ hibit1) != 0 && reshibit == 0))
			sumHi++; /* add carry bit */
		hi8 = sumHi;
		lo8 = sumLo;
	}

	/**
	 * Shift the number a given number of bit positions. The number is the low
	 * order bits of the result.
	 *
	 * @param bits
	 *            the bit positions to shift by
	 */
	public void shiftLeft(int bits) {
		if (bits != 0) {
			if (bits < 64) {
				hi8 <<= bits;
				hi8 |= (lo8 >>> (64 - bits));
				lo8 <<= bits;
			} else if (bits < 128) {
				hi8 = lo8 << (bits - 64);
				lo8 = 0;
			} else {
				hi8 = 0;
				lo8 = 0;
			}
		}
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		hi8 = in.readLong();
		lo8 = in.readLong();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(hi8);
		out.writeLong(lo8);
	}
}
时间: 2024-10-10 05:05:59

Spark 与 Hadoop 关于 TeraGen/TeraSort 的对比实验(包含源代码)的相关文章

大数据 --&gt; spark与hadoop对比

spark与hadoop对比 什么是Spark Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点:但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法.其架构如下图所示: Spark与Hadoop对比 S

Storm与Spark、Hadoop三种框架对比

一.Storm与Spark.Hadoop三种框架对比 Storm与Spark.Hadoop这三种框架,各有各的优点,每个框架都有自己的最佳应用场景.所以,在不同的应用场景下,应该选择不同的框架. 1.Storm是最佳的流式计算框架,Storm由Java和Clojure写成,Storm的优点是全内存计算,所以它的定位是分布式实时计算系统,按照Storm作者的说法,Storm对于实时计算的意义类似于Hadoop对于批处理的意义.Storm的适用场景:1)流数据处理Storm可以用来处理源源不断流进来

Spark与Hadoop MapReduce的对比分析

Spark与Hadoop MapReduce均为开源集群计算系统,但是两者适用的场景并不相同.其中,Spark基于内存计算实现,可以以内存速度进行计算,优化工作负载迭代过程,加快数据分析处理速度:Hadoop MapReduce以批处理方式处理数据,每次启动任务后,需要等待较长时间才能获得结果.在机器学习和数据库查询等数据计算过程中,Spark的处理素的可以达到Hadoop MapReduce 的100倍以上.因此,对于实时要求较高的计算处理应用,Spark更加适用:对于海量数据分析的非实时计算

Cloudera Developer之Spark 及 Hadoop 开发员培训(CCA-175)

学习如何将数据导入到 Apache Hadoop 机群并使用 Spark.Hive.Flume.Sqoop.Impala 及其他 Hadoop 生态系统工具对数据进行各种操作和处理分析. 详情:https://www.huodongjia.com/event-1838227010.html 在为期四天的培训中,学员将学习关键概念和掌握使用最新技术和工具将数据采集到 Hadoop 机群并进行处理.通过学习掌握诸如 Spark.Hive.Flume.Sqoop 和 Impala 这样的 Hadoop

Spark和Hadoop作业之间的区别

Spark目前被越来越多的企业使用,和Hadoop一样,Spark也是以作业的形式向集群提交任务,那么在内部实现Spark和Hadoop作业模型都一样吗?答案是不对的. 熟悉Hadoop的人应该都知道,用户先编写好一个程序,我们称为Mapreduce程序,一个Mapreduce程序就是一个Job,而一个Job里面可以有一个或多个Task,Task又可以区分为Map Task和Reduce Task,如下图所示: 而在Spark中,也有Job概念,但是这里的Job和Mapreduce中的Job不一

北京上海广州Cloudera Hadoop大数据:CCAH(管理员)、CCA(Spark and Hadoop)、HBase

上海5月21-24日ClouderaAaminisrrator Training for Apache Hadoop(CCAH) 广州6月1-3日Cloudera Trainingfor Apache Hbase 广州6月18-21日Cloudera Developertraining for Spark and Hadoop(CCA-175) 上海6月27-30日Cloudera Developertraining for Spark and Hadoop(CCA-175) 北京7月7-10日

Spark与Hadoop的区别

[适用场景不同] ? ? Hadoop:分布式批处理计算,强调批处理,常用于数据挖掘.分析 ? ? Spark:是一个基于内存计算的开源的集群计算系统,那些在并行操作之间重用工作数据集(比如机器学习算法)的工作负载.为了优化这些类型的工作负载,Spark 引进了内存集群计算的概念,可在内存集群计算中将数据集缓存在内存中,以缩短访问延迟, 目的是让数据分析更加快速 ? ? 除了能够提供交互式查询外,它还可以优化迭代工作负载 ? ? [语言支持不同] ? ? Spark 是在 Scala 语言中实现

spark、hadoop动态增减节点

之前在搭建实验环境的时候按部就班的配置好,然后就启动了.后来再一琢磨,有点不对劲.分布式系统的一个优势就是动态可伸缩性,如果增删节点需要重启那肯定是不行的.后来研究了一下,发现的确是不需要重启的.仍以Spark和Hadoop为例: 对于spark来说很简单,增加一个节点就是命令启动: ./sbin/start-slave.sh spark://<master>:7077 ,就完成了新节点的注册和加入集群.停止的时候是: ./sbin/stop-slave.sh.之后在master的管理端会显示

Cloudera Spark 及 Hadoop 开发员培训学习【北京上海】

Spark 及 Hadoop 开发员培训 学习如何将数据导入到 Apache Hadoop 机群并使用 Spark.Hive.Flume.Sqoop.Impala 及其他 Hadoop 生态系统工具对数据进行各种操作和处理分析 在为期四天的培训中,学员将学习关键概念和掌握使用最新技术和工具将数据采集到 Hadoop 机群并进行处理.通过学习掌握诸如 Spark.Hive.Flume.Sqoop 和 Impala 这样的 Hadoop 生态系统工具和技术,Hadoop 开发员将具备解决实际大数据问