如何使用进度条在 Python 中复制文件?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/274493/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to copy a file in Python with a progress bar?
提问by dbr
When copying large files using shutil.copy()
, you get no indication of how the operation is progressing..
使用 复制大文件时shutil.copy()
,您无法获知操作的进展情况。
I have put together something that works - it uses a simple ProgressBar class (which simple returns a simple ASCII progress bar, as a string), and a loop of open().read()
and .write()
to do the actual copying. It displays the progress bar using sys.stdout.write("\r%s\r" % (the_progress_bar))
which is a little hackish, but it works.
我已经把一些作品-它使用一个简单的进度条类(简单的返回一个简单的ASCII进度栏,作为一个字符串),和一个环open().read()
,并.write()
做实际的复制。它使用sys.stdout.write("\r%s\r" % (the_progress_bar))
有点hackish的方式显示进度条,但它可以工作。
You can see the code (in context) on github here
您可以在此处查看 github 上的代码(在上下文中)
Is there any built-in module that will do this better? Is there any improvements that can be made to this code?
是否有任何内置模块可以更好地做到这一点?是否可以对此代码进行任何改进?
回答by Greg Hewgill
Two things:
两件事情:
- I would make the default block size a lotlarger than 512. I would start with 16384 and perhaps more.
- For modularity, it might be better to have the
copy_with_prog
function not output the progress bar itself, but call a callback function so the caller can decide how to display the progress.
- 我想使默认的块大小,很多比512大,我将开始与16384,也许更。
- 对于模块化,最好让
copy_with_prog
函数不输出进度条本身,而是调用回调函数,以便调用者可以决定如何显示进度。
Perhaps something like this:
也许是这样的:
def copy_with_prog(src, dest, callback = None):
while True:
# copy loop stuff
if callback:
callback(pos, total)
prog = ProgressBar(...)
copy_with_prog(src, dest, lambda pos, total: prog.update(pos, total))
回答by Jim Carroll
Overkill? Perhaps. But on almost any system, Linux, Mac, and With a quick wxWidgets install on Windows, you can have the real deal, with pause and cancel buttons in a gui setup. Macs ship with wxWidgets these days, and it's a common package on Linux.
矫枉过正?也许。但是在几乎所有系统上,Linux、Mac 和 Windows 上快速安装 wxWidgets,您就可以真正获得优惠,在 gui 设置中使用暂停和取消按钮。现在 Mac 附带 wxWidgets,它是 Linux 上的一个常见软件包。
A single file is very quick (it will immediately finish and look broken) so you might consider creating a fileSet job that ticks along once per file instead of once per block. Enjoy!
单个文件非常快(它会立即完成并看起来已损坏),因此您可以考虑创建一个 fileSet 作业,该作业对每个文件进行一次而不是每个块一次。享受!
-Jim Carroll
-吉姆卡罗尔
"""
Threaded Jobs.
Any class that does a long running process can inherit
from ThreadedJob. This enables running as a background
thread, progress notification, pause and cancel. The
time remaining is also calculated by the ThreadedJob class.
"""
import wx.lib.newevent
import thread
import exceptions
import time
(RunEvent, EVT_RUN) = wx.lib.newevent.NewEvent()
(CancelEvent, EVT_CANCEL) = wx.lib.newevent.NewEvent()
(DoneEvent, EVT_DONE) = wx.lib.newevent.NewEvent()
(ProgressStartEvent, EVT_PROGRESS_START) = wx.lib.newevent.NewEvent()
(ProgressEvent, EVT_PROGRESS) = wx.lib.newevent.NewEvent()
class InterruptedException(exceptions.Exception):
def __init__(self, args = None):
self.args = args
#
#
class ThreadedJob:
def __init__(self):
# tell them ten seconds at first
self.secondsRemaining = 10.0
self.lastTick = 0
# not running yet
self.isPaused = False
self.isRunning = False
self.keepGoing = True
def Start(self):
self.keepGoing = self.isRunning = True
thread.start_new_thread(self.Run, ())
self.isPaused = False
#
def Stop(self):
self.keepGoing = False
#
def WaitUntilStopped(self):
while self.isRunning:
time.sleep(0.1)
wx.SafeYield()
#
#
def IsRunning(self):
return self.isRunning
#
def Run(self):
# this is overridden by the
# concrete ThreadedJob
print "Run was not overloaded"
self.JobFinished()
pass
#
def Pause(self):
self.isPaused = True
pass
#
def Continue(self):
self.isPaused = False
pass
#
def PossibleStoppingPoint(self):
if not self.keepGoing:
raise InterruptedException("process interrupted.")
wx.SafeYield()
# allow cancel while paused
while self.isPaused:
if not self.keepGoing:
raise InterruptedException("process interrupted.")
# don't hog the CPU
time.sleep(0.1)
#
#
def SetProgressMessageWindow(self, win):
self.win = win
#
def JobBeginning(self, totalTicks):
self.lastIterationTime = time.time()
self.totalTicks = totalTicks
if hasattr(self, "win") and self.win:
wx.PostEvent(self.win, ProgressStartEvent(total=totalTicks))
#
#
def JobProgress(self, currentTick):
dt = time.time() - self.lastIterationTime
self.lastIterationTime = time.time()
dtick = currentTick - self.lastTick
self.lastTick = currentTick
alpha = 0.92
if currentTick > 1:
self.secondsPerTick = dt * (1.0 - alpha) + (self.secondsPerTick * alpha)
else:
self.secondsPerTick = dt
#
if dtick > 0:
self.secondsPerTick /= dtick
self.secondsRemaining = self.secondsPerTick * (self.totalTicks - 1 - currentTick) + 1
if hasattr(self, "win") and self.win:
wx.PostEvent(self.win, ProgressEvent(count=currentTick))
#
#
def SecondsRemaining(self):
return self.secondsRemaining
#
def TimeRemaining(self):
if 1: #self.secondsRemaining > 3:
minutes = self.secondsRemaining // 60
seconds = int(self.secondsRemaining % 60.0)
return "%i:%02i" % (minutes, seconds)
else:
return "a few"
#
def JobFinished(self):
if hasattr(self, "win") and self.win:
wx.PostEvent(self.win, DoneEvent())
#
# flag we're done before we post the all done message
self.isRunning = False
#
#
class EggTimerJob(ThreadedJob):
""" A sample Job that demonstrates the mechanisms and features of the Threaded Job"""
def __init__(self, duration):
self.duration = duration
ThreadedJob.__init__(self)
#
def Run(self):
""" This can either be run directly for synchronous use of the job,
or started as a thread when ThreadedJob.Start() is called.
It is responsible for calling JobBeginning, JobProgress, and JobFinished.
And as often as possible, calling PossibleStoppingPoint() which will
sleep if the user pauses, and raise an exception if the user cancels.
"""
self.time0 = time.clock()
self.JobBeginning(self.duration)
try:
for count in range(0, self.duration):
time.sleep(1.0)
self.JobProgress(count)
self.PossibleStoppingPoint()
#
except InterruptedException:
# clean up if user stops the Job early
print "canceled prematurely!"
#
# always signal the end of the job
self.JobFinished()
#
#
def __str__(self):
""" The job progress dialog expects the job to describe its current state."""
response = []
if self.isPaused:
response.append("Paused Counting")
elif not self.isRunning:
response.append("Will Count the seconds")
else:
response.append("Counting")
#
return " ".join(response)
#
#
class FileCopyJob(ThreadedJob):
""" A common file copy Job. """
def __init__(self, orig_filename, copy_filename, block_size=32*1024):
self.src = orig_filename
self.dest = copy_filename
self.block_size = block_size
ThreadedJob.__init__(self)
#
def Run(self):
""" This can either be run directly for synchronous use of the job,
or started as a thread when ThreadedJob.Start() is called.
It is responsible for calling JobBeginning, JobProgress, and JobFinished.
And as often as possible, calling PossibleStoppingPoint() which will
sleep if the user pauses, and raise an exception if the user cancels.
"""
self.time0 = time.clock()
try:
source = open(self.src, 'rb')
# how many blocks?
import os
(st_mode, st_ino, st_dev, st_nlink, st_uid, st_gid, st_size, st_atime, st_mtime, st_ctime) = os.stat(self.src)
num_blocks = st_size / self.block_size
current_block = 0
self.JobBeginning(num_blocks)
dest = open(self.dest, 'wb')
while 1:
copy_buffer = source.read(self.block_size)
if copy_buffer:
dest.write(copy_buffer)
current_block += 1
self.JobProgress(current_block)
self.PossibleStoppingPoint()
else:
break
source.close()
dest.close()
except InterruptedException:
# clean up if user stops the Job early
dest.close()
# unlink / delete the file that is partially copied
os.unlink(self.dest)
print "canceled, dest deleted!"
#
# always signal the end of the job
self.JobFinished()
#
#
def __str__(self):
""" The job progress dialog expects the job to describe its current state."""
response = []
if self.isPaused:
response.append("Paused Copy")
elif not self.isRunning:
response.append("Will Copy a file")
else:
response.append("Copying")
#
return " ".join(response)
#
#
class JobProgress(wx.Dialog):
""" This dialog shows the progress of any ThreadedJob.
It can be shown Modally if the main application needs to suspend
operation, or it can be shown Modelessly for background progress
reporting.
app = wx.PySimpleApp()
job = EggTimerJob(duration = 10)
dlg = JobProgress(None, job)
job.SetProgressMessageWindow(dlg)
job.Start()
dlg.ShowModal()
"""
def __init__(self, parent, job):
self.job = job
wx.Dialog.__init__(self, parent, -1, "Progress", size=(350,200))
# vertical box sizer
sizeAll = wx.BoxSizer(wx.VERTICAL)
# Job status text
self.JobStatusText = wx.StaticText(self, -1, "Starting...")
sizeAll.Add(self.JobStatusText, 0, wx.EXPAND|wx.ALL, 8)
# wxGague
self.ProgressBar = wx.Gauge(self, -1, 10, wx.DefaultPosition, (250, 15))
sizeAll.Add(self.ProgressBar, 0, wx.EXPAND|wx.ALL, 8)
# horiz box sizer, and spacer to right-justify
sizeRemaining = wx.BoxSizer(wx.HORIZONTAL)
sizeRemaining.Add((2,2), 1, wx.EXPAND)
# time remaining read-only edit
# putting wide default text gets a reasonable initial layout.
self.remainingText = wx.StaticText(self, -1, "???:??")
sizeRemaining.Add(self.remainingText, 0, wx.LEFT|wx.RIGHT|wx.ALIGN_CENTER_VERTICAL, 8)
# static text: remaining
self.remainingLabel = wx.StaticText(self, -1, "remaining")
sizeRemaining.Add(self.remainingLabel, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 8)
# add that row to the mix
sizeAll.Add(sizeRemaining, 1, wx.EXPAND)
# horiz box sizer & spacer
sizeButtons = wx.BoxSizer(wx.HORIZONTAL)
sizeButtons.Add((2,2), 1, wx.EXPAND|wx.ADJUST_MINSIZE)
# Pause Button
self.PauseButton = wx.Button(self, -1, "Pause")
sizeButtons.Add(self.PauseButton, 0, wx.ALL, 4)
self.Bind(wx.EVT_BUTTON, self.OnPauseButton, self.PauseButton)
# Cancel button
self.CancelButton = wx.Button(self, wx.ID_CANCEL, "Cancel")
sizeButtons.Add(self.CancelButton, 0, wx.ALL, 4)
self.Bind(wx.EVT_BUTTON, self.OnCancel, self.CancelButton)
# Add all the buttons on the bottom row to the dialog
sizeAll.Add(sizeButtons, 0, wx.EXPAND|wx.ALL, 4)
self.SetSizer(sizeAll)
#sizeAll.Fit(self)
sizeAll.SetSizeHints(self)
# jobs tell us how they are doing
self.Bind(EVT_PROGRESS_START, self.OnProgressStart)
self.Bind(EVT_PROGRESS, self.OnProgress)
self.Bind(EVT_DONE, self.OnDone)
self.Layout()
#
def OnPauseButton(self, event):
if self.job.isPaused:
self.job.Continue()
self.PauseButton.SetLabel("Pause")
self.Layout()
else:
self.job.Pause()
self.PauseButton.SetLabel("Resume")
self.Layout()
#
#
def OnCancel(self, event):
self.job.Stop()
#
def OnProgressStart(self, event):
self.ProgressBar.SetRange(event.total)
self.statusUpdateTime = time.clock()
#
def OnProgress(self, event):
# update the progress bar
self.ProgressBar.SetValue(event.count)
self.remainingText.SetLabel(self.job.TimeRemaining())
# update the text a max of 20 times a second
if time.clock() - self.statusUpdateTime > 0.05:
self.JobStatusText.SetLabel(str(self.job))
self.statusUpdateTime = time.clock()
self.Layout()
#
#
# when a job is done
def OnDone(self, event):
self.ProgressBar.SetValue(0)
self.JobStatusText.SetLabel("Finished")
self.Destroy()
#
#
if __name__ == "__main__":
app = wx.PySimpleApp()
#job = EggTimerJob(duration = 10)
job = FileCopyJob("VeryBigFile.mp4", "/tmp/test_junk.mp4", 1024*1024*10)
dlg = JobProgress(None, job)
job.SetProgressMessageWindow(dlg)
job.Start()
dlg.ShowModal()
#
回答by frmdstryr
If you want to use the Windows copy dialog with progress you can use these:
如果要使用带有进度的 Windows 复制对话框,可以使用以下命令:
回答by Gabriel Coutinho De Miranda
I have this shutil.copy() with progress bar made in a simple way just with built in modules. If you are using utf-8 encoding you can get a progress like the second example in the gif image: Progress bars examples for this:
我有这个带有进度条的shutil.copy() 以简单的方式制作,仅使用内置模块。如果您使用的是 utf-8 编码,您可以获得类似于 gif 图像中的第二个示例的进度: 进度条示例:
Read the comments to change style and colors. The first and last examples don't need utf-8. You can use the command CPprogress(SOURCE, DESTINATION) just where you had shutil.copy(src, dst):
阅读评论以更改样式和颜色。第一个和最后一个示例不需要 utf-8。你可以使用命令 CPprogress(SOURCE, DESTINATION) 就在你有 Shutil.copy(src, dst) 的地方:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
CPprogress(SOURCE, DESTINATION)
I made this to give shutil.copy() [or shutil.copy2() in this case] a progress bar.
You can use CPprogress(SOURCE, DESTINATION) just like shutil.copy(src, dst). SOURCE must be a file path and DESTINATION a file or folder path.
It will give you a progress bar for each file copied. Just copy this code above the place where you want to use CPprogress(SOURCE, DESTINATION) in your code.
You can easily change the look of the progress bar:
- To keep the style and just change the colors, replace the colors values of progressCOLOR and finalCOLOR (orange code at the end of the lines).
- The use a solid block progress bar, # -*- coding: utf-8 -*- is required. Otherwise, you will get an encoding error. Some basic terminals, like xterm, may not show the progress bar because of the utf-8 characters.
To use this style, remove the comments #STYLE# in lines ###COLORS### - BlueCOLOR and endBLOCK.
In def getPERCECENTprogress() remove the comments #STYLE# AND COMMENT THE PREVIOUS line. Do the same in def CPprogress()
If you don't want the utf-8 encoding, delete the four lines beginning with #STYLE#.
NOTE: If you want to copy lots of small files, the copy process for file is so fast
that all you will see is a lot of lines scrolling in you terminal window - not enough time for a 'progress'.
In that case, I use an overall progress that shows only one progress bar to the complete job. nzX
'''
import os
import shutil
import sys
import threading
import time
######## COLORS ######
progressCOLOR = '3[38;5;33;48;5;236m' #3[38;5;33;48;5;236m# copy inside '' for colored progressbar| orange:#3[38;5;208;48;5;235m
finalCOLOR = '3[38;5;33;48;5;33m' #3[38;5;33;48;5;33m# copy inside '' for colored progressbar| orange:#3[38;5;208;48;5;208m
#STYLE#BlueCOLOR = '3[38;5;33m'#3[38;5;33m# copy inside '' for colored progressbar Orange#'3[38;5;208m'# # BG progress# #STYLE#
#STYLE#endBLOCK = '' # ▌ copy OR '' for none # BG progress# #STYLE# requires utf8 coding header
########
BOLD = '3[1m'
UNDERLINE = '3[4m'
CEND = '3[0m'
def getPERCECENTprogress(source_path, destination_path):
time.sleep(.24)
if os.path.exists(destination_path):
while os.path.getsize(source_path) != os.path.getsize(destination_path):
sys.stdout.write('\r')
percentagem = int((float(os.path.getsize(destination_path))/float(os.path.getsize(source_path))) * 100)
steps = int(percentagem/5)
copiado = int(os.path.getsize(destination_path)/1000000)# Should be 1024000 but this get's equal to Thunar file manager report (Linux - Xfce)
sizzz = int(os.path.getsize(source_path)/1000000)
sys.stdout.write((" {:d} / {:d} Mb ".format(copiado, sizzz)) + (BOLD + progressCOLOR + "{:20s}".format('|'*steps) + CEND) + (" {:d}% ".format(percentagem))) # BG progress
#STYLE#sys.stdout.write((" {:d} / {:d} Mb ".format(copiado, sizzz)) + (BOLD + BlueCOLOR + "?" + "{:s}".format('█'*steps) + CEND) + ("{:s}".format(' '*(20-steps))+ BOLD + BlueCOLOR + endBLOCK+ CEND) +(" {:d}% ".format(percentagem))) #STYLE# # BG progress# closer to GUI but less compatible (no block bar with xterm) # requires utf8 coding header
sys.stdout.flush()
time.sleep(.01)
def CPprogress(SOURCE, DESTINATION):
if os.path.isdir(DESTINATION):
dst_file = os.path.join(DESTINATION, os.path.basename(SOURCE))
else: dst_file = DESTINATION
print " "
print (BOLD + UNDERLINE + "FROM:" + CEND + " "), SOURCE
print (BOLD + UNDERLINE + "TO:" + CEND + " "), dst_file
print " "
threading.Thread(name='progresso', target=getPERCECENTprogress, args=(SOURCE, dst_file)).start()
shutil.copy2(SOURCE, DESTINATION)
time.sleep(.02)
sys.stdout.write('\r')
sys.stdout.write((" {:d} / {:d} Mb ".format((int(os.path.getsize(dst_file)/1000000)), (int(os.path.getsize(SOURCE)/1000000)))) + (BOLD + finalCOLOR + "{:20s}".format('|'*20) + CEND) + (" {:d}% ".format(100))) # BG progress 100%
#STYLE#sys.stdout.write((" {:d} / {:d} Mb ".format((int(os.path.getsize(dst_file)/1000000)), (int(os.path.getsize(SOURCE)/1000000)))) + (BOLD + BlueCOLOR + "?" + "{:s}{:s}".format(('█'*20), endBLOCK) + CEND) + (" {:d}% ".format(100))) #STYLE# # BG progress 100%# closer to GUI but less compatible (no block bar with xterm) # requires utf8 coding header
sys.stdout.flush()
print " "
print " "
'''
#Ex. Copy all files from root of the source dir to destination dir
folderA = '/path/to/SOURCE' # SOURCE
folderB = '/path/to/DESTINATION' # DESTINATION
for FILE in os.listdir(folderA):
if not os.path.isdir(os.path.join(folderA, FILE)):
if os.path.exists(os.path.join(folderB, FILE)): continue # as we are using shutil.copy2() that overwrites destination, this skips existing files
CPprogress(os.path.join(folderA, FILE), folderB) # use the command as if it was shutil.copy2() but with progress
75 / 150 Mb |||||||||| | 50%
'''
回答by Gabriel Coutinho De Miranda
If you want an overall progress, you can use something like this (made for another script). Note that in this case, the 'threading.Thread' that calls the progress bar was placed outside the 'for' loop. Also, the measures need be taken in a different way. This is the third example (non utf-8) from the gif image in the previous answer. It adds a files 'ToGo' counting:
如果你想要一个整体的进步,你可以使用这样的东西(为另一个脚本制作)。请注意,在这种情况下,调用进度条的“threading.Thread”被放置在“for”循环之外。此外,需要以不同的方式采取措施。这是上一个答案中 gif 图像的第三个示例(非 utf-8)。它添加了一个文件“ToGo”计数:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Ex.
CopyProgress('/path/to/SOURCE', '/path/to/DESTINATION')
I think this 'copy with overall progress' is very 'plastic' and can be easily adapted.
By default, it will RECURSIVELY copy the CONTENT of 'path/to/SOURCE' to 'path/to/DESTINATION/' keeping the directory tree.
Paying attention to comments, there are 4 main options that can be immediately change:
1 - The LOOK of the progress bar: see COLORS and the PAIR of STYLE lines in 'def getPERCECENTprogress'(inside and after the 'while' loop);
2 - The DESTINATION path: to get 'path/to/DESTINATION/SOURCE_NAME' as target, comment the 2nd 'DST =' definition on the top of the 'def CopyProgress(SOURCE, DESTINATION)' function;
3 - If you don't want to RECURSIVELY copy from sub-directories but just the files in the root source directory to the root of destination, you can use os.listdir() instead of os.walk(). Read the comments inside 'def CopyProgress(SOURCE, DESTINATION)' function to disable RECURSION. Be aware that the RECURSION changes(4x2) must be made in both os.walk() loops;
4 - Handling destination files: if you use this in a situation where the destination filename may already exist, by default, the file is skipped and the loop will jump to the next and so on. On the other way shutil.copy2(), by default, overwrites destination file if exists. Alternatively, you can handle files that exist by overwriting or renaming (according to current date and time). To do that read the comments after 'if os.path.exists(dstFILE): continue' both in the count bytes loop and the main loop. Be aware that the changes must match in both loops (as described in comments) or the progress function will not work properly.
'''
import os
import shutil
import sys
import threading
import time
progressCOLOR = '3[38;5;33;48;5;236m' #BLUEgreyBG
finalCOLOR = '3[48;5;33m' #BLUEBG
# check the color codes below and paste above
###### COLORS #######
# WHITEblueBG = '3[38;5;15;48;5;33m'
# BLUE = '3[38;5;33m'
# BLUEBG = '3[48;5;33m'
# ORANGEBG = '3[48;5;208m'
# BLUEgreyBG = '3[38;5;33;48;5;236m'
# ORANGEgreyBG = '3[38;5;208;48;5;236m' # = '3[38;5;FOREGROUND;48;5;BACKGROUNDm' # ver 'https://i.stack.imgur.com/KTSQa.png' para 256 color codes
# INVERT = '3[7m'
###### COLORS #######
BOLD = '3[1m'
UNDERLINE = '3[4m'
CEND = '3[0m'
FilesLeft = 0
def FullFolderSize(path):
TotalSize = 0
if os.path.exists(path):# to be safely used # if FALSE returns 0
for root, dirs, files in os.walk(path):
for file in files:
TotalSize += os.path.getsize(os.path.join(root, file))
return TotalSize
def getPERCECENTprogress(source_path, destination_path, bytes_to_copy):
dstINIsize = FullFolderSize(destination_path)
time.sleep(.25)
print " "
print (BOLD + UNDERLINE + "FROM:" + CEND + " "), source_path
print (BOLD + UNDERLINE + "TO:" + CEND + " "), destination_path
print " "
if os.path.exists(destination_path):
while bytes_to_copy != (FullFolderSize(destination_path)-dstINIsize):
sys.stdout.write('\r')
percentagem = int((float((FullFolderSize(destination_path)-dstINIsize))/float(bytes_to_copy)) * 100)
steps = int(percentagem/5)
copiado = '{:,}'.format(int((FullFolderSize(destination_path)-dstINIsize)/1000000))# Should be 1024000 but this get's closer to the file manager report
sizzz = '{:,}'.format(int(bytes_to_copy/1000000))
sys.stdout.write((" {:s} / {:s} Mb ".format(copiado, sizzz)) + (BOLD + progressCOLOR + "{:20s}".format('|'*steps) + CEND) + (" {:d}% ".format(percentagem)) + (" {:d} ToGo ".format(FilesLeft))) # STYLE 1 progress default #
#BOLD# sys.stdout.write(BOLD + (" {:s} / {:s} Mb ".format(copiado, sizzz)) + (progressCOLOR + "{:20s}".format('|'*steps) + CEND) + BOLD + (" {:d}% ".format(percentagem)) + (" {:d} ToGo ".format(FilesLeft))+ CEND) # STYLE 2 progress BOLD #
#classic B/W# sys.stdout.write(BOLD + (" {:s} / {:s} Mb ".format(copiado, sizzz)) + ("|{:20s}|".format('|'*steps)) + (" {:d}% ".format(percentagem)) + (" {:d} ToGo ".format(FilesLeft))+ CEND) # STYLE 3 progress classic B/W #
sys.stdout.flush()
time.sleep(.01)
sys.stdout.write('\r')
time.sleep(.05)
sys.stdout.write((" {:s} / {:s} Mb ".format('{:,}'.format(int((FullFolderSize(destination_path)-dstINIsize)/1000000)), '{:,}'.format(int(bytes_to_copy/1000000)))) + (BOLD + finalCOLOR + "{:20s}".format(' '*20) + CEND) + (" {:d}% ".format( 100)) + (" {:s} ".format(' ')) + "\n") # STYLE 1 progress default #
#BOLD# sys.stdout.write(BOLD + (" {:s} / {:s} Mb ".format('{:,}'.format(int((FullFolderSize(destination_path)-dstINIsize)/1000000)), '{:,}'.format(int(bytes_to_copy/1000000)))) + (finalCOLOR + "{:20s}".format(' '*20) + CEND) + BOLD + (" {:d}% ".format( 100)) + (" {:s} ".format(' ')) + "\n" + CEND ) # STYLE 2 progress BOLD #
#classic B/W# sys.stdout.write(BOLD + (" {:s} / {:s} Mb ".format('{:,}'.format(int((FullFolderSize(destination_path)-dstINIsize)/1000000)), '{:,}'.format(int(bytes_to_copy/1000000)))) + ("|{:20s}|".format('|'*20)) + (" {:d}% ".format( 100)) + (" {:s} ".format(' ')) + "\n" + CEND ) # STYLE 3 progress classic B/W #
sys.stdout.flush()
print " "
print " "
def CopyProgress(SOURCE, DESTINATION):
global FilesLeft
DST = os.path.join(DESTINATION, os.path.basename(SOURCE))
# <- the previous will copy the Source folder inside of the Destination folder. Result Target: path/to/Destination/SOURCE_NAME
# -> UNCOMMENT the next (# DST = DESTINATION) to copy the CONTENT of Source to the Destination. Result Target: path/to/Destination
DST = DESTINATION # UNCOMMENT this to specify the Destination as the target itself and not the root folder of the target
#
if DST.startswith(SOURCE):
print " "
print BOLD + UNDERLINE + 'Source folder can\'t be changed.' + CEND
print 'Please check your target path...'
print " "
print BOLD + ' CANCELED' + CEND
print " "
exit()
#count bytes to copy
Bytes2copy = 0
for root, dirs, files in os.walk(SOURCE): # USE for filename in os.listdir(SOURCE): # if you don't want RECURSION #
dstDIR = root.replace(SOURCE, DST, 1) # USE dstDIR = DST # if you don't want RECURSION #
for filename in files: # USE if not os.path.isdir(os.path.join(SOURCE, filename)): # if you don't want RECURSION #
dstFILE = os.path.join(dstDIR, filename)
if os.path.exists(dstFILE): continue # must match the main loop (after "threading.Thread")
# To overwrite delete dstFILE first here so the progress works properly: ex. change continue to os.unlink(dstFILE)
# To rename new files adding date and time, instead of deleating and overwriting,
# comment 'if os.path.exists(dstFILE): continue'
Bytes2copy += os.path.getsize(os.path.join(root, filename)) # USE os.path.getsize(os.path.join(SOURCE, filename)) # if you don't want RECURSION #
FilesLeft += 1
# <- count bytes to copy
#
# Treading to call the preogress
threading.Thread(name='progresso', target=getPERCECENTprogress, args=(SOURCE, DST, Bytes2copy)).start()
# main loop
for root, dirs, files in os.walk(SOURCE): # USE for filename in os.listdir(SOURCE): # if you don't want RECURSION #
dstDIR = root.replace(SOURCE, DST, 1) # USE dstDIR = DST # if you don't want RECURSION #
if not os.path.exists(dstDIR):
os.makedirs(dstDIR)
for filename in files: # USE if not os.path.isdir(os.path.join(SOURCE, filename)): # if you don't want RECURSION #
srcFILE = os.path.join(root, filename) # USE os.path.join(SOURCE, filename) # if you don't want RECURSION #
dstFILE = os.path.join(dstDIR, filename)
if os.path.exists(dstFILE): continue # MUST MATCH THE PREVIOUS count bytes loop
# <- <- this jumps to the next file without copying this file, if destination file exists.
# Comment to copy with rename or overwrite dstFILE
#
# RENAME part below
head, tail = os.path.splitext(filename)
count = -1
year = int(time.strftime("%Y"))
month = int(time.strftime("%m"))
day = int(time.strftime("%d"))
hour = int(time.strftime("%H"))
minute = int(time.strftime("%M"))
while os.path.exists(dstFILE):
count += 1
if count == 0:
dstFILE = os.path.join(dstDIR, '{:s}[{:d}.{:d}.{:d}]{:d}-{:d}{:s}'.format(head, year, month, day, hour, minute, tail))
else:
dstFILE = os.path.join(dstDIR, '{:s}[{:d}.{:d}.{:d}]{:d}-{:d}[{:d}]{:s}'.format(head, year, month, day, hour, minute, count, tail))
# END of RENAME part
shutil.copy2(srcFILE, dstFILE)
FilesLeft -= 1
#
'''
Ex.
CopyProgress('/path/to/SOURCE', '/path/to/DESTINATION')
'''