概述:
项目变更版本需要源码包, 检查关键代码是否缺失,其中有一个步骤需要进入指定目录解压所有.ZIP压缩包
问题:
1. 并行处理;
2. zipfile extract后中文名称乱码;
解决:
-
-
并行处理使用 ,参考之前的扫描端口的套路修改;
-
zipfile 解压乱码,踏着前人的足迹,查看了下python zipfile 源码open 函数中 如果zinfo.flag_bits 不是utf-8 都默认设为cp437
-
def open(self, name, mode="r", pwd=None, *, force_zip64=False):
...
if zinfo.flag_bits & 0x800:
# UTF-8 filename
fname_str = fname.decode("utf-8")
else:
fname_str = fname.decode("cp437")
...
ZIP File Format Specification 中描述
APPENDIX D - Language Encoding (EFS) ------------------------------------ D.1 The ZIP format has historically supported only the original IBM PC character encoding set, commonly referred to as IBM Code Page 437. This limits storing file name characters to only those within the original MS-DOS range of values and does not properly support file names in other character encodings, or languages. To address this limitation, this specification will support the following change.
代码:
import os
import zipfile
from queue import Queue
import time
import threading
from pathlib import Path
print_lock = threading.Lock()
def unzip(file):
file_name, ext = os.path.splitext(file)
if ext == ".zip":
try:
f = zipfile.ZipFile(file, 'r')
for fn in f.namelist():
extract_path = Path(f.extract(fn))
extract_path.replace(fn.encode('cp437').decode('gbk'))
with print_lock:
print(file, 'unzip ok')
except:
with print_lock:
print(file, 'unzip error')
pass
def threader():
while True:
worker = q.get()
unzip(worker)
q.task_done()
def create_thread( threadnums ):
for x in range(threadnums):
t = threading.Thread(target=threader)
t.daemon = True
t.start()
if __name__ == "__main__":
q = Queue()
startTime = time.time()
path = os.getcwd()+'\\源码'
os.chdir(path)
file_list = os.listdir(path)
print(file_list)
create_thread(100)
for unzipfile in file_list:
q.put(unzipfile)
q.join()
print('Time taken:', time.time()-startTime)
Be First to Comment