HBase(二): c#访问HBase之股票行情Demo

上一章完成了c#访问hbase的sdk封装,接下来以一个具体Demo对sdk进行测试验证。场景:每5秒抓取指定股票列表的实时价格波动行情,数据下载后,一方面实时刷新UI界面,另一方面将数据放入到在内存中模拟的MQ (实际生产情况,可用kafka等集群代替)->存入HBase数据库。提供按指定时间范围股票价格数据查询。

目录:

  • 示例说明
  • 示例效果图
  • rest server运行状态检查
  • 获取股票实时数据代码
  • 数据持续化至Hbase代码
  • 从HBase读取数据代码

示例说明:

  • 在Hbase 中创建两个表,分别为:
  1. StocksInfo (股票信息表,用来存储设置的股票代码、股票名称)
  2. StockRealInfo (股票实时行情数据,包含开盘价、当前价、最高价、最低价、五档竞买、卖单价和数量、成交单价、数量、涨跌幅等)
  • 每5秒钟抓取StocksInfo表中所有股票的数据,自动更新UI,持续化到HBase;支持增加、删除要监控的股票列表。
  • 提供按指定时间范围从hbase中查询历史数据

示例效果图:

  • 历史数据查询:

rest server运行状态检查:

  • 在 HDP2.4安装(五):集群及组件安装 章节,Hbase 主机安装在 hdp4 192.168.2.21 上,使用xshell 工具连接到hbase master(hdp4)
  • 查看8080端口是否正常,也可从 ambari UI 界面查看HBase状态,如图:

获取股票实时数据代码:

  •  好多的网站提供股票实时交易数据的下载,我选择的是从 hq.sina 下载,注意抓取数据的频度不要设置的太高,否则你的IP可能会被封掉,代码如下:

    public class SnatchFormSina
        {
            #region SnatchFormSina
    
            HttpClient client;
    
            private const string dataurl = "http://hq.sinajs.cn/list={0}";
    
            public SnatchFormSina()
            {
                this.client = new HttpClient();
            }
    
            /// <summary>
            ///
            /// </summary>
            public static SnatchFormSina Current
            {
                get {
                    return new SnatchFormSina();
                }
            }
    
            #endregion
    
            #region GetCurrentInfos
    
            /// <summary>
            ///
            /// </summary>
            /// <param name="stockIDs"></param>
            /// <returns></returns>
            public async Task<List<StockRealInfo>> GetCurrentInfosAsync(List<string> stockIDs)
            {
                List<StockRealInfo> list = new List<StockRealInfo>();
                string dataUrl = this.ParseStockIDs(stockIDs);
                dataUrl = dataUrl.Substring(0, dataUrl.Length - 1);
    
                string realInfo = await this.client.GetStringAsync(dataUrl);
                string[] infos = realInfo.Split(‘\n‘);
    
                StockRealInfo stockInfo;
                foreach (string info in infos)
                {
                    if (string.IsNullOrEmpty(info))
                        continue;
    
                    stockInfo = new StockRealInfo(info);
                    stockInfo.ID = SimulatorCache.StockAccount[stockInfo.Name];
                    SimulatorCache.StockInfos[stockInfo.ID] = stockInfo;
                    list.Add(stockInfo);
                }
    
                return list;
            }
    
            #endregion
    
            #region ParseStockIDs
    
            /// <summary>
            ///
            /// </summary>
            /// <param name="stockIDs"></param>
            /// <returns></returns>
            private string ParseStockIDs(List<string> stockIDs)
            {
                StringBuilder sb = new StringBuilder();
                foreach(string id in stockIDs)
                {
                    if (id.Substring(0, 2) == "60")//上海是600打头
                    {
                        sb.Append(string.Format("sh{0},", id));
                    }
                    else if (id.Substring(0, 2) == "51")//上海基金
                    {
                        sb.Append(string.Format("sh{0},", id));
                    }
                    else //if (stockIDs.Substring(0, 2) == "00")//深圳
                    {
                        sb.Append(string.Format("sz{0},", id));
                    }
                }
    
                sb[sb.Length - 1].ToString().Replace(",", "");
    
                return string.Format(dataurl, sb.ToString());
            }
    
            #endregion
    
            #region ValiateStockID
    
            /// <summary>
            ///
            /// </summary>
            /// <param name="stockIDs"></param>
            /// <returns></returns>
            public async Task<string> ValiateStockID(string stockID)
            {
                string name = string.Empty;
                string dataUrl = this.ParseStockIDs(new List<string> { stockID });
                dataUrl = dataUrl.Substring(0, dataUrl.Length - 1);
    
                string realInfo = await this.client.GetStringAsync(dataUrl);
                string[] infos = realInfo.Split(‘\n‘);
    
                StockRealInfo stockInfo;
                foreach (string info in infos)
                {
                    if (string.IsNullOrEmpty(info))
                        continue;
    
                    stockInfo = new StockRealInfo(info);
                    name = stockInfo.Name;
                }
    
                return name;
            }
    
            #endregion
        }

数据持续化到Hbase代码示例:

  • 代码中Utils.HBaseClient 是在一个工具类里面创建一个HBaseClient实例

    public class StockRealWriter
        {
            #region StockRealWriter
    
            Queue<StockRealInfo> queue = new Queue<StockRealInfo>();
    
            // use multithread write
            Thread writerThread;
            bool threadRunning = true;
    
            const string HBASESTOCKTBLNAME = "StockRealInfo";
    
            public StockRealWriter()
            {
                // Start a thread for writting to HBase
                Task task = new Task(WriterThreadFunction);
                task.Start();
            }
    
            ~StockRealWriter()
            {
                threadRunning = false;
            }
    
            #endregion
    
            #region WriterThreadFunction
    
            /// <summary>
            /// WriterThreadFunction
            /// </summary>
            public void WriterThreadFunction()
            {
                while (threadRunning)
                {
                    if (queue.Count > 0)
                    {
                        lock (queue)
                        {
                            CellSet set = new CellSet();
                            do
                            {
                                StockRealInfo stock = queue.Dequeue();
                                this.CreateStockByRealInfos(set, stock);
                            } while (queue.Count > 0);
    
                            Utils.HBaseClient.StoreCellsAsync(HBASESTOCKTBLNAME, set);
                        }
                    }
    
                    Thread.Sleep(5000);
                }
            }
    
            #endregion
    
            #region CreateStockByRealInfos
    
            /// <summary>
            ///
            /// </summary>
            /// <param name="set"></param>
            /// <param name="info"></param>
            private void CreateStockByRealInfos(CellSet set, StockRealInfo info)
            {
                string key = string.Format("{0}_{1}_{2}", info.ID, info.Date, info.Time);
                var row = new CellSet.Row { key = Encoding.UTF8.GetBytes(key) };
    
                var value = new Cell { column = Encoding.UTF8.GetBytes("d:ID"), data = Encoding.UTF8.GetBytes(info.ID) };
                row.values.Add(value);
    
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Name"), data = Encoding.UTF8.GetBytes(info.Name) };
                row.values.Add(value);
    
                //今日开盘价
                value = new Cell { column = Encoding.UTF8.GetBytes("d:TodayOpen"), data = Encoding.UTF8.GetBytes(info.TodayOpen) };
                row.values.Add(value);
    
                //昨日收盘价
                value = new Cell { column = Encoding.UTF8.GetBytes("d:YesterdayClose"), data = Encoding.UTF8.GetBytes(info.YesterdayClose) };
                row.values.Add(value);
    
                //当前价格
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Current"), data = Encoding.UTF8.GetBytes(info.Current) };
                row.values.Add(value);
    
                //今日最高价
                value = new Cell { column = Encoding.UTF8.GetBytes("d:High"), data = Encoding.UTF8.GetBytes(info.High) };
                row.values.Add(value);
    
                //今日最低价
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Low"), data = Encoding.UTF8.GetBytes(info.Low) };
                row.values.Add(value);
    
                //竟买价 买1
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Buy"), data = Encoding.UTF8.GetBytes(info.Buy) };
                row.values.Add(value);
    
                //竟卖价 卖1
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Sell"), data = Encoding.UTF8.GetBytes(info.Sell) };
                row.values.Add(value);
    
                // 成交数 单位股数 通常除于100成为手
                value = new Cell { column = Encoding.UTF8.GetBytes("d:VolAmount"), data = Encoding.UTF8.GetBytes(info.VolAmount) };
                row.values.Add(value);
    
                //  成交多少钱,单位元
                value = new Cell { column = Encoding.UTF8.GetBytes("d:VolMoney"), data = Encoding.UTF8.GetBytes(info.VolMoney) };
                row.values.Add(value);
    
                //  日期
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Date"), data = Encoding.UTF8.GetBytes(info.Date) };
                row.values.Add(value);
    
                //  时间
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Time"), data = Encoding.UTF8.GetBytes(info.Time) };
                row.values.Add(value);
    
                //  差额
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Diff"), data = Encoding.UTF8.GetBytes(info.Diff) };
                row.values.Add(value);
    
                //  百分比
                value = new Cell { column = Encoding.UTF8.GetBytes("d:DiffPrec"), data = Encoding.UTF8.GetBytes(info.DiffPrec) };
                row.values.Add(value);
    
                DataRow buyInfo;
                for(int i=0;i<5;i++)
                {
                    buyInfo = info.BuyList.Rows[i];
    
                    value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price0{0}",i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Price"])) };
                    row.values.Add(value);
    
                    value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount0{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Amount"])) };
                    row.values.Add(value);
                }
    
                DataRow sellInfo;
                for (int i = 0; i < 5; i++)
                {
                    sellInfo = info.SellList.Rows[i];
    
                    value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Price"])) };
                    row.values.Add(value);
    
                    value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Amount"])) };
                    row.values.Add(value);
                }
    
                set.rows.Add(row);
            }
    
            #endregion
    
            #region WriteStock
    
            /// <summary>
            ///
            /// </summary>
            /// <param name="stockInfo"></param>
            public void WriteStock(List<StockRealInfo> stockInfos)
            {
                lock (queue)
                {
                    foreach(var stockInfo in stockInfos)
                    {
                      queue.Enqueue(stockInfo);
                    }
                }
            }
    
            #endregion
        }

从HBase读取数据代码:

  • 代码中 Scanner 参数是指设置的查询范围 (设置StartRow、EndRow、Batch等参数)

    public class StockRealReader
        {
            #region StockRealReader
    
            const string HBASESTOCKTBLNAME = "StockRealInfo";
    
            public StockRealReader()
            {
    
            }
    
            #endregion
    
            #region QueryStockRealAsync
    
            public async Task<List<StockRealInfo>> QueryStockRealAsync(Scanner query)
            {
                List<StockRealInfo> list = new List<StockRealInfo>();
    
                ScannerInformation info = await Utils.HBaseClient.CreateScannerAsync(HBASESTOCKTBLNAME, query);
    
                CellSet next;
                while ((next = await Utils.HBaseClient.ScannerGetNextAsync(info)) != null)
                {
                    StockRealInfo realInfo;
                    foreach (CellSet.Row row in next.rows)
                    {
                        realInfo = new StockRealInfo();
    
                        //开盘价
                        var temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:TodayOpen");
                        realInfo.TodayOpen = Encoding.UTF8.GetString(temp.data);
    
                        //昨日收盘价
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:YesterdayClose");
                        realInfo.YesterdayClose = Encoding.UTF8.GetString(temp.data);
    
                        //当前价格
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Current");
                        realInfo.Current = Encoding.UTF8.GetString(temp.data);
    
                        //今日最高价
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:High");
                        realInfo.High = Encoding.UTF8.GetString(temp.data);
    
                        //今日最低价
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Low");
                        realInfo.Low = Encoding.UTF8.GetString(temp.data);
    
                        //竟买价 买1
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Buy");
                        realInfo.Buy = Encoding.UTF8.GetString(temp.data);
    
                        //竟卖价 卖1
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Sell");
                        realInfo.Sell = Encoding.UTF8.GetString(temp.data);
    
                        //成交数 单位股数 通常除于100成为手
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolAmount");
                        realInfo.VolAmount = Encoding.UTF8.GetString(temp.data);
    
                        //成交多少钱,单位元
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolMoney");
                        realInfo.VolMoney = Encoding.UTF8.GetString(temp.data);
    
                        //日期
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Date");
                        realInfo.Date = Encoding.UTF8.GetString(temp.data);
    
                        //时间
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Time");
                        realInfo.Time = Encoding.UTF8.GetString(temp.data);
    
                        //差额
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Diff");
                        realInfo.Diff = Encoding.UTF8.GetString(temp.data);
    
                        //百分比
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:DiffPrec");
                        realInfo.DiffPrec = Encoding.UTF8.GetString(temp.data);
    
                        list.Add(realInfo);
    
                    }
                }
    
                return list;
            }
    
            #endregion
        }

时间: 2024-10-06 03:23:03

HBase(二): c#访问HBase之股票行情Demo的相关文章

HBase二次开发之搭建HBase调试环境,如何远程debug HBase源代码

版本 HDP:3.0.1.0 HBase:2.0.0 一.前言 之前的文章也提到过,最近工作中需要对HBase进行二次开发(参照HBase的AES加密方法,为HBase增加SMS4数据加密类型).研究了两天,终于将开发流程想清楚并搭建好了debug环境,所以就迫不及待地想写篇文章分享给大家. 二.思路 首先看到这个需求,肯定是需要先实现HBase配置AES加密<HBase配置AES加密>,或者还可以再继续了解实现SMS4加密算法<Java版SMS4加密解密算法>.等到这些都完成之后

PHP通过thrift2访问HBASE

前一段时间需要在网页上显示HBASE查询的结果,考虑用PHP来实现,在网上搜了一下,普遍都是用thrift作为接口来实现的.? 参考博文:? http://www.cnblogs.com/scotoma/archive/2013/05/16/3081236.html 用上述网址里提供的PHP代码,可以访问公司里的一个HBASE集群,但是另一个集群怎么也访问不了,上网查了一下,发现thrift有两套HBASE的接口--thrift和thrift2,而且两套接口并不兼容. ? 用thrift2的接口

spring hadoop 访问hbase入门

1.  环境准备: Maven Eclipse Java Spring 版本 3..2.9 2. Maven  pom.xml配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <!-- Spring hadoop  -->                    <dependency>             <groupId>org.apache.hbase</groupId>             <artifa

HBase 二次开发 java api和demo

1. 试用thrift python/java以及hbase client api,结论如下: 1.1 thrift的安装和发布繁琐,可能会遇到未知的错误,且hbase.thrift的版本在变化中.优点代码简单,需要打包的内容少. 1.2 hbase client api,需要的jar很多,发布版的容量也很大,打包后近百兆.优点是,明确,无歧义. 2. 推荐用hbase client api的方式搞定. 3. 以下均为技术细节. 4. 有一台机器/一个集群,在运行hadoop,也运行了基于这个h

HBase总结(二十)HBase常用shell命令详细说明

进入hbase shell console $HBASE_HOME/bin/hbase shell 如果有kerberos认证,需要事先使用相应的keytab进行一下认证(使用kinit命令),认证成功之后再使用hbase shell进入可以使用whoami命令可查看当前用户 hbase(main)> whoami 表的管理 1)查看有哪些表 hbase(main)> list 2)创建表 # 语法:create <table>, {NAME => <family>

HBase基本概念和hbase shell常用命令用法

1. 简介 HBase是一个分布式的.面向列的开源数据库,源于google的一篇论文<bigtable:一个结构化数据的分布式存储系统>.HBase是Google Bigtable的开源实现,它利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协同服务. 2. HBase的表结构 HBase以表的形式存储数据.表有行和列组成.列划分为若干个列族/列簇(column family). Row Key colu

【转载】HBase基本概念和hbase shell常用命令用法

1. 简介 HBase是一个分布式的.面向列的开源数据库,源于google的一篇论文<bigtable:一个结构化数据的分布式存储系统>.HBase是Google Bigtable的开源实现,它利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协同服务. 2. HBase的表结构 HBase以表的形式存储数据.表有行和列组成.列划分为若干个列族/列簇(column family). Row Key colu

HBase(一)——HBase介绍

HBase介绍 1.关系型数据库与非关系型数据库 (1)关系型数据库 ? 关系型数据库最典型的数据机构是表,由二维表及其之间的联系所组成的一个数据组织 ? 优点: ? 1.易于维护:都是使用表结构,格式一致 ? 2.使用方便:SQL语言通用,可用于复杂查询 ? 3.复杂操作:支持SQL,可用于一个表以及多个表之间非常复杂的查询 ? 缺点: ? 1.读写性能比较差,尤其是海量数据的高效率读写 ? 2.固定的表结构,灵活度稍欠 ? 3.高并发读写需求,传统关系型数据库,硬盘IO是一个很大的瓶颈 (2

Hbase学习笔记之一 | Hbase Shell命令篇

最近在XX项目的测试过程中,接触到一些HBase的东西,希望能站在测试的角度,把过程记录下来,期望对快速了解它有点帮助.作为一个初次接触它的人来说,需要迫切掌握其中基本的概念,这里就不赘述了. HBase Shell是HBase提供的便捷的访问方式,首先你需要搭建HBase的环境,可以参考 http://hbase.apache.org/book/quickstart.html 和http://hbase.apache.org/book/notsoquick.html. 1.进入Hbase Sh