【转】工作队列学习

首先要注意本文的两个概念:(1)使用内核提供的工作队列, (2)自己创建工作队列

http://blog.csdn.net/fontlose/article/details/8286445

工作队列是一种将工作推后执行的形式,交由一个内核线程去执行在进程上下文执行,其不能访问用户空间。最重要特点的就是工作队列允许重新调度甚至是睡眠。

工作队列子系统提供了一个默认的工作者线程来处理这些工作。默认的工作者线程叫做events/n,这里n是处理器的编号,每个处理器对应一个线程,也可以自己创建工作者线程。

工作的定义

typedef void
(*work_func_t)(struct work_struct *work);

定义中初始化处理函数

DECLARE_WORK(n,
f);

#define DECLARE_WORK(n,
f) struct work_struct n = __WORK_INITIALIZER(n, f)

#define
__WORK_INITIALIZER(n, f) {    
  \

.data = WORK_DATA_INIT(0),  
 \

.entry = { &(n).entry, &(n).entry }, \

.func = (f)    
  \

}

先定义中后初始化处理函数

struct
work_struct

INIT_WORK(struct
work_struct *work, func_t);

#define INIT_WORK(_work,
_func) \

do {      
     
     
     \

__INIT_WORK((_work), (_func), 0); \

} while (

在使用带delay的函数或宏时使用DECLARE_DELAYED_WORK定义和INIT_DELAYED_WORK初始化。

1.使用内核提供的共享列队

对工作进行调度,即把给定工作的处理函数提交给缺省的工作队列和工作者线程。

int
schedule_work(struct work_struct *work);

确保没有工作队列入口在系统中任何地方运行。

void
flush_scheduled_work(void);

延时执行一个任务

int
schedule_delayed_work(struct delayed_struct *work, unsigned long
delay);

从一个工作队列中去除入口;

int
cancel_delayed_work(struct delayed_struct *work);

测试例子

void
myfunc(struct work_struct*ws);

DECLARE_WORK(mywork,myfunc);  
     
     
   //定义

void myfunc(struct work_struct*ws)

{

printk(KERN_ALERT "myfunc
current->pid %d\n",current->pid);

ssleep(1);

printk(KERN_ALERT "myfunc
current->pid %d\n",current->pid);

ssleep(1);

printk(KERN_ALERT "myfunc
current->pid %d\n",current->pid);

ssleep(1);

}

在加载模块时调用

schedule_work(&mywork);

printk(KERN_ALERT "main current->pid
 %d\n" ,current->pid);

在卸载模块的时调用

cancel_delayed_work(&mywork);

测试结果

输出的pid

main current->pid  2883

myfunc current->pid 4

myfunc current->pid 4

myfunc current->pid 4

[[email protected] module]# ps

PID USER  
    VSZ STAT COMMAND

1 root
     2108 S
   init

2 root
     
  0 SW   [ksoftirqd/0]

3 root
     
  0 SW   [watchdog/0]

4 root
     
  0 SW<  [events/0]

myfunc运行在pid为4
的进程中,查看pid为4的进程为events/0,使用内核提供的共享列队,列队是保持顺序执行的,做完一个工作才做下一个,如果一个工作内有耗时大的处理如阻塞等待信号或锁,那么后面的工作都不会执行。如果你不喜欢排队或不好意思让别人等太久,那么可以创建自己的工作者线程,所有工作可以加入自己创建
的工作列队,列队中的工作运行在创建的工作者线程中。

2.使用自定义列队

创建工作列队使用3个宏 成功后返回workqueue_struct
*指针,并创建了工作者线程。三个宏主要区别在后面两个参数singlethread和freezeable,singlethread为0时会为每个
cpu上创建一个工作者线程,为1时只在当前运行的cpu上创建一个工作者线程。freezeable会影响内核线程结构体thread_info的
PF_NOFREEZE标记

if
(!cwq->freezeable)

current->flags |= PF_NOFREEZE;

set_user_nice(current,
-5);

在线程函数内设置了测试点如下

if (cwq->freezeable)

try_to_freeze();

如果设置了PF_NOFREEZE这个flag,那么系统挂起时候这个进程不会被挂起。

主要函数

#define create_workqueue(name) __create_workqueue((name), 0,
0)      
     
     
     
 //多处理器时会为每个cpu创建一个工作者线程

#define create_freezeable_workqueue(name)
__create_workqueue((name), 1, 1)  
    //只创建一个工作者线程,系统挂起是线程也挂起

#define create_singlethread_workqueue(name)
__create_workqueue((name), 1, 0)  
 //只创建一个工作者线程,系统挂起是线程线程不挂起

以上三个宏调用__create_workqueue函数定义

extern struct workqueue_struct *__create_workqueue(const char
*name,int singlethread, int freezeable);

释放创建的工作列队资源

void destroy_workqueue(struct workqueue_struct *wq)

延时调用指定工作列队的工作

queue_delayed_work(struct workqueue_struct *wq,struct
delay_struct *work, unsigned long delay)

取消指定工作列队的延时工作

cancel_delayed_work(struct delay_struct *work)

将工作加入工作列队进行调度

queue_work(struct workqueue_struct *wq, struct work_struct
*work)

等待列队中的任务全部执行完毕。

void flush_workqueue(struct workqueue_struct *wq);

主要测试代码

void myfunc(struct work_struct*ws);

struct workqueue_struct *wqueue;

DECLARE_WORK(mywork,myfunc);

void myfunc(struct work_struct*ws)

{

printk(KERN_ALERT "myfunc
1 current->pid %d\n",current->pid);

ssleep(1);

printk(KERN_ALERT "myfunc
2 current->pid %d\n",current->pid);

ssleep(1);

printk(KERN_ALERT "myfunc
3 current->pid %d\n",current->pid);

ssleep(1);

}

在模块加载是执行

wqueue=create_workqueue("myqueue");

queue_work(wqueue,&mywork);

printk(KERN_ALERT "main current->pid
 %d\n" ,current->pid);

测试结果

main current->pid  1010

myfunc 1 current->pid 1016

myfunc 2 current->pid 1016

myfunc 3 current->pid 1016

ps

....

1016 root    
    0 SW<
 [myqueue/0]

可见函数运行在pid为1016的进程中,ps查看进程名为myqueue/0.

实例说明:

1.使用自定义工作队列

//=========

#include>linux /kernel.h>

#include>linux/module.h>

#include>linux/proc_fs.h>

#include>linux/workqueue.h>

#include>linux/sched.h>

#include>linux/init.h>

#include>linux/interrupt.h>

#include>linux/delay.h>

struct workqueue_struct *test_wq;

struct delayed_work test_dwq;

void delay_func(struct work_struct *work);

void delay_func(struct work_struct *work)

{

int i;

printk(KERN_INFO "My
name is delay_func!\n");

for (i = 0; i < 3;
i++) {

printk(KERN_ERR "delay_fun:i=%d\n", i);

msleep(1000);

}

}

static int __init example_init(void)

{

int i;

int ret;

test_wq =
create_workqueue("test_wq");

if (!test_wq) {

printk(KERN_ERR "No memory for workqueue\n");

return 1;

}

printk(KERN_INFO "Create
Workqueue successful!\n");

INIT_DELAYED_WORK(&test_dwq, delay_func);

ret =
queue_delayed_work(test_wq, &test_dwq, 5000);

printk(KERN_INFO "first
ret=%d!\n", ret);

for (i = 0; i < 3;
i++) {

printk(KERN_INFO "Example:ret= %d,i=%d\n", ret,
i);

msleep(100);

}

ret =
queue_delayed_work(test_wq, &test_dwq, 0);

printk(KERN_INFO "second
ret=%d!\n", ret);

return 0;

}

static void __exit example_exit(void)

{

int ret;

ret =
cancel_delayed_work(&test_dwq);

flush_workqueue(test_wq);

destroy_workqueue(test_wq);

printk(KERN_INFO
"Goodbay! ret=%d\n", ret);

}

module_init(example_init);

module_exit(example_exit);

MODULE_LICENSE("GPL");

运行结果:

kernel: Create Workqueue successful!

kernel: first ret=1!

kernel: Example:ret= 1,i=0

kernel: Example:ret= 1,i=1

kernel: Example:ret= 1,i=2

kernel: second ret=0!

kernel: Goodbay! ret=1

kernel: Create Workqueue successful!

说明将任务添加到工作队列后,如果工作队列还在执行该任务,则queue_delayed_work()返回1,否则返回0,如上实例所述;

主线程delay_wq将任务添加到工作队列后,使得工作队列在延迟delay后执行函数delay_func(),而delay_wq线程继续执行;

2.使用内核工作队列实例

#include>linux/module.h>

#include>linux/init.h>

#include>linux/kernel.h>

#include>linux/net.h>

#include>net/sock.h>

#include>linux/in.h>

#include>linux/types.h>

#include>linux/kthread.h>

#include>linux/wait.h>

#include>linux/skbuff.h>

#include>linux/string.h>

#include>asm-generic/unaligned.h>

#include>linux/sysctl.h>

#include>linux/netfilter.h>

#include>linux/netfilter_ipv4.h>

#include>asm/checksum.h>

#include>linux/ip.h>

#include>linux/workqueue.h>

#define err(msg) printk(KERN_INFO "%s failed.\n", msg)

static void defense_work_handler(struct work_struct *work);

static DECLARE_DELAYED_WORK(defense_work, defense_work_handler);

static void defense_work_handler(struct work_struct *work)
{
    printk(KERN_INFO "defense_work_handler function.\n");
}

static int __init main_init(void)
{
    schedule_delayed_work(&defense_work, 3 * HZ);

return 0;
}

static void __exit main_exit(void)
{
    cancel_delayed_work_sync(&defense_work);
}

module_init(main_init);
module_exit(main_exit);
MODULE_LICENSE("GPL");

时间: 2024-08-18 02:43:03

【转】工作队列学习的相关文章

RabbitMQ (消息队列)专题学习03 Work Queues(工作队列)

一.概述 工作队列(Work queues) (使用Java客户端) 在前面的专题学习中,我们使用Java语言实现了一个简单的从名为"hello"的队列中发送和接收消息的程序,在这部内容中我们将创建一个工作队列被用来分配定时消息任务,而且通过多个接收者(工作者)实现. 工作队列(又名任务队列),主要的思想是为了避免立即做一个资源密集型的任务(多消息同时密集发送),不必等待它完成,当运行许多工作者的让任务都在她们之间共享. 它在web应用中是非常有用的,因为在很短的时间内http请求窗口

rabbitmq消息队列学习——&quot;工作队列&quot;

二."工作队列" 在第一节中我们发送接收消息直接从队列中进行.这节中我们会创建一个工作队列来分发处理多个工作者中的耗时性任务. 工作队列主要是为了避免进行一些必须同步等待的资源密集型的任务.实际上我们将这些任务时序话稍后分发完成.我们将某个任务封装成消息然后发送至队列,后台运行的工作进程将这些消息取出然后执行这些任务.当你运行多个工作进程的时候,这些任务也会在它们之间共享. 前期准备 上一节的练习中我们发送的是简单包含"Hello World!"的消息,这节我们还发

RabbitMQ学习(二)工作队列

1.工作队列(Work Queue)又叫任务队列(Task Queue)指将任务分发个多个消费者. 2.实际操作: 这里使用一个生产者产生多条数据提供给3个消费者 生产者代码: public class Producter { //队列名称 private final static String QUEUE_NAME = "Work_Queue"; public static void main(String[] args) throws IOException, TimeoutExc

我的RabbitMQ学习2(工作队列)

创建一个工作队列 1.建立一个生成者  //初始化一个连接 生产者 -> (消费者) var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //对应的队列 channel.QueueDeclare(que

RabbitMQ学习第二记:工作队列的两种分发方式,轮询分发(Round-robin)和 公平分发(Fair dispatch)

1.什么是RabbitMQ工作队列 我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据.因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里.这时,就得使用工作队列了.一个队列有多个消费者同时消费数据. 下图取自于官方网站(RabbitMQ)的工作队列的图例 P:消息的生产者 C1:消息的消费者1 C2:消息的消费者2 红色:队列 生产者将消息发送到队列,多个消费者同时从队列中获

rabbitmq学习(三) —— 工作队列

工作队列,又称任务队列,主要思想是避免立即执行资源密集型任务,并且必须等待完成.相反地,我们进行任务调度,我们将一个任务封装成一个消息,并将其发送到队列.工作进行在后台运行不断的从队列中取出任务然后执行.当你运行了多个工作进程时,这些任务队列中的任务将会被工作进程共享执行. 这个概念在 Web 应用程序中特别有用,在短时间 HTTP 请求内需要执行复杂的任务. 准备工作 现在,假装我们很忙,我们使用 Thread.sleep 来模拟耗时的任务. 发送端 public class NewTask

【译】RabbitMQ:工作队列(Work Queue)

在第一篇我们写了两个程序通过一个命名的队列分别发送和接收消息.在这一篇,我们将创建一个工作队列在多个工作线程间分发耗时的工作任务. 工作队列的核心思想是避免立刻处理资源密集型任务导致必须等待其执行完成.相反的,我们安排这些任务在稍晚的时间完成.我们将一个任务封装为一个消息并把它发送到队列中.一个后台的工作线程将从队列中取出任务并最终执行.当你运行多个工作线程,这些任务将在这些工作线程间共享. 这个概念对于在一个HTTP请求中处理复杂任务的Web应用尤其有用. 准备工作 在前一篇中,我们发送了一条

RabbitMQ学习(三).NET Client之Publish/Subscribe

3 Publish/Subscribe Sending messages to many consumers at once Python | Java | Ruby | PHP| C# 转载请注明出处:jiq?钦's technical Blog Publish/Subscribe (using the .NET Client) 前面的教程我们已经学习了如何创建工作队列,工作队列背后的假设是每一个任务都被准确地递送给一个worker进行处理.这里我们将介绍完全不同的模式,即一个消息可以递送给多

Kestrel Web 服务器学习笔记

前言: ASP.NET Core 已经不是啥新鲜的东西,很多新启的项目都会首选 Core 做开发: 而 Kestrel 可以说是微软推出的唯一真正实现跨平台的 Web 服务器了: Kestrel 利用一个名为 KestrelEngine 的网络引擎实现对请求的监听.接收和响应: Ketrel 之所以具有跨平台的特质,源于 KestrelEngine 是在一个名为 libuv 的跨平台网络库上开发的: Kestrel is a cross-platform web server for ASP.N