多线程C调用python api的陷阱

众所周知,用脚本语言编写的服务(wsgi接口)都需要一个server容器,常见的如php的php-fpm, lightd等。python中一般是用的uwsgi,uwsgi是在wsgi的基础上的一种新的协议,可以用来部署python等脚本程序的运行。然而在不熟悉uwsgi的代码架构和c调用python的api情况下进行开发可能会遇到一些意想不到的问题。

我们先看一段代码,下面这段代码是用的Flask框架,每次请求的时候会把COUNT的值先减一再加一,最后再乘二。如果请求50次,其最终的结果应该是2的50次幂。

from flask import Flask, request

COUNT = 1 

app = Flask(__name__)

@app.route('/test_uwsgi')
def index():
    global COUNT
    COUNT=COUNT-1
    COUNT=COUNT+1
    COUNT=COUNT*2
    print COUNT
    return 'OK'
17179869184
34359738368
68719476736
137438953472
274877906944
549755813888
1099511627776
2199023255552
4398046511104
8796093022208
17592186044416
35184372088832
70368744177664
140737488355328
281474976710656
562949953421312
1125899906842624

这是直接执行50次index函数得到的最后几行的结果,可得结果为2的50次幂。

 536870912
 1073741824
2147483648
4294967296
8589934592
17179869184
34359738368
 68719476736
137438953472
274877906944
549755813888
1099511627776
 2199023255552
 4398046511104
8796093022208
17592186044416
 35184372088832
70368744177664
 140737488355328
281474976710656
5629499534213121125899906842624

这是通过ab测试,用多个并发访问/test_uwsgi接口50次得到的最后几行的结果。可以看出最终的结果肯定是个异常数字。为什么程序在uwsgi中运行时的运行结果会出现异常呢?

其实大家通过阅读这个简单的例子就可以发现,这种例子一般都是用来演示多线程共享数据同步问题的时候,如果不加锁会暴露问题的例子。下面的代码我们就在修改共享资源COUNT的时候加上互斥锁,看看有没有什么变化。

from flask import Flask, request
import threading

mutex = threading.Lock()
COUNT = 1 

app = Flask(__name__)

@app.route('/test_uwsgi')
def index():
    global COUNT
    global mutex
    mutex.acquire()
    COUNT=COUNT-1
    COUNT=COUNT+1
    COUNT=COUNT*2
    print COUNT
    mutex.release()
    return 'OK'

上面的代码也是放到uwsgi的容器里面运行,通过http接口多个并发访问50次,得到的结果是正确的。但是这是为什么呢?在我们原来的python代码中并没有写任何涉及多进程的操作,虽然uwsgi在配置文件中开启了多个线程可以并发的处理请求,但是按笔者原来的理解,不是应该每个线程执行自己独立的Python解释器吗?每个线程在运行python脚本的时候的数据不应该是隔离的吗?

为了弄明白上面的问题,我们不得不研究研究uwsgi及其server架构中的结构和设计。

UWSGI是在python中广泛使用的一个服务器应用容器,类似于php上常见的wsgi协议的服务器应用容器,如mod-php、php-fpm、lightd等。uwsgi协议是在原有的wsgi协议之上新增了一套uwsgi的协议。

通过研读uwsgi的源码(core/uwsgi.c core/loop.c core/init.c core/master_util.c core/util.c),可以知道uwsgi的server设计,采用的是UNX书中介绍归纳的服务器程序设计范式8,暨TCP预先创建线程服务器程序,每个线程各自accept。

int main(int argc, char *argv[], char *envp[]) {
	uwsgi_setup(argc, argv, envp);
	return uwsgi_run();
}

void uwsgi_setup(int argc, char *argv[], char *envp[]) {

	int i;

	struct utsname uuts;

	......

	...设置和初始化各种资源,这里就省略了,有兴趣的自己看看

	......

	//最主要的是这行
	uwsgi_start((void *) uwsgi.argv);
}

int uwsgi_start(void *v_argv) {

	......

	简化摘要一些主要的代码

	......

	... 题外话,这里是创建一个多线程的共享内存空间,后面uwsgi_setup_workers的时候会用到。
		因为uwsgi有一个master进程,可以监测各个子进程的状态,所以需要一块匿名共享内存
	// initialize sharedareas
	uwsgi_sharedareas_init();

	// setup queue
	if (uwsgi.queue_size > 0) {
		uwsgi_init_queue();
	}

	... 这里很重要,uwsgi.p是一个接口,uwsgi中部署的app在这里初始化(在uwsgi中,部署的APP需要所对应语言的插件,如python就用python插件)
		后面也会看到,实际上uwsgi所执行的python代码,其所有模块的import都在这里执行
	// initialize request plugin only if workers or master are available
	if (uwsgi.sockets || uwsgi.master_process || uwsgi.no_server || uwsgi.command_mode || uwsgi.loop) {
		for (i = 0; i < 256; i++) {
			if (uwsgi.p[i]->init) {
				uwsgi.p[i]->init();
			}
		}
	}

	// again check for workers/sockets...
	if (uwsgi.sockets || uwsgi.master_process || uwsgi.no_server || uwsgi.command_mode || uwsgi.loop) {
		for (i = 0; i < 256; i++) {
			if (uwsgi.p[i]->post_init) {
				uwsgi.p[i]->post_init();
			}
		}
	}

	。。。这里主要是设置各个worker的共享内存空间
	// initialize workers/master shared memory segments
	uwsgi_setup_workers();

	// here we spawn the workers...
	if (!uwsgi.status.is_cheap) {
		if (uwsgi.cheaper && uwsgi.cheaper_count) {
			int nproc = uwsgi.cheaper_initial;
			if (!nproc)
				nproc = uwsgi.cheaper_count;
			for (i = 1; i <= uwsgi.numproc; i++) {
				if (i <= nproc) {
					if (uwsgi_respawn_worker(i))
						break;
					uwsgi.respawn_delta = uwsgi_now();
				}
				else {
					uwsgi.workers[i].cheaped = 1;
				}
			}
		}
		else {
			for (i = 2 - uwsgi.master_process; i < uwsgi.numproc + 1; i++) {
				。。。这里就是根据我们设置的进程数,去fork子进程
				if (uwsgi_respawn_worker(i))
					break;
				uwsgi.respawn_delta = uwsgi_now();
			}
		}
	}

	// END OF INITIALIZATION
	return 0;

}

int uwsgi_respawn_worker(int wid) {

	。。。主要是这行代码,fork子进程,里面就不跟了
	pid_t pid = uwsgi_fork(uwsgi.workers[wid].name);

	if (pid == 0) {
		signal(SIGWINCH, worker_wakeup);
		signal(SIGTSTP, worker_wakeup);
		uwsgi.mywid = wid;
		uwsgi.mypid = getpid();
		// pid is updated by the master
		//uwsgi.workers[uwsgi.mywid].pid = uwsgi.mypid;
		// OVERENGINEERING (just to be safe)
		uwsgi.workers[uwsgi.mywid].id = uwsgi.mywid;
		/*
		   uwsgi.workers[uwsgi.mywid].harakiri = 0;
		   uwsgi.workers[uwsgi.mywid].user_harakiri = 0;
		   uwsgi.workers[uwsgi.mywid].rss_size = 0;
		   uwsgi.workers[uwsgi.mywid].vsz_size = 0;
		 */
		// do not reset worker counters on reload !!!
		//uwsgi.workers[uwsgi.mywid].requests = 0;
		// ...but maintain a delta counter (yes this is racy in multithread)
		//uwsgi.workers[uwsgi.mywid].delta_requests = 0;
		//uwsgi.workers[uwsgi.mywid].failed_requests = 0;
		//uwsgi.workers[uwsgi.mywid].respawn_count++;
		//uwsgi.workers[uwsgi.mywid].last_spawn = uwsgi.current_time;

	}
	else if (pid < 1) {
		uwsgi_error("fork()");
	}
	else {
		// the pid is set only in the master, as the worker should never use it
		uwsgi.workers[wid].pid = pid;

		if (respawns > 0) {
			uwsgi_log("Respawned uWSGI worker %d (new pid: %d)\n", wid, (int) pid);
		}
		else {
			uwsgi_log("spawned uWSGI worker %d (pid: %d, cores: %d)\n", wid, pid, uwsgi.cores);
		}
	}

	return 0;
}

int uwsgi_run() {

	。。。也是捡重要的摘抄一些
		如果pid是master,就执行master_loop
		如果pid是worker,就执行uwsgi_worker_run

	// !!! from now on, we could be in the master or in a worker !!!
	if (getpid() == masterpid && uwsgi.master_process == 1) {
		(void) master_loop(uwsgi.argv, uwsgi.environ);
	}

	//from now on the process is a real worker
	uwsgi_worker_run();
	// never here
	_exit(0);

}

void uwsgi_worker_run() {

	int i;

	if (uwsgi.lazy || uwsgi.lazy_apps) {
		uwsgi_init_all_apps();
	}

	uwsgi_ignition();

	// never here
	exit(0);

}

void uwsgi_ignition() {

	if (uwsgi.loop) {
		void (*u_loop) (void) = uwsgi_get_loop(uwsgi.loop);
		if (!u_loop) {
			uwsgi_log("unavailable loop engine !!!\n");
			exit(1);
		}
		if (uwsgi.mywid == 1) {
			uwsgi_log("*** running %s loop engine [addr:%p] ***\n", uwsgi.loop, u_loop);
		}
		u_loop();
		uwsgi_log("your loop engine died. R.I.P.\n");
	}
	else {
		。。。子进程的循环体,一般是用simple_loop
		if (uwsgi.async < 1) {
			simple_loop();
		}
		else {
			async_loop();
		}
	}

	// end of the process...
	end_me(0);
}

。。。一直到这里,在子进程的loop里面才开始创建接收处理request请求的线程
	线程的执行函数simple_loop_run也是一个循环,基本上都是常规步奏,accept,receive, response...,后面就不继续追下去了
	在reciev接到请求的数据后,会通过python_call的方法调用python脚本的wsgi函数,处理这个请求
void simple_loop() {
	uwsgi_loop_cores_run(simple_loop_run);
}

void uwsgi_loop_cores_run(void *(*func) (void *)) {
	int i;
	for (i = 1; i < uwsgi.threads; i++) {
		long j = i;
		pthread_create(&uwsgi.workers[uwsgi.mywid].cores[i].thread_id, &uwsgi.threads_attr, func, (void *) j);
	}
	long y = 0;
	func((void *) y);
}

简单来说,就是uwsgi中执行python脚本和直接运行python脚本是不同的。uwsgi执行python脚本是通过调用python c api的方法,首先通过调用api载入python脚本中的module,这时候,像最开始的实例代码一样module import中的相关代码会被执行,所有的全局变量在进程中被创建和初始化。然后uwsgi创建线程,开始处理请求调用python api(python_call),执行python脚本中处理请求的函数(wsgi接口),因为module
import在线程创建之前已经执行了,所以之前在进程中的共享数据在线程中是可以访问的。这里就是需要我们着重注意的,在访问这些线程间共享数据的时候需要加锁,或者在编写python脚本的时候尽量少用全局变量而多用单例模式,避免不必要的采坑。

其实上面所述只是对我遇到问题的一个简化,为了帮助大家弄明白uwsgi多线程执行python wsgi接口的相关问题。我所遇到的问题是在处理请求的函数中,调用了一个在全局中创建的gearman client,这个client库不是线程安全的,使用中也没有加锁。当请求的并发比较大的时候,gearman client这个库就会报出一些连接的异常。

后记:其实这个问题并不是十分的复杂,暴露的问题是对于uwsgi的代码结构,和python c api以及c调用python的方法和相关概念等不是非常的熟练,暴露了自己知识体系中的短板。由于那几天同时在开发几个需求,没有对问题进行详细的测试,没有仔细的分析和查找Trackback中的错误,反而是一直在怀疑被调用方的接口性能问题。在这个问题上,我的那个同事确实是棋高一招,分析出了问题,但是对于问题的原因也是一知半解,没有从原理上确实阐明原因,造成了我俩激烈的争执。其实也引出现在这个团队交流在交流上确实也存在一些问题,争论基本上靠喊,靠抢话,不是就是论事而是经常人身攻击。当有人的方案确实在理,确实证明是可用的时候,反对的人也宁死不妥协。。。这大概就是像新浪这样臃肿老旧缺少活力的大公司的通病吧。

时间: 2024-10-14 21:29:50

多线程C调用python api的陷阱的相关文章

在独立的python文件调用django api

在独立的python文件调用django api加入下面代码: 1 import os 2 import sys 3 root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 4 if root not in sys.path: sys.path.append(root) 5 if os.environ.get('DJANGO_SETTINGS_MODULE') == None: os.environ.setdefaul

Python调用微博API

上头叫通过微博ID获取用户发布过的历史微博内容,于是研究了下新浪微博提供的API 1 首先在微博开放中心下"创建应用"创建一个应用,应用信息那些随便填,填写完毕后,不需要提交审核,需要的只是那个app-key和app-secret 2 在"微博开放平台"的"管理中心"找到刚才创建的应用,点开这个应用,点开左边"应用信息"栏,会看见"App key"和"App Secret"的字样,这两个

使用Python调用Flickr API抓取图片数据

Flickr是雅虎旗下的图片分享网站,上面有全世界网友分享的大量精彩图片,被认为是专业的图片网站.其API也很友好,可以实现多种功能.这里我使用了Python调用其API获得了大量的照片数据. 首先需要先去Flickr注册成为其开发者,创建应用,获得API_KEY和API_SECRET,其API网址在:https://www.flickr.com/services/api/ Flickr提供了多种开发工具进行使用.这里我们使用Python开发工具.官方推荐的开发工具是Beej’s Python

python 调用zabbix api实现查询主机信息,输出所有主机ip

之前发现搜索出来的主机调用zabbix api信息都不是那么明确,后来通过zabbix官方文档,查到想要的api信息,随后写一篇自己这次项目中用到的api. #!/usr/bin/env python #coding:utf8 import requests import json headers = {'Content-Type': 'application/json-rpc'} server_ip = '10.37.149.109' url = 'http://%s/zabbix/api_j

python调用zabbix api接口实时展示数据

近日公司准备自已做一个运维管理平台,其中的监控部分,打算调用zabbix api接口来进行展示. 经过思考之后,计划获取如下内容: 1.  获得认证密钥 2.  获取zabbix所有的主机组 3.  获取单个组下的所有主机 4.  获取某个主机下的所有监控项 5.  获取某个监控项的历史数据 6.  获取某个监控项的最新数据 计划最后展示框架如下内容(这只是值方面,其它的会再加): 主机组1 ----主机名1---监控项1----当前值 ---监控项2----当前值 ----主机名2----监控

(二)Python调用Zabbix api之从入门到放弃——登录并获取身份验证令牌

访问zabbix api的URL是: http://x.x.x.x/zabbix/api_jsonrpc.php x.x.x.x可能是你的IP或者域名 访问流程概览: 1.首先登录 2.认证成功后zabbix server返回一个token 3.带着这个token去访问各种数据,做各种操作 4.完毕! 一.用RESTClient进行登录 在json请求的正文中,具有以下属性: jsonrpc - API使用的JSON-RPC协议的版本; Zabbix API实现JSON-RPC版本2.0; me

Python调用ansible API系列(四)动态生成hosts文件

方法一:通过最原始的操作文件的方式 #!/usr/bin/env python # -*- coding: utf-8 -*- """ 通过操作文件形式动态生成ansible的hosts文件 """ import sys class Inventory: def __init__(self): # ansible的hosts文件路径 self._hostsfile = "./aaa" self._data = self._ge

python调用docker API(CentOS6.5)

一 环境背景 python-2.7.8 docker 版本 1.15 (*yum安装为1.14版本,需升级为1.15,详见后续步骤) 二 获取Docker容器指标[指标可行性分析见笔记:] CPU :usr 和 system Cpu time Memory IP 三 整体步骤 容器指标值获取 Docker Python API 环境搭建 获取指标可行性分析 四  具体实现 1    以下python脚本为获取指定容器ID的ip,cpu,及memory [*框图部分需引入python docker

Python调用Windows API函数编写录音机和音乐播放器

功能描述: 1)使用tkinter设计程序界面: 2)调用Windows API函数实现录音机和音乐播放器. . 参考代码: ? 运行界面: ? 原文地址:https://www.cnblogs.com/7758520lzy/p/12149931.html