aboutsummaryrefslogtreecommitdiffstats
path: root/ZipFileTransformer/zip.py
diff options
context:
space:
mode:
Diffstat (limited to 'ZipFileTransformer/zip.py')
-rw-r--r--ZipFileTransformer/zip.py227
1 files changed, 227 insertions, 0 deletions
diff --git a/ZipFileTransformer/zip.py b/ZipFileTransformer/zip.py
new file mode 100644
index 0000000..91ff4c5
--- /dev/null
+++ b/ZipFileTransformer/zip.py
@@ -0,0 +1,227 @@
+# -*- coding: utf-8 -*-
+
+import logging
+import re
+
+
+class Zip:
+
+ def __init__(self, filename):
+ self.filename = filename
+ self.buffer = bytearray()
+ self.size = 0
+ self.end_central_dir = 0
+ self.first_local_file_header = 0
+ self.offset_local_file = []
+ self.offset_central_directory = []
+ self.end_of_data = 0
+ self.end_of_first_local_file_header = 0
+
+ self.read()
+ self.check_header()
+ self.call_all_parsers()
+ self.check_central_directory()
+ self.parse_central_directories()
+ self.parse_local_file_headers()
+
+ def call_all_parsers(self):
+ self.parse_offset_end_central_dir()
+ self.parse_nb_of_disk()
+ self.parse_start_disk()
+ self.parse_nb_of_central_dir()
+ self.parse_nb_total_of_central_dir()
+ self.parse_size_central_dir()
+ self.parse_central_dir_file_header()
+ self.parse_comment_length()
+
+ def read(self):
+ with open(self.filename, 'rb') as fd:
+ self.buffer = bytearray(fd.read())
+ self.size = len(self.buffer)
+ logging.info("read " + str(self.size) + " bytes from Zip file")
+
+ def check_header(self):
+ if self.buffer[0:4] != b"PK\x03\x04":
+ raise Exception("Zip header not found")
+
+ def parse_offset_end_central_dir(self):
+ r = re.compile(b'\x06\x05KP')
+ s = r.search(self.buffer[::-1])
+ if s is None:
+ raise Exception("Unable to find end of central directory")
+ self.end_central_dir = self.size - s.end()
+ logging.info("Offset end of central directory: " +
+ hex(self.end_central_dir))
+
+ def parse_nb_of_disk(self):
+ self.nb_of_disk = int.from_bytes(
+ self.buffer[self.end_central_dir + 4:self.end_central_dir + 6],
+ "little")
+ logging.debug("Nb of disk: " + str(self.nb_of_disk))
+
+ def parse_start_disk(self):
+ self.start_disk = int.from_bytes(
+ self.buffer[self.end_central_dir + 6:self.end_central_dir + 8],
+ "little")
+ logging.debug("Start disk: " + str(self.start_disk))
+
+ def parse_nb_of_central_dir(self):
+ self.nb_of_central_dir = int.from_bytes(
+ self.buffer[self.end_central_dir + 8:self.end_central_dir + 10],
+ "little")
+ logging.info("Nb of central directory record: " +
+ str(self.nb_of_central_dir))
+
+ def parse_nb_total_of_central_dir(self):
+ self.nb_total_of_central_dir = int.from_bytes(
+ self.buffer[self.end_central_dir + 10:self.end_central_dir + 12],
+ "little")
+ logging.info("Nb of total central directory record: " +
+ str(self.nb_total_of_central_dir))
+
+ def parse_size_central_dir(self):
+ self.size_central_dir = int.from_bytes(
+ self.buffer[self.end_central_dir + 12:self.end_central_dir + 14],
+ "little")
+ logging.info("Size of central directory: " +
+ str(self.size_central_dir))
+
+ def parse_central_dir_file_header(self):
+ self.central_dir_file_header = int.from_bytes(
+ self.buffer[self.end_central_dir + 16:self.end_central_dir + 20],
+ "little")
+ logging.info("Central directory file header: " +
+ hex(self.central_dir_file_header))
+
+ def parse_comment_length(self):
+ self.comment_length = int.from_bytes(
+ self.buffer[self.end_central_dir + 20:self.end_central_dir + 22],
+ "little")
+ logging.info("Comment length: " +
+ str(self.comment_length))
+
+ def check_central_directory(self):
+ offset = self.central_dir_file_header
+ if (self.buffer[offset:offset + 4] !=
+ b'PK\x01\x02'):
+ raise Exception("Unable to find central directory")
+ logging.info("Found central directory")
+
+ def parse_central_directories(self):
+ if (self.buffer[self.central_dir_file_header:
+ self.central_dir_file_header + 4] !=
+ b'PK\x01\x02'):
+ raise Exception("Unable to find first central directory")
+ logging.info("Found first central directory")
+
+ i = 0
+ size = 0
+ offset = self.central_dir_file_header
+
+ while (self.buffer[size + offset:
+ size + offset + 4] ==
+ b'PK\x01\x02'):
+
+ logging.info("Parse central directory n°" + str(i))
+ logging.info("Offset: " + hex(offset + size))
+ self.offset_central_directory.append(offset + size)
+ filename_length = int.from_bytes(
+ self.buffer[size + offset + 28:size + offset + 30],
+ "little")
+ logging.info("filename length:" + str(filename_length))
+ extra_field_length = int.from_bytes(
+ self.buffer[size + offset + 30:size + offset + 32],
+ "little")
+ logging.info("extra field length:" + str(extra_field_length))
+ comment_length = int.from_bytes(
+ self.buffer[size + offset + 32:size + offset + 34],
+ "little")
+ logging.info("comment length:" + str(comment_length))
+ local_file_header = int.from_bytes(
+ self.buffer[size + offset + 42:size + offset + 46],
+ "little")
+ if i == 0:
+ self.first_local_file_header = local_file_header
+ logging.info("local file header:" + hex(local_file_header))
+
+ i = i + 1
+ size = (size + filename_length +
+ extra_field_length + comment_length + 46)
+
+ logging.debug("parse header at:" + hex(offset + size))
+
+ def parse_local_file_headers(self):
+ size = 0
+ offset = self.first_local_file_header
+ for i in range(self.nb_of_central_dir):
+ logging.info("Parse local file n°" + str(i))
+ compressed_data_lenght = int.from_bytes(
+ self.buffer[size + offset + 18:size + offset + 22],
+ "little")
+ logging.info("compressed data length:" +
+ str(compressed_data_lenght))
+ filename_length = int.from_bytes(
+ self.buffer[size + offset + 26:size + offset + 28],
+ "little")
+ logging.info("filename length:" + str(filename_length))
+ extra_field_length = int.from_bytes(
+ self.buffer[size + offset + 28:size + offset + 30],
+ "little")
+ logging.info("extra field length:" + str(extra_field_length))
+ local_file_size = (compressed_data_lenght +
+ filename_length + extra_field_length + 30)
+ logging.info("local file length:" + hex(local_file_size))
+ size = size + local_file_size
+ logging.debug("parse header at:" + hex(offset + size))
+ self.offset_local_file.append(offset + size)
+ self.end_of_data = offset + size
+ if i == 0:
+ self.end_of_first_local_file_header = self.end_of_data
+
+ def add_data_to_file(self, data_before_local, data_after_local,
+ write_buffer=False):
+ logging.info("Add data before local lenght:" +
+ str(len(data_before_local)))
+ new_buffer = self.buffer
+ for i in self.offset_central_directory:
+ logging.info("parse central directory at: " + hex(i))
+ local_file_header = int.from_bytes(
+ self.buffer[i + 42:i + 46],
+ "little")
+ logging.info("old local file header: " + hex(local_file_header))
+ local_file_header = local_file_header + len(data_before_local)
+ logging.info("new local file header: " + hex(local_file_header))
+ bytes_local_file_header = local_file_header.to_bytes(4, "little")
+ logging.info("change value at:" + hex(i + 42))
+ new_buffer[i + 42:i + 46] = bytes_local_file_header
+
+ logging.info("old central directory header: " +
+ hex(self.central_dir_file_header))
+ new_central_dir_file_header = (self.central_dir_file_header +
+ len(data_after_local) +
+ len(data_before_local))
+ logging.info("new central directory header: " +
+ hex(new_central_dir_file_header))
+ bytes_offset = new_central_dir_file_header.to_bytes(4, "little")
+ new_buffer[self.end_central_dir + 16:
+ self.end_central_dir + 20] = bytes_offset
+ self.buffer = new_buffer
+
+ if write_buffer:
+ new_buffer = (data_before_local +
+ new_buffer[:self.end_of_data] +
+ data_after_local +
+ new_buffer[self.central_dir_file_header:])
+ self.buffer = new_buffer
+
+ def get_local_file_data(self):
+ return self.buffer[:self.end_of_data]
+
+ def get_data_after_central_directory(self):
+ return self.buffer[self.central_dir_file_header:]
+
+ def get_first_part_length(self):
+ return len(self.get_local_file_data())
+
+ def get_second_part_length(self):
+ return len(self.get_data_after_central_directory())