提高CTP的jswig_JAVA接口回调线程处理效率

ctp回调线程要快速返回,每次从ctp进入java时必然要产生一个新的线程对象匹配,效率实在太低。

在java中产生一个线程,该线程调用native方法进入本地代码形成工作线程,该线程负责读取缓冲区数据并调用java接口处理。

在swig的Spi代码中生成一个缓冲区,每次有回调发生时,把所有数据复制到缓冲区形成一个数据包,然后唤醒工作线程。

在thostmduserapi_se_wrap.h头文件增加:

 1 #include <Windows.h>
 2 struct DataHead {
 3     short packLength;//报文长度,
 4     short dataLength;//报文内有效数据长度,
 5     int  eventID;//事件ID
 6 };
 7 class SwigDirector_CThostFtdcMdSpi : public CThostFtdcMdSpi, public Swig::Director {
 8
 9 public:
10     CRITICAL_SECTION  g_mutex;//临界区变量
11     HANDLE  g_cond;//唤醒信号
12     //缓冲区
13     /*有数据到达时,调用lockBuf(int length)请求一段缓冲区,然后填入消息报数据。
14     消息报由头部和数据构成。DataHead。
15     */
16     char *dataBuf;
17     int buf_begin, buf_end,buf_length;
18     char* lockBuf(int length);//请求加锁的缓冲区
19     void notifyBuf(char* pPtr, int plength);//设置数据长度并唤醒处理线程
20     ~SwigDirector_CThostFtdcMdSpi();

在thostmduserapi_se_wrap.h头文件增加native工作线程代码:

 1 #ifdef __cplusplus
 2 extern "C" {
 3 #endif
 4     SWIGEXPORT jlong JNICALL Java_............._handleEvent(JNIEnv *jenv, jobject  swigjobj, jlong cPtr)
 5     {
 6         /*检查消息缓存,无消息则进入等待
 7         //jlong cPtr指向内存中SwigDirector_CThostFtdcMdSpi对象
 8         //
 9         */
10         DataHead *pPtr = NULL;
11         SwigDirector_CThostFtdcMdSpi *clsPtr = (SwigDirector_CThostFtdcMdSpi *)cPtr;
12         while (true){
13             EnterCriticalSection(&(clsPtr->g_mutex));
14             int asize = 0;
15             while (true){
16                 pPtr = NULL;
17                 asize = clsPtr->buf_end - clsPtr->buf_begin;
18                 if (asize < 0){
19                     asize += clsPtr->buf_length;
20                 }
21                 if (asize <= 0){
22                     break;
23                 }
24                 pPtr = (DataHead*)(clsPtr->dataBuf + clsPtr->buf_begin);
25                 if (pPtr->dataLength == 0){
26                     //判断是否空包,跳过空包
27                     clsPtr->buf_begin += pPtr->packLength;
28                     if (clsPtr->buf_begin >= clsPtr->buf_length){
29                         clsPtr->buf_begin = 0;//有空间
30                     }
31                 }
32                 else{
33                     break;
34                 }
35             }
36             LeaveCriticalSection(&(clsPtr->g_mutex));
37             if (asize <= 0 || pPtr->dataLength <= 0){
38                 //进入休眠
39                 WaitForSingleObject(clsPtr->g_cond, INFINITE);
40                 continue;
41             }
42             //开始调用回调方法pPtr
43             int eID = pPtr->eventID;
44             char *dPtr = (char*)pPtr;
45             dPtr += sizeof(DataHead);
46             if (eID == 0){
47                 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[0], swigjobj);
48             }
49             else if (eID == 1){
50                 jint jnReason = (jint)(*(int*)(dPtr));
51                 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[1], swigjobj, jnReason);
52             }
53             else if (eID == 3){
54                 jlong jpRspUserLogin = 0;
55                 jlong jpRspInfo = 0;
56                 jint jnRequestID;
57                 jboolean jbIsLast;
58                 *((CThostFtdcRspUserLoginField **)&jpRspUserLogin) = (CThostFtdcRspUserLoginField *)dPtr;
59                 dPtr += sizeof(CThostFtdcRspUserLoginField);
60                 *((CThostFtdcRspInfoField **)&jpRspInfo) = (CThostFtdcRspInfoField *)dPtr;
61                 dPtr += sizeof(CThostFtdcRspInfoField);
62                 jnRequestID = (jint)(*(int*)(dPtr));
63                 dPtr += sizeof(int);
64                 jbIsLast = (jboolean)(*(bool*)(dPtr));
65                 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[3], swigjobj, jpRspUserLogin, jpRspInfo, jnRequestID, jbIsLast);
66             }
67             else if (eID == 11){
68                 jlong jpDepthMarketData = 0;
69                 *((CThostFtdcDepthMarketDataField **)&jpDepthMarketData) = (CThostFtdcDepthMarketDataField *)dPtr;
70                 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[11], swigjobj, jpDepthMarketData);
71             }
72             pPtr->dataLength = 0;//变成空包
73             jthrowable swigerror = jenv->ExceptionOccurred();
74             if (swigerror) {
75                 Swig::DirectorException::raise(jenv, swigerror);
76             }
77             continue;
78         }
79         return 0;
80     }

在thostmduserapi_se_wrap.h头文件修改添加申请缓冲区、唤醒工作线程代码。并修改回调代码:

  1 //初始化数据
  2
  3 SwigDirector_CThostFtdcMdSpi::SwigDirector_CThostFtdcMdSpi(JNIEnv *jenv) : CThostFtdcMdSpi(), Swig::Director(jenv) {
  4     this->buf_length = 4096 * 1024;
  5     this->dataBuf = new char[this->buf_length];
  6     this->buf_begin = 0;
  7     this->buf_end = 0;
  8     InitializeCriticalSection(&(this->g_mutex));  //初始化临界区
  9     //以non-signaled创建auto-reset模式的事件对象
 10     this->g_cond = CreateEvent(NULL, FALSE, FALSE, NULL);
 11 }
 12 //清场
 13 SwigDirector_CThostFtdcMdSpi::~SwigDirector_CThostFtdcMdSpi(){
 14     delete this->dataBuf;
 15     DeleteCriticalSection(&this->g_mutex);  //释放临界区
 16     CloseHandle(this->g_cond);  //销毁事件对象
 17 }
 18
 19 //获取数据包缓冲区,并设置相关指针
 20 char* SwigDirector_CThostFtdcMdSpi::lockBuf(int plength){
 21     int asize;
 22     DataHead *pPtr = NULL;
 23     EnterCriticalSection(&g_mutex);
 24     if (this->buf_end >= this->buf_begin){
 25         //end指针大于begin指针
 26         asize = this->buf_length - this->buf_end;
 27         if (asize > plength + sizeof(DataHead) + sizeof(DataHead)){
 28             //可以容纳
 29             pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 30             pPtr->packLength = plength + sizeof(DataHead);
 31             pPtr->dataLength = -1;
 32             this->buf_end += pPtr->packLength;
 33         }
 34         else{
 35             if (this->buf_begin > 0){
 36                 //begin前有空间容纳end指针,可以分配或者填充空包
 37                 if (asize >= plength + sizeof(DataHead)){
 38                     //可以分配
 39                     pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 40                     pPtr->packLength = asize;
 41                     pPtr->dataLength = -1;
 42                     this->buf_end = 0;
 43                 }
 44                 else{
 45                     //可以填充空包
 46                     pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 47                     pPtr->packLength = asize;
 48                     pPtr->dataLength = 0;
 49                     this->buf_end = 0;
 50                     pPtr = NULL;
 51                 }
 52             }
 53             else{
 54                 pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 55                 pPtr->packLength = 0;
 56             }
 57         }
 58     }
 59     asize = this->buf_begin - this->buf_end - 1;//end不能大于等于begin
 60     if (pPtr == NULL){//不用else是因为上面可能没有分配空间
 61         if (asize >= plength + sizeof(DataHead)){
 62             //可以容纳
 63             pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 64             pPtr->packLength = plength + sizeof(DataHead);
 65             pPtr->dataLength = -1;
 66             this->buf_end += pPtr->packLength;
 67         }
 68         else{
 69             pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 70             pPtr->packLength = 0;
 71         }
 72     }
 73     if (pPtr->packLength == 0){
 74         pPtr = NULL;
 75     }
 76     LeaveCriticalSection(&g_mutex);
 77     return (char*)pPtr;
 78 }
 79 //唤醒线程
 80 void SwigDirector_CThostFtdcMdSpi::notifyBuf(char* pPtr, int plength){
 81     EnterCriticalSection(&g_mutex);
 82     ((DataHead*)pPtr)->dataLength = plength;
 83     SetEvent(g_cond);//唤醒线程
 84     LeaveCriticalSection(&g_mutex);
 85 }
 86
 87 //修改后的回调代码例子
 88 void SwigDirector_CThostFtdcMdSpi::OnRspUserLogin(CThostFtdcRspUserLoginField *pRspUserLogin, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {
 89     int len = sizeof(CThostFtdcRspUserLoginField) + sizeof(CThostFtdcRspInfoField) + sizeof(int) + sizeof(bool);
 90     char *pPtr = this->lockBuf(len);
 91     if (pPtr != NULL){
 92         char  *dPtr = pPtr;
 93         ((DataHead*)pPtr)->eventID = 3;
 94         dPtr += sizeof(DataHead);
 95         memcpy(dPtr, (char*)pRspUserLogin, sizeof(CThostFtdcRspUserLoginField));
 96         dPtr += sizeof(CThostFtdcRspUserLoginField);
 97         memcpy(dPtr, (char*)pRspInfo, sizeof(CThostFtdcRspInfoField));
 98         dPtr += sizeof(CThostFtdcRspInfoField);
 99         *(int*)(dPtr) = nRequestID;
100         dPtr += sizeof(int);
101         *(bool*)(dPtr) = bIsLast;
102         this->notifyBuf(pPtr,len);
103     }
104     return;
105 }

原文地址:https://www.cnblogs.com/bjguanmu/p/12582258.html

时间: 2024-10-21 19:05:55

提高CTP的jswig_JAVA接口回调线程处理效率的相关文章

笑谈接口回调

接口回调是个比较抽象但是很重要的知识,大多数初学者都会在刚接触它时感觉抓不住要领,但当我们实际掌握它后,会对它爱不释手.废话不多说,让我们开始吧. 我认识新事物的一般有这样的习惯,就是先从事物的名称入手.那么对于接口回调这个新事物,我们能从这个名字中获取多少信息呢?首先是接口,什么是接口呢?相信有一定编程基础的朋友都知道,就是一种规范,但是这种规范在没有实现之前,没有任何作用.就好比招聘广告一样,广告上列出的条件都是宽泛的,如我要招一名熟悉Java的程序员.这是一个规范,你想来应聘就必须满足熟悉

使用接口回调解析数据并下载网络图片

package DemoBook; import mode.CookBook; public class Demobook { // 向网络回去json数据,通过接口回调 // gson解析数据 // 根据pic图片地址,下载图片存入磁盘中 // gson解析数据 public static void main(String[] args) { String path = "http://www.qubaobei.com/ios/cf/dish_list.php?stage_id=1&l

简单使用rxjava,告别繁琐的接口回调(1)

接口回调虽然繁琐,但也是必须掌握的一个技巧,接口回调的用法很多,首先搞清楚什么时候需要用接口回调,怎么使用接口回调. public class InterQueryDataActivity extends AppCompatActivity { @Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.l

回调函数、Java接口回调 总结

回调函数 谈到回调,我们得先从回调函数说起,什么叫回调函数呢? 回调函数是什么?    百度百科的解释:回调函数就是一个通过函数指针调用的函数.如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指针被用为调用它所指向的函数时,我们就说这是回调函数.回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外一方调用的,用于对该事件或条件进行响应.    接着,我们从下图简单说明一下回调函数.    已知图形上面三种模块,此时标号2能称为回调函数吗? 答案:不能,只有当标号2函

java 接口回调

学习自:http://blog.csdn.net/xiaanming/article/details/8703708/ http://hellosure.iteye.com/blog/1130176 内容and评论都很精彩,现在只是学习了概念,以后在用到还需要继续研究. A类实现一个接口,接口中有回调方法f,类B有方法fun(),类A对象a有类B对象的引用b,a调用b的方法fun(),在这个方法fun()中又调用了a实现的接口里的方法f. 在安卓中,线程run方法就是一个回调方法,start之后

Java总结(十)—实现Runnable接口创建线程,线程安全同步,死锁(哲学家进餐问题),读写锁

一.通过实现Runnable接口创建线程 定义实现Runnable接口的类 (1)Runnable接口中只有一个方法public void run();用来定义线程运行体: class MyRun implements Runnable(){ public void run(){ 线程执行的具体代码 } } (2)创建线程的实例的时候将这个类的实例作为参数传递到线程实例内部.然后再启动: Thread thread=new Thread(new MyRun()); Thread.start();

java接口回调的经典使用案例

java接口回调的经典使用案例 内容简介: 接口回调的理解及需要关注的问题 接口回调一般常规用法 接口回调简洁用法 接口回调的意义 接口回调简介: 简单的说接口回调就是:调用者A类访问了被调用者B类中的M方法.这个M方法在执行完毕后又调用了A类中的方法. 问题? B类中的M方法是如何访问A类中的方法的呢?弄清了这个问题,也就明白了接口回调. 下面我们根据一个场景描述去编写代码: 我们在调用者Caller类中使用被调用者Collee类中的下载文件方法DownLoadFile().下载完成后要通知C

谈谈-Android中的接口回调技术

Android中的接口回调技术有很多应用的场景,最常见的:Activity(人机交互的端口)的UI界面中定义了Button,点击该Button时,执行某个逻辑. 下面参见上述执行的模型,讲述James对Android接口回调技术的理解(结合前人的知识和自己的实践). 使用一个比喻很形象地说明:客户端有个疑问打电话请教服务端,但服务端无法现场给出解答,相互之间约定:服务端一旦有答案,使用电话的方式反馈给客户端. 以上有三个主体:客户端.服务端和接口(方式). 接口回调的原理框图说明: Demo界面

接口回调和向上转型

抄别人的,但为了自己掌握更加的踏实,所以复制到这里,拜谢原创!http://blog.csdn.net/u014025369/article/details/24707525 接口回调是指:可以把使用实现了某一接口的类创建的对象的引用赋给该接口声明的接口变量,那么该接口变量就可以调用被类实现的接口的方法.实际上,当接口变量调用被类实现的接口中的方法时,就是通知相应的对象调用接口的方法,这一过程称为对象功能的接口回调.看下面示例.interface People {     void people