Spark下生成测试数据,并在Spark环境下使用BulkProcessor将测试数据入库到ES6.4.2

Spark下生成2000w测试数据(每条记录150列)

使用spark生成大量数据过程中遇到问题,如果sc.parallelize(fukeData, 64);的记录数特别大比如500w,1000w时,会特别慢,而且会抛出内存溢出over head错误。解决方案,一次生成的数据量不高于100w,多次调用,这样下来一共生成2000w耗时十几分钟。

如果环境允许你可以在本地生成测试数据,然后上传到hdfs供spark测试。

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;

public class FileGenerate {
    public static void main(String[] args) throws IOException {
        BufferedWriter writer = new BufferedWriter(new FileWriter("d://test.csv"));

        List<String> fukeData = new ArrayList<String>();
        for (int i = 1; i <= 20000000; i++) {
            fukeData.add(String.valueOf(i));
        }

        List<String> fields = new ArrayList<String>();

        fields.add("id");//
        fields.add("object_id"); //
        fields.add("scan_start_time"); //
        fields.add("scan_stop_time");//
        fields.add("insert_time");//
        fields.add("enodeb_id");
        for (int i = 0; i < 145; i++) {
            fields.add("mr_tadv_" + (i < 10 ? "0" + i : i));
        }
        writer.write(String.join(",", fields) + "\r\n");

        // 假设有1w个小区,数据一共200w条记录,那么每个小区有200条记录。
        Random random = new Random();
        for (String id : fukeData) {
            List<String> rowItems = new ArrayList<String>();
            // id
            int intId = Integer.valueOf(id);
            rowItems.add(id);
            if (intId % 100000 == 0) {
                System.out.println(intId);
                writer.flush();
            }
            // object_id
            String objectId = String.valueOf((intId % 10000) + 10000 * 256 + 1);
            rowItems.add(objectId);

            int hour = random.nextInt(5) + 2;
            int minute = random.nextInt(59) + 1;
            int second_start = random.nextInt(30) + 1;
            int second_stop = second_start + 15;
            String scan_start_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_start;
            String scan_stop_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_stop;
            // scan_start_time
            rowItems.add(scan_start_time);
            // scan_stop_time
            rowItems.add(scan_stop_time);

            // insert_time
            rowItems.add(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

            // enodeb_id
            rowItems.add(String.valueOf((int) Integer.valueOf(objectId) / 256));

            for (int i = 0; i < 145; i++) {
                rowItems.add(String.valueOf(random.nextInt(100)));
            }

            writer.write(String.join(",", rowItems) + "\r\n");
        }

        writer.flush();
        writer.close();
    }
}

如下代码是spark2.2.0环境下生成2000w测试数据代码:

public class ESWriterTest extends Driver implements Serializable {
    private static final long serialVersionUID = 1L;
    private ExpressionEncoder<Row> encoder = null;
    private StructType type = null;    private String hdfdFilePath = "/user/my/streaming/test_es/*";
public ESWriterTest() {
    }

    @Override
    public void run() {
        initSchema();
        generateTestData();    

        sparkSession.stop();
    }

    private void initSchema() {
        type = DataTypes.createStructType(Arrays.asList(//
                DataTypes.createStructField("id", DataTypes.StringType, true), //
                DataTypes.createStructField("object_id", DataTypes.StringType, true), //
                DataTypes.createStructField("scan_start_time", DataTypes.StringType, true), //
                DataTypes.createStructField("scan_stop_time", DataTypes.StringType, true), //
                DataTypes.createStructField("insert_time", DataTypes.StringType, true), //
                DataTypes.createStructField("enodeb_id", DataTypes.StringType, true)));
        for (int i = 0; i < 145; i++) {
            type = type.add("mr_tadv_" + (i < 10 ? "0" + i : i), DataTypes.StringType);
        }
        encoder = RowEncoder.apply(type);
    }

    private void generateTestData() {
        generateData("/user/my/streaming/test_es/1/");
        generateData("/user/my/streaming/test_es/2/");
        generateData("/user/my/streaming/test_es/3/");
        generateData("/user/my/streaming/test_es/4/");
        generateData("/user/my/streaming/test_es/5/");
        generateData("/user/my/streaming/test_es/6/");
        generateData("/user/my/streaming/test_es/7/");
        generateData("/user/my/streaming/test_es/8/");
        generateData("/user/my/streaming/test_es/9/");
        generateData("/user/my/streaming/test_es/10/");
        generateData("/user/my/streaming/test_es/11/");
        generateData("/user/my/streaming/test_es/12/");
        generateData("/user/my/streaming/test_es/13/");
        generateData("/user/my/streaming/test_es/14/");
        generateData("/user/my/streaming/test_es/15/");
        generateData("/user/my/streaming/test_es/16/");
        generateData("/user/my/streaming/test_es/17/");
        generateData("/user/my/streaming/test_es/18/");
        generateData("/user/my/streaming/test_es/19/");
        generateData("/user/my/streaming/test_es/20/");

        // 支持的文件格式有:text、csv、json、parquet。
        StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("value", DataTypes.StringType, true)));
        Dataset<Row> rows = sparkSession.read().format("text").schema(structType).load(hdfdFilePath);
        rows.printSchema();
        rows.show(10);
    }

    private void generateData(String hdfsDataFilePath) {
        List<Row> fukeData = new ArrayList<Row>();
        for (int i = 1; i <= 1000000; i++) {
            fukeData.add(RowFactory.create(String.valueOf(i)));
        }

        StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("id", DataTypes.StringType, false)));
        JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
        JavaRDD<Row> javaRDD = sc.parallelize(fukeData, 64);
        Dataset<Row> fukeDataset = sparkSession.createDataFrame(javaRDD, structType);

        Random random = new Random();
        // 假设有1w个小区,数据一共200w条记录,那么每个小区有200条记录。
        Dataset<Row> rows = fukeDataset.mapPartitions(new MapPartitionsFunction<Row, Row>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Iterator<Row> call(Iterator<Row> idItems) throws Exception {
                List<Row> newRows = new ArrayList<Row>();
                while (idItems.hasNext()) {
                    String id = idItems.next().getString(0);
                    List<Object> rowItems = new ArrayList<Object>();
                    // id
                    int intId = Integer.valueOf(id);
                    rowItems.add(id);

                    // object_id
                    String objectId = String.valueOf((intId % 10000) + 10000 * 256 + 1);
                    rowItems.add(objectId);

                    int hour = random.nextInt(5) + 2;
                    int minute = random.nextInt(59) + 1;
                    int second_start = random.nextInt(30) + 1;
                    int second_stop = second_start + 15;
                    String scan_start_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_start;
                    String scan_stop_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_stop;
                    // scan_start_time
                    rowItems.add(scan_start_time);
                    // scan_stop_time
                    rowItems.add(scan_stop_time);

                    // insert_time
                    rowItems.add(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

                    // enodeb_id
                    rowItems.add(String.valueOf((int) Integer.valueOf(objectId) / 256));

                    for (int i = 0; i < 145; i++) {
                        rowItems.add(String.valueOf(random.nextInt(100)));
                    }

                    newRows.add(RowFactory.create(rowItems.toArray()));
                }

                return newRows.iterator();
            }
        }, encoder);

        rows.toJavaRDD().repartition(20).saveAsTextFile(hdfsDataFilePath);
    }
}

Spark环境下使用BulkProcessor将测试数据入库到ES6.4.2

下边是Spark2.2.0环境下,使用BulkProcessor方式插入2000w条记录到ES6.4.2下的测试代码,测试代码调测过程中发现问题:不能再ForeachPartitionFunction的call函数中调用client.close(),和bulkProcessor.close();函数,否则会抛出异常:原因这个client可能是多个executor共用。

    private ExpressionEncoder<Row> encoder = null;
    private StructType type = null;
    private String hdfdFilePath = "/user/my/streaming/test_es/*";

    public static void main(String[] args) {
        initSchema();

        StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("value", DataTypes.StringType, true)));
        Dataset<Row> lines = sparkSession.read().format("text").schema(structType).load(hdfdFilePath);

        Dataset<Row> rows = lines.map(new MapFunction<Row, Row>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Row call(Row value) throws Exception {
                List<Object> itemsList = new ArrayList<Object>();
                String line = value.getAs("value");
                String[] fields = line.split(",");

                for (String filed : fields) {
                    itemsList.add(filed);
                }

                return RowFactory.create(itemsList.toArray());
            }
        }, encoder);

        rows.show(10);

        rows.toJSON().foreachPartition(new EsForeachPartitionFunction());

        sparkSession.stop();
    }

    private void initSchema() {
        type = DataTypes.createStructType(Arrays.asList(//
                DataTypes.createStructField("id", DataTypes.StringType, true), //
                DataTypes.createStructField("object_id", DataTypes.StringType, true), //
                DataTypes.createStructField("scan_start_time", DataTypes.StringType, true), //
                DataTypes.createStructField("scan_stop_time", DataTypes.StringType, true), //
                DataTypes.createStructField("insert_time", DataTypes.StringType, true), //
                DataTypes.createStructField("enodeb_id", DataTypes.StringType, true)));
        for (int i = 0; i < 145; i++) {
            type = type.add("mr_tadv_" + (i < 10 ? "0" + i : i), DataTypes.StringType);
        }
        encoder = RowEncoder.apply(type);
    }

EsForeachPartitionFunction.java

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.spark.api.java.function.ForeachPartitionFunction;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

public class EsForeachPartitionFunction implements ForeachPartitionFunction<String> {
    private static final long serialVersionUID = 1L;

    @Override
    public void call(Iterator<String> rows) throws Exception {
        TransportClient client = null;
        BulkProcessor bulkProcessor = null;
        try {
            client = getClient();
            bulkProcessor = getBulkProcessor(client);
        } catch (Exception ex) {
            System.out.println(ex.getMessage() + "\r\n" + ex.getStackTrace());
        }
        Map<String, Object> mapType = new HashMap<String, Object>();

        while (rows.hasNext()) {
            @SuppressWarnings("unchecked")
            Map<String, Object> map = new com.google.gson.Gson().fromJson(rows.next(), mapType.getClass());
            bulkProcessor.add(new IndexRequest("twitter", "tweet").source(map));
        }

        try {
            // Flush any remaining requests
            bulkProcessor.flush();
            System.out.println("--------------------------------bulkProcessor.flush(); over...------------------------");

        } catch (Exception ex) {
            System.out.println("" + ex.getMessage() + "\r\n" + ex.getStackTrace());
        }

        try {
            // Or close the bulkProcessor if you don‘t need it anymore
            bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
            System.out.println("--------------------------------bulkProcessor.awaitClose(10, TimeUnit.MINUTES); over...------------------------");
        } catch (Exception ex) {
            System.out.println("" + ex.getMessage() + "\r\n" + ex.getStackTrace());
        }
    }

    private BulkProcessor getBulkProcessor(TransportClient client) {
        BulkProcessor bulkProcessor = BulkProcessor//
                .builder(client, new BulkProcessor.Listener() {
                    @Override
                    public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) {
                        // TODO Auto-generated method stub
                        System.out.println("结束afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2)。。。。");
                    }

                    @Override
                    public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2) {
                        // TODO Auto-generated method stub
                        System.out.println("结束afterBulk(long arg0, BulkRequest arg1, Throwable arg2)。。。。");
                        System.out.println(arg1.numberOfActions() + " data bulk failed,reason :" + arg2);
                    }

                    @Override
                    public void beforeBulk(long arg0, BulkRequest arg1) {
                        // TODO Auto-generated method stub
                        System.out.println("开始。。。。");
                    }
                }) //
                .setBulkActions(10000)//
                .setBulkSize(new ByteSizeValue(64, ByteSizeUnit.MB))//
                .setFlushInterval(TimeValue.timeValueSeconds(5))//
                .setConcurrentRequests(1)//
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//
                .build();
        return bulkProcessor;
    }

    private TransportClient getClient() {
        Settings settings = Settings.builder()//
                .put("cluster.name", "es") //
                .put("client.transport.sniff", true)//
                .build();

        PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings);

        TransportClient client = preBuiltTransportClient;
        // 10.205.201.97,10.205.201.98,10.205.201.96,10.205.201.95
        try {
            client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.97"), 9300));
            client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.98"), 9300));
            client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.96"), 9300));
            client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.95"), 9300));
        } catch (UnknownHostException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }

        return client;
    }
}

依赖pom.xml

        <!--Spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.4.2</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.4.2</version>
        </dependency>

测试速度有点低3500条记录/s

关于ES+SPARK如何优化的文章请参考:

Elasticsearch进阶(一)写入性能基准测试写入性能优化(56小时到5小时),chunk_size探讨

ElasticSearch写入性能优化

Elasticsearch写入性能优化

elasticsearch写入优化记录,从3000到8000/s

Spark2.x写入Elasticsearch的性能测试

原文地址:https://www.cnblogs.com/yy3b2007com/p/9885040.html

时间: 2024-10-09 22:42:44

Spark下生成测试数据,并在Spark环境下使用BulkProcessor将测试数据入库到ES6.4.2的相关文章

Spark Streaming性能优化: 如何在生产环境下应对流数据峰值巨变

1.为什么引入Backpressure 默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔.这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置

生产环境下was不允许重启,怎么办?

前段时间上线,遇到一个jndi的故障问题,怎么个问题呢?就是原在测试环境下没有问题,而在生产环境下无法连接生产数据库,当时找到问题所在,就是ibm工具自动生成一个在测试环境下连接的jndi的资源文件resources.xml,当时删除了,重启了server,无效.后来我考虑到这肯定是was缓存造成,因此想象缓存造成的原因,最后在测试环境下重启了was,问题解决了,但后来说生产环境是不可能重启was的,因此暂时困老了本人,后来所谓的领导说,他去找总架构师看有没有办法解决,可是时间不等人,过了2天依

Win环境下的文件读写

在win环境下,有许多方法可以对文件进行读写操作,如MFC 中的CFile类,及一些开源的项目如QT中的QFile.开源的好得是可以多平台,而MFC只是微软自家的东西,对于想写跨平台的人,最好不用MFC. 最近在写开发时,突然碰到了一个问题,也是与读写文件有关,不过用的是C的方法,而不是C++,问题的表现是用C 中的Open创建的文件都是只读的,平常很少用这个方法所在网上找了下,才发现这个函数还有一个权限参数,默认是只读.现将C方式下的两种文件操作归纳下 open比起fopen是更低级别的IO操

Hadoop 2.2.0 常见问题之:Ubuntu 64环境下“Unable to load native-hadoop library for your platform”问题”

问题 最近在学习Hadoop(2.2.0),打算写一个MapReduce的小程序在Ubuntu 64位的环境下测试一把,一切环境配置完毕后,执行的过程中,控制台输出下面的内容: WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 原因 在网上搜索了一番,得出如下结论: "The reason

集群环境下定时任务调度问题与方案探讨

摘要 问题:从单机扩展到集群 方案一:不做改造,直接扩展 方案二:多处调度.一处执行 方案三:一处调度.一处执行 方案四:一处调度.多处执行 方案五:多处调度.多处执行 摘要 从改造工作量.可用性.负载均衡.资源利用等方面,简单介绍了几种集群环境下定时任务调度的方案. 问题:从单机扩展到集群 单机环境的定时任务很简单.无论是用比较原始的Timer,还是用自成体系的quartz.spring-scheduler,都可以轻松写意的实现功能. 但是,当应用水平扩展到集群环境下时, 定时任务会出现重复调

window7下使用vagrant打造lamp开发环境(一)

前言: 公司电脑只有win7 + (xampp || wamp)开发,处于个人爱好,想学习下在最舒服的开发环境下开发,也不想安装双系统,想用Vmware电脑却配置不高,vagrant+virtual box给我们提供了很好的出路,占用的系统资源非常少,开发起来非常舒服,下面给大家分享我的安装过程,Linux我采用了目前最无解的CentOs(linux儿子.免费) 一.准备工作: 1:下载Vagrant:https://www.vagrantup.com/downloads.html 2:下载Vi

NLPIR分词工具的使用(java环境下)

一.NLPIR是什么? NLPIR(汉语分词系统)由中科大张华平博士团队开发,主要功能包括:中文分词,词性标注,命名实体识别,用户词典功能,详情见官网:http://ictclas.nlpir.org/. 二.java环境下的使用: 主要参考了如下资料:http://www.360doc.com/content/14/0926/15/19424404_412519063.shtml 下面是个人的使用方法,仅供参考 1.下载NLPIR工具包,链接如下:http://ictclas.nlpir.or

Python----Windows环境下安装Flask

Flask是Python中web开发的一个轻框架,掌握起来比较简单,想体验一下,先从安装Flask开始. 我是在Widows环境下安装的Flask,在Linux环境下也一样,无非就是安装几个依赖和开发的虚拟环境. 1.安装Python环境,并设置环境变量.我安装的是Python2.7,在E盘根目录下. E:\Python27 E:\Python27\Scripts 2.安装虚拟环境virtualenv并创建自己的虚拟环境myvir,实现环境隔离.可以用pip.easy_install等安装,我是

Linux环境下进行web增量部署

协同开发时,需要打"补丁包",其实"补丁包"就是"增量部署"-----在原先功能的基础上对项目模块进行升级. 1.linux环境下为Java project项目打补丁 将编译好的*.class文件直接放到Linux原先运行环境(目录)下进行处理. 2.Linux环境下对Java web project项目打补丁 首先建立与运行环境下面一致的目录(如下示例中的WEB-INFO---->classes---->com),将编译好的*.cl

cocos2d-x js在web和jsb环境下的兼容性差异

最近一个项目,web下测试正常,在jsb环境下bug多多,记录解决方案以备后查 一.cc.PhysicsSprite创建的物理精灵在添加到layer后,移动layer,在jsb环境下贴图不随着layer移动而移动 解决方案:将创建PhysicsSprite的精灵贴图整合进一张大图中,使用SpriteBatchNode加载,将创建的sprite添加进SpriteBatchNode中 this.spriteSheet = new cc.SpriteBatchNode(res.sltq0_png);