python基于mysql实现的简单队列以及跨进程锁

在我们做多进程应用开发的过程中,难免会遇到多个进程访问同一个资源(临界资源)的状况,必须通过加一个全局性的锁,来实现资源的同步访问(同一时间只能有一个进程访问资源)。

举个例子:

假设我们用mysql来实现一个任务队列,实现的过程如下:

1. 在Mysql中创建Job表,用于储存队列任务,如下:

create table jobs(

id auto_increment not null primary key,

message text not null,

job_status not null default 0

);

message 用来存储任务信息,job_status用来标识任务状态,假设只有两种状态,0:在队列中, 1:已出队列

2. 有一个生产者进程,往job表中放新的数据,进行排队

insert into jobs(message) values(‘msg1‘);

3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;

update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id

4. 如果没有跨进程的锁,两个消费者进程有可能同时取到重复的消息,导致一个消息被消费多次。这种情况是我们不希望看到的,于是,我们需要实现一个跨进程的锁。

这里我贴出非常好的一篇文章,大家可以参照一下:

https://blog.engineyard.com/2011/5-subtle-ways-youre-using-mysql-as-a-queue-and-why-itll-bite-you

=========================华丽的分割线=======================================

说道跨进程的锁实现,我们主要有几种实现方式:

1. 信号量

2. 文件锁fcntl

3. socket(端口号绑定)

4. signal

这几种方式各有利弊,总体来说前2种方式可能多一点,这里我就不详细说了,大家可以去查阅资料。

查资料的时候发现mysql中有锁的实现,适用于对于性能要求不是很高的应用场景,大并发的分布式访问可能会有瓶颈,链接如下:

http://dev.mysql.com/doc/refman/5.0/fr/miscellaneous-functions.html

我用python实现了一个demo,如下:

文件名:glock.py

#!/usr/bin/env python2.7
#
# -*- coding:utf-8 -*-
#
#   Author  :   yunjianfei
#   E-mail  :   [email protected]
#   Date    :   2014/02/25
#   Desc    :
#

import logging, time
import MySQLdb

class Glock:
    def __init__(self, db):
        self.db = db

    def _execute(self, sql):
        cursor = self.db.cursor()
        try:
            ret = None
            cursor.execute(sql)
            if cursor.rowcount != 1:
                logging.error("Multiple rows returned in mysql lock function.")
                ret = None
            else:
                ret = cursor.fetchone()
            cursor.close()
            return ret
        except Exception, ex:
            logging.error("Execute sql \"%s\" failed! Exception: %s", sql, str(ex))
            cursor.close()
            return None

    def lock(self, lockstr, timeout):
        sql = "SELECT GET_LOCK(‘%s‘, %s)" % (lockstr, timeout)
        ret = self._execute(sql)

        if ret[0] == 0:
            logging.debug("Another client has previously locked ‘%s‘.", lockstr)
            return False
        elif ret[0] == 1:
            logging.debug("The lock ‘%s‘ was obtained successfully.", lockstr)
            return True
        else:
            logging.error("Error occurred!")
            return None

    def unlock(self, lockstr):
        sql = "SELECT RELEASE_LOCK(‘%s‘)" % (lockstr)
        ret = self._execute(sql)
        if ret[0] == 0:
            logging.debug("The lock ‘%s‘ the lock is not released(the lock was not established by this thread).", lockstr)
            return False
        elif ret[0] == 1:
            logging.debug("The lock ‘%s‘ the lock was released.", lockstr)
            return True
        else:
            logging.error("The lock ‘%s‘ did not exist.", lockstr)
            return None

#Init logging
def init_logging():
    sh = logging.StreamHandler()
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    formatter = logging.Formatter(‘%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s‘)
    sh.setFormatter(formatter)
    logger.addHandler(sh)
    logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel()))

def main():
    init_logging()
    db = MySQLdb.connect(host=‘localhost‘, user=‘root‘, passwd=‘‘)
    lock_name = ‘queue‘

    l = Glock(db)

    ret = l.lock(lock_name, 10)
    if ret != True:
        logging.error("Can‘t get lock! exit!")
        quit()
    time.sleep(10)
    logging.info("You can do some synchronization work across processes!")
    ##TODO
    ## you can do something in here ##
    l.unlock(lock_name)

if __name__ == "__main__":
    main()

在main函数里, l.lock(lock_name, 10) 中,10是表示timeout的时间是10秒,如果10秒还获取不了锁,就返回,执行后面的操作。

在这个demo中,在标记TODO的地方,可以将消费者从job表中取消息的逻辑放在这里。即分割线以上的:

3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;

update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id

这样,就能保证多个进程访问临界资源时同步进行了,保证数据的一致性。

测试的时候,启动两个glock.py, 结果如下:

Java代码  

  1. [@tj-10-47 test]# ./glock.py
  2. 2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG
  3. 2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock ‘queue‘ was obtained successfully.
  4. 2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
  5. 2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock ‘queue‘ the lock was released.

可以看到第一个glock.py是 17:08:50解锁的,下面的glock.py是在17:08:50获取锁的,可以证实这样是完全可行的。

[@tj-10-47 test]# ./glock.py

2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG

2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock ‘queue‘ was obtained successfully.

2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!

2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock ‘queue‘ the lock was released.

[@tj-10-47 test]#

python基于mysql实现的简单队列以及跨进程锁

时间: 2024-10-05 04:40:13

python基于mysql实现的简单队列以及跨进程锁的相关文章

基于ADB框架Robotium跨进程操作

转自:http://blog.csdn.net/qingchunjun/article/details/42580937 2015年2月3日更新: 有些朋友在用真机尝试本方法时,抛出了InputStream cannot be null的异常.该异常是由于adb运行在robotium框架中时,是完全运行在手机中的,此时它的权限受到android系统的限制.而原框架是用在PC端的,这才导致了该异常的出现.具体的原因分析可以见我的这篇文章:http://blog.csdn.net/qingchunj

进击的Python【第十二章】:mysql介绍与简单操作,sqlachemy介绍与简单应用

进击的Python[第十二章]:mysql介绍与简单操作,sqlachemy介绍与简单应用 一.数据库介绍 什么是数据库? 数据库(Database)是按照数据结构来组织.存储和管理数据的仓库,每个数据库都有一个或多个不同的API用于创建,访问,管理,搜索和复制所保存的数据.我们也可以将数据存储在文件中,但是在文件中读写数据速度相对较慢.所以,现在我们使用关系型数据库管理系统(RDBMS)来存储和管理的大数据量.所谓的关系型数据库,是建立在关系模型基础上的数据库,借助于集合代数等数学概念和方法来

在SAE搭建Python+Django+MySQL(基于Windows)

为了与时俱进,工作闲余开始研究Python,刚一接触就被Python这"优雅"的语法吸引住!后来接触到了Django,虽然还没有太深入的研究,但对这种新概念的WEB开发很感兴趣,决定尝试用它代替Java开发小的后台程序,边用边学吧! 注:本篇只介绍基础搭建过程,不负责讲解,因为刚接触Python我也不懂! 一.SAE使用的Python环境版本 Python2.7 Django1.27 / 1.4 / 1.5 本地版本要确定好,需要用到其他的预装模块也要确定好. (我一开始使用了高版本的

基于MySQL元数据的Hive的安装和简单测试

引言: Hive是一种强大的数据仓库查询语言,类似SQL,本文将介绍如何搭建Hive的开发测试环境. 1. 什么是Hive? hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行. 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析. 2.  按照Hive的准备条件 2.1  Hadoop集

基于Servlet、JSP、JDBC、MySQL的一个简单的用户注册模块(附完整源码)

最近看老罗视频,做了一个简单的用户注册系统.用户通过网页(JSP)输入用户名.真名和密码,Servlet接收后通过JDBC将信息保存到MySQL中.虽然是个简单的不能再简单的东西,但麻雀虽小,五脏俱全,在此做一归纳和整理.下面先上源码: 一.index.jsp <%@ page language="java" import="java.util.*" pageEncoding="utf-8"%> <% String path =

python操作MySQL 模拟简单银行转账操作

一.基础知识 1.MySQL-python的安装 下载,然后 pip install 安装包 2.python编写通用数据库程序的API规范 (1).数据库连接对象 connection,建立python客户端与数据库的网络连接,创建方法为 MySQLdb.Connect(参数) 参数有六个:     host(MySQL服务器地址,一般本地为127.0.0.1) port(MySQL服务器端口号) user(用户名) passwd(密码) db(数据库名称) charset(连接编码) con

Python操作Mysql基础教程

Python操作Mysql 最近在学习python,这种脚本语言毫无疑问的会跟数据库产生关联,因此这里介绍一下如何使用python操作mysql数据库.我python也是零基础学起,所以本篇博客针对的是python初学者,大牛可以选择绕道. 另外,本篇博客基于的环境是Ubuntu13.10,使用的python版本是2.7.5. MYSQL数据库 MYSQL是一个全球领先的开源数据库管理系统.它是一个支持多用户.多线程的数据库管理系统,与Apache.PHP.Linux共同组成LAMP平台,在we

在python下比celery更加简单的异步任务队列RQ

前言: 这里介绍一个python下,比celery更加简单的异步工具,真的是很简单,当然他的功能没有celery多,复杂程度也没有celery大,文档貌似也没有celery多,但是为啥会介绍rq这个东西 因为他够简单. 当然他虽然简单,但是也是需要中间人的,也就是 Broker,这里只能是redis了. 他没有celery支持的那么多,比如 redis rabbitmq mongodb mysql之类的. 说回来,咱们用rq,就是看重他的简单. 如对celery有兴趣,可以看看我以前写过的博文.

Python 12 - Mysql &amp; ORM

本节内容 1.数据库介绍 2.mysql数据库安装使用 3.mysql数据库基础 4.mysql命令 5.事务 6.索引 7.Python操作mysql 8.ORM sqlalchemy了解 数据库介绍 什么是数据库? (介于本人还是属于熟悉数据库的,这一块就基本复制粘贴了) 数据库(Database)是按照数据结构来组织.存储和管理数据的仓库, 每个数据库都有一个或多个不同的API用于创建,访问,管理,搜索和复制所保存的数据. 我们也可以将数据存储在文件中,但是在文件中读写数据速度相对较慢.