Get progress back from shutil file copy thread
This might be a bit hacky but it works:
"""
Copying a file and checking its progress while it's copying.
"""
import os
import shutil
import threading
import time
des = r'<PATH/TO/SPURCE/FILE>'
src = r'<PATH/TO/DESTINATION/FILE>'
def checker(source_path, destination_path):
"""
Compare 2 files till they're the same and print the progress.
:type source_path: str
:param source_path: path to the source file
:type destination_path: str
:param destination_path: path to the destination file
"""
# Making sure the destination path exists
while not os.path.exists(destination_path):
print "not exists"
time.sleep(.01)
# Keep checking the file size till it's the same as source file
while os.path.getsize(source_path) != os.path.getsize(destination_path):
print "percentage", int((float(os.path.getsize(destination_path))/float(os.path.getsize(source_path))) * 100)
time.sleep(.01)
print "percentage", 100
def copying_file(source_path, destination_path):
"""
Copying a file
:type source_path: str
:param source_path: path to the file that needs to be copied
:type destination_path: str
:param destination_path: path to where the file is going to be copied
:rtype: bool
:return: True if the file copied successfully, False otherwise
"""
print "Copying...."
shutil.copyfile(source_path, destination_path)
if os.path.exists(destination_path):
print "Done...."
return True
print "Filed..."
return False
t = threading.Thread(name='copying', target=copying_file, args=(src, des))
# Start the copying on a separate thread
t.start()
# Checking the status of destination file on a separate thread
b = threading.Thread(name='checking', target=checker, args=(src, des))
b.start()
shutil.copy()
doesn't offer any options to track the progress, no. At most you could monitor the size of the destination file (using os.*
functions on the target filename).
The alternative would be to implement your own copy function. The implementation is really quite simple; shutil.copy()
is basically a shutil.copyfile()
plus shutil.copymode()
call; shutil.copyfile()
in turn delegates the real work to shutil.copyfileobj()
* (links to the Python 3.8.2 source code).
Implementing your own shutil.copyfileobj()
to include progress should be trivial; inject support for a callback function to report inform your program each time another block has copied:
import os
import shutil
def copyfileobj(fsrc, fdst, callback, length=0):
try:
# check for optimisation opportunity
if "b" in fsrc.mode and "b" in fdst.mode and fsrc.readinto:
return _copyfileobj_readinto(fsrc, fdst, callback, length)
except AttributeError:
# one or both file objects do not support a .mode or .readinto attribute
pass
if not length:
length = shutil.COPY_BUFSIZE
fsrc_read = fsrc.read
fdst_write = fdst.write
copied = 0
while True:
buf = fsrc_read(length)
if not buf:
break
fdst_write(buf)
copied += len(buf)
callback(copied)
# differs from shutil.COPY_BUFSIZE on platforms != Windows
READINTO_BUFSIZE = 1024 * 1024
def _copyfileobj_readinto(fsrc, fdst, callback, length=0):
"""readinto()/memoryview() based variant of copyfileobj().
*fsrc* must support readinto() method and both files must be
open in binary mode.
"""
fsrc_readinto = fsrc.readinto
fdst_write = fdst.write
if not length:
try:
file_size = os.stat(fsrc.fileno()).st_size
except OSError:
file_size = READINTO_BUFSIZE
length = min(file_size, READINTO_BUFSIZE)
copied = 0
with memoryview(bytearray(length)) as mv:
while True:
n = fsrc_readinto(mv)
if not n:
break
elif n < length:
with mv[:n] as smv:
fdst.write(smv)
else:
fdst_write(mv)
copied += n
callback(copied)
and then, in the callback, compare the copied
size with the file size.
Note that in the above implementation we look for the opportunity to use a different method for binary files, where you can use fileobj.readinto()
and a memoryview
object to avoid redundant data copying; see the original _copyfileobj_readinto()
implementation for comparison.
* footnote to … delegates the real work to shutil.copyfileobj()
: As of Python 3.8, on OS X and Linux the copyfile()
implementation delegates file copying to OS-specific, optimised system calls (to fcopyfile()
and sendfile()
, respectively) but these calls have no hooks whatsoever to track progress, and so if you need to track progress you'd want to disable these delegation paths anyway. On Windows the code uses the aforementioned _copyfileobj_readinto()
function.
I combined Martijn Pieters answer with some progress bar code from this answer with modifications to work in PyCharm from this answer which gives me the following. The function copy_with_progress
was my goal.
import os
import shutil
def progress_percentage(perc, width=None):
# This will only work for python 3.3+ due to use of
# os.get_terminal_size the print function etc.
FULL_BLOCK = '█'
# this is a gradient of incompleteness
INCOMPLETE_BLOCK_GRAD = ['░', '▒', '▓']
assert(isinstance(perc, float))
assert(0. <= perc <= 100.)
# if width unset use full terminal
if width is None:
width = os.get_terminal_size().columns
# progress bar is block_widget separator perc_widget : ####### 30%
max_perc_widget = '[100.00%]' # 100% is max
separator = ' '
blocks_widget_width = width - len(separator) - len(max_perc_widget)
assert(blocks_widget_width >= 10) # not very meaningful if not
perc_per_block = 100.0/blocks_widget_width
# epsilon is the sensitivity of rendering a gradient block
epsilon = 1e-6
# number of blocks that should be represented as complete
full_blocks = int((perc + epsilon)/perc_per_block)
# the rest are "incomplete"
empty_blocks = blocks_widget_width - full_blocks
# build blocks widget
blocks_widget = ([FULL_BLOCK] * full_blocks)
blocks_widget.extend([INCOMPLETE_BLOCK_GRAD[0]] * empty_blocks)
# marginal case - remainder due to how granular our blocks are
remainder = perc - full_blocks*perc_per_block
# epsilon needed for rounding errors (check would be != 0.)
# based on reminder modify first empty block shading
# depending on remainder
if remainder > epsilon:
grad_index = int((len(INCOMPLETE_BLOCK_GRAD) * remainder)/perc_per_block)
blocks_widget[full_blocks] = INCOMPLETE_BLOCK_GRAD[grad_index]
# build perc widget
str_perc = '%.2f' % perc
# -1 because the percentage sign is not included
perc_widget = '[%s%%]' % str_perc.ljust(len(max_perc_widget) - 3)
# form progressbar
progress_bar = '%s%s%s' % (''.join(blocks_widget), separator, perc_widget)
# return progressbar as string
return ''.join(progress_bar)
def copy_progress(copied, total):
print('\r' + progress_percentage(100*copied/total, width=30), end='')
def copyfile(src, dst, *, follow_symlinks=True):
"""Copy data from src to dst.
If follow_symlinks is not set and src is a symbolic link, a new
symlink will be created instead of copying the file it points to.
"""
if shutil._samefile(src, dst):
raise shutil.SameFileError("{!r} and {!r} are the same file".format(src, dst))
for fn in [src, dst]:
try:
st = os.stat(fn)
except OSError:
# File most likely does not exist
pass
else:
# XXX What about other special files? (sockets, devices...)
if shutil.stat.S_ISFIFO(st.st_mode):
raise shutil.SpecialFileError("`%s` is a named pipe" % fn)
if not follow_symlinks and os.path.islink(src):
os.symlink(os.readlink(src), dst)
else:
size = os.stat(src).st_size
with open(src, 'rb') as fsrc:
with open(dst, 'wb') as fdst:
copyfileobj(fsrc, fdst, callback=copy_progress, total=size)
return dst
def copyfileobj(fsrc, fdst, callback, total, length=16*1024):
copied = 0
while True:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
copied += len(buf)
callback(copied, total=total)
def copy_with_progress(src, dst, *, follow_symlinks=True):
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
copyfile(src, dst, follow_symlinks=follow_symlinks)
shutil.copymode(src, dst)
return dst