mirror of
https://github.com/andrewferrier/email2pdf.git
synced 2025-03-18 05:52:59 +00:00
Attach and embed images (relates to issue #12).
This commit is contained in:
parent
49544a4191
commit
6e9bc762f1
3 changed files with 64 additions and 14 deletions
44
email2pdf
44
email2pdf
|
@ -5,8 +5,11 @@ from subprocess import Popen, PIPE
|
|||
from email.header import decode_header
|
||||
import argparse
|
||||
import email
|
||||
import magic
|
||||
import io
|
||||
import os
|
||||
import os.path
|
||||
import re
|
||||
import sys
|
||||
|
||||
|
||||
|
@ -66,9 +69,9 @@ def main():
|
|||
else:
|
||||
header_info = ""
|
||||
|
||||
part = find_part_depth_first(my_email, "text/html")
|
||||
part = find_part_by_content_type(my_email, "text/html")
|
||||
if part is None:
|
||||
part = find_part_depth_first(my_email, "text/plain")
|
||||
part = find_part_by_content_type(my_email, "text/plain")
|
||||
if part is None:
|
||||
raise ExitCodeException("Cannot find an appropriate payload in email.")
|
||||
else:
|
||||
|
@ -77,18 +80,25 @@ def main():
|
|||
else:
|
||||
payload = part.get_payload(decode=True)
|
||||
|
||||
def cid_replace(matchobj):
|
||||
imagePart = find_part_by_content_id(my_email, matchobj.group(1))
|
||||
assert(imagePart['Content-Transfer-Encoding'] == 'base64')
|
||||
imageBase64 = imagePart.get_payload(decode=False)
|
||||
imageDecoded = imagePart.get_payload(decode=True)
|
||||
m = magic.open(magic.MAGIC_MIME_TYPE)
|
||||
m.load()
|
||||
mimeType = m.buffer(imageDecoded)
|
||||
return "data:" + mimeType + ";base64," + imageBase64
|
||||
|
||||
payload = bytes(re.sub('cid:([\w_-]+)', cid_replace, str(payload, encoding='utf-8')), 'UTF-8')
|
||||
|
||||
payload = bytes(header_info, 'UTF-8') + payload
|
||||
|
||||
p = Popen(['wkhtmltopdf', '-q', '--load-error-handling', 'ignore', '--load-media-error-handling',
|
||||
'ignore', '-', output_file_name], stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
||||
output, error = p.communicate(input=payload)
|
||||
if p.returncode > 0:
|
||||
if error == bytes('Exit with code 1 due to network error: ProtocolUnknownError\n', 'UTF-8'):
|
||||
# WARNING: we should handle this better - see
|
||||
# https://github.com/andrewferrier/email2pdf/issues/12
|
||||
pass
|
||||
else:
|
||||
raise ExitCodeException("wkhtmltopdf failed with exit code " + str(p.returncode))
|
||||
raise ExitCodeException("wkhtmltopdf failed with exit code " + str(p.returncode))
|
||||
|
||||
|
||||
def handle_pdf_parts(email, output_directory):
|
||||
|
@ -111,12 +121,14 @@ def handle_pdf_parts(email, output_directory):
|
|||
with open(fullFilename, 'wb') as output_file:
|
||||
output_file.write(payload)
|
||||
|
||||
|
||||
def extract_part_filename(part):
|
||||
filename = part.get_filename()
|
||||
if decode_header(filename)[0][1] is not None:
|
||||
filename = str(decode_header(filename)[0][0]).decode(decode_header(filename)[0][1])
|
||||
return filename
|
||||
|
||||
|
||||
def get_unique_version(filename):
|
||||
# From here: http://stackoverflow.com/q/183480/27641
|
||||
counter = 1
|
||||
|
@ -127,10 +139,10 @@ def get_unique_version(filename):
|
|||
return filename
|
||||
|
||||
|
||||
def find_part_depth_first(message, content_type):
|
||||
def find_part_by_content_type(message, content_type):
|
||||
if message.is_multipart():
|
||||
for part in message.get_payload():
|
||||
value = find_part_depth_first(part, content_type)
|
||||
value = find_part_by_content_type(part, content_type)
|
||||
if value is not None:
|
||||
return value
|
||||
elif message.get_content_type() == content_type:
|
||||
|
@ -139,6 +151,18 @@ def find_part_depth_first(message, content_type):
|
|||
return None
|
||||
|
||||
|
||||
def find_part_by_content_id(message, content_id):
|
||||
if message.is_multipart():
|
||||
for part in message.get_payload():
|
||||
value = find_part_by_content_id(part, content_id)
|
||||
if value is not None:
|
||||
return value
|
||||
elif message['Content-ID'] in (content_id, '<' + content_id + '>'):
|
||||
return message
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def find_all_parts(message, content_type):
|
||||
parts = []
|
||||
|
||||
|
|
|
@ -1,14 +1,15 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from datetime import datetime
|
||||
from email import encoders
|
||||
from email.message import Message
|
||||
from email.mime.base import MIMEBase
|
||||
from email.mime.image import MIMEImage
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
from email.mime.base import MIMEBase
|
||||
from email import encoders
|
||||
from email.utils import formatdate
|
||||
from subprocess import Popen, PIPE, DEVNULL
|
||||
from reportlab.pdfgen import canvas
|
||||
from subprocess import Popen, PIPE, DEVNULL
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
@ -96,6 +97,12 @@ class BaseTestClasses:
|
|||
finally:
|
||||
os.unlink(file_name)
|
||||
|
||||
def attachImage(self, imageId):
|
||||
with open('jpeg444.jpg', 'rb') as image_file:
|
||||
image = MIMEImage(image_file.read())
|
||||
image.add_header('Content-ID', imageId)
|
||||
self.msg.attach(image)
|
||||
|
||||
def tearDown(self):
|
||||
time.sleep(DELAY)
|
||||
|
||||
|
@ -193,7 +200,26 @@ class TestMIME(BaseTestClasses.Email2PDFTestCase):
|
|||
|
||||
def test_embeddedImageEmail(self):
|
||||
self.addHeaders("From", "To", "Subject")
|
||||
self.attachHTML('<img src=cid:_1_C9C396E8C9C391380055638680257D67>')
|
||||
self.attachImage('myid')
|
||||
self.attachHTML('<img src=cid:myid>')
|
||||
self.assertEqual(self.invokeEmail2PDF(), 0)
|
||||
|
||||
def test_embeddedImageEmail2(self):
|
||||
self.addHeaders("From", "To", "Subject")
|
||||
self.attachImage('<my_id>')
|
||||
self.attachHTML('<img src=cid:my_id>')
|
||||
self.assertEqual(self.invokeEmail2PDF(), 0)
|
||||
|
||||
def test_embeddedImageEmail3(self):
|
||||
self.addHeaders("From", "To", "Subject")
|
||||
self.attachImage('myid')
|
||||
self.attachHTML('<p><img src="blah.jpg"><li></li><img src="cid:myid"></p>')
|
||||
self.assertEqual(self.invokeEmail2PDF(), 0)
|
||||
|
||||
def test_embeddedImageEmail4(self):
|
||||
self.addHeaders("From", "To", "Subject")
|
||||
self.attachImage('myid')
|
||||
self.attachHTML('<IMG SRC="cid:myid">')
|
||||
self.assertEqual(self.invokeEmail2PDF(), 0)
|
||||
|
||||
def test_somethingElseAsOctetStream(self):
|
||||
|
|
BIN
jpeg444.jpg
Normal file
BIN
jpeg444.jpg
Normal file
Binary file not shown.
After ![]() (image error) Size: 5.5 KiB |
Loading…
Reference in a new issue