使用Python获取文件的最后n行,类似于tail
我正在为Web应用程序编写日志文件查看器,为此,我想在日志文件的各行中进行分页。文件中的项目是基于行的,底部是最新的项目。
因此,我需要一个" tail()"方法,该方法可以从底部读取" n"行并支持偏移量。我想到的是这样的:
def tail(f, n, offset=0):
"""Reads a n lines from f with an offset of offset lines."""
avg_line_length = 74
to_read = n + offset
while 1:
try:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
f.seek(0)
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None]
avg_line_length *= 1.3
这是一个合理的方法吗?建议使用带偏移量尾部日志文件的推荐方式是什么?
解决方案
假设我们可以在Python 2上使用类似unix的系统:
import os
def tail(f, n, offset=0):
stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
stdin.close()
lines = stdout.readlines(); stdout.close()
return lines[:,-offset]
对于python 3,我们可以执行以下操作:
import subprocess
def tail(f, n, offset=0):
proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
lines = proc.stdout.readlines()
return lines[:, -offset]
为了提高大文件的效率(在日志文件中通常需要使用tail的情况下很常见),通常希望避免读取整个文件(即使这样做时也没有立即将整个文件读入内存),但是需要以某种方式而不是字符来计算偏移量。一种可能是通过char用seek()char向后读取,但这很慢。相反,最好在较大的块中进行处理。
我有一个实用程序函数,我在前一段时间写过,可以向后读取文件,可以在这里使用。
import os, itertools
def rblocks(f, blocksize=4096):
"""Read file as series of blocks from end of file to start.
The data itself is in normal order, only the order of the blocks is reversed.
ie. "hello world" -> ["ld","wor", "lo ", "hel"]
Note that the file must be opened in binary mode.
"""
if 'b' not in f.mode.lower():
raise Exception("File must be opened using binary mode.")
size = os.stat(f.name).st_size
fullblocks, lastblock = divmod(size, blocksize)
# The first(end of file) block will be short, since this leaves
# the rest aligned on a blocksize boundary. This may be more
# efficient than having the last (first in file) block be short
f.seek(-lastblock,2)
yield f.read(lastblock)
for i in range(fullblocks-1,-1, -1):
f.seek(i * blocksize)
yield f.read(blocksize)
def tail(f, nlines):
buf = ''
result = []
for block in rblocks(f):
buf = block + buf
lines = buf.splitlines()
# Return all lines except the first (since may be partial)
if lines:
result.extend(lines[1:]) # First line may not be complete
if(len(result) >= nlines):
return result[-nlines:]
buf = lines[0]
return ([buf]+result)[-nlines:]
f=open('file_to_tail.txt','rb')
for line in tail(f, 20):
print line
[编辑]添加了更特定的版本(避免需要反向两次)
这可能比要快。不假设行长。一次返回一个文件块,直到找到正确数量的'\ n'字符为止。
def tail( f, lines=20 ):
total_lines_wanted = lines
BLOCK_SIZE = 1024
f.seek(0, 2)
block_end_byte = f.tell()
lines_to_go = total_lines_wanted
block_number = -1
blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
# from the end of the file
while lines_to_go > 0 and block_end_byte > 0:
if (block_end_byte - BLOCK_SIZE > 0):
# read the last block we haven't yet read
f.seek(block_number*BLOCK_SIZE, 2)
blocks.append(f.read(BLOCK_SIZE))
else:
# file too small, start from begining
f.seek(0,0)
# only read what was not read
blocks.append(f.read(block_end_byte))
lines_found = blocks[-1].count('\n')
lines_to_go -= lines_found
block_end_byte -= BLOCK_SIZE
block_number -= 1
all_read_text = ''.join(reversed(blocks))
return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])
我不喜欢关于行长的棘手假设,而实际上,我们永远都不知道那样的事情。
通常,这将在循环的第一次或者第二次通过中定位最后20行。如果74个字符实际上是准确的,则将块大小设置为2048,并且几乎立即尾随20行。
另外,我不会消耗大量的大脑卡路里来尝试与物理OS块进行精确对齐。使用这些高级I / O程序包,我怀疑我们会发现尝试在OS块边界上对齐会对性能产生任何影响。如果使用较低级别的I / O,则可能会看到加速。
再三考虑,这可能和这里的一切一样快。
def tail( f, window=20 ):
lines= ['']*window
count= 0
for l in f:
lines[count%window]= l
count += 1
print lines[count%window:], lines[:count%window]
这要简单得多。而且它似乎确实在以良好的速度前进。
如果可以读取整个文件,请使用双端队列。
from collections import deque deque(f, maxlen=n)
在2.6之前,双端队列没有maxlen选项,但是实现起来很容易。
import itertools
def maxque(items, size):
items = iter(items)
q = deque(itertools.islice(items, size))
for item in items:
del q[0]
q.append(item)
return q
如果需要从头开始读取文件,请使用疾驰(也就是指数)搜索。
def tail(f, n):
assert n >= 0
pos, lines = n+1, []
while len(lines) <= n:
try:
f.seek(-pos, 2)
except IOError:
f.seek(0)
break
finally:
lines = list(f)
pos *= 2
return lines[-n:]
我最终使用的代码。我认为这是迄今为止最好的:
def tail(f, n, offset=None):
"""Reads a n lines from f with an offset of offset lines. The return
value is a tuple in the form ``(lines, has_more)`` where `has_more` is
an indicator that is `True` if there are more lines in the file.
"""
avg_line_length = 74
to_read = n + (offset or 0)
while 1:
try:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
f.seek(0)
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None], \
len(lines) > to_read or pos > 0
avg_line_length *= 1.3
基于S.Lott的最高票选答案(08年9月25日在21:43),但针对小文件而固定。
def tail(the_file, lines_2find=20):
the_file.seek(0, 2) #go to end of file
bytes_in_file = the_file.tell()
lines_found, total_bytes_scanned = 0, 0
while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned:
byte_block = min(1024, bytes_in_file-total_bytes_scanned)
the_file.seek(-(byte_block+total_bytes_scanned), 2)
total_bytes_scanned += byte_block
lines_found += the_file.read(1024).count('\n')
the_file.seek(-total_bytes_scanned, 2)
line_list = list(the_file.readlines())
return line_list[-lines_2find:]
#we read at least 21 line breaks from the bottom, block by block for speed
#21 to ensure we don't get a half line
希望这是有用的。
我们可以使用f.seek(0,2)转到文件末尾,然后用以下替换readline()逐行读取行:
def readline_backwards(self, f):
backline = ''
last = ''
while not last == '\n':
backline = last + backline
if f.tell() <= 0:
return backline
f.seek(-1, 1)
last = f.read(1)
f.seek(-1, 1)
backline = last
last = ''
while not last == '\n':
backline = last + backline
if f.tell() <= 0:
return backline
f.seek(-1, 1)
last = f.read(1)
f.seek(-1, 1)
f.seek(1, 1)
return backline

