基于Spark和Tensorflow构建DCN模型进行CTR预测

实验介绍

数据采用Criteo Display Ads。这个数据一共11G,有13个integer features,26个categorical features。

Spark

由于数据比较大,且只在一个txt文件,处理前用split -l 400000 train.txt对数据进行切分。

连续型数据利用log进行变换,因为从实时训练的角度上来判断,一般的标准化方式,如Z-Score和最大最小标准化中用到的值都跟某一批数据的整体统计结果有关,换一批数据后标准化就程度就不一样了。

而对于离散型分类数据,一般企业应该都会有类别表而不需要自己从数据中获取(这样能节省计算时间,而且流处理下只能针对特定批量或者时间段出现的数据进行数字编码,所以对超出该批量和时间的新类别就无法进行编码了)。虽然如此,如果在离线情况且真的需要自己从数据中提取类别并进行编码,比如现在这种情况,最直接的方法是使用ML模块的StringIndexer。这个工具方面使用,但是对于数据类别过多或者需要进行编码的列数量较多时容易出现OOM。通过StringIndexer的源码可以知道,它的实现是先利用rdd的countByValue得出特定列的统计map,然后出现频率最大的编码为0,第二的为1,如此类推。另外,它会copy这个map,而StringIndexer本身并没有提供删除这个map的方法,所以如果出现上述数据类别过多或者需要进行编码的列数量较多便会积累大量的map。而刚好这份数据有26种类别数据,且某些类别的种类居然能有三百多万种,所以只能另辟蹊径。下面的方法效仿StringIndexer的部分实现来达到目的,而且运行效率比之前有了很大的提升。当然,由于某些类别出现的频率很低,也可以采取一些cutoff措施,比如该用countByValue,只保留前n个类别,或者保留频率在某个数值以上的类别。

下面实现考虑cutoff,出现次数少于万分之一的类别统一归类为UNK。

val spark = SparkSession
   .builder()
   .master("local[*]")
    // 这里的driver.memory和memory.fraction只做展示,实际使用中要在driver启动前设置才有效。即如果在idea中想增大driver的大小,这需要在VM option中设置堆大小。另外,local模式下设置提高driver大小即可,因为executor也是在同一个JVM进程中。
   .config("spark.driver.memory", 5G)
   .config("spark.sql.shuffle.partitions", 12)
   .config("spark.default.parallelism", 12)
   .config("spark.memory.fraction", 0.75)
   .getOrCreate()

import org.apache.spark.sql.functions._
val path = ""
// 数据源是txt文件,但可以通过csv来推断格式
val df = spark.read
  .option("header", false)
  .option("delimiter", "\t")
  .option("inferSchema", true)
  .format("csv")
  .load(path + "..")
// 如果内存够大,先把它全部加载到内存,减少IO
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
val dataSize = df.count()
val cutoff = dataSize * 0.0001

val numCols = (1 until 14).map(i => s"_c$i").toArray

var df1 = df
numCols.foreach(column => {
  df1 = df1.withColumn(column, when(col(column).isNull, 0).otherwise(log(col(column) + 10)))
})

val catCols = (14 until 40).map(i => s"_c$i")
var df2 = df1

// 所有cat列统一编码
// 使用java的map,通常java的集合比scala的更效率,而且java的hashmap能够初始化大小
val inderMap: util.HashMap[String, util.HashMap[String, Int]] = new util.HashMap(catCols.length)
var i = 0
for (column <- catCols) {
  val uniqueElem = df2.select(column)
    .groupBy(column)
    .agg(count(column))
    .filter(col(s"count($column)") >= cutoff)
    .select(column)
    .map(_.getAs[String](0))
    .collect()

  val len = uniqueElem.length
  var index = 0
  val freqMap = new util.HashMap[String, Int](len)

  while (index < len){
    freqMap.put(uniqueElem(index), i)
    index += 1
    i += 1
  }
  freqMap.put("UNK", i)
  i += 1
  inderMap.put(column, freqMap)
}

val bcMap = spark.sparkContext.broadcast(inderMap)

for (column <- catCols) {
  val Indexer = udf { elem: String =>
    val curMap = bcMap.value.get(column)
    if (elem == null || !curMap.containsKey(elem)) curMap.get("UNK")
    else curMap.get(elem)
  }
  df2 = df2.withColumn(column + "_e", Indexer(col(column))).drop(column)
}
// 如需要划分训练集和测试集
val Array(train, test) = df2.randomSplit(Array(0.9, 0.1))

// parquet输出
df2.write
   .mode("overwrite")
   .save(path + "res/")
// txt输出
df2.map(x => x.mkString(","))
  .write
  .mode("overwrite")
  .format("text")
  .save(path + "res/txt")
// 后面tensorflow需要
print("The total dimension of all categorical features is " + i) // 14670

Tensorflow

下面代码大致介绍深度学习的整体流程,生产环境的代码需要做一定的修改,可以参考“https://github.com/yangxudong/deeplearning/tree/master/DCN”和“https://github.com/lambdaji/tf_repos/tree/master/deep_ctr”两个GitHub的实现

大致流程:定义数据输入函数input_fn,然后开始规划模型和它的训练和测试operation,最后是执行阶段的代码。

input function

def input_fn(filenames, batch_size=32):

    def _parse_line(line):
        fields = tf.decode_csv(line,FIELD_DEFAULTS)
        label = fields[0]
        num_features = fields[1:14]
        cat_features = fields[14:]

        return num_features, cat_features, label

    num_features, cat_features, label = tf.data.TextLineDataset(filenames)    .repeat(2)    .prefetch(1024)    .batch(batch_size)    .map(_parse_line, num_parallel_calls=2)    .make_one_shot_iterator()    .get_next()

    return num_features, cat_features, label

数据和模型的变量

# 构建一些input需要用到的参数。
NUM_COLUMNS = ["c%d" % i for i in range(14)]
CAT_COLUMNS = ["c%d" % i for i in range(14,40)]

FIELD_DEFAULTS = []
for i in range(14):
    FIELD_DEFAULTS.append([0.0])
for i in range(14,40):
    FIELD_DEFAULTS.append([0]) 

filenames = []
for i in range(24):
    ...

# 本地调试
num_col = 13
cat_col = 26
cat_size = 14670
embedding_size = 12
cross_layers = 3
deep_layers = [200,100,33]
label_size = 1
learning_rate = 0.0005

DCN模型

# DCN模型的构建,这里利用LOW Level API,实际上按照custom Estimator的流程会更好。
with tf.name_scope("DCN_model"):
    he_init = tf.variance_scaling_initializer()
    with tf.name_scope("Embedding_layer"):
        x1, x2, label = input_fn(filenames,32)
        Embed_W = tf.get_variable(name=‘embed_w‘, shape=[cat_size, embedding_size],
                                  initializer=he_init)  # TC * E
        embeddings = tf.nn.embedding_lookup(Embed_W, x2) # ? * C * E
        oned_embed = tf.reshape(embeddings, shape=[-1, cat_col * embedding_size])  # ? * (C * E)
        embed_layer_res = tf.concat([x1, oned_embed], 1)  # ? * (N + C * E)

    with tf.name_scope("Cross_Network"):
        x0 = embed_layer_res
        cross_x = embed_layer_res
        for level in range(cross_layers):
            Cross_W = tf.get_variable(name=‘cross_w%s‘ % level, shape=[num_col + cat_col * embedding_size, 1],
                                      initializer=he_init)  # (N + C * E) * 1
            Cross_B = tf.get_variable(name=‘cross_b%s‘ % level, shape=[1,num_col + cat_col * embedding_size],
                                      initializer=he_init)  # (N + C * E) * 1
            xtw = tf.matmul(cross_x, Cross_W)  # ? * 1
            cross_x = x0 * xtw + cross_x + Cross_B  # (N + C * E) * 1

    with tf.name_scope("Deep_Network"):
        deep_x = embed_layer_res
        for neurons in deep_layers:
            deep_x = tf.layers.dense(inputs=deep_x, units=neurons, name=‘deep_%s‘ % neurons,
                                     activation=tf.nn.selu, kernel_initializer=he_init)

    with tf.variable_scope("Output-layer"):
        x_stack = tf.concat([cross_x, deep_x], 1)  # ? * ((N + C * E) + deep_layers[-1])
        logits = tf.layers.dense(inputs=x_stack, units=label_size, name="outputs")
        z = tf.reshape(logits, shape=[-1])
        pred = tf.sigmoid(z)

训练和评估指标

with tf.name_scope("loss"):
    xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=label, logits=z)
    loss = tf.reduce_mean(xentropy, name="loss")
    loss_summary = tf.summary.scalar(‘log_loss‘, loss)

with tf.name_scope("train"):
    optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8)
    training_op = optimizer.minimize(loss)

with tf.name_scope("eval"):
    acc, upacc = tf.metrics.accuracy(label, tf.math.round(pred))
    auc, upauc = tf.metrics.auc(label, pred)
    acc_summary = tf.summary.scalar(‘accuracy‘, upacc)
    auc_summary = tf.summary.scalar(‘auc‘, upauc)

TensorBroad相关设置,optional

from datetime import datetime

def log_dir(prefix=""):
    now = datetime.utcnow().strftime("%Y%m%d%H%M%S")
    root_logdir = "tf_logs"
    if prefix:
        prefix += "-"
    name = prefix + "run-" + now
    return "{}/{}/".format(root_logdir, name)

logdir = log_dir("my_dcn")
file_writer = tf.summary.FileWriter(logdir, tf.get_default_graph())

执行阶段

# 包含checkpoint、early stop。同样,这里利用LOW Level API,实际上按照custom Estimator的流程会更好。
n_epochs = 2
data_size = 12000000
batch_size = 64
n_batches = int(np.ceil(data_size / batch_size))

checkpoint_path = ".../model/my_dcn_model.ckpt"
checkpoint_epoch_path = checkpoint_path + ".epoch"
final_model_path = "./my_deep_mnist_model"

best_auc = np.infty
epochs_without_progress = 0
max_epochs_without_progress = 20

saver = tf.train.Saver()
gb_init = tf.global_variables_initializer()
lc_init = tf.local_variables_initializer()
with tf.Session() as sess:
    if os.path.isfile(checkpoint_epoch_path):
        with open(checkpoint_epoch_path, "rb") as f:
            start_epoch = int(f.read())
        print("Training was interrupted. Continuing at epoch", start_epoch)
        saver.restore(sess, checkpoint_path)
    else:
        start_epoch = 0
        sess.run([gb_init,lc_init])

    for epoch in range(start_epoch, n_epochs):
        for batch_index in range(n_batches):
            # 每2000批数据测试一遍
            if batch_index % 2000 != 0:
                sess.run(training_op)
            else:
                loss_tr, loss_summary_str, up1, up2, acc_summary_str, auc_summary_str = sess.run([loss, loss_summary, upacc, upauc, acc_summary, auc_summary])
                print("Epoch:", epoch, ",Batch_index:", batch_index,
                      "\tLoss: {:.5f}".format(loss_tr),
                      "\tACC: ", up1,
                      "\tAUC", up2)
                file_writer.add_summary(acc_summary_str, batch_index)
                file_writer.add_summary(auc_summary_str, batch_index)
                file_writer.add_summary(loss_summary_str, batch_index)
                if batch_index % 5000 == 0:
                    saver.save(sess, checkpoint_path)
                    with open(checkpoint_epoch_path, "wb") as f:
                        f.write(b"%d" % (epoch + 1))
                    if up2 < best_auc:
                        saver.save(sess, final_model_path)
                        best_auc = up2
                    else:
                        epochs_without_progress += 1
                        if epochs_without_progress > max_epochs_without_progress:
                            print("Early stopping")
                            break

参考:

玩转企业级Deep&Cross Network模型你只差一步

原文地址:https://www.cnblogs.com/code2one/p/10343134.html

时间: 2024-11-09 00:31:29

基于Spark和Tensorflow构建DCN模型进行CTR预测的相关文章

Spark学习笔记——构建分类模型

Spark中常见的三种分类模型:线性模型.决策树和朴素贝叶斯模型. 线性模型,简单而且相对容易扩展到非常大的数据集:线性模型又可以分成:1.逻辑回归:2.线性支持向量机 决策树是一个强大的非线性技术,训练过程计算量大并且较难扩展(幸运的是,MLlib会替我们考虑扩展性的问题),但是在很多情况下性能很好: 朴素贝叶斯模型简单.易训练,并且具有高效和并行的优点(实际中,模型训练只需要遍历所有数据集一次).当采用合适的特征工程,这些模型在很多应用中都能达到不错的性能.而且,朴素贝叶斯模型可以作为一个很

基于Spark的异构分布式深度学习平台

导读:本文介绍百度基于Spark的异构分布式深度学习系统,把Spark与深度学习平台PADDLE结合起来解决PADDLE与业务逻辑间的数据通路问题,在此基础上使用GPU与FPGA异构计算提升每台机器的数据处理能力,使用YARN对异构资源做分配,支持Multi-Tenancy,让资源的使用更有效. 深层神经网络技术最近几年取得了巨大的突破,特别在语音和图像识别应用上有质的飞跃,已经被验证能够使用到许多业务上.如何大规模分布式地执行深度学习程序,使其更好地支持不同的业务线成为当务之急.在过去两年,百

基于Spark构建开放式的云计算平台第一阶段课程

在2014年6月30日到7月2日举行的Spark Summit是整个云计算大数据领域的Big Event,在会议上DataBricks公司提出了构建开放的Cloud平台,而且宣布该平台完全基于Spark,该平台功能类似于EC2,但比EC2更快.更灵活.更易用. 构建一个开发的云服务平台,需要存储技术.计算平台.消息驱动框架和开发API架构设计等,所以我们把课程主要分为两个阶段:1,Spark技术实战:2,构建开发云平他的消息驱动框架和开放API设计实现: 本课程是是整个系列课程的第一阶段课程,采

基于Spark ALS构建商品推荐引擎

基于Spark ALS构建商品推荐引擎 一般来讲,推荐引擎试图对用户与某类物品之间的联系建模,其想法是预测人们可能喜好的物品并通过探索物品之间的联系来辅助这个过程,让用户能更快速.更准确的获得所需要的信息,提升用户的体验.参与度以及物品对用户的吸引力. 在开始之前,先了解一下推荐模型的分类: 1.基于内容的过滤:利用物品的内容或是属性信息以及某些相似度定义,求出与该物品类似的物品 2.协同过滤:利用大量已有的用户偏好来估计用户对其未接触过的物品的喜好程度 3.矩阵分解(包括显示矩阵分解.隐式矩阵

【Spark】RDD机制实现模型

RDD渊源 弹性分布式数据集(RDD),它是MapReduce模型一种简单的扩展和延伸,RDD为了实现迭代.交互性和流查询等功能,需要保证RDD具备在并行计算阶段之间能够高效地数据共享的功能特性.RDD运用高效的数据共享概念和类似于MapReduce的操作方式,使得所有的计算工作可以有效地执行,并可以在当前特定的系统中获得关键性的优化. RDD是一种有容错机制的特殊集合,可以分布在集群的节点上,以函数式编操作集合的方式,进行各种并行操作.可以将RDD理解为一个具有容错机制的特殊集合,它提供了一种

走在大数据的边缘 基于Spark的机器学习-智能客户系统项目实战(项目实战)

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

大数据实时处理-基于Spark的大数据实时处理及应用技术培训

随着互联网.移动互联网和物联网的发展,我们已经切实地迎来了一个大数据 的时代.大数据是指无法在一定时间内用常规软件工具对其内容进行抓取.管理和处理的数据集合,对大数据的分析已经成为一个非常重要且紧迫的需求.目前对大数据的分析工具,首选的是Hadoop/Yarn平台,但目前对大数据的实时分析工具,业界公认最佳为Spark.Spark是基于内存计算的大数据并行计算框架,Spark目前是Apache软件基金会旗下,顶级的开源项目,Spark提出的DAG作为MapReduce的替代方案,兼容HDFS.H

基于隐马尔可夫模型的有监督词性标注

代码下载:基于隐马尔可夫模型的有监督词性标注 词性标注(Part-of-Speech tagging 或 POS tagging)是指对于句子中的每个词都指派一个合适的词性,也就是要确定每个词是名词.动词.形容词或其他词性的过程,又称词类标注或者简称标注.词性标注是自然语言处理中的一项基础任务,在语音识别.信息检索及自然语言处理的许多领域都发挥着重要的作用. 词性标注本质上是一个分类问题,对于句子中的每一个单词W,找到一个合适的词类类别T,也就是词性标记,不过词性标注考虑的是整体标记的好坏,既整