library(RJDBC)
cp = c(list.files( "/usr/lib/hive/lib" , pattern = "[.]jar" , full.names=TRUE, recursive=TRUE),
list.files( "/usr/lib/hadoop" , pattern = "[.]jar" , full.names=TRUE, recursive=TRUE),
list.files( "/etc/hadoop/conf" , full.names=TRUE, recursive=TRUE),
recursive=TRUE
)
drv = JDBC(driverClass = "org.apache.hive.jdbc.HiveDriver" , classPath = cp )
hiveconnection = dbConnect(drv, "jdbc:hive2://hadoop-jy-backupserver:10000/default;principal=hive/[email protected]" ,user= "*******" , password= "*******" ) # *** 替换为相应用户名密码
# 从BAIDU_INDEX 表中查询记录的个数,存入DataFrame
count = dbGetQuery(hiveconnection, "SELECT count(*) FROM cpr.baidu_index" ) # 开始执行MapReduce任务
# 非查询Query的语句,如创建表CREATE, dbSendUpdate 函数执行所有非查询Query的语句
sqlCreateTbl = "CREATE TABLE IF NOT EXISTS cpr.person_correlation_graph_temp(rownames STRING,ibao_person_id_x STRING, ibao_person_id_y STRING,
cor_index DOUBLE,start_date STRING,end_date STRING)"
result=dbSendUpdate(hiveconnection,sqlCreateTbl)
# 将数据写入Hive数据库
dfToLoad = data.frame(rownames=c( ‘1‘ , ‘2‘ ),ibao_person_id_x=c( ‘盗墓笔记‘ , ‘盗墓笔记‘ ),
ibao_person_id_y=c( ‘李易峰‘ , ‘杨洋‘ ),cor_index=c(0.8900,0.5100))
dbWriteTable(hiveconnection, "cpr.person_correlation_graph" , dfToLoad, overwrite=TRUE)
# R中执行其他Hive SQL的函数
dbListTables(hiveconnection, "%qiyu%" )
df = dbReadTable(hiveConn, "iris" )
|