推荐引擎数据导入模块的实现

毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步

在新建了一个PyDev项目后,需要如下操作(拣最主要的写):

模块的环境变量:

# -*- coding:UTF-8 -*-#!/usr/bin/python
# FileName:pro_env.py

#***************************************************
# 项目的路径
PROJECT_DIR = "/usr/local/EclipseProjects/MyBI"
# 项目配置文件的路径
PROJECT_CONF_DIR = PROJECT_DIR + "/conf/"
# 项目第三方库的路径
PROJECT_LIB_DIR = PROJECT_DIR + "/lib"
# 项目临时文件的路径
PROJECT_TMP_DIR = PROJECT_DIR + "/temp"
#***************************************************

# Hadoop的安装路径
HADOOP_HOME = "/usr/local/hadoop/"
# Hadoop的命令路径
HADOOP_PATH = HADOOP_HOME + "bin/"
# HIVE的安装路径
HIVE_HOME = "/opt/hive-0.9.0/"
# HIVE的命令路径
HIVE_PATH = HIVE_HOME + "bin/"
# Sqoop的安装路径
SQOOP_HOME = "/opt/Sqoop/"
# Sqoop的命令路径
SQOOP_PATH = SQOOP_HOME + "bin/"
#***************************************************

# Java的安装路径
Java_HOME = "/usr/lib/jvm/jdk1.7.0_75"

配置文件:

导入模块的配置文件主要的目的是告诉Sqoop,导入哪些表,怎么导入,我暂时需要一张表,新建一个XML文件Import.xml,type="add"表示增量导入

<?xml version="1.0" encoding="UTF-8"?>
<root>
    <task type="add">
        <table>ModifyRecords</table>
    </task>
</root>

需要对每张表进行更细一步的配置,新建ModifyRecords.xml

<?xml version="1.0" encoding="UTF-8"?>
<root>
    <sqoop-shell type="import">
        <param key="connect">jdbc:mysql://localhost:3306/Recommend</param>
        <param key="username">${username}</param>
        <param key="password">${password}</param>
        <param key="target-dir">/user/hadoop/Recommend/$dt</param>
        <param key="query">‘select userID,movieID,pref from Recommend.ModifyRecords where modifyDate$flag"\$CONDITIONS" and $CONDITIONS‘</param>
        <param key="m">1</param>
        <param key="fields-terminated-by">‘,‘</param>
    </sqoop-shell>
</root>

剩下的工作就是解析配置文件:

# -*- coding:UTF-8 -*-
#!/usr/bin/python
# FileName:import.py
from com.utls.pro_env import PROJECT_CONF_DIR
import time
from com.utls.sqoop import SqoopUtil
import xml.etree.ElementTree as ET

# 其中dt为昨天的日期,将由调度模块传入
def resolve_conf(dt):

    # 获得配置文件名
    conf_file = PROJECT_CONF_DIR + "Import.xml"

    # 解析配置文件
    xml_tree = ET.parse(conf_file)
    # 获得task元素
    tasks = xml_tree.findall(‘./task‘)

    for task in tasks:
        # 获得导入类型,增量导入或者全量导入
        import_type = task.attrib["type"]

        # 获得表名集合
        tables = task.findall(‘./table‘)

        # 用来保存待执行的Sqoop命令的集合
        cmds = []

        # 迭代表名集合,解析表配置文件
        for i in range(len(tables)):
            # 表名
            table_name = tables[i].text
            # 表配置文件名
            table_conf_file = PROJECT_CONF_DIR + table_name + ".xml"

            # 解析表配置文件
            xmlTree = ET.parse(table_conf_file)

            # 获取sqoop-shell节点
            sqoopNodes = xmlTree.findall("./sqoop-shell")

            # 获取sqoop-shell节点
            sqoop_cmd_type = sqoopNodes[0].attrib["type"]
            # 获取
            praNodes = sqoopNodes[0].findall("./param")

            # 用来保存param信息的字典
            cmap = {}

            for i in range(len(praNodes)):
                # 获得key属性的值
                key = praNodes[i].attrib["key"]
                # 获得param标签中间的值
                value = praNodes[i].text
                # 保存到字典中
                cmap[key] = value

            # 首先组装成sqoop命令头
            command = "sqoop " + sqoop_cmd_type

            # 如果为全量导入
            if(import_type == "all"):
                # query的查询条件为<dt
                import_condition = dt
                flag = "<"
            # 如果为增量导入
            elif (import_type == "add"):
                # query的查询条件为=dt
                import_condition = dt
                flag = "="
            else:
                raise Exception

            # #迭代字典将param的信息拼装成字符串
            for key in cmap.keys():

                value = cmap[key]

                # 如果不是键值对形式的命令选项
                if(value == None or value == "" or value == " "):
                    value = ""

                # 将query的CONDITIONS替换为查询条件
                if(key == "query"):
                    value = value.replace("\$CONDITIONS", import_condition)
                    value = value.replace("$flag", flag)

                # 将导入分区替换为传入的时间
                if(key == "target-dir"):
                    value = value.replace("$dt", dt)

                # 拼装为命令
                if key == "fields-terminated-by":
                    command += " --" + key + " " + value
                else:
                    command += " --" + key + " " + value + "\\" + "\n"

            # 将命令加入至待执行的命令集合
            cmds.append(command)

    return cmds

# Python模块的入口:main函数
if __name__ == ‘__main__‘:

    # 调度模块将昨天的时间传入
    dt = time.strftime("%Y-%m-%d", time.localtime(time.time()))
    # 解析配置文件,获得sqoop命令集合
    cmds = resolve_conf(dt)

    # 迭代集合,执行命令
    for i in range(len(cmds)):
        cmd = cmds[i]

        # 执行导入过程
        SqoopUtil.execute_shell(cmd)

拼装出来的命令如下:

sqoop import --username xxxx --target-dir /user/hadoop/Recommend/2015-04-26 --m 1 --connect jdbc:mysql://localhost:3306/Recommend --query ‘select userID,movieID,pref from Recommend.ModifyRecords where modifyDate="2015-04-26" and $CONDITIONS‘ --password xxxx --fields-terminated-by ‘,‘

最后新建一个模块(不过当然写在import.py的main函数之前...),编写一个类,为该类编写一个函数,目的是用Python调用Sqoop命令:

#!/usr/bin/python
# FileName sqoop.py
# -*- coding:UTF-8 -*-
import os
class SqoopUtil(object):
    ‘‘‘
    sqoop operation
    ‘‘‘
    def __init__(self):
        pass

    @staticmethod
    def execute_shell(shell):
        print shell
        os.system(shell)
        
时间: 2024-10-10 10:26:08

推荐引擎数据导入模块的实现的相关文章

XAF Excel数据导入模块使用说明与源码

我实现了XAF项目中Excel数据的导入,使用Devexpress 新出的spreadsheet控件,可能也不新了吧:D 好,先看一下效果图:下图是Web版本的. 下面是win版,目前只支持Ribbon UI,下个版本支持其他界面的: 功能说明: 支持从Excel任意版本导入数据,可以使用 打开文件功能选择现有的文件,没有模板时,请来到上图界面中,另存为Excel到本地,往模板上填加数据. 导入时使用了显示名称进行匹配字段,所以字段部分不要修改. 导入时会使用你在写好的验证规则. 支持Win+W

蚂蚁金服智能推荐引擎解决方案与实践

摘要:以"数字金融新原力(The New Force of Digital Finance)"为主题,蚂蚁金服ATEC城市峰会于2019年1月4日上海如期举办.金融智能专场分论坛上,蚂蚁金服人工智能部高级技术专家王志勇做了主题为<蚂蚁金服智能推荐引擎>的精彩分享. 演讲中,王志勇代表蚂蚁金服首次向公众介绍了蚂蚁金服智能推荐引擎,分享了蚂蚁金服利用人工智能和大数据能力在推荐引擎上沉淀的大量经验,并介绍了结合蚂蚁自身优势打造的.能够灵活适配各种业务场景的智能推荐引擎解决方案(A

Mahout对于GroupLens数据定制的推荐引擎

/* * 这段程序是使用GroupLens定制的DataModel数据模型 * 因为这里的数据是以逗号隔开的. * 这里我把数据量加大,变成了20M的数据 * 这里使用的数据模型是对于GroupLens定制的GroupLensDataModel * */ package byuser; import java.io.File; import java.io.IOException; import java.util.List; import org.apache.mahout.cf.taste.

MySQL存储引擎 SQL数据导入/导出 操作表记录 查询及匹配条件

MySQL存储引擎的配置 SQL数据导入/导出 操作表记录 查询及匹配条件 1 MySQL存储引擎的配置1.1 问题 本案例要求MySQL数据存储引擎的使用,完成以下任务操作: 可用的存储引擎类型 查看默认存储类型 更改表的存储引擎 1.2 步骤 实现此案例需要按照如下步骤进行. 步骤一:查看存储引擎信息 登入MySQL服务器,查看当前支持哪些存储引擎. 使用mysql命令连接,以root用户登入: [[email protected] ~]# mysql -u root –p Enter pa

旅游公司用友财务软件接口实现大批量数据导入财务模块实现报表统计

一. 总体概况:.............................................................................................................. 2 二. 需求:....................................................................................................................... 2 三

三十一.MySQL存储引擎 、 数据导入导出 管理表记录 匹配条件

1.MySQL存储引擎的配置 查看服务支持的存储引擎 查看默认存储类型 更改表的存储引擎 设置数据库服务默认使用的存储引擎 1.1 查看存储引擎信息 mysql> SHOW ENGINES\G 1.2 查看默认存储类型 mysql> SHOW VARIABLES LIKE 'default_storage_engine'; +------------------------+--------+ | Variable_name          | Value  | +-------------

25_MySQL存储引擎 、 数据导入导出 管理表记录 匹配条件

版本:5.7.28服务器:mysql 192.168.4.20 1.MySQL存储引擎的配置查看服务支持的存储引擎查看默认存储类型更改表的存储引擎设置数据库服务默认使用的存储引擎 查看存储引擎信息mysql> SHOW ENGINES;+--------------------+---------+-------------------| Engine             | Support |+--------------------+---------+-----------------

python数据分析入门——数据导入数据预处理基本操作

数据导入到python环境:http://pandas.pydata.org/pandas-docs/stable/io.html(英文版) IO Tools (Text, CSV, HDF5, ...)? The pandas I/O API is a set of top level reader functions accessed like pd.read_csv() that generally return a pandasobject. read_csv read_excel re

[Asp.net]常见数据导入Excel,Excel数据导入数据库解决方案,总有一款适合你!

引言 项目中常用到将数据导入Excel,将Excel中的数据导入数据库的功能,曾经也查找过相关的内容,将曾经用过的方案总结一下. 方案一 NPOI NPOI 是 POI 项目的 .NET 版本.POI是一个开源的Java读写Excel.WORD等微软OLE2组件文档的项目.使用 NPOI 你就可以在没有安装 Office 或者相应环境的机器上对 WORD/EXCEL 文档进行读写.NPOI是构建在POI 3.x版本之上的,它可以在没有安装Office的情况下对Word/Excel文档进行读写操作