通过Python操作hbase api

# coding=utf-8
# Author: ruin
"""
discrible:

"""
from thrift.transport import TSocket
from thrift.protocol import TBinaryProtocol
from thrift.transport import TTransport
from hbase import Hbase

import struct

# Method for encoding ints with Thrift‘s string encoding
def encode(n):
   return struct.pack("i", n)

# Method for decoding ints with Thrift‘s string encoding
def decode(s):
   return int(s) if s.isdigit() else struct.unpack(‘i‘, s)[0]
class HBaseApi(object):

    def __init__(self,table=‘fr_test_hbase:test_api‘,host=‘10.2.46.240‘,port=9090):
        self.table = table.encode(‘utf-8‘)
        self.host = host
        self.port = port
        # Connect to HBase Thrift server
        self.transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))
        self.protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)

        # Create and open the client connection
        self.client = Hbase.Client(self.protocol)
        self.transport.open()
        # set type and field of column families
        self.set_column_families([bytes],[‘info‘])
        self._build_column_families()

    def set_column_families(self,type_list,col_list=[‘info‘]):
        self.columnFamiliesType = type_list

        self.columnFamilies = col_list

    def _build_column_families(self):
        """
        give all column families name list,create a table
        :return:
        """
        tables = self.client.getTableNames()
        if self.table not in tables:
            self.__create_table(self.table)

    def __create_table(self,table):
        """
        create table in hbase with column families
        :param table: fr_test_hbase:fr_test
        :return:
        """

        columnFamilies = []
        for columnFamily in self.columnFamilies:
            name = Hbase.ColumnDescriptor(name = columnFamily)
            columnFamilies.append(name)
        table = table.encode(‘utf-8‘)
        print(type(table),type(columnFamilies))

        self.client.createTable(table,columnFamilies)

    def __del__(self):
        self.transport.close()

    def __del_table(self,table):
        """
        delete a table,first need to disable it
        """
        self.client.disableTable(table)
        self.client.deleteTable(table)

    def getColumnDescriptors(self):
        return self.client.getColumnDescriptors(self.table)

    def put(self, rowKey, qualifier, value):
        """
        put one row
        column is column name,value is column value
        :param rowKey: rowKey
        :param column: column name
        :param value: column value
        :description: HbaseApi(table).put(‘rowKey‘,‘column‘,‘value‘)
        """

        rowKey = rowKey.encode(‘utf-8‘)
        mutations = []
        # for j, column in enumerate(column):
        if isinstance(value, str):
            value = value.encode(‘utf-8‘)
            m_name = Hbase.Mutation(column=(self.columnFamilies[0]+‘:‘+qualifier).encode(‘utf-8‘), value=value)
        elif isinstance(value, int):
            m_name = Hbase.Mutation(column=(self.columnFamilies[0]+‘:‘+qualifier).encode(‘utf-8‘), value=encode(value))
        mutations.append(m_name)
        self.client.mutateRow(self.table, rowKey, mutations, {})

    def puts(self,rowKeys,qualifier,values):
        """ put sevel rows, `qualifier` is autoincrement

        :param rowKeys: a single rowKey
        :param values: values is a 2-dimension list, one piece element is [name, sex, age]
        :param qualifier: column family qualifier

        Usage::

        >>> HBaseTest(‘table‘).puts(rowKeys=[1,2,3],qualifier="name",values=[1,2,3])

        """

        mutationsBatch = []
        if not isinstance(rowKeys,list):
            rowKeys = [rowKeys] * len(values)

        for i, value in enumerate(values):
            mutations = []
            # for j, column in enumerate(value):
            if isinstance(value, str):
                value = value.encode(‘utf-8‘)
                m_name = Hbase.Mutation(column=(self.columnFamilies[0]+‘:‘+qualifier).encode(‘utf-8‘), value=value)
            elif isinstance(value, int):
                m_name = Hbase.Mutation(column=(self.columnFamilies[0]+‘:‘+qualifier).encode(‘utf-8‘), value=encode(value))
            mutations.append(m_name)
            mutationsBatch.append(Hbase.BatchMutation(row = rowKeys[i].encode(‘utf-8‘),mutations=mutations))
        self.client.mutateRows(self.table, mutationsBatch, {})

    def getRow(self,row, qualifier=‘name‘):
        """
        get one row from hbase table
        :param row:
        :param qualifier:
        :return:
        """
        # res = []
        row = self.client.getRow(self.table, row.encode(‘utf-8‘),{})
        for r in row:
            rd = {}
            row = r.row.decode(‘utf-8‘)
            value = (r.columns[b‘info:name‘].value).decode(‘utf-8‘)
            rd[row] = value
            # res.append(rd)
            # print (‘the row is ‘,r.row.decode(‘utf-8‘))
            # print (‘the value is ‘,(r.columns[b‘info:name‘].value).decode(‘utf-8‘))
            return rd

    def getRows(self, rows, qualifier=‘name‘):
        """
        get rows from hbase,all the row sqecify the same ‘qualifier‘
        :param rows: a list of row key
        :param qualifier: column
        :return: None
        """
        # grow = True if len(rows) == 1 else False
        res = []
        for r in rows:
            res.append(self.getRow(r,qualifier))
        return res

    def scanner(self, numRows=100, startRow=None, stopRow=None):
        """

        :param numRows:
        :param startRow:
        :param stopRow:
        :return:
        """
        scan = Hbase.TScan(startRow, stopRow)
        scannerId = self.client.scannerOpenWithScan(self.table,scan, {})

        ret = []
        rowList = self.client.scannerGetList(scannerId, numRows)

        for r in rowList:
            rd = {}
            row = r.row.decode(‘utf-8‘)
            value = (r.columns[b‘info:name‘].value).decode(‘utf-8‘)
            rd[row] = value
            # print (‘the row is ‘,r.row.decode(‘utf-8‘))
            # print (‘the value is ‘,(r.columns[b‘info:name‘].value).decode(‘utf-8‘))
            ret.append(rd)

        return ret

def demo():
    ha = HBaseApi(‘fr_test_hbase:test_log1‘)
    # ha.put(‘0002‘,‘age‘,‘23‘)
    rowKeys = [str(key) for key in range(10001,10010)]
    values = [‘fr‘+str(val) for val in range(10001,10010)]
    ha.puts(rowKeys,‘name‘,values)
    print(ha.scanner())
    # print(ha.getRow(‘0001‘))
    # print(ha.getRows(rowKeys))
if __name__ == "__main__":
    demo()
时间: 2024-10-29 03:53:00

通过Python操作hbase api的相关文章

Python操作HBase之happybase

安装Thrift 安装Thrift的具体操作,请点击链接 pip install thrift 安装happybase pip install happybase 连接(happybase.Connection) happybase.Connection(host='localhost', port=9090, timeout=None, autoconnect=True, table_prefix=None, table_prefix_separator=b'_', compat='0.98'

python操作Hbase

本地操作 启动thrift服务:./bin/hbase-daemon.sh start thrift hbase模块产生: 下载thrfit源码包:thrift-0.8.0.tar.gz 解压安装 ./configure make make install 在thrift-0.8.0目录中,lib/py/build/lib.linux-x86_64-2.6/目录下存在thrift的python模块,拷贝出来即可 生成hbase模块 下载源码包:hbase-0.98.24-src.tar.gz 解

Python 操作 HBase —— Trift Trift2 Happybase 安装使用

Python无法直接访问HBase,必须通过Thrift. HBase与Thrift之间又支持两种接口标准,两种标准互不兼容,功能也互不覆盖. Thrift连接HBase还有超时自动断开的大坑. 安装Thrift依赖(Server端) Centos: yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel open

Python 操作 GA API 指南

因为需要写一个 Blog Feature 的缘故,所以接触了下 GA 的 Python API,发现 G 家的 API 不是那么直观,比较绕,但是,在使用过程中发现其实 G 家的 API 设计挺有意思的,可能有一些新的设计理念,值得思考学习一番.但是这不是这篇文章的重点,这篇文章还是介绍一下 GA 的 Python API V4 版本的使用,顺带在最后解答几个我再使用过程中遇到的问题. GA API 使用入门 目前 GA 的 API 是 V4 版本,据说 V3 版本还可以使用,但是我没有尝试,为

Python 操作Sonqube API 获取检测结果并打印

1.需求:每次Sonqube检查完毕后,需要登陆才能看到结果无法通过Jenkins发布后直接看到bug 及漏洞数量. 2.demo:发布后,可以将该项目的检测结果简单打印出来显示,后面还可以集成钉钉发送到群里. 1 # -*- coding: UTF-8 -*- 2 import sys 3 reload(sys) 4 sys.setdefaultencoding('utf8') 5 6 ''' 7 @author:jmmei 8 @file: SonarQubeDingtalk.py 9 @t

Python 操作Redis

redis redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此基础上,redis支持各种不同方式的排序.与memcached一样,为了保证效率,数据都是缓存在内存中.区别的是redis会周期性

hbase之python利用thrift操作hbase数据和shell操作

前沿: 以前都是用mongodb的,但是量大了,mongodb显得不那么靠谱,改成hbase撑起一个量级. HBase是Apache Hadoop的数据库,能够对大型数据提供随机.实时的读写访问.HBase的目标是存储并处理大型的数据.HBase是一个开源的,分布式的,多版本的,面向列的存储模型.它存储的是松散型数据. HBase提供了丰富的访问接口. HBase Shell Java clietn API Jython.Groovy DSL.Scala REST Thrift(Ruby.Pyt

HBase API 操作范例

package com.test.hbase.api; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; imp

HBase 6、用Phoenix Java api操作HBase

开发环境准备:eclipse3.5.jdk1.7.window8.hadoop2.2.0.hbase0.98.0.2.phoenix4.3.0 1.从集群拷贝以下文件:core-site.xml.hbase-site.xml.hdfs-site.xml文件放到工程src下 2.把phoenix的phoenix-4.3.0-client.jar和phoenix-core-4.3.0.jar添加到工程classpath 3.配置集群中各节点的hosts文件,把客户端的hostname:IP添加进去