就我而言,大文件是 tar.gz,myBigFile.tar.gz
大小为 52GB,我将其分割为大小为 2GB 的块,因此我有 27 个部分文件。
这是我从头开始编写的代码:
from time import sleep
from glob import glob
import filecmp
import os
CHUNK_SIZE = 2097152000 # bytes
# CHUNK_SIZE = 1000000 # bytes
# CHUNK_SIZE = 2 # bytes
ORIGINAL_FILE_DIR = './data/original'
SPLITTED_FILE_DIR = './data/splitted'
JOINED_FILE_DIR = './data/joined'
def get_original_filepath(filename):
return f'{ORIGINAL_FILE_DIR}/{filename}'
def get_splitted_filepath(filename, overwrite=False):
partspath = f'{SPLITTED_FILE_DIR}/{filename}.parts'
if overwrite:
try:
os.rmdir(partspath)
except Exception as e:
print(e)
try:
os.mkdir(partspath)
except Exception as e:
print(e)
return partspath
def get_joined_filepath(filename):
return f'{JOINED_FILE_DIR}/{filename}'
def get_part_extension(part, pad_num=8):
if isinstance(part, int):
return f'{part:0{pad_num}d}.part'
elif isinstance(part, str):
return f'{part}.part'
else:
raise Exception('Unknown typeof <part>', type(part))
def get_part_filename(filename, part, pad_num=8):
part_extension = get_part_extension(part, pad_num)
return f'{filename}.{part_extension}'
def get_file_size(filepath):
return os.path.getsize(filepath)
def get_number_of_chunks(total_size, chunk_size):
return total_size // chunk_size + (total_size % chunk_size > 0)
def is_directory_empty(directory_path):
try:
# Get the list of files and directories in the specified path
files = os.listdir(directory_path)
# Check if there are any files in the list
if len(files) == 0:
return True
else:
return False
except:
# Handle the case when the directory does not exist
return True
def split_file(filename, chunk_size=CHUNK_SIZE):
original_path = get_original_filepath(filename)
if get_file_size(original_path) == 0:
print(Exception('E: Original file not found!'))
splitted_path = get_splitted_filepath(filename, overwrite=True)
with open(original_path, 'rb') as readfile:
number_of_chunks = get_number_of_chunks(get_file_size(original_path),
chunk_size)
for part in range(number_of_chunks):
chunk = readfile.read(chunk_size)
part_filename = get_part_filename(filename, part,
len(str(number_of_chunks)))
with open(f'{splitted_path}/{part_filename}', 'wb') as writefile:
writefile.write(chunk)
def join_file(filename):
splitted_path = get_splitted_filepath(filename)
joined_path = get_joined_filepath(filename)
if is_directory_empty(splitted_path):
print(Exception('E: Splitted file not found!'))
part = '*' # wilcard
part_filename = get_part_filename(filename, part)
partfiles = [
os.path.normpath(fn) for fn in glob(f'{splitted_path}/{part_filename}')
]
with open(joined_path, 'ab') as appendfile:
for partfile in partfiles:
with open(partfile, 'rb') as readfile:
appendfile.write(readfile.read())
def compare_file(filename):
# Specify the paths of the two files
file1_path = get_original_filepath(filename)
file2_path = get_joined_filepath(filename)
return f'{filename} is identical.' if filecmp.cmp(
file1_path, file2_path) else f'{filename} is not identical.'
filename = 'myBigFile.tar.gz'
split_file(filename)
join_file(filename)
print(compare_file(filename))
所以 splitted_path 看起来像这样:
./data/myBigFile.tar.gz.parts/myBigFile.tar.gz.00.part
./data/myBigFile.tar.gz.parts/myBigFile.tar.gz.01.part
...
./data/myBigFile.tar.gz.parts/myBigFile.tar.gz.25.part
我知道我可以使用 Unix 实用程序,例如 tar、zip 或其他归档程序。
我也在小 CHUNK_SIZE 的小文件中测试了它,它加入文件没有任何问题。