spark1.4的本地模式编程练习(2)

spark编程练习

申明:以下代码仅作学习参考使用,勿使用在商业用途。

  • Wordcount
  • UserMining
  • TweetMining
  • HashtagMining
  • InvertedIndex

代码以及测试数据下载

数据以及测试代码的获取请点击 —— [ 这里 ]

WordCount代码块



import java.io.Serializable;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

/*
 *  step 1, the mapper:
 *
 *  -我们为每一个单词添加属性 1.获取形如(word,1)的 JavaPairRDD<String, Integer>。单词作为key
 *
 *  step 2, the reducer:
 *  -合并统计.
 *
 *
 */
public class Ex0Wordcount implements Serializable {
    public static String pathToFile = "data/wordcount.txt";
    public static SparkConf conf = null;
    public static JavaSparkContext sc = null;

    static {

        conf = new SparkConf().setAppName("Wordcount")
                .set("spark.driver.allowMultipleContexts", "true");
                //.setMaster("spark://master:7077");
        conf.set("spark.executor.memory", "1000m");
        conf .setMaster("local[*]"); // here local mode. And * means you will use
        // as much as you have cores.

        sc = new JavaSparkContext(conf);
        sc.addJar("/home/hadoop/tools/jars/1.jar");
    }

    public static void main(String[] args) {
        Ex0Wordcount wc = new Ex0Wordcount();
        wc.filterOnWordcount();

    }

    public JavaRDD<String> loadData() {

        JavaRDD<String> words = sc.textFile(pathToFile).flatMap(
                new FlatMapFunction<String, String>() {
                    public Iterable call(String line) throws Exception {
                        return Arrays.asList(line.split(" "));
                    }
                });

        return words;

    }

    /**
     * Now count how much each word appears!
     */
    public JavaPairRDD<String, Integer> wordcount() {
        JavaRDD<String> words = loadData();

        // code here
        JavaPairRDD<String, Integer> couples = words
                .mapToPair(new PairFunction<String, String, Integer>() {

                    @Override
                    public Tuple2<String, Integer> call(String s)
                            throws Exception {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                });

        // code here
        JavaPairRDD<String, Integer> result = couples
                .reduceByKey(new Function2<Integer, Integer, Integer>() {

                    @Override
                    public Integer call(Integer i0, Integer i1)
                            throws Exception {
                        return i0 + i1;
                    }
                });

        return result;
    }

    /**
     * Now keep the word which appear strictly more than 4 times!
     */
    public JavaPairRDD<String, Integer> filterOnWordcount() {
        JavaPairRDD<String, Integer> wordcounts = wordcount();

        List<Tuple2<String, Integer>> output = wordcounts.collect();
        JavaPairRDD<String, Integer> filtered = null;
        // int count=0;
        for (Tuple2<?, ?> tuple : output) {
            if (Integer.parseInt(tuple._2() + "") > 4) {
                // filtered=tuple;
                System.out.println(tuple._1() + "==" + tuple._2());
            }

        }
        return filtered;
    }

}

UserMining代码块



import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.cassandra.cli.CliParser.newColumnFamily_return;
import org.apache.cassandra.thrift.Cassandra.system_add_column_family_args;
import org.apache.spark.Partition;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.util.TaskCompletionListener;

import scala.Function0;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.BoxedUnit;
import utils.Parse;
import utils.Tweet;

/**
 * The Java Spark API documentation:
 * http://spark.apache.org/docs/latest/api/java/index.html
 *
 * 我们使用包含了8198个tweet数据记录。数据格式如下:
 *
 * {"id":"572692378957430785", "user":"Srkian_nishu :)", "text":
 * "@always_nidhi @YouTube no i dnt understand bt i loved of this mve is rocking"
 * , "place":"Orissa", "country":"India"}
 *
 * 目标: 找出user所有的tweet账户(一个user可能包含多个tweet账户,如Srkian_nishu的tweet账户有[
 * 572692378957430785,...])
 *
 */
public class Ex1UserMining implements Serializable{

    private static String pathToFile = "data/reduced-tweets.json";

    public static void main(String[] args) {
        Ex1UserMining userMining=new Ex1UserMining();
        //userMining.tweetsByUser();
        userMining.filterOnTweetUser();
        System.exit(0);
    }

    public JavaRDD<Tweet> loadData() {
        // Create spark configuration and spark context
        SparkConf conf = new SparkConf().setAppName("User mining")
                .set("spark.driver.allowMultipleContexts", "true")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        // Load the data and parse it into a Tweet.
        // Look at the Tweet Object in the TweetUtils class.
        JavaRDD<Tweet> tweets = sc.textFile(pathToFile).map(
                new Function<String, Tweet>() {
                    public Tweet call(String line) throws Exception {
                        // TODO Auto-generated method stub
                        return Parse.parseJsonToTweet(line);
                    }

                });

        return tweets;
    }

    /**
     * For each user return all his tweets
     */
    public JavaPairRDD<String, Iterable<Tweet>> tweetsByUser() {
        JavaRDD<Tweet> tweets = loadData();

        // TODO write code here
        // Hint: the Spark API provides a groupBy method
        JavaPairRDD<String, Iterable<Tweet>> tweetsByUser = tweets.groupBy(new Function<Tweet, String>() {
            @Override
            public String call(Tweet tweet) throws Exception {

                return tweet.getUser();
            }
        });

        return tweetsByUser;
    }

    /**
     * Compute the number of tweets by user
     */
    public JavaPairRDD<String, Integer> tweetByUserNumber() {
        JavaRDD<Tweet> tweets = loadData();

        // TODO write code here
        // Hint: think about what you did in the wordcount example
        JavaPairRDD<String, Integer> count = tweets.mapToPair(new PairFunction<Tweet, String    ,Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tweet tweet) throws Exception {
                return  new Tuple2<String, Integer>(tweet.getUser(), 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer a, Integer b) throws Exception {
                return a+b;
            }
        });

        return count;
    }

    public void filterOnTweetUser() {
        JavaPairRDD<String, Iterable<Tweet>> filtered = tweetsByUser();

        //filtered.values().splits().get(0);
     filtered.keyBy(new Function<Tuple2<String,Iterable<Tweet>>,Tuple2<String,ArrayList<Long>> >() {

        @Override
        public Tuple2<String, ArrayList<Long>> call(
                Tuple2<String, Iterable<Tweet>> t) throws Exception {
            ArrayList< Long> arrayList=new ArrayList<Long>();
            arrayList.add(t._2().iterator().next().getId());
            return new Tuple2<String, ArrayList<Long>>(t._1(),arrayList);
        }
    });

        List<Tuple2<String, Iterable<Tweet>>> output = filtered.collect();
        Iterator< Tuple2<String, Iterable<Tweet>>> it2=output.iterator();
        List<Long> list=new ArrayList<Long>();
        while (it2.hasNext()) {
            Tuple2<String, Iterable<utils.Tweet>> tuple2 = (Tuple2<String, Iterable<utils.Tweet>>) it2.next();
            Iterator<Tweet> iterator= tuple2._2().iterator();
            while (iterator.hasNext()) {
                Tweet tweet = (Tweet) iterator.next();
                list.add(tweet.getId());
            //  System.out.println(tuple2._1()+"=="+tweet.getId());

            }
            System.out.println(tuple2._1()+"=="+ShowData(list));
            list.clear();
        }
        //return filtered;

    }

    public String ShowData(List<Long> list) {
        String str="[";
        for (int i = 0; i < list.size(); i++) {
            str+=list.get(i)+",";
        }
        return str+"]";
    }

    public JavaPairRDD<String, Integer> filterOnTweetcount() {
        JavaPairRDD<String, Integer> tweetcounts = tweetByUserNumber();

        List<Tuple2<String, Integer>> output = tweetcounts.collect();
        JavaPairRDD<String, Integer> filtered = null;
        // int count=0;
        for (Tuple2<?, ?> tuple : output) {
                // filtered=tuple;
                System.out.println(tuple._1() + "==" + tuple._2());

        }
        return filtered;
    }

}

TweetMining代码块



import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;
import utils.Parse;
import utils.Tweet;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

/**
 * The Java Spark API documentation:
 * http://spark.apache.org/docs/latest/api/java/index.html
 * 我们使用包含了8198个tweet数据记录。数据格式如下:
 *
 * {"id":"572692378957430785", "user":"Srkian_nishu :)", "text":
 * "@always_nidhi @YouTube no i dnt understand bt i loved of this mve is rocking"
 * , "place":"Orissa", "country":"India"}
 *
 * 目标: 1.找出所有被@的人 2.计算每个人被@到的次数,找出前10个@次数最多的人
 *
 *
 * Use the Ex2TweetMiningTest to implement the code.
 */
public class Ex2TweetMining implements Serializable {

    /**
     *
     */

    private static String pathToFile = "data/reduced-tweets.json";
    private static String saveAsTextFile = "out1/out1.txt";

    /**
     * Load the data from the json file and return an RDD of Tweet
     */
    public JavaRDD<Tweet> loadData() {
        // create spark configuration and spark context
        SparkConf conf = new SparkConf().setAppName("Tweet mining").setMaster(
                "local[*]");
        // .setMaster("spark://master:7077");
        conf.set("spark.driver.allowMultipleContexts", "true");
        JavaSparkContext sc = new JavaSparkContext(conf);
        // sc.addJar("/home/sun/jars/tutorial-all.jar");

        // load the data and create an RDD of Tweet
        // JavaRDD<Tweet> tweets =
        // sc.textFile("hdfs://master:9000/sparkdata/reduced-tweets.json")
        JavaRDD<Tweet> tweets = sc.textFile(pathToFile).map(
                new Function<String, Tweet>() {
                    public Tweet call(String line) throws Exception {
                        // TODO Auto-generated method stub
                        return Parse.parseJsonToTweet(line);
                    }

                });
        return tweets;
    }

    /**
     * Find all the persons mentioned on tweets (case sensitive)
     */
    public JavaRDD<String> mentionOnTweet() {
        JavaRDD<Tweet> tweets = loadData();

        // You want to return an RDD with the mentions
        // Hint: think about separating the word in the text field and then find
        // the mentions
        // TODO write code here
        JavaRDD<String> mentions = tweets
                .flatMap(new FlatMapFunction<Tweet, String>() {
                    public Iterable<String> call(Tweet t) throws Exception {
                        String text = t.getText();
                        Set<String> set = new HashSet<String>();
                        String[] words = text.split(" ");
                        for (String word : words) {
                            if (word.startsWith("@")) {
                                set.add(word);
                            }
                        }
                        return set;
                    }

                });

        return mentions;

    }

    /**
     * Count how many times each person is mentioned
     */
    public JavaPairRDD<String, Integer> countMentions() {
        JavaRDD<String> mentions = mentionOnTweet();

        // Hint: think about what you did in the wordcount example
        // TODO write code here
        JavaPairRDD<String, Integer> mentionCount = mentions.mapToPair(
                new PairFunction<String, String, Integer>() {
                    public Tuple2<String, Integer> call(String t)
                            throws Exception {
                        return new Tuple2<String, Integer>(t, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                // TODO Auto-generated method stub
                return v1 + v2;
            }
        });
        // mentionCount.saveAsTextFile("hdfs://master:9000/sparkdata/tweets-m4");
        // mentionCount.saveAsTextFile(saveAsTextFile);
        return mentionCount;
    }

    /**
     * Find the 10 most mentioned persons by descending order
     */
    public List<Tuple2<Integer, String>> top10mentions() {
        JavaPairRDD<String, Integer> counts = countMentions();

        // Hint: take a look at the sorting and take methods
        // TODO write code here
        List<Tuple2<Integer, String>> mostMentioned = (List<Tuple2<Integer, String>>) counts
                .mapToPair(
                        new PairFunction<Tuple2<String, Integer>, Integer, String>() {

                            @Override
                            public Tuple2<Integer, String> call(
                                    Tuple2<String, Integer> tuple2)
                                    throws Exception {

                                return new Tuple2<Integer, String>(tuple2._2(),
                                        tuple2._1());
                            }
                        }).sortByKey(false).take(10);

        return mostMentioned;
    }

    public void filterOnTweetTop10Mentions() {
        List<Tuple2<Integer, String>> output = top10mentions();
        Iterator<Tuple2<Integer, String>> it2 = output.iterator();
        // List<Long> list=new ArrayList<Long>();
        while (it2.hasNext()) {
            Tuple2<Integer, String> tuple2 = (Tuple2<Integer, String>) it2
                    .next();
            System.out.println(tuple2._1() + "==" + tuple2._2());

        }
        // System.out.println(tuple2._1()+"=="+ShowData(list));
        // list.clear();
    }

    // return filtered;

    public static void main(String[] args) {
        Ex2TweetMining ex2TweetMining = new Ex2TweetMining();
        ex2TweetMining.filterOnTweetTop10Mentions();

        /*
         * JavaPairRDD<String, Integer> res = ex2TweetMining.countMentions();
         * System.out.println(res.take(1));
         */
    }
}

HashtagMining代码块



import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;
import utils.Parse;
import utils.Tweet;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 *  The Java Spark API documentation: http://spark.apache.org/docs/latest/api/java/index.html
 *
 * 我们使用包含了8198个tweet数据记录。数据格式如下:
 *
 * {"id":"572692378957430785", "user":"Srkian_nishu :)", "text":
 * "@always_nidhi @YouTube no i dnt understand bt i loved of this mve is rocking"
 * , "place":"Orissa", "country":"India"}
 *
 * 目标: 1.找出所有所有被标记(”#“)到的人。
 *      2.找出每个被标记(“#”)的人被(”@“)到的次数,求出次数前十
 *
 *
 */
public class Ex3HashtagMining implements Serializable{

  private static String pathToFile = "data/reduced-tweets.json";

  public static void main(String[] args) {
      Ex3HashtagMining ex3HashtagMining=new Ex3HashtagMining();
      ex3HashtagMining.filterOnTweetTop10HashtagMining();

}

  /**
   *  Load the data from the json file and return an RDD of Tweet
   */
  public JavaRDD<Tweet> loadData() {
    // create spark configuration and spark context
    SparkConf conf = new SparkConf()
        .setAppName("Hashtag mining")
        .set("spark.driver.allowMultipleContexts", "true")
        .setMaster("local[*]");

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<Tweet> tweets = sc.textFile(pathToFile).map(new Function<String, Tweet>() {
        public Tweet call(String line) throws Exception {
            // TODO Auto-generated method stub
            return Parse.parseJsonToTweet(line);
        }

    });

    return tweets;
  }

  /**
   *  Find all the hashtags mentioned on tweets
   */
  public JavaRDD<String> hashtagMentionedOnTweet() {
    JavaRDD<Tweet> tweets = loadData();

    // You want to return an RDD with the mentions
    // Hint: think about separating the word in the text field and then find the mentions
    // TODO write code here
    JavaRDD<String> mentions = tweets.flatMap(new FlatMapFunction<Tweet, String>() {
        @Override
        public Iterable<String> call(Tweet tweet) throws Exception {
            return Arrays.asList(tweet.getText().split(" "));
        }
    }).filter(new Function<String, Boolean>() {

        @Override
        public Boolean call(String string) throws Exception {
            return string.startsWith("#")&&string.length()>1;
        }
    });

    return mentions;
  }

  /**
   *  Count how many times each hashtag is mentioned
   */
  public JavaPairRDD<String,Integer> countMentions() {
    JavaRDD<String> mentions = hashtagMentionedOnTweet();

    // Hint: think about what you did in the wordcount example
    // TODO write code here
    JavaPairRDD<String, Integer> counts = mentions.mapToPair(new PairFunction<String, String, Integer>() {

        @Override
        public Tuple2<String, Integer> call(String s) throws Exception {
            return new Tuple2<String, Integer>(s, 1);
        }
    }).reduceByKey(new Function2<Integer, Integer, Integer>() {

        @Override
        public Integer call(Integer a, Integer b) throws Exception {
            return a+b;
        }
    });

    return counts;
  }

  /**
   *  Find the 10 most popular Hashtags by descending order
   */
  public List<Tuple2<Integer, String>> top10HashTags() {
    JavaPairRDD<String, Integer> counts = countMentions();

    // Hint: take a look at the sorting and take methods
    // TODO write code here
    List<Tuple2<Integer, String>> top10 = (List<Tuple2<Integer, String>>) counts.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {

        @Override
        public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple2)
                throws Exception {
            return new Tuple2<Integer, String>(tuple2._2(), tuple2._1());
        }

    }).sortByKey(false).take(10);

    return top10;
  }

  public void filterOnTweetTop10HashtagMining() {
        List<Tuple2<Integer, String>> output = top10HashTags();
        Iterator<Tuple2<Integer, String>> it2 = output.iterator();
        // List<Long> list=new ArrayList<Long>();
        while (it2.hasNext()) {
            Tuple2<Integer, String> tuple2 = (Tuple2<Integer, String>) it2
                    .next();
            System.out.println(tuple2._1() + "==" + tuple2._2());

        }
        // System.out.println(tuple2._1()+"=="+ShowData(list));
        // list.clear();
    }

}

InvertedIndex代码块



import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;

import scala.Tuple2;
import utils.Parse;
import utils.Tweet;

/**

 * 目标 : 建立标记的索引视图
 *
 * 说明:  例如对于标记#spark,它出现在tweet1, tweet3, tweet39中。 建立的索引应该返回(#spark, List(tweet1,tweet3, tweet39))
 *
 */
public class Ex4InvertedIndex implements Serializable{

  private static String pathToFile = "data/reduced-tweets.json";

  public static void main(String[] args) {
      ShowData();
}

  /**
   *  Load the data from the json file and return an RDD of Tweet
   */
  public static JavaRDD<Tweet> loadData() {
    // create spark configuration and spark context
    SparkConf conf = new SparkConf()
        .setAppName("Inverted index")
        .set("spark.driver.allowMultipleContexts", "true")
        .setMaster("local[*]");

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<Tweet> tweets = sc.textFile(pathToFile).map(new Function<String, Tweet>() {
        public Tweet call(String line) throws Exception {
            // TODO Auto-generated method stub
            return Parse.parseJsonToTweet(line);
        }

    });

    return tweets;
  }

  public  static void ShowData(){
      Map<String, Iterable<Tweet>> output=invertedIndex();

      Iterator<Entry<String, Iterable<Tweet>>> it2 = output.entrySet().iterator();

        List<Long> list=new ArrayList<Long>();
        while (it2.hasNext()) {
            Entry<String, Iterable<Tweet>> tuple2 =  it2.next();
            Iterator<Tweet> iterator= tuple2.getValue().iterator();
            while (iterator.hasNext()) {
                Tweet tweet = (Tweet) iterator.next();
                list.add(tweet.getId());
            //  System.out.println(tuple2._1()+"=="+tweet.getId());

            }
            System.out.println(tuple2.getKey()+ "==" +PrintData(list));
            list.clear();

        }

  }

  public static String PrintData(List<Long> list ){
        String str="[";
        for (int i = 0; i < list.size(); i++) {
            str+=list.get(i)+",";
        }
        return str+"]";
  }

  public  static void ShowData1(){
      Map<String, Iterable<Tweet>> output=invertedIndex();

      Iterator<Entry<String, Iterable<Tweet>>> it2 = output.entrySet().iterator();

        // List<Long> list=new ArrayList<Long>();
        while (it2.hasNext()) {
            Entry<String, Iterable<Tweet>> tuple2 =  it2.next();

            System.out.println(tuple2.getKey()+ "==" + tuple2.getValue());

        }

  }

  public static Map<String, Iterable<Tweet>> invertedIndex() {
    JavaRDD<Tweet> tweets = loadData();

    // for each tweet, extract all the hashtag and then create couples (hashtag,tweet)
    // Hint: see the flatMapToPair method
    // TODO write code here
    JavaPairRDD<String, Tweet> pairs = tweets.flatMapToPair(new PairFlatMapFunction<Tweet, String, Tweet>() {

        @Override
        public Iterable<Tuple2<String, Tweet>> call(Tweet tweet)
                throws Exception {
            List results = new ArrayList();
              List<String> hashtags = new ArrayList();
              List<String> words = Arrays.asList(tweet.getText().split(" "));

              for (String word: words) {
                if (word.startsWith("#") && word.length() > 1) {
                  hashtags.add(word);
                }
              }

              for (String hashtag : hashtags) {
                Tuple2<String, Tweet> result = new Tuple2<>(hashtag, tweet);
                results.add(result);
              }
            return results;
        }
    });

    // We want to group the tweets by hashtag
    // TODO write code here
    JavaPairRDD<String, Iterable<Tweet>> tweetsByHashtag = pairs.groupByKey();

    // Then return the inverted index (= a map structure)
    // TODO write code here
    Map<String, Iterable<Tweet>> map =tweetsByHashtag.collectAsMap();

    return map;
  }

}

时间: 2024-08-13 12:43:37

spark1.4的本地模式编程练习(2)的相关文章

spark1.4的本地模式编程练习(1)

spark编程练习 申明:以下代码仅作学习参考使用,勿使用在商业用途. Wordcount UserMining TweetMining HashtagMining InvertedIndex Test Test代码 package tutorial; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD

Spark API编程动手实战-01-以本地模式进行Spark API实战map、filter和co

首先以spark的本地模式测试spark API,以local的方式运行spark-shell: 先从parallelize入手吧: map操作后结果: 下面看下 filter操作: filter执行结果: 我们用最正宗的scala函数式编程的风格: 执行结果: 从结果 可以看出来,与之前那种分步奏方式结果是一样的 但采用这种方式,即是复合scala风格的写法,也是符合spark的应用程序风格的写法,在spark的编程中,大多数功能的实现都是只要一行代码即可完成.

Spark API编程动手实战-01-以本地模式进行Spark API实战map、filter和collect

首先以spark的本地模式测试spark API,以local的方式运行spark-shell: 先从parallelize入手吧: map操作后结果: 下面看下 filter操作: filter执行结果: 我们用最正宗的scala函数式编程的风格: 执行结果: 从结果 可以看出来,与之前那种分步奏方式结果是一样的 但采用这种方式,即是复合scala风格的写法,也是符合spark的应用程序风格的写法,在spark的编程中,大多数功能的实现都是只要一行代码即可完成.

Pig安装及本地模式实战

Pig是Apache的一个开源项目,用于简化MapReduce的开发,实质Pig将转化为MapReduce作业,使开发人员更多专注数据而不是执行的本质,是不懂java人员的福利. Pig由以下两部分组成: 1.表达数据流的语言,成为Pig Latin. 2.运行Pig Latin程序的执行环境.目前有两种环境:在单个JVM本地执行和在HADOOP集群上分步执行 废话不多说,实战起来: 和Apache其他项目一样,安装Pig都很简单,在/etc/profile文件中设置环境变量. #set pig

设计模式剖析-面向模式编程

这是一篇不成形的论文,新项目开始了,先放水了.以前看设计模式相关的书籍,总是感觉记起来很吃力,当时理解了过后仍是忘记.康德在理性批评前言中有过大致如下的描述:如果我们做一件事情,一旦要达成目的,或是已经达成目的,却发现我们得推翻以前所有的建设而重新开始,对于这门学科,我们还远远没有找到一条可靠的道路.本文给出了设计模式的一个全新的分类视角,使记忆.理解都变得十分容易.我相信这是一条正确的道路.并提出了面向模式的程序设计概念.它融合了面向对象编程.泛型.面向方面编程.并为新模式的产生指明了道路.

IntelliJ IDEA(Ultimate版本)的下载、安装和WordCount的初步使用(本地模式和集群模式)Ultimate

不多说,直接上干货! IntelliJ IDEA号称当前Java开发效率最高的IDE工具.IntelliJ IDEA有两个版本:社区版(Community)和旗舰版(Ultimate).社区版时免费的.开源的,但功能较少,旗舰版提供了较多的功能,是收费的,可以试用30天. 强烈推荐,新手刚入门,可以去用社区版,但是,立马还是用旗舰版,我可是走了弯路,当然,体会到其中的棘手还是很不错! IDEA Community(社区版)再谈之无奈之下还是去安装社区版 IntelliJ IDEA(Communi

Hive的三种安装方式(内嵌模式,本地模式远程模式)

一.安装模式介绍:     Hive官网上介绍了Hive的3种安装方式,分别对应不同的应用场景.     1.内嵌模式(元数据保村在内嵌的derby种,允许一个会话链接,尝试多个会话链接时会报错)     2.本地模式(本地安装mysql 替代derby存储元数据)     3.远程模式(远程安装mysql 替代derby存储元数据) 二.安装环境以及前提说明:     首先,Hive是依赖于hadoop系统的,因此在运行Hive之前需要保证已经搭建好hadoop集群环境.     本文中使用的

Windows8.1+Eclipse搭建Hadoop2.7.2本地模式开发环境

下面介绍如何在Windows8.1上搭建hadoop2.7.2的本地模式开发环境,为后期做mapreduce的开发做准备. 在搭建开发环境之前,首先选择开发工具,就是大家都很熟悉的Eclipse(本人这次使用的是eclipse4.4.2版本),Eclipse提供了hadoop的插件,我们通过这个插件,就可以在eclipse中编写mapreduce.但是,这个插件可能会随着hadoop的版本升级或者eclipse的版本升级,而需要相应的去进行编译.所以,在我们开发之前,学会编译这个eclipse的

Hadoop学习笔记(2)-搭建Hadoop本地模式

0.前言 hadoop总共有三种运行方式.本地模式(Local (Standalone) Mode),伪分布式(Pseudo-Distributed Mode),分布式(Fully-Distributed Mode).后面足一讲解搭建本地以及伪分布式,分布式读者自行搭建. 参考资料(官网为主,网络资料为铺): http://hadoop.apache.org/docs/r2.6.4/hadoop-project-dist/hadoop-common/SingleCluster.html#Stan