perl 多线程,实时监控线程数,支持max thread

#!/usr/bin/perl -w
#Description:rerun eod job group by system
#Auther:Suzm
#Date  :2015-06-23

use DBI;
use Thread;
use strict;

push( @INC, $ENV{TPMS_EOD_PERL_LIB} );
require public_pg;

my %dbc_info;
my $maxnum = 3;
my %threads;
my $tx_date;
my $path_log = $ENV{TPMS_EOD_LOGPATH};

my $system = get_config(‘TPMS_SYSTEM_ID‘);
unless ( defined( $system ) ) {
	$system = "error";
}

#my $system = undef;

my $log_name =
  uc( CITIC::getscript_name($0) . "_" .$system);
my $log_file     = CITIC::create_logfile( $log_name, $path_log );
my $TPMS_EOD_SID = $ENV{TPMS_EOD_SID};
my $my_sysenv    = uc( $ENV{MY_SYSENV} );

select $log_file;
$| = 1;

#get eod job info ,return hash
sub getEodJob {
	my ( $sysid, $dbh ) = @_;
	my %jobinfo;

	#my $str = "select * from tpms_eod.T_TPMS_EODTABLE where system=‘${sysid}‘";
	my $str = <<EOF;
select a.* from tpms_eod.t_tpms_eodtable a
inner join tpms_eod.Tpms_Rollback_Bizdate b
on a.TX_DATE != b.CURR_BIZ_DATE
or (a.TX_DATE = b.CURR_BIZ_DATE and a.FLAG=1)
where a.SYSTEM=‘${sysid}‘
EOF
	my $sth;
	eval {
		$sth = $dbh->prepare($str);
		$sth->execute();
	};
	if ([email protected]) {
		CITIC::showtime();
		print "An error occurred ([email protected])\n";
		CITIC::showtime();
		print $dbh->errstr . "\n";
		$dbh->disconnect();
		return %jobinfo;
	}
	while ( my @row = $sth->fetchrow_array() ) {
		$jobinfo{ $row[1] }{"SYSTEM"}     = $row[0];
		$jobinfo{ $row[1] }{"JOB_SCRIPT"} = $row[2];
		$jobinfo{ $row[1] }{"ARGV"}       = $row[3];
		$jobinfo{ $row[1] }{"START_TIME"} = $row[4];
		$jobinfo{ $row[1] }{"END_TIME"}   = $row[5];
		$jobinfo{ $row[1] }{"TX_DATE"}    = $row[6];
		$jobinfo{ $row[1] }{"FLAG"}       = $row[7];

	}
	return %jobinfo;
}

#获取配置参数
sub get_config{
	my($name) = @_;
	my $str;
	open(CONFIG,"$ENV{TPMS_EOD_ETC}/etc.profile");
	while(<CONFIG>){
		my($key,$value)=split(‘=‘,$_);
		if(uc($key) eq uc($name)){
			$str = CITIC::strimstr($value);
			chomp $str;
		}else{
			next;
		}
	}
	close(CONFIG);
	return uc($str);
}

#cmd
sub cmd {
	my $ret;
	my ( $script, $argv, $tablename, $dbh ) = @_;
	my $cmd = " perl ${script} $argv";
	my $rc = open( INFO, "$cmd 2>&1|" );
	unless ($rc) {
		CITIC::showtime();
		print "Can‘t invoke the Command!\n";
		return 1;
	}
	while (<INFO>) {
		print $_;

		#print STDOUT $_;
		if (eof) {
			unless (/\b(complete|Succeeds)/) {
				CITIC::showtime();
				print "*" x 6 . "$tablename作业执行失败" . "*" x 6 . "\n";
				$ret = 1;
			}
			else {
				CITIC::showtime();
				print "*" x 6 . "$tablename作业执行成功" . "*" x 6 . "\n";
				$ret = 0;
			}
		}
	}
	close(INFO);
	return $ret;
}

#update job info
sub upJobInfo {
	my ( $dbh, $tablename, $flag, $final ) = @_;
	my $upstr;
	my $timestamp = showtime();

	if ( $flag == 1 ) {
		$upstr =
"START_TIME=to_timestamp(‘$timestamp‘,‘YYYY-MM-DD hh24:mi:ss:ff‘),TX_DATE=to_date(‘$tx_date‘,‘YYYY-MM-DD‘)";
	}
	else {
		$upstr =
"END_TIME=to_timestamp(‘$timestamp‘,‘YYYY-MM-DD hh24:mi:ss:ff‘),FLAG=$final";
	}

	my $str =
"update tpms_eod.T_TPMS_EODTABLE set $upstr where TABLE_NAME=‘$tablename‘ and SYSTEM=‘$system‘";
	my $sth;
	eval {
		$sth = $dbh->prepare($str);
		$sth->execute();

	};
	if ([email protected]) {
		CITIC::showtime();
		print "An error occurred ([email protected])\n";
		CITIC::showtime();
		print $dbh->errstr . "\n";
		$dbh->disconnect();
		return 1;
	}
	return 0;
}

#get tx_date
sub getTxdate {
	my ($dbh) = @_;
	my $txdate;
	my $str =
	  "select to_char(CURR_BIZ_DATE,‘YYYY-MM-DD‘) from tpms_eod.Tpms_Rollback_Bizdate";
	my $sth;
	eval {
		$sth = $dbh->prepare($str);
		$sth->execute();
	};
	if ([email protected]) {
		CITIC::showtime();
		print "An error occurred ([email protected])\n";
		CITIC::showtime();
		print $dbh->errstr . "\n";
		$dbh->disconnect();
		return 1;
	}
	my $table = $sth->fetchall_arrayref();
	$txdate = $table->[0][0];

	#$dbh->disconnect();
	return $txdate;

}

#get timestamp
sub showtime {
	my ( $sec, $min, $hour, $day, $mon, $year ) = localtime( time() );
	my $current = "";
	$sec  = sprintf( "%02d", $sec );
	$min  = sprintf( "%02d", $min );
	$hour = sprintf( "%02d", $hour );
	$day  = sprintf( "%02d", $day );
	$mon  = sprintf( "%02d", $mon + 1 );
	$year += 1900;
	$current = " $year-$mon-$day" . " $hour" . ":$min" . ":$sec";
	return ${current};
}

#execute eod job
sub executeJob {
	my ( $dbh, %hash ) = @_;
	my $i;
	my $num = scalar( keys %hash );
	for my $jobname ( keys %hash ) {
		CITIC::showtime();
		print "*" x 6 . "正在执行$jobname作业" . "*" x 6 . "\n";
		upJobInfo( $dbh, $jobname, 1 );
		$i++;
		$num--;
		eval {
			$threads{$jobname} = Thread->new(
				\&cmd,
				$hash{$jobname}{‘JOB_SCRIPT‘},
				$hash{$jobname}{‘ARGV‘}, $jobname
			);

			#->join();
		};
		if ([email protected]) {
			CITIC::showtime();
			print "An error occurred ([email protected])\n";
			return 1;
		}
		if ( $i > $maxnum - 1 ) {
			print "进程达到最大数" . Thread->list() . "!!!\n";
			while (1) {

				for my $th ( Thread->list() ) {
					my $tid;
					if ( $th->done ) {
						eval { $tid = $th->join() };
						foreach my $tb ( keys %threads ) {
							if ( $threads{$tb}->tid() == $th->tid() ) {
								if ($tid) {
									upJobInfo( $dbh, $tb, 2, 1 );
								}
								else {
									upJobInfo( $dbh, $tb, 2, 0 );
								}
							}
						}
						$i--;
					}
					else {
						next;
					}
				}
				if ( Thread->list() < $maxnum ) {
					print "进程小于3,开始新的进程\n";
					last;
				}
				else {
					print "sleep 10s,waiting for the job finish\n";
					sleep 10;
				}
			}
		}
		if ( $num == 0 ) {
			print "#" x 20
			  . "所有作业调度完成,等待进程结束!!!"
			  . "#" x 20 . "\n";
			for my $th1 ( Thread->list() ) {
				my $tid1;
				eval { $tid1 = $th1->join() };
				foreach my $tb1 ( keys %threads ) {
					if ( $threads{$tb1}->tid() == $th1->tid() ) {
						if ($tid1) {
							upJobInfo( $dbh, $tb1, 2, 1 );
						}
						else {
							upJobInfo( $dbh, $tb1, 2, 0 );
						}
					}
				}
			}
		}
	}
}

#程序入口
sub main {

	#my ($system)[email protected]_;
	my $ret = 0;
	my $dbh;
	%dbc_info = CITIC::get_dbc_info($TPMS_EOD_SID);

	unless (%dbc_info) {
		CITIC::showtime();
		print "Failed to get database information!\n";
	}
	else {
		$dbh = CITIC::connect_db(
			$dbc_info{"ip"},   $dbc_info{"port"}, $dbc_info{"sid"},
			$dbc_info{"user"}, $dbc_info{"pwd"}
		);
	}

	unless ($dbh) {
		$ret = 1;
	}
	else {
		my %jobinfo = getEodJob( $system, $dbh );
		if (%jobinfo) {
			if ( getTxdate($dbh) eq 1 ) {
				$ret = 1;
			}
			else {
				$tx_date = getTxdate($dbh);
				executeJob( $dbh, %jobinfo );
			}

		}
		else {
			CITIC::showtime();
			print "没有要执行的JOB,请确认后再执行!\n";
			$ret = 1;
		}

		#	$ret = excute_sql( $dbh, @sql_queue );

	}

	return $ret;
}

open( STDERR, ">&STDOUT" );
#if ( $#ARGV < 0 ) {
#	print "Please input parameters,for example:\n1.system id :CTS\n";
#	exit(1);
#}

#$system = uc( $ARGV[0] );
my $ret = main();
if ( $ret == 0 ) {
	print STDOUT "complete\n";
}
else {
	print STDOUT "fail\n";

#	CITIC::send_mail(
#		"[$my_sysenv] $log_name JOB FAILED",
#"Hi all,\n      Job $log_name failed. Please see attached log file for details.\nthanks!"
#	);
}
exit($ret);

END {
	CITIC::showtime();
	print "return code is $ret\n";
	CITIC::close_logfile($log_file);
}
时间: 2024-09-30 15:57:37

perl 多线程,实时监控线程数,支持max thread的相关文章

通过micrometer实时监控线程池的各项指标

通过micrometer实时监控线程池的各项指标 前提 最近的一个项目中涉及到文件上传和下载,使用到JUC的线程池ThreadPoolExecutor,在生产环境中出现了某些时刻线程池满负载运作,由于使用了CallerRunsPolicy拒绝策略,导致满负载情况下,应用接口调用无法响应,处于假死状态.考虑到之前用micrometer + prometheus + grafana搭建过监控体系,于是考虑使用micrometer做一次主动的线程池度量数据采集,最终可以相对实时地展示在grafana的

[转载]C# 多线程、控制线程数提高循环输出效率

C#多线程及控制线程数量,对for循环输出效率. 虽然输出不规律,但是效率明显提高. 思路: 如果要删除1000条数据,只使用for循环,则一个接着一个输出.所以,把1000条数据分成seed段,每段10条数据. int seed = Convert.ToInt32(createCount.Value) % 10 == 0 ? Convert.ToInt32(createCount.Value) / 10 : Convert.ToInt32(createCount.Value) / 10 + 1

C# 多线程、控制线程数提高循环输出效率

原文:C# 多线程.控制线程数提高循环输出效率 C#多线程及控制线程数量,对for循环输出效率. 虽然输出不规律,但是效率明显提高. 思路: 如果要删除1000条数据,只使用for循环,则一个接着一个输出.所以,把1000条数据分成seed段,每段10条数据. int seed = Convert.ToInt32(createCount.Value) % 10 == 0 ? Convert.ToInt32(createCount.Value) / 10 : Convert.ToInt32(cre

多线程-网络请求线程数设置

自己闲暇时,也曾尝试实现过简单的线程池,对高并发有过预生产的压测和调优,然而一直我都是作为服务提供方在做多线程. 最近,接过一个服务消费方的代码,看过代码,实现上并没有看出问题,每秒也有过千的请求并发效果,但是领导要求完成每秒两万. 凭借我一直的实践和阅读,理论上 线程数 = 2*CPU数时,可以达到一个最优的性能表现. 当下机器配置 12台 8C 机器,理论每台 16线程相对不错了. 按服务方说明,http请求,响应延时 10ms 计算理论值 200ms(tcp握手+延时) ,每秒每个线程可以

Java多线程-新特性-线程池

Sun在Java5中,对Java线程的类库做了大量的扩展,其中线程池就是Java5的新特征之一,除了线程池之外,还有很多多线程相关的内容,为多线程的编程带来了极大便利.为了编写高效稳定可靠的多线程程序,线程部分的新增内容显得尤为重要. 有关Java5线程新特征的内容全部在java.util.concurrent下面,里面包含数目众多的接口和类,熟悉这部分API特征是一项艰难的学习过程.目前有关这方面的资料和书籍都少之又少,大部分介绍线程方面书籍还停留在java5之前的知识层面上. 在Java5之

Perl多线程(1):解释器线程的特性

本文关于Perl线程的内容初始主要来自于<Pro Perl>的第21章,未来可能会逐渐添加.完善更多内容,当然也可能分离一部分内容单独成文. 线程简介 线程(thread)是轻量级进程,和进程一样,都能独立.并行运行,也由父线程创建,并由父线程所拥有,线程也有线程ID作为线程的唯一标识符,也需要等待线程执行完毕后收集它们的退出状态(比如使用join收尸),就像waitpid对待子进程一样. 线程运行在进程内部,每个进程都至少有一个线程,即main线程,它在进程创建之后就存在.线程非常轻量级,一

轻松学会多线程(三)——如何确定线程数

一般情况下,在互联网编程中,我们会使用多线程来抢夺网络资源.那么,线程数量我们如何来确定呢? 我们都知道,线程数量和CPU核数有关.所以有人建议说:线程数为核数的两倍最好. 其实只要这些线程不频繁切换.竞争资源的话.想要最优性能,还是根据具体项目慢慢调试. CPU切不切换我们没法控制,只能提高线程优先级以获取更多的CPU时间. CPU除了处理Java还需要处理N多系统和其他线程,一般而言,多线程编程中,在确保内存不溢出的情况下提升线程数是可以提高CPU中签率的,也就是能提高你的程序处理数据的速度

Qt学习 之 多线程程序设计(QT通过三种形式提供了对线程的支持)

QT通过三种形式提供了对线程的支持.它们分别是, 一.平台无关的线程类 二.线程安全的事件投递 三.跨线程的信号-槽连接. 这使得开发轻巧的多线程Qt程序更为容易,并能充分利用多处理器机器的优势.多线程编程也是一个有用的模式,它用于解决执行较长时间的操作而不至于用户界面失去响应.在Qt的早期版本中,在构建库时有不选择线程支持的选项,从4.0开始,线程总是有效的. 线程类 Qt 包含下面一些线程相关的类: QThread 提供了开始一个新线程的方法 QThreadStorage 提供逐线程数据存储

c# 指定线程数的多线程操作

多线程操作一直用ThreadPool.QueueUserWorkItem比较多,今天想到用这个方式实现,控制并发线程的数量. 主要思路是: 1.声明开启的线程数 int threadCount = 2; 2.创建一个泛型集合List<TaskInfo>  workingList = new List<TaskInfo>(); 需要开启多少个线程,就从Queue里取几个taskInfo,添加到workingList,同时,把TaskInfo.IsBusy置为true. 3.在子线程中