使用Python获取文件的最后n行,类似于tail

时间:2020-03-06 14:45:01  来源:igfitidea点击:

我正在为Web应用程序编写日志文件查看器,为此,我想在日志文件的各行中进行分页。文件中的项目是基于行的,底部是最新的项目。

因此,我需要一个" tail()"方法,该方法可以从底部读取" n"行并支持偏移量。我想到的是这样的:

def tail(f, n, offset=0):
    """Reads a n lines from f with an offset of offset lines."""
    avg_line_length = 74
    to_read = n + offset
    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None]
        avg_line_length *= 1.3

这是一个合理的方法吗?建议使用带偏移量尾部日志文件的推荐方式是什么?

解决方案

假设我们可以在Python 2上使用类似unix的系统:

import os
def tail(f, n, offset=0):
  stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
  stdin.close()
  lines = stdout.readlines(); stdout.close()
  return lines[:,-offset]

对于python 3,我们可以执行以下操作:

import subprocess
def tail(f, n, offset=0):
    proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
    lines = proc.stdout.readlines()
    return lines[:, -offset]

为了提高大文件的效率(在日志文件中通常需要使用tail的情况下很常见),通常希望避免读取整个文件(即使这样做时也没有立即将整个文件读入内存),但是需要以某种方式而不是字符来计算偏移量。一种可能是通过char用seek()char向后读取,但这很慢。相反,最好在较大的块中进行处理。

我有一个实用程序函数,我在前一段时间写过,可以向后读取文件,可以在这里使用。

import os, itertools

def rblocks(f, blocksize=4096):
    """Read file as series of blocks from end of file to start.

    The data itself is in normal order, only the order of the blocks is reversed.
    ie. "hello world" -> ["ld","wor", "lo ", "hel"]
    Note that the file must be opened in binary mode.
    """
    if 'b' not in f.mode.lower():
        raise Exception("File must be opened using binary mode.")
    size = os.stat(f.name).st_size
    fullblocks, lastblock = divmod(size, blocksize)

    # The first(end of file) block will be short, since this leaves 
    # the rest aligned on a blocksize boundary.  This may be more 
    # efficient than having the last (first in file) block be short
    f.seek(-lastblock,2)
    yield f.read(lastblock)

    for i in range(fullblocks-1,-1, -1):
        f.seek(i * blocksize)
        yield f.read(blocksize)

def tail(f, nlines):
    buf = ''
    result = []
    for block in rblocks(f):
        buf = block + buf
        lines = buf.splitlines()

        # Return all lines except the first (since may be partial)
        if lines:
            result.extend(lines[1:]) # First line may not be complete
            if(len(result) >= nlines):
                return result[-nlines:]

            buf = lines[0]

    return ([buf]+result)[-nlines:]

f=open('file_to_tail.txt','rb')
for line in tail(f, 20):
    print line

[编辑]添加了更特定的版本(避免需要反向两次)

这可能比要快。不假设行长。一次返回一个文件块,直到找到正确数量的'\ n'字符为止。

def tail( f, lines=20 ):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
                # from the end of the file
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            # read the last block we haven't yet read
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count('\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = ''.join(reversed(blocks))
    return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

我不喜欢关于行长的棘手假设,而实际上,我们永远都不知道那样的事情。

通常,这将在循环的第一次或者第二次通过中定位最后20行。如果74个字符实际上是准确的,则将块大小设置为2048,并且几乎立即尾随20行。

另外,我不会消耗大量的大脑卡路里来尝试与物理OS块进行精确对齐。使用这些高级I / O程序包,我怀疑我们会发现尝试在OS块边界上对齐会对性能产生任何影响。如果使用较低级别的I / O,则可能会看到加速。

再三考虑,这可能和这里的一切一样快。

def tail( f, window=20 ):
    lines= ['']*window
    count= 0
    for l in f:
        lines[count%window]= l
        count += 1
    print lines[count%window:], lines[:count%window]

这要简单得多。而且它似乎确实在以良好的速度前进。

如果可以读取整个文件,请使用双端队列。

from collections import deque
deque(f, maxlen=n)

在2.6之前,双端队列没有maxlen选项,但是实现起来很容易。

import itertools
def maxque(items, size):
    items = iter(items)
    q = deque(itertools.islice(items, size))
    for item in items:
        del q[0]
        q.append(item)
    return q

如果需要从头开始读取文件,请使用疾驰(也就是指数)搜索。

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []
    while len(lines) <= n:
        try:
            f.seek(-pos, 2)
        except IOError:
            f.seek(0)
            break
        finally:
            lines = list(f)
        pos *= 2
    return lines[-n:]

我最终使用的代码。我认为这是迄今为止最好的:

def tail(f, n, offset=None):
    """Reads a n lines from f with an offset of offset lines.  The return
    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
    an indicator that is `True` if there are more lines in the file.
    """
    avg_line_length = 74
    to_read = n + (offset or 0)

    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None], \
                   len(lines) > to_read or pos > 0
        avg_line_length *= 1.3

基于S.Lott的最高票选答案(08年9月25日在21:43),但针对小文件而固定。

def tail(the_file, lines_2find=20):  
    the_file.seek(0, 2)                         #go to end of file
    bytes_in_file = the_file.tell()             
    lines_found, total_bytes_scanned = 0, 0
    while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned: 
        byte_block = min(1024, bytes_in_file-total_bytes_scanned)
        the_file.seek(-(byte_block+total_bytes_scanned), 2)
        total_bytes_scanned += byte_block
        lines_found += the_file.read(1024).count('\n')
    the_file.seek(-total_bytes_scanned, 2)
    line_list = list(the_file.readlines())
    return line_list[-lines_2find:]

    #we read at least 21 line breaks from the bottom, block by block for speed
    #21 to ensure we don't get a half line

希望这是有用的。

我们可以使用f.seek(0,2)转到文件末尾,然后用以下替换readline()逐行读取行:

def readline_backwards(self, f):
    backline = ''
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    backline = last
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    f.seek(1, 1)
    return backline