PostgreSQL异步客户端(并模拟redis 数据结构)

以前为了不在游戏逻辑(对象属性)变更时修改数据库,就弄了个varchar字段来表示json,由服务器逻辑(读取到内存)去操作它。

但这对运维相当不友好,也不能做一些此Json数据里查询。

所以后面就用了下ssdb,然而就在前几天才了解到postgresql支持json了(其实早在两年前就行了吧···)

就这点差不多就可以算当作mongodb用了,不过还是不支持redis的高级数据结构。

于是我就想模拟(实现)下redis(的数据结构)。

就抽空看了下它的c api库:libpq,发现其请求-等待模型,在网络延迟高的时候,特别影响qps。所以我就写了一个异步客户端,并简易模拟了redis的kv,hash。

开8个链接到pg server,其速度比1个链接快5倍。 在我的测试中,每秒打到30k QPS

(目前不支持list,以及后期还要通过储存过程对现在的hash实现进行改造优化)

#include <string>
#include <list>
#include <iostream>
#include <unordered_map>
#include <memory>
#include <queue>
#include <assert.h>
#include <functional>
#include <sstream>
#include <chrono>

#include "fdset.h"

#include "libpq-events.h"
#include "libpq-fe.h"
#include "libpq/libpq-fs.h"

using namespace std;

class AsyncPGClient
{
public:
    /*TODO::传递错误信息*/
    typedef std::function<void(const PGresult*)> RESULT_CALLBACK;
    typedef std::function<void(bool value)> BOOL_RESULT_CALLBACK;
    typedef std::function<void(const string& value)> STRING_RESULT_CALLBACK;
    typedef std::function<void(const std::unordered_map<string, string>& value)> STRINGMAP_RESULT_CALLBACK;

    AsyncPGClient() : mKVTableName("kv_data"), mHashTableName("hashmap_data")
    {
        mfdset = ox_fdset_new();
    }

    ~AsyncPGClient()
    {
        for (auto& kv : mConnections)
        {
            PQfinish((*kv.second).pgconn);
        }

        ox_fdset_delete(mfdset);
        mfdset = nullptr;
    }

    void    get(const string& key, const STRING_RESULT_CALLBACK& callback = nullptr)
    {
        mStringStream << "SELECT key, value FROM public." << mKVTableName << " where key = ‘" << key << "‘;";

        postQuery(mStringStream.str(), [callback](const PGresult* result){
            if (callback != nullptr && result != nullptr)
            {
                if (PQntuples(result) == 1 && PQnfields(result) == 2)
                {
                    callback(PQgetvalue(result, 0, 1));
                }
            }
        });
    }

    void    set(const string& key, const string& v, const BOOL_RESULT_CALLBACK& callback = nullptr)
    {
        mStringStream << "INSERT INTO public." << mKVTableName << "(key, value) VALUES(‘" << key << "‘, ‘" << v << "‘) ON CONFLICT(key) DO UPDATE SET value = EXCLUDED.value;";

        postQuery(mStringStream.str(), [callback](const PGresult* result){
            if (callback != nullptr)
            {
                if (PQresultStatus(result) == PGRES_COMMAND_OK)
                {
                    callback(true);
                }
                else
                {
                    cout << PQresultErrorMessage(result);
                    callback(false);
                }
            }
        });
    }

    void    hget(const string& hashname, const string& key, const STRING_RESULT_CALLBACK& callback = nullptr)
    {
        hmget(hashname, { key }, [callback](const std::unordered_map<string, string>& value){
            if (callback != nullptr && !value.empty())
            {
                callback((*value.begin()).second);
            }
        });
    }

    void    hmget(const string& hashname, const std::vector<string>& keys, const STRINGMAP_RESULT_CALLBACK& callback = nullptr)
    {
        mStringStream << "SELECT key, value FROM public." << mHashTableName << " where ";
        auto it = keys.begin();
        do
        {
            mStringStream << "key=‘" << (*it) << "‘";

            ++it;
        } while (it != keys.end() && &(mStringStream << " or ") != nullptr);
        mStringStream << ";";

        postQuery(mStringStream.str(), [callback](const PGresult* result){
            if (callback != nullptr)
            {
                std::unordered_map<string, string> ret;
                if (PQresultStatus(result) == PGRES_TUPLES_OK)
                {
                    int num = PQntuples(result);
                    int fileds = PQnfields(result);
                    if (fileds == 2)
                    {
                        for (int i = 0; i < num; i++)
                        {
                            ret[PQgetvalue(result, i, 0)] = PQgetvalue(result, i, 1);
                        }
                    }
                }

                callback(ret);
            }
        });
    }

    void    hset(const string& hashname, const string& key, const string& value, const BOOL_RESULT_CALLBACK& callback = nullptr)
    {
        mStringStream << "INSERT INTO public." << mHashTableName << "(hashname, key, value) VALUES(‘" << hashname << "‘, ‘" << key << "‘, ‘" << value
            << "‘) ON CONFLICT (hashname, key) DO UPDATE SET value = EXCLUDED.value;";

        postQuery(mStringStream.str(), [callback](const PGresult* result){
            if (callback != nullptr)
            {
                callback(PQresultStatus(result) == PGRES_COMMAND_OK);
            }
        });
    }

    void  hgetall(const string& hashname, const STRINGMAP_RESULT_CALLBACK& callback = nullptr)
    {
        mStringStream << "SELECT key, value FROM public." << mHashTableName << " where hashname = ‘" << hashname << "‘;";
        postQuery(mStringStream.str(), [callback](const PGresult* result){
            if (callback != nullptr)
            {
                std::unordered_map<string, string> ret;
                if (PQresultStatus(result) == PGRES_TUPLES_OK)
                {
                    int num = PQntuples(result);
                    int fileds = PQnfields(result);
                    if (fileds == 2)
                    {
                        for (int i = 0; i < num; i++)
                        {
                            ret[PQgetvalue(result, i, 0)] = PQgetvalue(result, i, 1);
                        }
                    }
                }

                callback(ret);
            }
        });
    }

    void    postQuery(const string&& query, const RESULT_CALLBACK& callback = nullptr)
    {
        mPendingQuery.push({ std::move(query), callback});
        mStringStream.str(std::string());
        mStringStream.clear();
    }

    void    postQuery(const string& query, const RESULT_CALLBACK& callback = nullptr)
    {
        mPendingQuery.push({ query, callback });
        mStringStream.str(std::string());
        mStringStream.clear();
    }

public:
    void    poll(int millSecond)
    {
        ox_fdset_poll(mfdset, millSecond);

        std::vector<int> closeFds;

        for (auto& it : mConnections)
        {
            auto fd = it.first;
            auto connection = it.second;
            auto pgconn = connection->pgconn;

            if (ox_fdset_check(mfdset, fd, ReadCheck))
            {
                if (PQconsumeInput(pgconn) > 0 && PQisBusy(pgconn) == 0)
                {
                    bool successGetResult = false;

                    while (true)
                    {
                        auto result = PQgetResult(pgconn);
                        if (result != nullptr)
                        {
                            successGetResult = true;
                            if (connection->callback != nullptr)
                            {
                                connection->callback(result);
                                connection->callback = nullptr;
                            }
                            PQclear(result);
                        }
                        else
                        {
                            break;
                        }
                    }

                    if (successGetResult)
                    {
                        mIdleConnections.push_back(connection);
                    }
                }

                if (PQstatus(pgconn) == CONNECTION_BAD)
                {
                    closeFds.push_back(fd);
                }
            }

            if (ox_fdset_check(mfdset, fd, WriteCheck))
            {
                if (PQflush(pgconn) == 0)
                {
                    //移除可写检测
                    ox_fdset_del(mfdset, fd, WriteCheck);
                }
            }
        }

        for (auto& v : closeFds)
        {
            removeConnection(v);
        }
    }

    void    trySendPendingQuery()
    {
        while (!mPendingQuery.empty() && !mIdleConnections.empty())
        {
            auto& query = mPendingQuery.front();
            auto& connection = mIdleConnections.front();

            if (PQsendQuery(connection->pgconn, query.request.c_str()) == 0)
            {
                cout << PQerrorMessage(connection->pgconn) << endl;
                if (query.callback != nullptr)
                {
                    query.callback(nullptr);
                }
            }
            else
            {
                ox_fdset_add(mfdset, PQsocket(connection->pgconn), WriteCheck);
                connection->callback = query.callback;
            }

            mPendingQuery.pop();
            mIdleConnections.pop_front();
        }
    }

    size_t  pendingQueryNum() const
    {
        return mPendingQuery.size();
    }

    size_t  getWorkingQuery() const
    {
        return mConnections.size() - mIdleConnections.size();
    }

    void    createConnection(  const char *pghost, const char *pgport,
                        const char *pgoptions, const char *pgtty,
                        const char *dbName, const char *login, const char *pwd,
                        int num)
    {
        for (int i = 0; i < num; i++)
        {
            auto pgconn = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName, login, pwd);
            if (PQstatus(pgconn) == CONNECTION_OK)
            {
                auto connection = std::make_shared<Connection>(pgconn, nullptr);
                mConnections[PQsocket(pgconn)] = connection;
                PQsetnonblocking(pgconn, 1);
                ox_fdset_add(mfdset, PQsocket(pgconn), ReadCheck);
                mIdleConnections.push_back(connection);
            }
            else
            {
                cout << PQerrorMessage(pgconn);
                PQfinish(pgconn);
                pgconn = nullptr;
            }
        }

        if (!mConnections.empty())
        {
            sCreateTable((*mConnections.begin()).second->pgconn, mKVTableName, mHashTableName);
        }
    }

private:
    void    removeConnection(int fd)
    {
        auto it = mConnections.find(fd);
        if (it != mConnections.end())
        {
            auto connection = (*it).second;
            for (auto it = mIdleConnections.begin(); it != mIdleConnections.end(); ++it)
            {
                if ((*it)->pgconn == connection->pgconn)
                {
                    mIdleConnections.erase(it);
                    break;
                }
            }

            ox_fdset_del(mfdset, fd, ReadCheck | WriteCheck);
            PQfinish(connection->pgconn);
            mConnections.erase(fd);
        }
    }

private:
    static  void    sCreateTable(PGconn* conn, const string& kvTableName, const string& hashTableName)
    {
        {
            string query = "CREATE TABLE public.";
            query += kvTableName;
            query += "(key character varying NOT NULL, value json, CONSTRAINT key PRIMARY KEY(key))";
            PGresult* exeResult = PQexec(conn, query.c_str());
            auto status = PQresultStatus(exeResult);
            auto errorStr = PQresultErrorMessage(exeResult);
            PQclear(exeResult);
        }

        {
            string query = "CREATE TABLE public.";
            query += hashTableName;
            query += "(hashname character varying, key character varying, value json, "
                    "CONSTRAINT hk PRIMARY KEY (hashname, key))";
            PGresult* exeResult = PQexec(conn, query.c_str());
            auto status = PQresultStatus(exeResult);
            auto errorStr = PQresultErrorMessage(exeResult);
            PQclear(exeResult);
        }
    }

private:
    struct QueryAndCallback
    {
        std::string request;
        RESULT_CALLBACK  callback;
    };

    struct Connection
    {
        PGconn* pgconn;
        RESULT_CALLBACK callback;

        Connection(PGconn* p, RESULT_CALLBACK c)
        {
            pgconn = p;
            callback = c;
        }
    };

    const string                                    mKVTableName;
    const string                                    mHashTableName;

    stringstream                                    mStringStream;
    fdset_s*                                        mfdset;

    std::unordered_map<int, shared_ptr<Connection>> mConnections;
    std::list<shared_ptr<Connection>>               mIdleConnections;

    std::queue<QueryAndCallback>                    mPendingQuery;

    /*TODO::监听wakeup支持*/
    /*TODO::考虑固定分配connection给某业务*/

    /*TODO::编写储存过程,替换现有的hashtable模拟方式,如循环使用jsonb_set以及 select value->k1, value->k2 from ...*/
    /*TODO::编写储存过程,实现list*/
};

int main()
{
    using std::chrono::system_clock;

    AsyncPGClient asyncClient;
    asyncClient.createConnection("192.168.12.1", "5432", nullptr, nullptr, "postgres", "postgres", "19870323", 8);
    system_clock::time_point startTime = system_clock::now();

    auto nowTime = time(NULL);

    for (int i = 0; i < 100000; i++)
    {
        if(false)
        {
            string test = "INSERT INTO public.kv_data(key, value) VALUES (‘";
            test += std::to_string(nowTime*1000+i);
            test += "‘, ‘{\"hp\":100000}‘) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value;";

            asyncClient.postQuery(test);
        }
        else
        {
            asyncClient.postQuery("select * from public.kv_data where key=‘dd‘;");
        }
    }

    asyncClient.postQuery("INSERT INTO public.kv_data(key, value) VALUES (‘dodo5‘, ‘{\"hp\":100000}‘) "
        " ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", [](const PGresult* result){
        cout << "fuck" << endl;
    });

    asyncClient.get("dd", [](const string& value){
        cout << "get dd : " << value << endl;
    });

    asyncClient.set("dd", "{\"hp\":456}", [](bool isOK){
        cout << "set dd : " << isOK << endl;
    });

    asyncClient.hget("heros:dodo", "hp", [](const string& value){
        cout << "hget heros:dodo:" << value << endl;
    });

    asyncClient.hset("heros:dodo", "hp", "{\"hp\":1}", [](bool isOK){
        cout << "hset heros:dodo:" << isOK << endl;
    });

    asyncClient.hmget("heros:dodo", { "hp", "money" }, [](const unordered_map<string, string>& kvs){
        cout << "hmget:" << endl;
        for (auto& kv : kvs)
        {
            cout << kv.first << " : " << kv.second << endl;
        }
    });

    asyncClient.hgetall("heros:dodo", [](const unordered_map<string, string>& kvs){
        cout << "hgetall:" << endl;
        for (auto& kv : kvs)
        {
            cout << kv.first << " : " << kv.second << endl;
        }
    });

    while (true)
    {
        asyncClient.poll(1);
        asyncClient.trySendPendingQuery();
        if (asyncClient.pendingQueryNum() == 0 && asyncClient.getWorkingQuery() == 0)
        {
            break;
        }
    }

    auto elapsed = system_clock::now() - startTime;
    cout << "cost :" << chrono::duration<double>(elapsed).count() << "s" << endl;
    cout << "enter any key exit" << endl;
    cin.get();
    return 0;
}

代码地址:https://github.com/IronsDu/accumulation-dev/blob/master/examples/Pgedis.cpp

时间: 2024-12-09 11:31:50

PostgreSQL异步客户端(并模拟redis 数据结构)的相关文章

REdis数据结构服务器

Rdis和JQuery一样是纯粹为应用而产生的,这里记录的是在CentOS 5.7上学习入门文章: 1.Redis简介  Redis是一个key-value存储系统.和Memcached类似,但是解决了断电后数据完全丢失的情况,而且她支持更多无化的value类型,除了和string外,还支持lists(链表).sets(集合)和zsets(有序集合)几种数据类型.这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的. 2.Redis的

Redis学习系列六ZSet(有序列表)及Redis数据结构的过期

一.简介 ZSet可以说是Redis中最有趣的数据结构了,因为他兼具了Hash集合和Set的双重特性,也是用的最多的,保证了value值的唯一性的同时,,同时又保证了高性能,最主要的是还可以给每个Value设置Source(权重),那么我们就可以通过权重进行排序,这在业务上是非常常见的,比如很多地方需要,比如我们需要对所有用户的数学成绩进行排序.对英语等等地例子比比皆是,那么通过ZSet,你将会得到一个响应速度非常快的过程.下面会介绍. ZSet的内部原理是通过跳跃列表来实现的,这里还是不想说太

redis数据结构、持久化、缓存淘汰策略

Redis 单线程高性能,它所有的数据都在内存中,所有的运算都是内存级别的运算,而且单线程避免了多线程的切换性能损耗问题.redis利用epoll来实现IO多路复用,将连接信息和事件放到队列中,依次放到文件事件分派器,事件分派器将事件分发给事件处理器. 1.Redis数据结构及简单操作指令 String.list.set.hash.zset(有序set) 总体来说redis都是通过Key-Value的形式来存储数据的.只是不用数据类型Value的形式不同. String:最简单数据结构,比如我们

Redis 数据结构使用场景

Redis 数据结构使用场景 redis共有5种数据结构,每种的使用场景都是什么? 一.redis 数据结构使用场景 原来看过 redisbook 这本书,对 redis 的基本功能都已经熟悉了,从上周开始看 redis 的源码.目前目标是吃透 redis 的数据结构.我们都知道,在 redis 中一共有5种数据结构,那每种数据结构的使用场景都是什么呢? String——字符串 Hash——字典 List——列表 Set——集合 Sorted Set——有序集合 下面我们就来简单说明一下它们各自

Netty模拟redis服务器

Redis的客户端与服务端采用叫做 RESP(Redis Serialization Protocol)的网络通信协议交换数据,QKxue.NET客户端和服务器通过 TCP 连接来进行数据交互, 服务器默认的端口号为 6379 .客户端和服务器发送的命令或数据一律以 \r\n (CRLF)结尾. RESP支持五种数据类型: 状态回复(status reply):以“+”开头,表示正确的状态信息,”+”后就是具体信息?,比如: redis 127.0.0.1:6379> set ss sdfOK其

Redis 数据结构与内存管理策略(下)

Redis 数据结构与内存管理策略(下) Redis 数据类型特点与使用场景 String.List.Hash.Set.Zset 案例:沪江团购系统大促 hot-top 接口 cache 设计 Redis 内存数据结构与编码 OBJECT encoding key.DEBUG OBJECT key 简单动态字符串(simple dynamic string) 链表(linked list) 字典(dict) 跳表(skip list) 整数集合(int set) 压缩表(zip list) Re

redis安装-模拟redis集群

在虚拟机上模拟redis集群,由于redis的投票机制,一个集群至少需要3个redis节点,如果每个节点设置一主一备,一共需要六台虚拟机来搭建集群,此处,在一台虚拟机上使用6个redis实例来模拟搭建一个伪分布式的redis集群. 1.安装ruby 搭建redis集群需要ruby脚本,需要安装ruby的环境 (1)yum install ruby (2) yum install rubygems 2. 创建一个集群的目录 3. 复制编译安装好的redis节点 进入redis01目录,删除快照文件

java 16 - 5 LinkedList模拟栈数据结构的集合

请用LinkedList模拟栈数据结构的集合,并测试 题目的意思是: 你自己的定义一个集合类,在这个集合类内部可以使用LinkedList模拟. 1 package cn_LinkedList; 2 3 import java.util.LinkedList; 4 5 6 7 8 public class MyStack { 9 10 //定义一个LinkedList类的成员变量 11 private LinkedList list = null; 12 13 /** 14 * 构造方法 15

采用LinkedList来模拟栈数据结构的集合--先进后出

三.用LinkedList来模拟栈数据结构的集合 /* * 自定义一个数据结构为LinkedList的集合类*/public class MyCollection_LinkedList { public LinkedList linkedList;            public MyCollection_LinkedList() {             //在构造方法里初始化             linkedList= new LinkedList();             }