nodejs fork 子进程创建任务以及简单的prometheus 监控

以下是一个简单的基于nodejs 的fork 子进程创建子任务,同时使用prometheus 暴露一些简单的metrics
使用express 框架

环境准备

  • 项目结构
├── Dockerfile
├── README.md
├── app.js
├── docker-compose.yaml
├── grafana
│ └── metrics.json
├── metrics.js
├── package.json
├── prometheus.yml
├── send_mail.js
├── taskstatus.js
├── utils.js
└── yarn.lock
  • 结构说明
    send_mail.js 为简单的fork 子进程的代码
    taskstatus.js 为存储子进程任务状态的
    utils.js 为一个简单的工具模块,主要对于已经完成的子任务的清理
    metrics.js 为prometheus metrics 定义
    app.js express rest 接口操作以及任务处理的核心代码
    docker-compose.yaml docker-compose 全家桶运行
    Dockerfile nodejs 应用容器化
    grafana/metrics.json 为应用grafana 对于 prometheus 的dashboard 配置
  • 代码说明
    app.js 核心处理
const { fork } = require(‘child_process‘)
const express = require("express")
const util = require("./utils")
const uuid = require("uuid/v4")
const { child_process_status_all, child_process_status_pending, child_process_status_ok } = require("./taskstatus")
const { child_process_status_all_counter, child_process_status_pending_gauge, child_process_status_ok_counter, initMetrics, initGcStats, process_ok_clean_timer__status, up } = require("./metrics")
const app = express();
const main_process_id = process.pid;
let interval = false;
?
/**
 * metrics route register  注册prometheus metrcis 路由
 */
app.get(‘/metrics‘, (req, res) => {
  initMetrics(req, res);
})
/**
 * disable process clean timer 禁用定时任务清理
 */
app.get("/disable_timer", (req, res) => {
  if (interval) {
    interval = false;
  }
  process_ok_clean_timer__status.set(0)
  res.send({ timer_statuss: false })
})
/**
 * enable process clean timer 启用定时任务清理子进程
 */
app.get("/enable_timer", (req, res) => {
  if (interval == false) {
    interval = true;
  }
  process_ok_clean_timer__status.set(1)
  res.send({ timer_statuss: true })
})
?
/**
 * for create process workers
 */
app.get(‘/endpoint‘, (req, res) => {
  // fork another process  子进程的创建以及消息的通信(状态以及部分prometheus metrics 维护)
  const myprocess = fork(‘./send_mail.js‘);
  child_process_status_pending[myprocess.pid] = {
    status: "pending"
  }
  child_process_status_all[myprocess.pid] = {
    status: "pending"
  }
  child_process_status_all_counter.inc(1)
  child_process_status_pending_gauge.inc(1)
  console.log(`fork process pid:${myprocess.pid}`)
  const mails = uuid();
  // send list of e-mails to forked process
  myprocess.send({ mails });
  // listen for messages from forked process
  myprocess.on(‘message‘, (message) => {
    console.log(`Number of mails sent ${message.counter}`);
    child_process_status_ok[myprocess.pid] = {
      status: "ok"
    }
    child_process_status_ok_counter.inc(1)
    child_process_status_pending_gauge.dec(1)
    child_process_status_all[myprocess.pid] = {
      status: "ok"
    }
    delete child_process_status_pending[myprocess.pid]
  });
  return res.json({ status: true, sent: true });
});
?
/**
 * call api for stop processed workers   删除任务完成的子进程
 */
app.get("/stop", (req, res) => {
  util.stopProcess(main_process_id, (err, data) => {
    if (err == null) {
      res.send({ timer_clean_status: "ok" })
    }
  })
})
?
// init gc metrics   gc metrcis 暴露
initGcStats()
// clean ok process timer 定时任务清理完成的进程
setInterval(function () {
  if (interval) {
    util.stopProcess(main_process_id, (err, data) => {
      if (err == null) {
        console.log({ timer_clean_status: "ok" })
      } else {
        process_ok_clean_timer__status.set(0)
      }
    })
  }
}, 10000)
// set metric status to up 
up.set(1)
app.listen(8080, "0.0.0.0", () => {
  console.log(`go to http://localhost:8080/ to generate traffic`)
}).on("error", () => {
  up.set(0)
})

utils.js 定时任务清理模块

const psTree = require("ps-tree")
const {spawn } = require(‘child_process‘)
const {child_process_status_ok} = require("./taskstatus")
const {process_ok_clean_timer__status} = require("./metrics")
/**
 * 
 * @param {mainProcessID} mainProcessID 
 * @param {cb} callback for check status
 */
function stopProcess(mainProcessID,cb){
    psTree(mainProcessID, function (err, children) {
        if (err){
          process_ok_clean_timer__status.set(0)
        }
        let pids = [];
        for (const key in child_process_status_ok) {
          if (child_process_status_ok.hasOwnProperty(key)) {
            pids.push(key)
            delete child_process_status_ok[key]
          }
        }
        let info = children.filter(item => item.COMM == "ps" || item.COMMAND == "ps").map(function (p) { return p.PID })
        pids.push(...info)
        console.log(`stop child process ids: ${JSON.stringify(pids)}`)
        spawn(‘kill‘, [‘-9‘].concat(pids));
        cb(null,"ok")
      })
}
module.exports = {
    stopProcess
};

metrics.js prometheus metrics 模块
主要是metrics 定义,以及一个初始化的方法

const Prometheus = require("prom-client")
const gcStats = require(‘prometheus-gc-stats‘)
module.exports = {
    child_process_status_all_counter:new Prometheus.Counter({
        name: ‘child_process_status_all_total‘,
        help: ‘all running process‘,
        labelNames: [‘process_all‘]
      }),
      child_process_status_pending_gauge:new Prometheus.Gauge({
        name: ‘current_child_process_status_pending‘,
        help: ‘current pending process‘,
        labelNames: [‘process_pending‘]
      }),
      child_process_status_ok_counter:new Prometheus.Counter({
        name: ‘child_process_status_ok_total‘,
        help: ‘all ok process‘,
        labelNames: [‘process_ok‘]
      }),
      process_ok_clean_timer__status:new Prometheus.Gauge({
        name: ‘process_ok_clean_timer_status‘,
        help: ‘process_ok_clean_timer_status‘,
        labelNames: [‘process_timer‘]
      }),
      up:new Prometheus.Gauge({
        name: ‘up‘,
        help: ‘metrics_status‘,
        labelNames: [‘metrics_status‘]
      }),
      initGcStats: function(){
        const startGcStats = gcStats(Prometheus.register)
        startGcStats()
      },
      initMetrics:function(req,res){
        res.set(‘Content-Type‘, Prometheus.register.contentType)
        res.end(Prometheus.register.metrics())
      }
}

taskstatus.js 状态存储(很简单,就是一个json 对象)

module.exports = {
    child_process_status_all:{},
    child_process_status_pending:{},
    child_process_status_ok:{}
}

send_mail.js 子进程任务处理

async function sendMultipleMails(mails) {
    let sendMails = 0;
    // logic for
    // sending multiple mails
    return sendMails;
 }
 // receive message from master process
 process.on(‘message‘, async (message) => {
   console.log("get messageId",message)
   const numberOfMailsSend = await sendMultipleMails(message.mailsid); 
   setTimeout(()=>{
    process.send({ counter: numberOfMailsSend });
   },Number.parseInt(Math.random()*10000))
   // send response to master process
 });

Dockerfile 为了方便使用docker 运行

FROM node:12.14.0-alpine3.9
WORKDIR /app
COPY app.js /app/app.js
COPY package.json /app/package.json
COPY yarn.lock /app/yarn.lock
COPY metrics.js /app/metrics.js
COPY utils.js /app/utils.js
COPY taskstatus.js /app/taskstatus.js
COPY send_mail.js /app/send_mail.js
#CMD [ "yarn","app"]
EXPOSE 8080
RUN yarn
ENTRYPOINT [ "yarn","app" ]

docker-compose 文件主要是包含prometheus 以及grafana还有nodejs 应用

version: "3"
services:
  app:
    build: ./
    ports: 
    - "8080:8080"
    image: dalongrong/node-process
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
  prometheus:
    image: prom/prometheus
    volumes:
      - "./prometheus.yml:/etc/prometheus/prometheus.yml"
    ports:
      - "9090:9090"

启动&&效果

  • 启动
docker-compose up -d
  • 简单压测
ab -n 200 -c 20 http://localhost:8080/endpoint
  • grafana 效果

说明

以上就是一个简单的基于fork 以及prometheus 的nodejs 子任务创建

参考资料

https://github.com/rongfengliang/node-process-fork-learning
https://nodejs.org/api/child_process.html#child_process_child_process_fork_modulepath_args_options

原文地址:https://www.cnblogs.com/rongfengliang/p/12173253.html

时间: 2024-10-14 20:02:43

nodejs fork 子进程创建任务以及简单的prometheus 监控的相关文章

通过fork函数创建进程的跟踪,分析linux内核进程的创建

作者:吴乐 山东师范大学 <Linux内核分析>MOOC课程http://mooc.study.163.com/course/USTC-1000029000 一.实验过程 1.打开gdb,设置断点 2.跟踪到do_fork处 3.跟踪到copy_process断点处. 4.跟踪到ret_from_fork子进程创建完成. 二.代码部分分析 Fork的系统调用代码在linux/arch/i386/kernel/process.c中:       asmlinkage int sys_fork(s

fork同时创建多个子进程的方法

Fork同时创建多个子进程方法 第一种方法:验证通过 特点:同时创建多个子进程,每个子进程可以执行不同的任务,程序 可读性较好,便于分析,易扩展为多个子进程 int main(void) { printf("before fork(), pid = %d\n", getpid()); pid_t p1 = fork(); if( p1 == 0 ) { printf("in child 1, pid = %d\n", getpid()); return 0; //若

利用fork循环创建进程

我们知道,fork可以创建子进程,那么如果循环调用fork,进程之间会有什么关系呢? 得到结果: 从结果上分析,一共有4个进程,6132,6133,6134,6135,它们的关系是怎样的呢? 按道理来讲,刚开始i=0,只有一个进程6132,fork后,产生子进程6133.6132执行完printf后,i++.在fork后,6132作为父进程,产生子进程6134.而6133作为i=0时的子进程,执行完printf后,i++,此时6133作为父进程,产生子进程6135. 结果却和所想有些出入,为什么

linux下c程序 daemon、fork与创建pthread的顺序问题

近期发如今写linux c服务程序的时候,daemon与线程创建之间出现故障. 发现程序在daemon之后,起的线程就全挂了. 查过一些文档之后,最终知道了why. daemon函数的操作事实上非常easy, 1.fork一个进程,2.处理3个标准输入输出流 做完之后,主进程就退出了,实际执行的是子进程. 子进程会被挂在init进程上,也就是pid为1的进程. 问题就发生在这个daemon的时间点. 假设在pthread初始化了线程之后,再进行daemon,子进程不会拥有之前的线程,相反之前的线

Linux系统开发 4 进程资源 环境 fork()子进程 wait() waitpid()僵尸 孤儿进程

<大纲> Linux系统编程 进程资源上限 进程环境变量 进程获取/修改环境变量 创建子进程fork() 最大进程数测试 程序的设置用户ID/组ID/黏住位 exec簇函数,执行程序覆盖堆栈 fork 与execl函数在一起 exec() 与主程序同一个PCB 僵尸进程 wait()回收僵尸进程 证明:父子进程同组pid waitpid() 非阻塞等待子线程发生变化 孤儿进程演示[父进程已经结束,子进程还在运行] 进程资源上限值 [email protected]:~$ cat /proc/s

linux进程编程:子进程创建及执行函数简介

子进程创建及执行函数有三个: (1)fork();(2)exec();(3)system();    下面分别做详细介绍.(1)fork()    函数定义:    pid_t fork();    函数说明:    linux下进程在内存中由三部分数据组成:代码段.数据段.堆栈段.在一个进程中,调用fork函数,可以创建.启动一个新进程.新进程与父进程共享代码段,复制父进程的数据段和堆栈段.创建成功后,fork()会向两个进程都有返回值.向父进程的返回值为子进程的进行号,向子进程的返回值为0.

localtime死锁——多线程下fork子进程

最近测试我们自己改进的redis,发现在做rdb时,子进程会一直hang住,gdb attach上,堆栈如下: (gdb) bt #0 0x0000003f6d4f805e in __lll_lock_wait_private () from /lib64/libc.so.6 #1 0x0000003f6d49dcad in _L_lock_2164 () from /lib64/libc.so.6 #2 0x0000003f6d49da67 in __tz_convert () from /l

Nodejs做web服务器的一个简单逻辑和实现

本文写了自己对"Nodejs做web服务器的一个简单逻辑和实现",如果不太合理,或者了解这部分的大神,欢迎拍死.哈哈,新手. 昨天开始学习Nodejs,w3cschool看了一遍.感觉其上的内容,一上来就开始介绍Nodejs的模块.看来后来,连如何"从零创建一个nodejs版的web服务器"都没能明白.可能是自己太过肤浅,或者,陷到了WAMP的思维里,不能自拔吧.后来在nodejs中文社区发了个帖子.感觉自己算是明白了点儿.写出来,希望和大家一起学习,同时也希望得到

Nodejs express中创建ejs项目,解决express下默认创建jade,无法创建ejs问题

最近在看<Node.js开发指南>,看到使用nodejs进行web开发的时候,准备创建ejs项目遇到问题了, 书上命令为: ? 1 express -t ejs microblog 可是执行后,仍旧创建的是jade项目. 原来,express3.x,express4.x中创建ejs命令更新为: express -e microblog //即ejs,-j(即jade)  当然,最直接的,你也可以修改package.json里的定义来实现安装ejs. PS:建立工程过程 1.必须得安装expre