spark集群搭建整理之解决亿级人群标签问题

   最近在做一个人群标签的项目,也就是根据客户的一些交易行为自动给客户打标签,而这些标签更有利于我们做商品推荐,目前打上标签的数据已达5亿+,

用户量大概1亿+,项目需求就是根据各种组合条件寻找标签和人群信息。

举个例子:

集合A: ( 购买过“牙膏“的人交易金额在10-500元并且交易次数在5次的客户并且平均订单价在20 -200元)  。

集合B: (购买过“牙刷”的人交易金额在5-50 并且交易次数在3次的客户并且平均订单价在10-30元)。

求:<1>  获取集合A  交 集合B 客户数 和 客户的具体信息,希望时间最好不要超过15s。

上面这种问题如果你用mysql做的话,基本上是算不出来的,时间上更无法满足项目需求。

一:寻找解决方案

如果你用最小的工作量解决这个问题的话,可以搭建一个分布式的Elasticsearch集群,查询中相关的Nick,AvgPrice,TradeCount,TradeAmont字段可以用

keyword模式存储,避免出现fieldData字段无法查询的问题,虽然ES大体上可以解决这个问题,但是熟悉ES的朋友应该知道,它的各种查询都是我们通过json

的格式去定制,虽然可以使用少量的script脚本,但是灵活度相比spark来说的话太弱基了,用scala函数式语言定制那是多么的方便,第二个是es在group by的

桶分页特别不好实现,也很麻烦,社区里面有一些 sql on elasticsearch 的框架,大家可以看看:https://github.com/NLPchina/elasticsearch-sql,只支持一

些简单的sql查询,不过像having这样的关键词是不支持的,跟sparksql是没法比的,基于以上原因,决定用spark试试看。

二:环境搭建

    搭建spark集群,需要hadoop + spark + java + scala,搭建之前一定要注意各自版本的对应关系,否则遇到各种奇葩的错误让你好受哈!!!不信去官网看

看: https://spark.apache.org/downloads.html 。

这里我采用的组合是:

hadoop-2.7.6.tar.gz

jdk-8u144-linux-x64.tar.gz

scala-2.11.0.tgz

spark-2.2.1-bin-hadoop2.7.tgz

jdk-8u144-linux-x64.tar.gz

mysql-connector-java-5.1.46.jar

sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz

使用3台虚拟机:一台【namenode +resourcemanager + spark master node】 + 二台 【datanode + nodemanager + spark work data】

192.168.2.227 hadoop-spark-master
192.168.2.119 hadoop-spark-salve1
192.168.2.232 hadoop-spark-salve2

1. 先配置三台机器的免ssh登录。

[[email protected] ~]# ssh-keygen -t rsa -P ‘‘
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
/root/.ssh/id_rsa already exists.
Overwrite (y/n)? y
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
0f:4e:26:4a:ce:7d:08:b0:7e:13:82:c6:10:77:a2:5d [email protected]
The key‘s randomart image is:
+--[ RSA 2048]----+
|. o E            |
| = +             |
|o o              |
|o. o             |
|.oo + . S        |
|.. = = * o       |
|  . * o o .      |
|   . . .         |
|                 |
+-----------------+
[[email protected] ~]# ls /root/.ssh
authorized_keys  id_rsa  id_rsa.pub  known_hosts
[[email protected] ~]# 

2. 然后将公钥文件 id_rsa.pub copy到另外两台机器,这样就可以实现hadoop-spark-master 免密登录到另外两台

slave上去了。

scp /root/.ssh/id_rsa.pub [email protected]192.168.2.119:/root/.ssh/authorized_keys
scp /root/.ssh/id_rsa.pub [email protected]192.168.2.232:/root/.ssh/authorized_keys
cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys

3. 在三台机器上增加如下的host映射。

[[email protected] ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.2.227   hadoop-spark-master
192.168.2.119   hadoop-spark-salve1
192.168.2.232   hadoop-spark-salve2

4.  然后就是把我列举的那些 tar.gz 下载下来之后,在/etc/profile中配置如下,然后copy到另外两台salves机器上。

[[email protected] ~]# tail -10 /etc/profile
export JAVA_HOME=/usr/myapp/jdk8
export NODE_HOME=/usr/myapp/node
export SPARK_HOME=/usr/myapp/spark
export SCALA_HOME=/usr/myapp/scala
export HADOOP_HOME=/usr/myapp/hadoop
export HADOOP_CONF_DIR=/usr/myapp/hadoop/etc/hadoop
export LD_LIBRARY_PATH=/usr/myapp/hadoop/lib/native:$LD_LIBRARY_PATH
export SQOOP=/usr/myapp/sqoop
export NODE=/usr/myapp/node
export PATH=$NODE/bin:$SQOOP/bin:$SCALA_HOME/bin:$HADOOP_HOME/bin:$HADOOP/sbin$SPARK_HOME/bin:$NODE_HOME/bin:$JAVA_HOME/bin:$PATH

5. 最后就是hadoop的几个配置文件的配置了。

《1》core-site.xml

[[email protected] hadoop]# cat core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
   <property>
      <name>hadoop.tmp.dir</name>
      <value>/usr/myapp/hadoop/data</value>
      <description>A base for other temporary directories.</description>
   </property>
   <property>
     <name>fs.defaultFS</name>
     <value>hdfs://hadoop-spark-master:9000</value>
   </property>
</configuration>

《2》 hdfs-site.xml :当然也可以在这里使用 dfs.datanode.data.dir 挂载多个硬盘:

[[email protected] hadoop]# cat hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>2</value>
  </property>
</configuration>

《3》 mapred-site.xml   这个地方将mapreduce的运作寄存于yarn集群。

[[email protected] hadoop]# cat mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property>
   <name>mapreduce.framework.name</name>
   <value>yarn</value>
  </property>
</configuration>

《4》 yarn-site.xml  【这里要配置resoucemanager的相关地址,方便slave进行连接,否则你的集群会跑不起来的】

[[email protected] hadoop]# cat yarn-site.xml
<?xml version="1.0"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->
<configuration>

<!-- Site specific YARN configuration properties -->
 <property>
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>
 </property>
 <property>
   <name>yarn.resourcemanager.address</name>
   <value>hadoop-spark-master:8032</value>
</property>
<property>
   <name>yarn.resourcemanager.scheduler.address</name>
   <value>hadoop-spark-master:8030</value>
</property>
<property>
   <name>yarn.resourcemanager.resource-tracker.address</name>
   <value>hadoop-spark-master:8031</value>
</property>
</configuration>

《5》 修改slaves文件,里面写入的各自salve的地址。

[[email protected]master hadoop]# cat slaves
hadoop-spark-salve1
hadoop-spark-salve2

《6》这些都配置完成之后,可以用scp把整个hadoop文件scp到两台slave机器上。

scp /usr/myapp/hadoop [email protected]:/usr/myapp/hadoop
scp /usr/myapp/hadoop [email protected]:/usr/myapp/hadoop

《7》因为hdfs是分布式文件系统,使用之前先给hdfs格式化一下,因为当前hadoop已经灌了很多数据,就不真的执行format啦!

[[email protected] bin]# ./hdfs namenode -format
[[email protected] bin]# pwd
/usr/myapp/hadoop/bin

《8》 然后分别开启 start-dfs.sh 和 start-yarn.sh ,或者干脆点直接执行 start-all.sh 也可以,不然后者已经是官方准备废弃的方式。

[[email protected]master sbin]# ls
distribute-exclude.sh  hdfs-config.sh           refresh-namenodes.sh  start-balancer.sh    start-yarn.cmd  stop-balancer.sh    stop-yarn.cmd
hadoop-daemon.sh       httpfs.sh                slaves.sh             start-dfs.cmd        start-yarn.sh   stop-dfs.cmd        stop-yarn.sh
hadoop-daemons.sh      kms.sh                   start-all.cmd         start-dfs.sh         stop-all.cmd    stop-dfs.sh         yarn-daemon.sh
hdfs-config.cmd        mr-jobhistory-daemon.sh  start-all.sh          start-secure-dns.sh  stop-all.sh     stop-secure-dns.sh  yarn-daemons.sh

《9》 记住,只要在hadoop-spark-master 节点开启 dfs 和yarn就可以了,不需要到其他机器。

[[email protected] sbin]# ./start-dfs.sh
Starting namenodes on [hadoop-spark-master]
hadoop-spark-master: starting namenode, logging to /usr/myapp/hadoop/logs/hadoop-root-namenode-hadoop-spark-master.out
hadoop-spark-salve2: starting datanode, logging to /usr/myapp/hadoop/logs/hadoop-root-datanode-hadoop-spark-salve2.out
hadoop-spark-salve1: starting datanode, logging to /usr/myapp/hadoop/logs/hadoop-root-datanode-hadoop-spark-salve1.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/myapp/hadoop/logs/hadoop-root-secondarynamenode-hadoop-spark-master.out
[[email protected] sbin]# ./start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to /usr/myapp/hadoop/logs/yarn-root-resourcemanager-hadoop-spark-master.out
hadoop-spark-salve1: starting nodemanager, logging to /usr/myapp/hadoop/logs/yarn-root-nodemanager-hadoop-spark-salve1.out
hadoop-spark-salve2: starting nodemanager, logging to /usr/myapp/hadoop/logs/yarn-root-nodemanager-hadoop-spark-salve2.out
[[email protected]master sbin]# jps
5671 NameNode
5975 SecondaryNameNode
6231 ResourceManager
6503 Jps

然后到其他两台slave上可以看到dataNode都开启了。

[[email protected] ~]# jps
5157 Jps
4728 DataNode
4938 NodeManager
[[email protected] ~]# jps
4899 Jps
4458 DataNode
4669 NodeManager

到此hadoop就搭建完成了。

三:Spark搭建

  如果仅仅是搭建spark 的 standalone模式的话,只需要在conf下修改slave文件即可,把两个work节点塞进去。

[[email protected] conf]# tail -5  slaves

# A Spark Worker will be started on each of the machines listed below
hadoop-spark-salve1
hadoop-spark-salve2

[[email protected]-spark-master conf]# pwd
/usr/myapp/spark/conf

然后还是通过scp 把整个conf文件copy过去即可,然后在sbin目录下执行start-all.sh 脚本即可。

[[email protected] sbin]# ./start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /usr/myapp/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop-spark-master.out
hadoop-spark-salve1: starting org.apache.spark.deploy.worker.Worker, logging to /usr/myapp/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop-spark-salve1.out
hadoop-spark-salve2: starting org.apache.spark.deploy.worker.Worker, logging to /usr/myapp/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop-spark-salve2.out
[[email protected]-spark-master sbin]# jps
6930 Master
7013 Jps
5671 NameNode
5975 SecondaryNameNode
6231 ResourceManager
[[email protected]-spark-master sbin]# 

然后你会发现slave1 和 slave2 节点上多了一个work节点。

[[email protected] ~]# jps
4728 DataNode
4938 NodeManager
5772 Jps
5646 Worker
[[email protected] ~]# jps
5475 Jps
4458 DataNode
4669 NodeManager
5342 Worker

接下来就可以看下成果啦。

http://hadoop-spark-master:50070/dfshealth.html#tab-datanode  这个是hdfs 的监控视图,可以清楚的看到有两个DataNode。

http://hadoop-spark-master:8088/cluster/nodes  这个是yarn的一个节点监控。

http://hadoop-spark-master:8080/  这个就是spark的计算集群。

四: 使用sqoop导入数据

  基础架构搭建之后,现在就可以借助sqoop将mysql的数据导入到hadoop中,导入的格式采用parquet 列式存储格式,不过这里要注意的一点就是一定要

把mysql-connector-java-5.1.46.jar 这个驱动包丢到 sqoop的lib目录下。

[[email protected] lib]# ls
ant-contrib-1.0b3.jar          commons-logging-1.1.1.jar      kite-data-mapreduce-1.1.0.jar        parquet-format-2.2.0-rc1.jar
ant-eclipse-1.0-jvm1.2.jar     hsqldb-1.8.0.10.jar            kite-hadoop-compatibility-1.1.0.jar  parquet-generator-1.6.0.jar
avro-1.8.1.jar                 jackson-annotations-2.3.1.jar  mysql-connector-java-5.1.46.jar      parquet-hadoop-1.6.0.jar
avro-mapred-1.8.1-hadoop2.jar  jackson-core-2.3.1.jar         opencsv-2.3.jar                      parquet-jackson-1.6.0.jar
commons-codec-1.4.jar          jackson-core-asl-1.9.13.jar    paranamer-2.7.jar                    slf4j-api-1.6.1.jar
commons-compress-1.8.1.jar     jackson-databind-2.3.1.jar     parquet-avro-1.6.0.jar               snappy-java-1.1.1.6.jar
commons-io-1.4.jar             jackson-mapper-asl-1.9.13.jar  parquet-column-1.6.0.jar             xz-1.5.jar
commons-jexl-2.1.1.jar         kite-data-core-1.1.0.jar       parquet-common-1.6.0.jar
commons-lang3-3.4.jar          kite-data-hive-1.1.0.jar       parquet-encoding-1.6.0.jar 

[[email protected]-spark-master lib]# pwd
/usr/myapp/sqoop/lib

接下来我们就可以导入数据了,我准备把db=zuanzhan ,table=dsp_customertag的表,大概155w的数据导入到hadoop的test路径中,因为是测试环

境没办法,文件格式为parquet列式存储。

[[email protected] lib]# [[email protected] bin]# sqoop import --connect jdbc:mysql://192.168.2.166:3306/zuanzhan --username admin --password 123456 --table dsp_customertag --m 1 --target-dir test --as-parquetfile
bash: [[email protected]master: command not found...
[[email protected]-spark-master lib]# sqoop import --connect jdbc:mysql://192.168.2.166:3306/zuanzhan --username admin --password 123456 --table dsp_customertag --m 1 --target-dir test --as-parquetfile
Warning: /usr/myapp/sqoop/bin/../../hbase does not exist! HBase imports will fail.
Please set $HBASE_HOME to the root of your HBase installation.
Warning: /usr/myapp/sqoop/bin/../../hcatalog does not exist! HCatalog jobs will fail.
Please set $HCAT_HOME to the root of your HCatalog installation.
Warning: /usr/myapp/sqoop/bin/../../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
Warning: /usr/myapp/sqoop/bin/../../zookeeper does not exist! Accumulo imports will fail.
Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.
18/05/29 00:19:40 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
18/05/29 00:19:40 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
18/05/29 00:19:40 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
18/05/29 00:19:40 INFO tool.CodeGenTool: Beginning code generation
18/05/29 00:19:40 INFO tool.CodeGenTool: Will generate java class as codegen_dsp_customertag
18/05/29 00:19:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `dsp_customertag` AS t LIMIT 1
18/05/29 00:19:42 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `dsp_customertag` AS t LIMIT 1
18/05/29 00:19:42 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/myapp/hadoop
Note: /tmp/sqoop-root/compile/0020f679e735b365bf96dabecb1611de/codegen_dsp_customertag.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
18/05/29 00:19:48 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-root/compile/0020f679e735b365bf96dabecb1611de/codegen_dsp_customertag.jar
18/05/29 00:19:48 WARN manager.MySQLManager: It looks like you are importing from mysql.
18/05/29 00:19:48 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
18/05/29 00:19:48 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
18/05/29 00:19:48 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
18/05/29 00:19:48 WARN manager.CatalogQueryManager: The table dsp_customertag contains a multi-column primary key. Sqoop will default to the column CustomerTagId only for this job.
18/05/29 00:19:48 WARN manager.CatalogQueryManager: The table dsp_customertag contains a multi-column primary key. Sqoop will default to the column CustomerTagId only for this job.
18/05/29 00:19:48 INFO mapreduce.ImportJobBase: Beginning import of dsp_customertag
18/05/29 00:19:48 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
18/05/29 00:19:50 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `dsp_customertag` AS t LIMIT 1
18/05/29 00:19:50 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `dsp_customertag` AS t LIMIT 1
18/05/29 00:19:51 WARN spi.Registration: Not loading URI patterns in org.kitesdk.data.spi.hive.Loader
18/05/29 00:19:53 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
18/05/29 00:19:53 INFO client.RMProxy: Connecting to ResourceManager at hadoop-spark-master/192.168.2.227:8032
18/05/29 00:19:57 INFO db.DBInputFormat: Using read commited transaction isolation
18/05/29 00:19:57 INFO mapreduce.JobSubmitter: number of splits:1
18/05/29 00:19:57 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527575811851_0001
18/05/29 00:19:57 INFO impl.YarnClientImpl: Submitted application application_1527575811851_0001
18/05/29 00:19:58 INFO mapreduce.Job: The url to track the job: http://hadoop-spark-master:8088/proxy/application_1527575811851_0001/
18/05/29 00:19:58 INFO mapreduce.Job: Running job: job_1527575811851_0001
18/05/29 00:20:07 INFO mapreduce.Job: Job job_1527575811851_0001 running in uber mode : false
18/05/29 00:20:07 INFO mapreduce.Job:  map 0% reduce 0%
18/05/29 00:20:26 INFO mapreduce.Job:  map 100% reduce 0%
18/05/29 00:20:26 INFO mapreduce.Job: Job job_1527575811851_0001 completed successfully
18/05/29 00:20:26 INFO mapreduce.Job: Counters: 30
    File System Counters
        FILE: Number of bytes read=0
        FILE: Number of bytes written=142261
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=8616
        HDFS: Number of bytes written=28954674
        HDFS: Number of read operations=50
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=10
    Job Counters
        Launched map tasks=1
        Other local map tasks=1
        Total time spent by all maps in occupied slots (ms)=16729
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=16729
        Total vcore-milliseconds taken by all map tasks=16729
        Total megabyte-milliseconds taken by all map tasks=17130496
    Map-Reduce Framework
        Map input records=1556209
        Map output records=1556209
        Input split bytes=87
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=1147
        CPU time spent (ms)=16710
        Physical memory (bytes) snapshot=283635712
        Virtual memory (bytes) snapshot=2148511744
        Total committed heap usage (bytes)=150994944
    File Input Format Counters
        Bytes Read=0
    File Output Format Counters
        Bytes Written=0
18/05/29 00:20:26 INFO mapreduce.ImportJobBase: Transferred 27.6133 MB in 32.896 seconds (859.5585 KB/sec)
18/05/29 00:20:26 INFO mapreduce.ImportJobBase: Retrieved 1556209 records.

然后可以在UI中看到有这么一个parquet文件。

五:使用python对spark进行操作

  之前使用scala对spark进行操作,使用maven进行打包,用起来不大方便,采用python还是很方便的,大家先要下载一个pyspark的安装包,一定要和spark

的版本对应起来。

你可以在master机器和开发机上直接安装 pyspark 2.2.1 模板,这样master机上执行就不需要通过pyspark-shell提交给spark集群了,下面我使用清华大学的

临时镜像下载的,毕竟官网的pip install不要太慢。

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark==2.2.1

下面就是app.py脚本,采用spark sql 的模式。

# coding=utf-8

import time;
import sys;
from pyspark.sql import SparkSession;
from pyspark.conf import SparkConf

# reload(sys);
# sys.setdefaultencoding(‘utf8‘);

logFile = "hdfs://hadoop-spark-master:9000/user/root/test/fbd52109-d87a-4f8c-aa4b-26fcc95368eb.parquet";

sparkconf = SparkConf();

# sparkconf.set("spark.cores.max", "2");
# sparkconf.set("spark.executor.memory", "512m");

spark = SparkSession.builder.appName("mysimple").config(conf=sparkconf).master(
    "spark://hadoop-spark-master:7077").getOrCreate();

df = spark.read.parquet(logFile);
df.createOrReplaceTempView("dsp_customertag");

starttime = time.time();

spark.sql("select TagName,TradeCount,TradeAmount from dsp_customertag").show();

endtime = time.time();

print("time:" + str(endtime - starttime));

spark.stop();

然后到shell上执行如下:

好了,本篇就说这么多了,你可以使用更多的sql脚本,输入数据量特别大还可以将结果再次写入到hdfs或者mongodb中给客户端使用,搭建过程中你可能会踩上

无数的坑,对于不能FQ的同学,你尽可以使用bing国际版 寻找答案吧!!!

原文地址:https://www.cnblogs.com/huangxincheng/p/9105748.html

时间: 2024-11-05 12:26:01

spark集群搭建整理之解决亿级人群标签问题的相关文章

Spark3000门徒第六课精通Spark集群搭建总结

今晚听了王家林老师的第六课精通Spark集群搭建和测试,课后作业是:搭建自己的spark环境并成功运行pi,我的总结如下: 1 硬件环境: 至少8GB内存,推荐金士顿内存,虚拟机推荐Ubuntu kylin版本,可以安装各种办公软件包括搜狗输入法.上网方式:Nat,root权限登录,避免权限问题 2.软件环境: RedHat 6.4  spark 1.6.0   hadoop 2.6.0   scala 2.11.8 3 /etc/hosts ip-hostname对应关系 spark.even

7.基于yarn的Spark集群搭建

构造分布式的Spark1.0.2集群 下载Scala 2.10.4,具体下载地址: http://www.scala-lang.org/download/2.10.4.html 在Ubuntu机器上Scala会帮助我们自动选择“scala-2.10.4.tgz”进行下载: 安装和配置Scala 我们需要在master.slave1以及slave2上分别安装Scala 安装Scala 将Scala安装包拷贝到各台机器上 解压 新建目录/usr/lib/scala 将上述解压之后的文件夹scala-

Spark集群搭建与测试(DT大数据梦工厂)

Spark流行的两种文件存储方式:1.Hadoop的HDFS:2.H3云存储 tux yarn  +HDFS是未来3.5年的趋势 看你用的是bash,可能ubuntu里的bash不会自动source /etc/profile,所以你将那条export命令放在~/.bashrc里试试 计算的集群和数据存储的集群不在同一个集群上的话,性能不高不可接受,tux yarn解决了这个问题,它用JAVA写的 ubuntu 设置root登录见http://jingyan.baidu.com/article/1

spark集群搭建

Spark集群环境搭建 2015年09月27日中秋节,祝中秋快乐团圆 1安装jdk 略 2安装scala 在http://www.scala-lang.org scala官网下载安装包,这里以scala-2.11.7为例: 1)下载scala-2.11.7.tgz 2)在目录下解压缩: tar -xzvf scala-2.11.7.tgz 3)配置环境变量,在/etc/profile中添加以下的内容: export SCALA_HOME=实际安装路径 export PATH=${SCALA_HO

大数据:spark集群搭建

创建spark用户组,组ID1000 groupadd -g 1000 spark 在spark用户组下创建用户ID 2000的spark用户 获取视频中文档资料及完整视频的伙伴请加QQ群:947967114useradd -u 2000 -g spark spark 设置密码 passwd spark 修改sudo权限 chmod u+w /etc/sudoers vi /etc/sudoers 找到 root ALL=(ALL) ALL 添加 spark ALL=(ALL) ALL 创建一个

spark学习笔记-spark集群搭建(7)

安装spark包 1 1.将spark-1.3.0-bin-hadoop2.4.tgz使用WinSCP上传到/usr/local目录下. 2 2.解压缩spark包:tar zxvf spark-1.3.0-bin-hadoop2.4.tgz. 3 3.更改spark目录名:mv spark-1.3.0-bin-hadoop2.4 spark 4 4.设置spark环境变量 5 vi .bashrc 6 export SPARK_HOME=/usr/local/spark 7 export PA

【Spark-core学习之三】 Spark集群搭建

环境 虚拟机:VMware 10 Linux版本:CentOS-6.5-x86_64 客户端:Xshell4 FTP:Xftp4 jdk1.8 scala-2.10.4(依赖jdk1.8) spark-1.6 组建方案:master:PCS101,slave:PCS102.PCS103 搭建方式一:Standalone 步骤一:解压文件 改名 [[email protected] src]# tar -zxvf spark-1.6.0-bin-hadoop2.6.tgz -C /usr/loca

spark 集群搭建

1. 解压spark安装包, 重命名(版本使用: hadoop-2.7.1.tar.gz + spark-2.3.0-bin-hadoop2.7.tgz) 2. 进入 /home/spark/spark-2.2.0/conf 目录3. cp slaves.template slaves 4. vim slaves 将结尾的 localhost(伪分布式) 修改成自己的work节点, 比如 hadoop2 hadoop3(每行一条), 保存退出 5. cp spark-env.sh.templat

zhihu spark集群

spark集群中的节点可以只处理自身独立数据库里的数据,然后汇总吗?修改 我将spark搭建在两台机器上,其中一台既是master又是slave,另一台是slave,两台机器上均装有独立的mongodb数据库.我是否可以让它们只统计自身数据库的内容,然后将结果汇总到一台服务器上的数据库里?目前我的代码如下,但是最终只统计了master里的数据,另一个worker没有统计上. val config = new Configuration() //以下代码表示只统计本机数据库上的数据,猜测问题可能出