Python操作hdfs

Python直接操作hdfs,包括追加数据文件到hdfs文件

#!coding:utf-8
import sys
from hdfs.client import Client

#设置utf-8模式
reload(sys)
sys.setdefaultencoding( "utf-8" )

#关于python操作hdfs的API可以查看官网:
#https://hdfscli.readthedocs.io/en/latest/api.html

#读取hdfs文件内容,将每行存入数组返回
def read_hdfs_file(client,filename):
    #with client.read(‘samples.csv‘, encoding=‘utf-8‘, delimiter=‘\n‘) as reader:
    #  for line in reader:
    #pass
    lines = []
    with client.read(filename, encoding=‘utf-8‘, delimiter=‘\n‘) as reader:
        for line in reader:
            #pass
            #print line.strip()
            lines.append(line.strip())
    return lines

#创建目录
def mkdirs(client,hdfs_path) :
    client.makedirs(hdfs_path)

#删除hdfs文件
def delete_hdfs_file(client,hdfs_path):
    client.delete(hdfs_path)

#上传文件到hdfs
def put_to_hdfs(client,local_path,hdfs_path):
    client.upload(hdfs_path, local_path,cleanup=True)

#从hdfs获取文件到本地
def get_from_hdfs(client,hdfs_path,local_path):
    download(hdfs_path, local_path, overwrite=False)

#追加数据到hdfs文件
def append_to_hdfs(client,hdfs_path,data):
    client.write(hdfs_path, data,overwrite=False,append=True)

#覆盖数据写到hdfs文件
def write_to_hdfs(client,hdfs_path,data):
    client.write(hdfs_path, data,overwrite=True,append=False)

#移动或者修改文件
def move_or_rename(client,hdfs_src_path, hdfs_dst_path):
    client.rename(hdfs_src_path, hdfs_dst_path)

#返回目录下的文件
def list(client,hdfs_path):
    return client.list(hdfs_path, status=False)

#client = Client(url, root=None, proxy=None, timeout=None, session=None)
#client = Client("http://hadoop:50070")

#move_or_rename(client,‘/input/2.csv‘, ‘/input/emp.csv‘)
#read_hdfs_file(client,‘/input/emp.csv‘)
#put_to_hdfs(client,‘/home/shutong/hdfs/1.csv‘,‘/input/‘)
#append_to_hdfs(client,‘/input/emp.csv‘,‘我爱你‘+‘\n‘)
#write_to_hdfs(client,‘/input/emp.csv‘,‘我爱你‘+‘\n‘)
#read_hdfs_file(client,‘/input/emp.csv‘)
#move_or_rename(client,‘/input/emp.csv‘, ‘/input/2.csv‘)
#mkdirs(client,‘/input/python‘)
#print list(client,‘/input/‘)
#chown(client,‘/input/1.csv‘, ‘root‘)
时间: 2024-10-03 19:58:49

Python操作hdfs的相关文章

Python操作Hdfs,获得hdfs文件名和文件的基本属性,包括修改时间,并转化为标准时间

使用anaconda安装python hdfs包 python-hdfs 2.1.0的包 from hdfs import *import time client = Client("http://192.168.56.101:50070")ll = client.list('/home/test', status=True)for i in ll: table_name = i[0]#表名 table_attr = i[1]#表的属性 #修改时间1528353247347,13位到毫

使用python操作hdfs,并grep想要的数据

代码如下: import subprocess for day in range(24, 30): for h in range(0, 24): filename = "tls-metadata-2018-10-%02d-%02d.txt" % (day, h) cmd = "hdfs dfs -text /data/2018/10/%02d/%02d/*.snappy" % (day, h) print(cmd) #cmd = "cat *.py&quo

hadoop 》》 django 简单操作hdfs 语句

>> from django.shortcuts import render # Create your views here. from hdfs.client import Client from django.views import View from hdfs.client import Client import os # # # 关于python操作hdfs的API可以查看官网: # # https://hdfscli.readthedocs.io/en/latest/api.h

使用Python访问HDFS

最近接触到大数据,对于Skpark和Hadoop的料及都停留在第一次听到这个名词时去搜一把看看大概介绍免得跟不上时代的层次. 在实际读了点别人的代码,又自己写了一些之后,虽然谈不上理解加深,至少对于大数据技术的整体布局有了更清晰的认识. HDFS主要用来存储文件系统,虽然Spark有自己的RDD,但是似乎并未被启用.我需要的数据,是通过Spark服务启动的计算程序,写入HDFS中的. #这结构怎么看都感觉有点怪. Spark支持Java.Scala和Python开发,对我来说是个好事.唯一的问题

Python操作数据库(mysql redis)

一.python操作mysql数据库: 数据库信息:(例如211.149.218.16   szz  123456) 操作mysql用pymysql模块 #操作其他数据库,就安装相应的模块 import  pymysql ip='211.149.218.16' port=3306 passwd='123456' user='root' db='szz' conn=pymysql.connect(host=ip,user=user,port=port,passwd=passwd,db=db,cha

python操作mysql ------- SqlAchemy正传

本篇对于Python操作MySQL主要使用两种方式: 原生模块 pymsql ORM框架 SQLAchemy pymsql pymsql是Python中操作MySQL的模块,其使用方法和MySQLdb几乎相同. 下载安装 pip3 install pymysql 使用操作 1.执行SQL #!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql # 创建连接 conn = pymysql.connect(host='127.0.0.1

Python之路【第九篇】:Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Python之路[第九篇]:Python操作 RabbitMQ.Redis.Memcache.SQLAlchemy Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. Memc

python操作mysql数据库

连接数据库 输入值 存入数据库 关闭 import string import mysql.connector conn=mysql.connector.connect(user='root',password='test',database='dalian',use_unicode=True) cursor=conn.cursor() a=raw_input('enter an id: ') b=raw_input('enter a name: ') while(a!='quit' or b!

使用python操作InfluxDB

环境: CentOS6.5_x64InfluxDB版本:1.1.0Python版本 : 2.6 准备工作 启动服务器 执行如下命令: service influxdb start 示例如下: [[email protected] ~]# service influxdb start Starting influxdb... influxdb process was started [ OK ] [[email protected] ~]# 安装influxdb-python github地址: