email2pdf/tests/BaseTestClasses.py

373 lines
13 KiB
Python
Raw Permalink Normal View History

2014-12-17 21:12:33 +00:00
from PyPDF2 import PdfFileReader
from datetime import datetime
from datetime import timedelta
from email import encoders
from email.header import Header
2014-12-17 21:12:33 +00:00
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
2014-12-17 21:12:33 +00:00
from email.mime.text import MIMEText
from email.utils import formatdate
2014-12-20 20:51:48 +00:00
from pdfminer.converter import TextConverter
from pdfminer.layout import LAParams
from pdfminer.pdfinterp import PDFResourceManager, process_pdf
2015-01-13 14:42:14 +00:00
from pdfminer.pdftypes import PSException
2014-12-17 21:12:33 +00:00
from reportlab.pdfgen import canvas
from subprocess import Popen, PIPE
import io
import imghdr
import logging
import inspect
2014-12-17 21:12:33 +00:00
import os
import os.path
2014-12-17 21:12:33 +00:00
import requests
import shutil
import sys
import tempfile
import unittest
2014-12-18 21:26:31 +00:00
2014-12-17 22:06:45 +00:00
class Email2PDFTestCase(unittest.TestCase):
2014-12-18 21:17:59 +00:00
isOnline = None
2014-12-17 22:06:45 +00:00
examineDir = None
2015-07-01 17:32:56 -05:00
time_invoked = None
time_completed = None
NONEXIST_IMG = 'http://www.andrewferrier.com/nonexist.jpg'
2015-03-16 14:09:25 +00:00
NONEXIST_IMG_BLACKLIST = 'http://www.emltrk.com/nonexist.jpg'
EXIST_IMG = 'https://raw.githubusercontent.com/andrewferrier/email2pdf/master/tests/basi2c16.png'
2015-03-16 23:48:44 +00:00
EXIST_IMG_UPPERCASE = 'https://raw.githubusercontent.com/andrewferrier/email2pdf/master/tests/UPPERCASE.png'
2014-12-25 20:34:36 +00:00
COMMAND = os.path.normpath(os.path.join(os.getcwd(), 'email2pdf'))
DEFAULT_FROM = "from@example.org"
DEFAULT_TO = "to@example.org"
DEFAULT_SUBJECT = "Subject of the email"
JPG_FILENAME = 'tests/jpeg444.jpg'
PNG_FILENAME = 'tests/basi2c16.png'
JPG_SIZE = os.path.getsize(JPG_FILENAME)
PNG_SIZE = os.path.getsize(PNG_FILENAME)
2014-12-17 22:06:45 +00:00
def setUp(self):
self.workingDir = tempfile.mkdtemp(dir='/tmp')
2015-07-01 06:24:45 -05:00
self._check_online()
self._check_examine_dir()
2015-07-01 17:32:56 -05:00
def getTimeStamp(self, my_time):
return my_time.strftime("%Y-%m-%dT%H-%M-%S")
2014-12-17 21:12:33 +00:00
2014-12-17 22:06:45 +00:00
def existsByTime(self, path=None):
if self.getTimedFilename(path):
return True
else:
return False
def getTimedFilename(self, path=None):
2014-12-17 22:06:45 +00:00
if path is None:
path = self.workingDir
2014-12-17 21:12:33 +00:00
2015-07-01 17:32:56 -05:00
for single_time in self._timerange(self.time_invoked, self.time_completed):
filename = os.path.join(path, self.getTimeStamp(single_time) + ".pdf")
if os.path.exists(filename):
return filename
2014-12-17 21:12:33 +00:00
return None
2014-12-17 21:12:33 +00:00
def addHeaders(self, frm=DEFAULT_FROM, to=DEFAULT_TO, subject=DEFAULT_SUBJECT, subject_encoding=None):
if subject:
if subject_encoding:
assert isinstance(subject, bytes)
header = Header(subject, subject_encoding)
self.msg['Subject'] = header
else:
assert isinstance(subject, str)
self.msg['Subject'] = subject
if frm:
2014-12-17 22:06:45 +00:00
self.msg['From'] = frm
2014-12-17 21:12:33 +00:00
if to:
2014-12-17 22:06:45 +00:00
self.msg['To'] = to
2014-12-17 21:12:33 +00:00
2014-12-17 22:06:45 +00:00
self.msg['Date'] = formatdate()
2014-12-17 21:12:33 +00:00
2015-07-01 06:24:45 -05:00
def invokeAsSubprocess(self, inputFile=False, outputDirectory=None, outputFile=None, extraParams=None,
2015-03-17 00:22:28 +00:00
expectOutput=False, okToExist=False):
2015-07-01 13:30:25 -05:00
bytes_message = self.msg.as_bytes()
2014-12-17 21:12:33 +00:00
2014-12-25 20:34:36 +00:00
options = [Email2PDFTestCase.COMMAND]
2014-12-17 22:06:45 +00:00
if inputFile:
2015-07-01 06:24:45 -05:00
input_file_handle = tempfile.NamedTemporaryFile()
options.extend(['-i', input_file_handle.name])
2015-07-01 13:30:25 -05:00
input_file_handle.write(bytes_message)
2015-07-01 06:24:45 -05:00
input_file_handle.flush()
my_stdin = None
my_input = None
2014-12-17 22:06:45 +00:00
else:
2015-07-01 06:24:45 -05:00
my_stdin = PIPE
2015-07-01 13:30:25 -05:00
my_input = bytes_message
2014-12-17 22:06:45 +00:00
if outputDirectory:
options.extend(['-d', outputDirectory])
if outputFile:
options.extend(['-o', outputFile])
if not okToExist:
2015-07-01 06:24:45 -05:00
assert not os.path.exists(outputFile)
if extraParams is None:
extraParams = []
2014-12-17 22:06:45 +00:00
options.extend(extraParams)
2015-07-01 17:32:56 -05:00
self.time_invoked = datetime.now()
2014-12-17 22:06:45 +00:00
if outputDirectory is None:
2015-07-01 06:24:45 -05:00
my_cwd = self.workingDir
2014-12-17 22:06:45 +00:00
else:
2015-07-01 06:24:45 -05:00
my_cwd = None
2014-12-17 22:06:45 +00:00
2015-07-01 17:32:56 -05:00
email2pdf_process = Popen(options, stdin=my_stdin, stdout=PIPE, stderr=PIPE, cwd=my_cwd)
2014-12-17 22:06:45 +00:00
2015-07-01 17:32:56 -05:00
output, error = email2pdf_process.communicate(my_input)
email2pdf_process.wait()
self.time_completed = datetime.now()
2014-12-17 22:06:45 +00:00
output = str(output, "utf-8")
error = str(error, "utf-8")
2014-12-17 22:06:45 +00:00
if expectOutput:
self.assertNotEqual("", output)
else:
self.assertEqual("", output)
2014-12-17 22:06:45 +00:00
if inputFile:
2015-07-01 06:24:45 -05:00
input_file_handle.close()
2014-12-17 22:06:45 +00:00
2015-07-01 17:32:56 -05:00
return (email2pdf_process.returncode, output, error)
2014-12-17 21:12:33 +00:00
2015-07-01 06:24:45 -05:00
def invokeDirectly(self, outputDirectory=None, outputFile=None, extraParams=None, completeMessage=None, okToExist=False):
module_path = self._get_original_script_path()
2015-07-01 17:32:56 -05:00
email2pdf = self._get_email2pdf_object(module_path)
if completeMessage:
2015-07-01 13:30:25 -05:00
bytes_message = bytes(completeMessage, 'utf-8')
else:
2015-07-01 13:30:25 -05:00
bytes_message = self.msg.as_bytes()
2014-12-25 00:07:28 +00:00
2015-07-01 06:24:45 -05:00
with tempfile.NamedTemporaryFile() as input_file_handle:
options = [module_path, '-i', input_file_handle.name]
2015-07-01 13:30:25 -05:00
input_file_handle.write(bytes_message)
2015-07-01 06:24:45 -05:00
input_file_handle.flush()
2015-07-01 17:32:56 -05:00
options.extend(['-d', outputDirectory if outputDirectory else self.workingDir])
if outputFile:
options.extend(['-o', outputFile])
if not okToExist:
2015-07-01 06:24:45 -05:00
assert not os.path.exists(outputFile)
if extraParams is None:
extraParams = []
options.extend(extraParams)
stream = io.StringIO()
handler = logging.StreamHandler(stream)
log = logging.getLogger('email2pdf')
log.propagate = False
log.setLevel(logging.DEBUG)
log.addHandler(handler)
2015-07-01 17:32:56 -05:00
self.time_invoked = datetime.now()
try:
email2pdf.main(options, None, handler)
finally:
2015-07-01 17:32:56 -05:00
self.time_completed = datetime.now()
log.removeHandler(handler)
handler.close()
error = stream.getvalue()
2014-12-20 16:23:32 +00:00
return error
2014-12-17 22:06:45 +00:00
def setPlainContent(self, content, charset='UTF-8'):
if isinstance(self.msg, MIMEMultipart):
raise Exception("Cannot call setPlainContent() on a MIME-based message.")
else:
self.msg.set_default_type("text/plain")
self.msg.set_payload(content)
self.msg.set_charset(charset)
2014-12-17 21:12:33 +00:00
def attachHTML(self, content, charset=None):
if not isinstance(self.msg, MIMEMultipart):
2015-07-01 13:36:58 -05:00
raise Exception("Cannot call attachHTML() on a non-MIME-based message.")
else:
# According to the docs
# (https://docs.python.org/3.3/library/email.mime.html), setting
# charset explicitly to None is different from not setting it. Not
# sure how that works. But for the moment, sticking with this
# style of invocation to be safe.
if charset:
self.msg.attach(MIMEText(content, 'html', charset))
else:
self.msg.attach(MIMEText(content, 'html'))
2014-12-17 21:12:33 +00:00
def attachText(self, content, charset=None):
if not isinstance(self.msg, MIMEMultipart):
raise Exception("Cannot call attachText() on a MIME-based message.")
else:
if charset:
self.msg.attach(MIMEText(content, 'plain', charset))
else:
self.msg.attach(MIMEText(content, 'plain'))
2014-12-17 22:06:45 +00:00
def attachPDF(self, string, filePrefix="email2pdf_unittest_file",
2015-01-18 15:38:55 +00:00
extension="pdf", mainContentType="application", subContentType="pdf", no_filename=False):
2015-07-01 06:24:45 -05:00
_, file_name = tempfile.mkstemp(prefix=filePrefix, suffix="." + extension)
2014-12-17 22:06:45 +00:00
try:
2015-07-01 13:30:25 -05:00
pdf_canvas = canvas.Canvas(file_name)
pdf_canvas.drawString(0, 500, string)
pdf_canvas.save()
2014-12-17 22:06:45 +00:00
2015-07-01 13:36:49 -05:00
with open(file_name, "rb") as open_handle:
if no_filename:
self.attachAttachment(mainContentType, subContentType, open_handle.read(), None)
else:
self.attachAttachment(mainContentType, subContentType, open_handle.read(), file_name)
2014-12-17 22:06:45 +00:00
return os.path.basename(file_name)
finally:
os.unlink(file_name)
def attachImage(self, content_id=None, jpeg=True, content_type=None, inline=False, force_filename=False, extension=None):
2014-12-17 22:06:45 +00:00
if jpeg:
2015-07-01 13:30:25 -05:00
real_filename = self.JPG_FILENAME
file_suffix = 'jpg' if not extension else extension
2014-12-17 22:06:45 +00:00
else:
2015-07-01 13:30:25 -05:00
real_filename = self.PNG_FILENAME
file_suffix = 'png' if not extension else extension
2014-12-17 22:06:45 +00:00
2015-07-01 13:30:25 -05:00
with tempfile.NamedTemporaryFile(prefix="email2pdf_unittest_image", suffix="." + file_suffix) as temp_file:
2015-07-01 06:24:45 -05:00
_, basic_file_name = os.path.split(temp_file.name)
2014-12-17 22:06:45 +00:00
2015-07-01 13:30:25 -05:00
with open(real_filename, 'rb') as image_file:
2014-12-17 22:06:45 +00:00
image = MIMEImage(image_file.read())
if content_id:
image.add_header('Content-ID', content_id)
if content_type:
2015-07-01 06:24:45 -05:00
self._replace_header(image, 'Content-Type', content_type)
2014-12-17 21:12:33 +00:00
if inline:
if force_filename:
2015-07-01 06:24:45 -05:00
self._replace_header(image, 'Content-Disposition', 'inline; filename="%s"' % basic_file_name)
else:
2015-07-01 06:24:45 -05:00
self._replace_header(image, 'Content-Disposition', 'inline')
2014-12-17 21:12:33 +00:00
else:
2015-07-01 06:24:45 -05:00
self._replace_header(image, 'Content-Disposition', 'attachment; filename="%s"' % basic_file_name)
2014-12-17 22:06:45 +00:00
self.msg.attach(image)
if inline and not force_filename:
2014-12-17 22:06:45 +00:00
return None
else:
return basic_file_name
def attachAttachment(self, mainContentType, subContentType, data, file_name):
part = MIMEBase(mainContentType, subContentType)
part.set_payload(data)
encoders.encode_base64(part)
2015-01-18 15:38:55 +00:00
if file_name:
part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(file_name))
else:
part.add_header('Content-Disposition', 'inline')
self.msg.attach(part)
def assertIsJPG(self, filename):
self.assertEqual(imghdr.what(filename), 'jpeg')
2015-07-01 17:32:56 -05:00
def getMetadataField(self, pdf_filename, field_name):
with open(pdf_filename, 'rb') as file_input:
2015-07-01 13:30:25 -05:00
input_f = PdfFileReader(file_input)
2015-07-01 17:32:56 -05:00
document_info = input_f.getDocumentInfo()
key = '/' + field_name
if key in document_info.keys():
return document_info[key]
2014-12-17 22:06:45 +00:00
else:
return None
2014-12-20 20:51:48 +00:00
def getPDFText(self, filename):
try:
with io.StringIO() as retstr:
2015-07-01 06:24:45 -05:00
with open(filename, 'rb') as filehandle:
rsrcmgr = PDFResourceManager()
device = TextConverter(rsrcmgr, retstr, laparams=LAParams())
pagenos = set()
2015-07-01 06:24:45 -05:00
process_pdf(rsrcmgr, device, filehandle, pagenos, maxpages=0, password="", caching=True, check_extractable=True)
device.close()
string = retstr.getvalue()
return string
2015-01-18 15:44:17 +00:00
except PSException:
return None
2014-12-20 20:51:48 +00:00
2014-12-17 22:06:45 +00:00
def touch(self, fname):
open(fname, 'w').close()
def find_mount_point(self, path):
while not os.path.ismount(path):
path = os.path.dirname(path)
return path
2015-07-01 06:24:45 -05:00
def _timerange(self, start_time, end_time):
start_time = start_time.replace(microsecond=0)
end_time = end_time.replace(microsecond=0)
for step in range(int((end_time - start_time).seconds) + 1):
yield start_time + timedelta(0, step)
2015-07-01 17:32:56 -05:00
def _replace_header(self, mime_base, header, value):
mime_base.__delitem__(header)
mime_base.add_header(header, value)
2014-12-17 22:06:45 +00:00
2015-07-01 17:32:56 -05:00
@classmethod
def _get_original_script_path(cls):
2015-07-01 06:24:45 -05:00
module_path = inspect.getfile(inspect.currentframe())
module_path = os.path.join(os.path.dirname(os.path.dirname(module_path)), 'email2pdf')
return module_path
2015-07-01 17:32:56 -05:00
@classmethod
def _get_email2pdf_object(cls, module_path):
import importlib.machinery
loader = importlib.machinery.SourceFileLoader('email2pdf', module_path)
return loader.load_module()
2015-07-01 06:24:45 -05:00
@classmethod
def _check_examine_dir(cls):
if Email2PDFTestCase.examineDir is None:
Email2PDFTestCase.examineDir = '/tmp'
Email2PDFTestCase.examineDir = tempfile.mkdtemp(dir=Email2PDFTestCase.examineDir)
print("Output examination directory: " + Email2PDFTestCase.examineDir)
@classmethod
def _check_online(cls):
if Email2PDFTestCase.isOnline is None:
print("Checking if online... ", end="")
sys.stdout.flush()
try:
request = requests.get(Email2PDFTestCase.EXIST_IMG, headers={'Connection': 'close'})
request.raise_for_status()
Email2PDFTestCase.isOnline = True
print("Yes.")
except Exception as exception:
Email2PDFTestCase.isOnline = False
print("No (" + str(exception) + ")")
2014-12-17 22:06:45 +00:00
def tearDown(self):
shutil.rmtree(self.workingDir)