glog另启动线程写文本日志

glog本身是非常高效的,google的大牛肯定知道大规模的写日志用glog的话肯定会影响业务线程的处理,带负荷的磁盘IO谁都桑不起。比如levelDB就是默认异步写,更不用说google的三驾马车都是分布式的。之前看过其论文,简直是引领时代。

在glog的issue里有人提出了异步写的问题,但是语焉不详,不过0.33版本已经有了接口,但是还不友好,但是完全可以实现磁盘日志的异步写。

今天算是花了点时间踩了点坑,算是基本可以搞了。稳定之后会把这个版本和glog,g2log,mudo logging一起测试下。mudo对buffer做了些trick,内部有两个bufferptr,做了双缓冲,据说效率很高,不过只有linux平台的,不过但把它的log抽离出来也不难,陈老师封装了mutex,thread,conditional等,在gcc4.8,clang3.3,VS2010都不是问题,已经没多大必要,而且之前为之乐道的linux下的threadsafe的initonce,现在C++11x也有了支持。

glog中可以让client定制接口是:

class GOOGLE_GLOG_DLL_DECL Logger {
 public:
  virtual ~Logger();

  // Writes "message[0,message_len-1]" corresponding to an event that
  // occurred at "timestamp".  If "force_flush" is true, the log file
  // is flushed immediately.
  //
  // The input message has already been formatted as deemed
  // appropriate by the higher level logging facility.  For example,
  // textual log messages already contain timestamps, and the
  // file:linenumber header.
  virtual void Write(bool force_flush,
                     time_t timestamp,
                     const char* message,
                     int message_len) = 0;

  // Flush any buffered messages
  virtual void Flush() = 0;

  // Get the current LOG file size.
  // The returned value is approximate since some
  // logged data may not have been flushed to disk yet.
  virtual uint32 LogSize() = 0;

  virtual void SetBasename(const char* basename) = 0;
  virtual void SetExtension(const char* ext) = 0 ;
  virtual void SetSymlinkBasename(const char* symlink_basename) = 0;

};

我在里面另外加了几个接口,为了之后的方便。

用Active object模式很好解决,就是我们通常所说的生产者消费者,在logmsg析构时就会fflush到磁盘,这次就会调用logger的write方法,此时就是我们接手的机会,把数据封装下,投递到业务线程,然后取出,实际写磁盘就好。

封装了简单的Active模式,Activer里封装了LogData用来封装打印实体,Buffer用来线程间传递数据,另外要显式设置Active的回调函数callBack.线程间传递数据用了C++11里的currentQueue,就不需要自己造轮子了:

/** ==========================================================================
* 2010 by KjellKod.cc. This is PUBLIC DOMAIN to use at your own risk and comes
* with no warranties. This code is yours to share, use and modify with no
* strings attached and no restrictions or obligations.
* ============================================================================
*
* Example of a Active Object, using C++11 std::thread mechanisms to make it
* safe for thread communication.
*
* This was originally published at http://sites.google.com/site/kjellhedstrom2/active-object-with-cpp0x
* and inspired from Herb Sutter‘s C++11 Active Object
* http://herbsutter.com/2010/07/12/effective-concurrency-prefer-using-active-objects-instead-of-naked-threads
*
* The code below uses JustSoftware Solutions Inc std::thread implementation
* http://www.justsoftwaresolutions.co.uk
*
* Last update 2012-10-10, by Kjell Hedstrom,
* e-mail: hedstrom at kjellkod dot cc
* linkedin: http://linkedin.com/se/kjellkod */

#ifndef ACTIVE_H_
#define ACTIVE_H_

#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <memory>
#include <concurrent_queue.h>
#include "shared_queue.h"

struct  Buffer
{
	Buffer():m_Len(0), m_pMsg(NULL){}
	~Buffer()
	{
		if (NULL != m_pMsg)
			delete []m_pMsg;
	}
	Buffer(int size):m_Len(size)
		, m_pMsg(new char[m_Len])
	{

	}
	int m_Len;
	char* m_pMsg;
};

typedef std::function<void(Buffer*)> Callback;

class Active {
private:
  Active(const Active&); // c++11 feature not yet in vs2010 = delete;
  Active& operator=(const Active&); // c++11 feature not yet in vs2010 = delete;
  Active();                         // Construction ONLY through factory createActive();
  void doDone(){done_ = true;}
  void run();
  void setCallBack(Callback aCallBack);

  Concurrency::concurrent_queue<Buffer*> mq_;
  std::thread thd_;
  bool done_;  // finished flag to be set through msg queue by ~Active
  Callback callBack_;

public:
  virtual ~Active();
  void send(Buffer* apBuffer);
  static std::unique_ptr<Active> createActive(Callback aCallBack); // Factory: safe construction & thread start
};

#endif
/** ==========================================================================
* 2010 by KjellKod.cc. This is PUBLIC DOMAIN to use at your own risk and comes
* with no warranties. This code is yours to share, use and modify with no
* strings attached and no restrictions or obligations.
* ============================================================================
*
* Example of a Active Object, using C++11 std::thread mechanisms to make it
* safe for thread communication.
*
* This was originally published at http://sites.google.com/site/kjellhedstrom2/active-object-with-cpp0x
* and inspired from Herb Sutter‘s C++11 Active Object
* http://herbsutter.com/2010/07/12/effective-concurrency-prefer-using-active-objects-instead-of-naked-threads
*
* The code below uses JustSoftware Solutions Inc std::thread implementation
* http://www.justsoftwaresolutions.co.uk
*
* Last update 2012-10-10, by Kjell Hedstrom,
* e-mail: hedstrom at kjellkod dot cc
* linkedin: http://linkedin.com/se/kjellkod */

#include "active.h"
#include <cassert>

Active::Active(): done_(false){}

Active::~Active() {
  Callback quit_token = std::bind(&Active::doDone, this);
  thd_.join();
}

// Add asynchronously a work-message to queue
void Active::send( Buffer* apBuffer )
{
	if (NULL != apBuffer)
	{
		mq_.push(apBuffer);
	}
}

void Active::run() {
  while (!done_) {
	if (!mq_.empty())
	{
		Buffer* pBuffer = NULL;
		mq_.try_pop(pBuffer);
		if (NULL != pBuffer)
		{
			callBack_(pBuffer);

			delete pBuffer;
		}
	}
  }
}

// Factory: safe construction of object before thread start
std::unique_ptr<Active> Active::createActive(Callback aCallBack){
  std::unique_ptr<Active> aPtr(new Active());
  aPtr->thd_ = std::thread(&Active::run, aPtr.get());
  aPtr->callBack_ = aCallBack;
  return aPtr;
}

void Active::setCallBack( Callback aCallBack )
{
	callBack_ = aCallBack;
}

重点是在threadlogger里,实现了Logger的接口。Write函数实现真正的写逻辑,几个set函数会在内部被调用。

#pragma once
#include <glog/logging.h>
#include <mutex>
#include "active.h"

using namespace std;

namespace google
{

class ThreadLog : public google::base::Logger
{
public:
	ThreadLog();
	~ThreadLog();
	virtual void Write(bool force_flush,
		time_t timestamp,
		const char* message,
		int message_len) ;
	virtual void Flush();
	virtual uint32 LogSize();

	// Configuration options
	void SetBasename(const char* basename);
	void SetExtension(const char* ext);
	void SetSymlinkBasename(const char* symlink_basename);
	void CallBack(Buffer* pBuffer);

private:
	static const uint32 kRolloverAttemptFrequency = 0x20;
	mutex lock_;
	bool base_filename_selected_;
	string base_filename_;
	string symlink_basename_;
	string filename_extension_;     // option users can specify (eg to add port#)
	FILE* file_;
	LogSeverity severity_;
	uint32 bytes_since_flush_;
	uint32 file_length_;
	unsigned int rollover_attempt_;
	int64 next_flush_time_;         // cycle count at which to flush log
	string hostname;
	bool stopWriting;
	std::unique_ptr<Active> m_pActive;
	bool CreateLogfile(const string& time_pid_string);
	void FlushUnlocked();
	void WriteInteral(bool force_flush, time_t timestamp, const char* message, int message_len);
};

}

#include "ThreadLog.h"
#include "port.h"
#include <fcntl.h>
#include <iomanip>
#include "utilities.h"
#include <functional>

namespace google
{
	static int GetSize(bool& force_flush, time_t& timestamp, const char* message, int& message_len)
	{
		return sizeof(force_flush)+sizeof(timestamp)+sizeof(message_len)+message_len;
	}

	void ThreadLog::Write( bool force_flush, time_t timestamp, const char* message, int message_len )
	{
		Buffer* pBuffer = new Buffer(GetSize(force_flush, timestamp, message, message_len));
		char* curData = pBuffer->m_pMsg;
		memcpy(curData, &force_flush, sizeof(force_flush));
		curData += sizeof(force_flush);

		memcpy(curData, ×tamp, sizeof(timestamp));
		curData += sizeof(timestamp);

		memcpy(curData, &message_len, sizeof(message_len));
		curData += sizeof(message_len);

		memcpy(curData, message, message_len);
		curData += message_len;

		m_pActive->send(pBuffer);
	}

	void ThreadLog::Flush()
	{

	}

	google::uint32 ThreadLog::LogSize()
	{
		return 0;
	}

	void ThreadLog::SetBasename( const char* basename )
	{
		std::lock_guard<std::mutex> lock(lock_);
		base_filename_selected_ = true;
		if (base_filename_ != basename)
		{
			if (file_ != NULL)
			{
				fclose(file_);
				file_ = NULL;
				rollover_attempt_ = kRolloverAttemptFrequency-1;
			}
			base_filename_ = basename;
		}
	}

	void ThreadLog::SetExtension( const char* ext )
	{
		std::lock_guard<std::mutex> lock(lock_);
		if (filename_extension_ != ext)
		{
			// Get rid of old log file since we are changing names
			if (file_ != NULL)
			{
				fclose(file_);
				file_ = NULL;
				rollover_attempt_ = kRolloverAttemptFrequency-1;
			}
			filename_extension_ = ext;
		}
	}

	void ThreadLog::SetSymlinkBasename( const char* symlink_basename )
	{
		std::lock_guard<std::mutex> lock(lock_);
		symlink_basename_ = symlink_basename;
	}

	bool ThreadLog::CreateLogfile( const string& time_pid_string )
	{
		string string_filename = base_filename_+filename_extension_+
			time_pid_string;
		const char* filename = string_filename.c_str();
		int fd = open(filename, O_WRONLY | O_CREAT | O_EXCL, 0664);
		if (fd == -1) return false;
#ifdef HAVE_FCNTL
		// Mark the file close-on-exec. We don‘t really care if this fails
		fcntl(fd, F_SETFD, FD_CLOEXEC);
#endif

		file_ = fdopen(fd, "a");  // Make a FILE*.
		if (file_ == NULL) {  // Man, we‘re screwed!
			close(fd);
			unlink(filename);  // Erase the half-baked evidence: an unusable log file
			return false;
		}

		if (!symlink_basename_.empty()) {
			// take directory from filename
			const char* slash = strrchr(filename, ‘/‘);
			const string linkname =
				symlink_basename_ + ‘.‘ + LogSeverityNames[severity_];
			string linkpath;
			if ( slash ) linkpath = string(filename, slash-filename+1);  // get dirname
			linkpath += linkname;
			unlink(linkpath.c_str());                    // delete old one if it exists

			// We must have unistd.h.
#ifdef HAVE_UNISTD_H
			// Make the symlink be relative (in the same dir) so that if the
			// entire log directory gets relocated the link is still valid.
			const char *linkdest = slash ? (slash + 1) : filename;
			if (symlink(linkdest, linkpath.c_str()) != 0) {
				// silently ignore failures
			}

			// Make an additional link to the log file in a place specified by
			// FLAGS_log_link, if indicated
			if (!FLAGS_log_link.empty()) {
				linkpath = FLAGS_log_link + "/" + linkname;
				unlink(linkpath.c_str());                  // delete old one if it exists
				if (symlink(filename, linkpath.c_str()) != 0) {
					// silently ignore failures
				}
			}
#endif
		}

		return true;  // Everything worked
	}

	void ThreadLog::FlushUnlocked()
	{
		if (file_ != NULL)
		{
			fflush(file_);
			bytes_since_flush_ = 0;
		}

		const int64 next = (FLAGS_logbufsecs * static_cast<int64>(1000000));  // in usec
		next_flush_time_ = CycleClock_Now() + UsecToCycles(next);
	}

	ThreadLog::ThreadLog(): file_(NULL)
		, bytes_since_flush_(0)
		, file_length_(0)
		, rollover_attempt_(0)
		, next_flush_time_(0)
		, stopWriting(false)
		, m_pActive(Active::createActive(std::bind(&ThreadLog::CallBack, this, std::placeholders::_1)))
	{
	}

	ThreadLog::~ThreadLog()
	{

	}

	void ThreadLog::WriteInteral( bool force_flush, time_t timestamp, const char* message, int message_len )
	{
		if (base_filename_selected_ && base_filename_.empty())
		{
			return;
		}

		if (static_cast<int>(file_length_ >> 20) >= MaxLogSize())
		{
			if (file_ != NULL)
				fclose(file_);
			file_ = NULL;
			file_length_ = bytes_since_flush_ = 0;
			rollover_attempt_ = kRolloverAttemptFrequency-1;
		}

		if (file_ == NULL)
		{
			//if (++rollover_attempt_ != kRolloverAttemptFrequency)
			//	return;
			//rollover_attempt_ = 0;

			struct ::tm tm_time;
			localtime_r(×tamp, &tm_time);
			ostringstream time_pid_stream;
			time_pid_stream.fill(‘0‘);
			time_pid_stream << 1900+tm_time.tm_year
				<< setw(2) << 1+tm_time.tm_mon
				<< setw(2) << tm_time.tm_mday
				<< ‘-‘
				<< setw(2) << tm_time.tm_hour
				<< setw(2) << tm_time.tm_min
				<< setw(2) << tm_time.tm_sec
				<< ‘.‘
				<< GetCurrentThreadId();
			const string& time_pid_string = time_pid_stream.str();

			if (base_filename_selected_)
			{
				if (!CreateLogfile(time_pid_string))
				{
					perror("Could not create log file");
					fprintf(stderr, "COULD NOT CREATE LOGFILE ‘%s‘!\n", time_pid_string.c_str());
					return;
				}
			}
			else
			{
				string stripped_filename(glog_internal_namespace_::ProgramInvocationShortName());
				GetHostName(&hostname);
				string uidname = MyUserName();
				if (uidname.empty())
					uidname = "invalid-user";

				stripped_filename = stripped_filename+‘.‘+hostname+‘.‘+uidname+".log."+LogSeverityNames[severity_]+‘.‘;
				const vector<string> & log_dirs = GetLoggingDirectories();

				bool success = false;
				for (vector<string>::const_iterator dir = log_dirs.begin();dir != log_dirs.end(); ++dir)
				{
						base_filename_ = *dir + "/" + stripped_filename;
						if ( CreateLogfile(time_pid_string) )
						{
							success = true;
							break;
						}
				}

				if ( success == false )
				{
					perror("Could not create logging file");
					fprintf(stderr, "COULD NOT CREATE A LOGGINGFILE %s!",
						time_pid_string.c_str());
					return;
				}
			}

			ostringstream file_header_stream;
			file_header_stream.fill(‘0‘);
			file_header_stream << "Log file created at: "
				<< 1900+tm_time.tm_year << ‘/‘
				<< setw(2) << 1+tm_time.tm_mon << ‘/‘
				<< setw(2) << tm_time.tm_mday
				<< ‘ ‘
				<< setw(2) << tm_time.tm_hour << ‘:‘
				<< setw(2) << tm_time.tm_min << ‘:‘
				<< setw(2) << tm_time.tm_sec << ‘\n‘
				<< "Running on machine: "
				<< hostname << ‘\n‘
				<< "Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu "
				<< "threadid file:line] msg" << ‘\n‘;
			const string& file_header_string = file_header_stream.str();

			const int header_len = file_header_string.size();
			fwrite(file_header_string.data(), 1, header_len, file_);
			file_length_ += header_len;
			bytes_since_flush_ += header_len;
		}

		if ( !stopWriting )
		{
			errno = 0;
			fwrite(message, 1, message_len, file_);
			if ( FLAGS_stop_logging_if_full_disk && errno == ENOSPC )
			{  // disk full, stop writing to disk
				stopWriting = true;  // until the disk is
				return;
			}
			else
			{
				file_length_ += message_len;
				bytes_since_flush_ += message_len;
			}
		}
		else
		{
			if ( CycleClock_Now() >= next_flush_time_ )
				stopWriting = true;  // check to see if disk has free space.
		}

		if ( force_flush || (bytes_since_flush_ >= 1000000) || (CycleClock_Now() >= next_flush_time_) ) {
				FlushUnlocked();
#ifdef OS_LINUX
				if (FLAGS_drop_log_memory) {
					if (file_length_ >= logging::kPageSize) {
						// don‘t evict the most recent page
						uint32 len = file_length_ & ~(logging::kPageSize - 1);
						posix_fadvise(fileno(file_), 0, len, POSIX_FADV_DONTNEED);
					}
				}
#endif
		}
	}

	void ThreadLog::CallBack( Buffer* pBuffer )
	{
		char* curData = pBuffer->m_pMsg;
		bool force_flush = *(bool*)curData;
		curData += sizeof(force_flush);
		time_t timestamp = *(time_t*)curData;
		curData += sizeof(timestamp);
		int message_len = *(int*)curData;
		curData += sizeof(message_len);
		char* message = curData;
		WriteInteral(force_flush, timestamp, message, message_len);
	}

}

这样搞定之后,main函数可以这样使用,就可以把自己的ThreadLog类内嵌到glog里。

#define GLOG_NO_ABBREVIATED_SEVERITIES
#include <windows.h>
#include <glog/logging.h>
#include "ThreadLog.h"

using namespace google;
int main(int argc, char* argv[]) {
	google::InitGoogleLogging("test/testsss");
	google::base::Logger* mylogger = new google::ThreadLog;
	SetLogger(google::GLOG_INFO, mylogger);

	google::SetLogDestination(google::GLOG_INFO, "../Debug/logtestInfo");
	//google::SetLogDestination(google::GLOG_ERROR, "../Debug/logtestDebug");

	int num_cookies = 0;

	google::SetStderrLogging(google::GLOG_INFO);
	//google::SetStderrLogging(google::GLOG_ERROR);
	//google::LogToStderr();
	for (int i = 0; i < 1000; ++i){
		LOG(INFO) << "how are " << i << " cookies";
	}

	google::ShutdownGoogleLogging();
}

当然直接用这源码是无法编译成功的,我修改了glog内部的源码。

等下会把github工程地址:https://github.com/boyxiaolong/Proejcts/tree/master/asyn_glog-0.3.3

测试还有点问题,偶尔会有乱码,而且需要优化的是那个Buffer的动态申请。

不过都是后话了。

glog另启动线程写文本日志

时间: 2024-10-25 02:13:04

glog另启动线程写文本日志的相关文章

C#写文本日志帮助类(支持多线程)

using System;using System.Configuration;using System.IO;using System.Threading; namespace FQDService.Utils{ /// <summary> /// 写日志类 /// </summary> public class FileLogger { #region 字段 public static readonly object _lock = new object(); #endregi

C#写文本日志帮助类

代码: using System; using System.Configuration; using System.IO; using System.Threading; namespace FQDService.Utils { /// <summary> /// 写日志类 /// </summary> public class FileLogger { #region 字段 public static readonly object _lock = new object();

Asp.Net写文本日志

底层代码: using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace AddLog { /// <summary> /// 记录:文件以时间命名 /// </summary> public class Log { private static System.IO.StreamWriter swt; private static System.IO.F

java笔记之线程方式1启动线程

* 需求:我们要实现多线程的程序. * 如何实现呢? *   由于线程是依赖进程而存在的,所以我们应该先创建一个进程出来. *   而进程是由系统创建的,所以我们应该去调用系统功能创建一个进程. *   Java是不能直接调用系统功能的,所以,我们没有办法直接实现多线程程序. *   但是呢?Java可以去调用C/C++写好的程序来实现多线程程序. *   由C/C++去调用系统功能创建进程,然后由Java去调用这样的东西, *   然后提供一些类供我们使用.我们就可以实现多线程程序了. * 那

我们是这样写升级日志的,处处可以体现人文情怀

做软件,特别是SaaS软件,一般都会把升级日志公布给用户.让用户知道每次都升级了哪些内容.传统的格式无非是: 1.新增了某某按钮 2.修改了无法保存的错误 3.... 我们稍微给升级内容加入了人文情怀.升级日志本来就是开发者与用户的沟通的一个载体,何不写的轻松点呢?请看我们的做法. 超级表格已经升级多次了.直到本次升级,我们公布了升级内容.内容是这样写的: 2014-05-26:这是我第一次记录升级内容,以后每次升级都会在这里累计记录-------------------------------

android 写行为日志到SD卡 并发处理 异步写入数据到文件不影响界面响应时间

公司在做一个项目 要求记录用户行为,写行为日志文件到SD卡.实现思想 不影响界面用户体验,要时时记录日志 不能漏掉. 1.并发处理日志 写一个类负责管理各个线程传过来的日志数据,日志数据放在队列中等待写线程去处理.这里每次添加一条日志数据都会检查写日志线程是否在工作,同时为了并发处理传过来的数据采用synchronized 同步: ConcurrentLinkedQueue 是基于链接节点的.线程安全的队列.并发访问不需要同步.因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的

web项目启动线程服务

多线程是java入门的必修课程,然而到了接触J2EE时,这份功课就还给了老师了,至少本人是这样的,呵呵.不过等到用到的时候,我还是能想起大概,再加上度娘帮忙,就能重拾回来了,这里我插播一个小故事,是我自身的亲身经历,希望给同道小生有所帮助.以前在做学生的时候,学习java并不是那么用心,有些东西只是知其一不知其二,很多知识点在脑子里有点印象卻不是很深刻,记得有一次我我去一家公司面试,面试官就问我,线程这块,你熟么?作为面试者,谁会说不熟啊,虽然我真的不太熟,但是我当场就回答还行,然后就闹笑话了,

Qt 进程和线程之二:启动线程

Qt提供了对线程的支持,这包括一组与平台无关的线程类.一个线程安全的发送事件的方式,以及跨线程的信号槽的关联.这些使得可以很容易地开发可移植的多线程Qt应用程序,可以充分利用多处理器的机器.多线程编程也可以有效解决在不冻结一个应用程序的用户界面情况下执行一个耗时的操作问题.对应本节的内容,可以在帮助中査看Thread Support in Qt关键字. 这里准备介绍QThread常用函数和启动进程的两种方式: 子类化QThread Worker-Object 一.QThread常用函数 可以将常

阿里后端Java面试题:启动线程是start()还是run()?为什么?

面试官:请问启动线程是start()还是run()方法,能谈谈吗? 应聘者:start()方法 当用start()开始一个线程后,线程就进入就绪状态,使线程所代表的虚拟处理机处于可运行状态,这意味着它可以由JVM调度并执行.但是这并不意味着线程就会立即运行.只有当cpu分配时间片时,这个线程获得时间片时,才开始执行run()方法.start()是方法,它调用run()方法.而run()方法是你必须重写的. run()方法中包含的是线程的主体(真正的逻辑). 继承Thread类的启动方式 publ