10min手写(五):面试题解析丨Python实现tail -f功能

作者:蜗牛 shengxinjing (woniuppp) · GitHub

写这篇文章的初衷是有人去面试遇到了这个笔试题,不知道怎么做,没有什么思路,就发到了Reboot 的交流群里,让大家一起讨论讨论。

关于这道题,简单说一下我的想法吧。当然,也有很好用的 pyinotify 模块专门监听文件变化,不过我更想介绍的,是解决的思路。毕竟作为面试官,还是想看到一下解决问题的思路,而且我觉得这一题的难点不在于监控文件增量,而在于怎么打印最后面10行。

希望大家在读这篇文章前,对 Python 基础、处理文件和常用模块有一个简单的了解,知道下面几个名词是什么:

open(‘a.txt‘)
file.seek
file.tell
time.sleep()

下面的思路仅限于我个人知识和见解,免不了有错误和考虑不周的地方。希望大家有更好的方法能够提出来,我会随时优化代码。下面就让天真烂漫的蜗牛教大家如何用 Python 来实现。

怎么用 Python 实现

其实思路也不算难啦

  • 打开这个文件,指针移到最后
  • 每隔一秒就尝试readline一下,有内容就打印出来,没内容就sleep


监听文件


思路如下:

  • 用open打开文件
  • 用seek文件指针,给大爷把跳到文件最后面
  • while True进行循环
    • 持续不停的readline,如果能读到内容,打印出来即可

代码呼之欲出

with open(‘test.txt‘) as f:
    f.seek(0,2)
    while True:
        last_pos = f.tell()
        line = f.readline()        
        if line:
           print line

代码说明:

  • seek第二个参数2,意思就是从文件结尾处开始seek,更标准的写法使用os模块下面的SEEK_END,可读性更好
  • 只写出了简单的逻辑,代码简单粗暴,如果这个题目是10分的话,最多也就拿4分吧,不能再多了

优化点

  • print有缺陷,每次都是新的一行,换成sys.stdout.write(line)更和谐
  • 文件名传参,不能写死
  • 直接打印可以当成默认行为,具体要做什么,可以写成函数处理,这样我们就可以把新行做其他处理,比如展示在浏览器里
  • 加上容错处理,比如文件不存在会报错
  • while True一直都文件,比较耗性能,每读一次,间隔一秒比较靠谱
    • 调用time.sleep(1)
  • 用类来组织代码


实例代码如下:

# coding=utf-8
import sys
import time

class Tail():
    def __init__(self,file_name,callback=sys.stdout.write):
        self.file_name = file_name
        self.callback = callback
    def follow(self):

        try:
            with open(self.file_name) as f:
                f.seek(0,2)            
                while True:
                    line = f.readline()                 
                    if line:
                        self.callback(line)
                    time.sleep(1)
        except Exception,e:
            print ‘打开文件失败,囧,看看文件是不是不存在,或者权限有问题‘
            print e

使用方法

# 使用默认的sys.stdout.write打印到屏幕
py_tail = Tail(‘test.txt‘)
py_tail.follow()

# 自己定义处理函数

def test_tail(line):
    print ‘xx‘+line+‘xx‘
    
py_tail1 = Tail(‘test.txt‘, test_tail)
py_tail1.follow()

咦 ,等等!tail -f 默认还会打印最后10行,这个好像才是这个题目的难点所在。众所周知,Python 里读文件指针,只能移动到固定位置,不能判断是哪一行。那就先实现简单的,然后逐渐加强吧。


默认打印最后10行

现在这个代码,大概能拿6分啦,我们还有一个功能没做,那就是打印最后n行,默认是10行,现在把这个功能加上,加一个函数就行啦。

文件小的时候

我们知道,readlines可以获取所有内容,并且分行,代码呼之欲出,获取list最后10行很简单有么有,切片妥妥的。

# 演示代码,说明核心逻辑,完整代码在下面
last_lines = f.readlines()[-10:]
for line in last_lines:
    self.callback(line)


时代码变成这样了:

import sys
import time

class Tail():
    def __init__(self,file_name,callback=sys.stdout.write):
        self.file_name = file_name
        self.callback = callback
    def follow(self,n=10):
        try:
            with open(self.file_name) as f:
                self._file = f
                self.showLastLine(n)
                self._file.seek(0,2)           
                while True:
                    line = self._file.readline()                    
                    if line:
                        self.callback(line)        
        except Exception,e:
            print ‘打开文件失败,囧,看看文件是不是不存在,或者权限有问题‘
            print e    
     def showLastLine(self, n):
        last_lines = self._file.readlines()[-10:]        
        for line in last_lines:
            self.callback(line)

更进一步:大的日志怎么办

但是如果文件特别大呢,特别是日志文件,很容易就是几个G。我们只需要最后几行,全部读出来内存受不了,所以我们要继续优化showLastLine 函数,我觉得这才是这题的难点所在。

大概的思路如下:

  • 我先估计日志一行大概是100个字符,注意,我只是估计一个大概,多了也没关系,我们只是需要一个初始值,后面会修正
  • 我要读十行,所以一开始就 seek到离结尾1000的位置seek(-1000,2),把最后1000个字符读出来,然后有一个判断
  • 如果这1000个字符长度大于文件长度,不用管了 属于级别1的情况,直接read然后split
  • 如果1000个字符里面的换行符大于10,说明1000个字符里,大于10行,那也好办,处理思路类似级别1,直接split取后面十个
  • 问题就在于 如果小于10怎么处理
    • 比如1000个里,只有四个换行符,说明一行大概是1000/4=250个字符,我们还缺6行,所以我们再读250*5=1500,一共有2500的大概比较靠谱,然后在对2500个字符进行相同的逻辑判断,直到读出的字符里,换行符大于10,读取结束

逻辑清晰以后,代码就呼之欲出啦

# coding=utf-8
import sysi
mport time

class Tail():
    def __init__(self,file_name,callback=sys.stdout.write):
        self.file_name = file_name
        self.callback = callback
    def follow(self,n=10):
        try:    
            with open(self.file_name) as f:
                self._file = f
                self._file.seek(0,2)
                self.file_length = self._file.tell()
                self.showLastLine(n)       
                while True:
                    line = self._file.readline()    
                    if line:
                        self.callback(line)
                    time.sleep(1)
        except Exception,e:
            print ‘打开文件失败,囧,看看文件是不是不存在,或者权限有问题‘
            print e
      def showLastLine(self, n):
        len_line = 100
        read_len = len_line*n
        while True:
            if read_len>self.file_length:
                self._file.seek(0)
                last_lines = self._file.read().split(‘\n‘)[-n:]                      break
            self._file.seek(-read_len, 2)
            last_words = self._file.read(read_len)
            count = last_words.count(‘\n‘)          
            if count>=n:
                last_lines = last_words.split(‘\n‘)[-n:]                             break
            else:            
                if count==0:
                    len_perline = read_len       
                else:
                    len_perline = read_len/count
                read_len = len_perline * n        
        for line in last_lines:
            self.callback(line+‘\n‘)
if __name__ == ‘__main__‘:
    py_tail = Tail(‘test.txt‘)
    py_tail.follow()

加上注释的版本:

# coding=utf-8
import sys
import time

class Tail():
    def __init__(self,file_name,callback=sys.stdout.write):
        self.file_name = file_name
        self.callback = callback
    def follow(self,n=10):
        try:      
            # 打开文件
            with open(self.file_name) as f:
                self._file = f
                self._file.seek(0,2)
                # 存储文件的字符长度
                self.file_length = self._file.tell()                
                # 打印最后10行
                self.showLastLine(n)                
                # 持续读文件 打印增量
                while True:
                    line = self._file.readline()                    
                    if line:
                        self.callback(line)
                    time.sleep(1)
        except Exception,e:            
            print ‘打开文件失败,囧,看看文件是不是不存在,或者权限有问题‘
            print e    
    def showLastLine(self, n):
        # 一行大概100个吧 这个数改成1或者1000都行
        len_line = 100
        # n默认是10,也可以follow的参数传进来
        read_len = len_line*n        
        # 用last_lines存储最后要处理的内容
        while True:            
            # 如果要读取的1000个字符,大于之前存储的文件长度
            # 读完文件,直接break
            if read_len>self.file_length:
                self._file.seek(0)
                last_lines = self._file.read().split(‘\n‘)[-n:]                      break
            # 先读1000个 然后判断1000个字符里换行符的数量
            self._file.seek(-read_len, 2)
            last_words = self._file.read(read_len)            
            # count是换行符的数量
            count = last_words.count(‘\n‘)            

            if count>=n:                
                # 换行符数量大于10 很好处理,直接读取
                last_lines = last_words.split(‘\n‘)[-n:]                             break
            # 换行符不够10个
            else:                
                # break
                #不够十行
                # 如果一个换行符也没有,那么我们就认为一行大概是100个
                if count==0:

                    len_perline = read_len               
                # 如果有4个换行符,我们认为每行大概有250个字符
                else:
                    len_perline = read_len/count                
                # 要读取的长度变为2500,继续重新判断
                read_len = len_perline * n        
        for line in last_lines:
            self.callback(line+‘\n‘)
if __name__ == ‘__main__‘:
    py_tail = Tail(‘test.txt‘)
    py_tail.follow(20)

效果如下,终于可以打印最后几行了,大家可以试一下,无论日志每行只有1个,还是每行有300个字符,都是可以打印最后10行的。

效果看这里:http://mmbiz.qpic.cn/mmbiz/Mzws9oBx0P7dId4jrnw0FqCyNNdClffCBTo9VFxt1DS1yTQQWn4Kw5oYzGRKE3ZrZ432kibOtibhUFfRYmJRIIqA/0?wx_fmt=gif&tp=webp&wxfrom=5&wx_lazy=1

能做到这个地步,一般的面试官就直接被你搞定了,这个代码大概8分吧,如果还想再进一步,还有一些可以优化的地方,代码放 github 上了,有兴趣的可以拿去研究。(Github 地址:pytail/tail.py at master · shengxinjing/pytail · GitHub

待优化:留给大家作为扩展

  • 如果你tail -f的过程中,日志被备份清空或者切割了,怎么处理

    • 其实很简单,你维护一个指针位置,如果下次循环发现文件指针位置变了,从最新的指针位置开始读就行
  • 具体每种错误的类型
    • 我这里只是简单的try 可以写个函数,把文件不存在,文件没权限这些报错信息分开
  • 性能小优化,比如我第一次读了1000,只有4行,我大概估计每行250个字符,总体需要2500行,下次没必要直接读2500个,而是读1500行和之前1000行拼起来即可
  • inotify(7) - Linux manual page

最后大杀器

如果写出来这个,基本面试官会直接……

import osd
ef tail(file_name):
    os.system(‘tail -f ‘+file_name)
    
tail(‘log.log‘)

打死你!!!

以上就是我对这个题的想法,实际开发中想监控文件变化,其实还是pyinotify 好用。

关于这道面试题,大家有什么好的想法或对代码有什么优化,都可以留言交流哦。

著作权归作者所有。
商业转载请联系作者获得授权,非商业转载请注明出处。
作者:shengxinjing (woniuppp) · GitHub
来源:Python 10min系列之面试题解析丨Python实现tail -f功能

==========================================

欢迎关注Reboot教育  运维自动化班(5月8日开班)

课程表:http://www.51reboot.com/course/devops/

上课形式:面授班 / 网络直播班

QQ:979950755

交流群:238757010

10min手写(五):面试题解析丨Python实现tail -f功能

时间: 2024-10-13 01:17:37

10min手写(五):面试题解析丨Python实现tail -f功能的相关文章

Python 10min 面试题解析丨Python实现多连接下载器

作者:蜗牛 shengxinjing (woniuppp) · GitHub 今天群里看到有人问关于 Python 多线程写文件的问题,联想到这是 Reboot 的架构师班的入学题.我想了一下,感觉坑和考察的点还挺多的,可以当成一个面试题来问,简单说一下我的想法和思路吧,涉及的代码和注释在 GitHub 上 (https://github.com/shengxinjing/my_blog/blob/master/downloader/downloader.py) 当年的网络蚂蚁"多点同时下载.并

深度学习---手写字体识别程序分析(python)

我想大部分程序员的第一个程序应该都是"hello world",在深度学习领域,这个"hello world"程序就是手写字体识别程序. 这次我们详细的分析下手写字体识别程序,从而可以对深度学习建立一个基本的概念. 1.初始化权重和偏置矩阵,构建神经网络的架构 import numpy as np class network(): def __init__(self, sizes): self.num_layers = len(sizes) self.sizes =

10min 手写一个内存监控系统

本文的目的在于,尽可能用简单的代码,让大家了解内存监控的原理,及思想.更容易去理解Nagios.Zabbix.Ganglia监控原理,文章最后还有视频教程链接哦,从零敲出来的全过程 思路分为下面几块: 1.获取内存监控信息 2.存储监控信息 3.数据展现 4.后续扩展 1.加主机名,monitor部署在多台机器,不直接插入数据库 2. 增加CPU,Disk监控 3. 通过HTTP请求的方式,启用一个单独的Flask专门存储monitor数据 第一步:获取内存信息 我们通过读取 /proc/mem

使用神经网络来识别手写数字【译(三)- 用Python代码实现

实现我们分类数字的网络 好,让我们使用随机梯度下降和 MNIST训练数据来写一个程序来学习怎样失败手写数字. 我们也难怪Python (2.7) 来实现.只有 74 行代码!我们需要的第一个东西是 MNIST数据.如果有 github 账号,你可以将这些代码库克隆下来, git clone https://github.com/mnielsen/neural-networks-and-deep-learning.git 或者你可以到这里 下载. Incidentally, 当我先前说到 MNIS

Atiit 如何手写词法解析器

1.1. 通过编程直接从正则->nfa->dfa->表驱动词法解析一条龙自动生成.那是用程序自动生成是需要这样的,自己手写完全不必要这么复杂1 1.2. 状态转移表.使用状态表比较简单,dfa比较麻烦.Dfa其实就是比较高级的状态表..1 1.3. 然后给了你代码框架(这里以nested case statement 为例):2 1.4. 源码实现2 1.1. 通过编程直接从正则->nfa->dfa->表驱动词法解析一条龙自动生成.那是用程序自动生成是需要这样的,自己手

BP神经网络识别手写数字项目解析及代码

这两天在学习人工神经网络,用传统神经网络结构做了一个识别手写数字的小项目作为练手.点滴收获与思考,想跟大家分享一下,欢迎指教,共同进步. 平常说的BP神经网络指传统的人工神经网络,相比于卷积神经网络(CNN)来说要简单些. 人工神经网络具有复杂模式和进行联想.推理记忆的功能, 它是解决某些传统方法所无法解决的问题的有力工具.目前, 它日益受到重视, 同时其他学科的发展, 为其提供了更大的机会.1986 年, Romelhart 和Mcclelland提出了误差反向传播算法(Error Back

22 道高频 JavaScript 手写面试题及答案

实现防抖函数(debounce) 防抖函数原理:在事件被触发n秒后再执行回调,如果在这n秒内又被触发,则重新计时. 那么与节流函数的区别直接看这个动画实现即可. 手写简化版: // 防抖函数 const debounce = (fn, delay) => { let timer = null; return (...args) => { clearTimeout(timer); timer = setTimeout(() => { fn.apply(this, args); }, del

使用Caffe进行手写数字识别执行流程解析

之前在 http://blog.csdn.net/fengbingchun/article/details/50987185 中仿照Caffe中的examples实现对手写数字进行识别,这里详细介绍下其执行流程并精简了实现代码,使用Caffe对MNIST数据集进行train的文章可以参考  http://blog.csdn.net/fengbingchun/article/details/68065338 : 1.   先注册所有层,执行layer_factory.hpp中类LayerRegis

tensorflow 基础学习五:MNIST手写数字识别

MNIST数据集介绍: from tensorflow.examples.tutorials.mnist import input_data # 载入MNIST数据集,如果指定地址下没有已经下载好的数据,tensorflow会自动下载数据 mnist=input_data.read_data_sets('.',one_hot=True) # 打印 Training data size:55000. print("Training data size: {}".format(mnist.