aboutsummaryrefslogtreecommitdiffstats
path: root/PdfFileTransformer/pdf.py
diff options
context:
space:
mode:
authorben2018-09-18 10:52:38 +0200
committerben2018-09-18 10:52:38 +0200
commitf57654b84b4cf0ffa1287034fc9f66ba200bb259 (patch)
tree5ffb371ce5b5008052e425955f45c8b808ba7fa0 /PdfFileTransformer/pdf.py
downloadtruepolyglot-f57654b84b4cf0ffa1287034fc9f66ba200bb259.tar.gz
truepolyglot-f57654b84b4cf0ffa1287034fc9f66ba200bb259.tar.bz2
truepolyglot-f57654b84b4cf0ffa1287034fc9f66ba200bb259.tar.xz
First public commit
Diffstat (limited to 'PdfFileTransformer/pdf.py')
-rw-r--r--PdfFileTransformer/pdf.py352
1 files changed, 352 insertions, 0 deletions
diff --git a/PdfFileTransformer/pdf.py b/PdfFileTransformer/pdf.py
new file mode 100644
index 0000000..c93fb61
--- /dev/null
+++ b/PdfFileTransformer/pdf.py
@@ -0,0 +1,352 @@
+# -*- coding: utf-8 -*-
+
+import logging
+import re
+import tempfile
+from .PyPDF2 import PdfFileWriter, PdfFileReader
+
+
+class Pdf:
+
+ def __init__(self, filename):
+ self.filename = filename
+ self.buffer = bytearray()
+ self.objects = [] # [(7,0,b"data"), (8,0,b"data2"), ..]
+ self.trailer = {} # {Root: (7, 0), Info: (5, 0)}
+ self.translation_table = {} # {(6,0):7, (5,0): 8}, ..]
+ self.original_xref_offset = 0
+ self.original_first_obj_offset = 0
+ self.file_offset = 0
+
+ self.clean_and_read_pdf()
+ self.check_pdf_header()
+ self.parse_xref_offset()
+ self.parse_xref_table()
+ self.parse_objects()
+ self.parse_trailer()
+
+ def clean_and_read_pdf(self):
+ f_input = open(self.filename, "rb")
+ pdf_header = f_input.read(8)
+ f_input.seek(0)
+ filename_output = tempfile.mktemp()
+ logging.info("Use " + filename_output + " for normalisation output")
+ f_ouput = open(filename_output, "wb")
+ writer = PdfFileWriter()
+ reader = PdfFileReader(f_input)
+ info = reader.getDocumentInfo()
+ if info.producer is not None:
+ writer.addMetadata({u'/Producer': info.producer})
+ else:
+ writer.addMetadata({u'/Producer': u'TruePolyglot'})
+ if info.creator is not None:
+ writer.addMetadata({u'/Creator': info.creator})
+ else:
+ writer.addMetadata({u'/Creator': u'TruePolyglot'})
+ writer.appendPagesFromReader(reader)
+ writer.setHeader(pdf_header)
+ writer.write(f_ouput)
+ f_input.close()
+ f_ouput.close()
+ f_norm = open(filename_output, "rb")
+ self.buffer = bytearray(f_norm.read())
+ self.size = len(self.buffer)
+ f_norm.close()
+
+ def check_pdf_header(self):
+ if self.buffer[0:5] == b"%PDF-":
+ pdf_version = self.buffer[5:8].decode("utf-8")
+ logging.info("PDF Header found: " + pdf_version)
+ else:
+ raise Exception("PDF Header not found")
+
+ def parse_xref_offset(self):
+ r = re.compile(b'startxref\n([0-9]+)')
+ m = r.search(self.buffer)
+ if m is None:
+ raise Exception('Unable to find xref offset')
+ self.original_xref_offset = int(m.group(1))
+ logging.info("Xref offset found at: " + hex(self.original_xref_offset))
+
+ def parse_xref_table(self):
+ xref_table = []
+ r = re.compile(b'xref\n([0-9]+) ([0-9]+)')
+ offset = self.original_xref_offset
+ s = r.search(self.buffer[offset:offset + 32])
+ nb_xtable_object = int(s.group(2))
+ logging.info("Nb objects in Xref table: " + str(nb_xtable_object))
+ xref_header_size = s.end()
+ r = re.compile(b'([0-9]+) ([0-9]+) ([f|n])')
+ x = 0
+ for i in range(nb_xtable_object):
+ s = r.search(
+ self.buffer[self.original_xref_offset + xref_header_size + x:])
+ if s is not None:
+ x = x + s.end()
+ xref_table.append((int(s.group(1)),
+ int(s.group(2)),
+ s.group(3)))
+ logging.debug("Xref table:")
+ for i in xref_table:
+ logging.debug(str(i[0]) + " " +
+ str(i[1]) + " " +
+ i[2].decode("utf-8"))
+
+ def parse_objects(self):
+ r_begin = re.compile(b'([0-9]+) ([0-9]+) obj\n')
+ r_end = re.compile(b'\nendobj\n')
+
+ offset_buffer = 0
+ obj = ()
+ while offset_buffer < self.size:
+ m_begin = r_begin.match(
+ self.buffer[offset_buffer:offset_buffer + 32])
+ obj_nb_index = 0
+ obj_nb_offset = 0
+ obj_offset_start = 0
+ obj_offset_end = 0
+ if m_begin is not None:
+ if self.original_first_obj_offset == 0:
+ self.original_first_obj_offset = (offset_buffer +
+ m_begin.start())
+ obj_nb_index = int(m_begin.group(1))
+ obj_nb_offset = int(m_begin.group(2))
+ obj_data_start = m_begin.end()
+ obj_offset_start = offset_buffer + m_begin.start()
+ while offset_buffer < self.size:
+ m_end = r_end.match(
+ self.buffer[offset_buffer:offset_buffer + 8])
+ if m_end is not None:
+ obj_offset_end = offset_buffer + m_end.end() - 2
+ break
+ else:
+ offset_buffer = offset_buffer + 1
+ else:
+ offset_buffer = offset_buffer + 1
+
+ if (obj_offset_start != 0 and
+ obj_offset_end != 0):
+ a = obj_offset_start + obj_data_start
+ b = obj_offset_end - 6
+ obj = (obj_nb_index, obj_nb_offset,
+ self.buffer[a:b])
+ logging.debug("Objects: (" + str(obj_nb_index) +
+ ", " + str(obj_nb_offset) +
+ ", " + hex(obj_offset_start) +
+ ", " + hex(obj_offset_end))
+ self.objects.append(obj)
+
+ def parse_trailer(self):
+ r_begin = re.compile(b'trailer\n')
+ s_begin = r_begin.search(self.buffer[self.original_xref_offset:])
+ start = self.original_xref_offset + s_begin.start()
+ logging.info("Trailer found at:" + hex(start))
+
+ r_root = re.compile(b'/Root ([0-9]+) ([0-9]+) R')
+ s_root = r_root.search(self.buffer[self.original_xref_offset:])
+ if s_root is None:
+ raise Exception('Root not found')
+ else:
+ self.trailer["Root"] = (int(s_root.group(1)), int(s_root.group(2)))
+
+ r_info = re.compile(b'/Info ([0-9]+) ([0-9]+) R')
+ s_info = r_info.search(self.buffer[self.original_xref_offset:])
+ if s_info is not None:
+ self.trailer["Info"] = (int(s_info.group(1)), int(s_info.group(2)))
+
+ def get_file_header(self):
+ return self.buffer[:self.original_first_obj_offset]
+
+ def get_xref_table(self):
+ offset_xref = 0
+ buf = (b'xref\n' +
+ str(offset_xref).encode('utf-8') + b' ' +
+ str(len(self.objects) + 1).encode('utf-8') + b'\n' +
+ str(0).zfill(10).encode('utf-8') + b' ' +
+ str(65535).zfill(5).encode('utf-8') + b' f \n')
+
+ for i in range(len(self.objects)):
+ obj_start = self.get_object_offset(i)
+ logging.info("Obj %d at %d" % (self.objects[i][0], obj_start))
+ buf = (buf +
+ (str(obj_start).zfill(10)).encode('utf-8') + b' ' +
+ str(0).zfill(5).encode('utf-8') + b' ' +
+ b'n' + b' \n')
+ return buf
+
+ def get_trailer(self):
+ trailer_data = (b"trailer\n<<\n/Size " +
+ str(len(self.objects) + 1).encode("utf-8") +
+ b"\n/Root " +
+ str(self.trailer["Root"][0]).encode("utf-8") +
+ b" " +
+ str(self.trailer["Root"][1]).encode("utf-8") +
+ b" R\n")
+ if "Info" in self.trailer:
+ trailer_data = (trailer_data +
+ b"/Info " +
+ str(self.trailer["Info"][0]).encode("utf-8") +
+ b" " +
+ str(self.trailer["Info"][1]).encode("utf-8") +
+ b" R\n")
+ trailer_data = trailer_data + b">>"
+ return trailer_data
+
+ def get_xref_offset(self):
+ return self.get_end_of_last_object() + 1
+
+ def get_eof(self):
+ s = (b'startxref\n' +
+ str(self.get_xref_offset()).encode("utf-8") +
+ b'\n%%EOF\n')
+ return s
+
+ def build_object(self, obj):
+ buf = (str(obj[0]).encode("utf-8") +
+ b' ' +
+ str(obj[1]).encode("utf-8") +
+ b' obj\n' +
+ obj[2] +
+ b'\nendobj')
+ return buf
+
+ def get_build_buffer(self):
+ b_buffer = bytearray()
+ b_buffer = b_buffer + self.get_file_header()
+ for obj in self.objects:
+ b_buffer = b_buffer + self.build_object(obj) + b'\n'
+ b_buffer = b_buffer + self.get_xref_table()
+ b_buffer = b_buffer + self.get_trailer() + b'\n'
+ b_buffer = b_buffer + self.get_eof()
+ return b_buffer
+
+ def get_obj(self, nb):
+ for obj in self.objects:
+ if obj[0] == nb:
+ return obj
+
+ def get_end_of_last_object(self):
+ offset = self.get_last_object_offset()
+ offset = offset + len(self.build_object(self.objects[-1]))
+ return offset
+
+ def generate_stream_obj_data(self, data):
+ buf = (b'<<\n/Filter /FlateDecode\n/Length ' +
+ str(len(data)).encode("utf-8") +
+ b'\n>>\nstream\n' +
+ data +
+ b'\nendstream')
+ return buf
+
+ def insert_new_obj_stream_at(self, position, stream_data):
+ '''
+ Return offset of stream data
+ '''
+ logging.info("Insert obj at %d" % position)
+ obj_nb = position
+ obj_off = 0
+ data = self.generate_stream_obj_data(stream_data)
+ obj = (obj_nb, obj_off, data)
+
+ obj_data = self.build_object(obj)
+ full_obj_size = len(obj_data)
+ logging.info("New object full size is: " + str(full_obj_size))
+
+ obj = (obj_nb, obj_off, data)
+ self.objects.insert(position, obj)
+
+ self.reorder_objects()
+ self.fix_trailer_ref()
+
+ def get_first_stream_offset(self):
+ offset = self.file_offset + len(self.get_file_header())
+ r = re.compile(b'stream\n')
+ m = r.search(self.objects[0][2])
+ offset = offset + len(b"1 0 obj\n") + m.end()
+ return offset
+
+ def get_last_stream_offset(self):
+ offset = self.file_offset + self.get_last_object_offset()
+ r = re.compile(b'stream\n')
+ m = r.search(self.build_object(self.objects[-1]))
+ return offset + m.end()
+
+ def get_object_offset(self, index):
+ offset = self.file_offset + len(self.get_file_header())
+ for obj in self.objects[:index]:
+ offset = offset + len(self.build_object(obj)) + 1
+ return offset
+
+ def get_last_object_offset(self):
+ offset = self.get_object_offset(len(self.objects) - 1)
+ return offset
+
+ def insert_new_obj_stream_at_start(self, data):
+ return self.insert_new_obj_stream_at(0, data)
+
+ def insert_new_obj_stream_at_end(self, data):
+ return self.insert_new_obj_stream_at(len(self.objects) + 1,
+ data)
+
+ def generate_translation_table(self):
+ for i in range(len(self.objects)):
+ self.translation_table[(self.objects[i][0],
+ self.objects[i][1])] = i + 1
+ logging.info(self.translation_table)
+
+ def replace_ref(self, ibuffer):
+ '''
+ Exemple:
+ in: AZERTY 6 0 R -- BGT 88 0 R HYT
+ out: AZERTY 77 0 R -- BGT 9 0 R HYT
+ '''
+ index = 0
+ obuffer = bytearray()
+ while True:
+ r = re.compile(b'([0-9]+) ([0-9]+) R')
+ s = r.search(ibuffer[index:])
+ if s is None:
+ obuffer = obuffer + ibuffer[index:]
+ break
+ o_old = int(s.group(1))
+ p_old = int(s.group(2))
+ o_new = self.translation_table[(o_old, p_old)]
+ p_new = p_old
+
+ newref = (str(o_new).encode("utf-8") +
+ b" " +
+ str(p_new).encode("utf-8") +
+ b" R")
+
+ nbuffer = ibuffer[index:index + s.start()] + newref
+ obuffer = obuffer + nbuffer
+ index = index + s.end()
+ return obuffer
+
+ def reorder_objects(self):
+ self.generate_translation_table()
+ offset_obj = len(self.get_file_header())
+ for i in range(len(self.objects)):
+ buf = self.objects[i][2]
+ new_buf = self.replace_ref(buf)
+ obj_nb = self.objects[i][0]
+ new_obj_nb = self.translation_table[(obj_nb, 0)]
+ new_obj_start = offset_obj
+ size_obj = len(self.build_object((new_obj_nb,
+ 0,
+ new_buf)))
+ new_obj_end = new_obj_start + size_obj
+
+ offset_obj = new_obj_end + 1
+ obj = (new_obj_nb,
+ 0,
+ new_buf)
+ self.objects[i] = obj
+
+ def fix_trailer_ref(self):
+ new_obj_nb = self.translation_table[self.trailer["Root"]]
+ self.trailer["Root"] = (new_obj_nb, 0)
+
+ if "Info" in self.trailer:
+ new_obj_nb = self.translation_table[self.trailer["Info"]]
+ self.trailer["Info"] = (new_obj_nb, 0)