#!/usr/bin/env python3
#
# archive_invoice_files.py
#
# Copyright (C) 2019-2022 Franco Masotti (franco \D\o\T masotti {-A-T-} tutanota \D\o\T com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# See more copyrights and licenses below.
r"""Download, verify, archive and print invoice files."""
import email
import imaplib
import pathlib
import shlex
import shutil
import subprocess
import sys
import tempfile
import traceback
from itertools import permutations
import cups
import dateutil.parser
import fattura_elettronica_reader
import fpyutils
import lxml.etree
import yaml
from weasyprint import CSS, HTML
class EmailError(Exception):
r"""Error."""
def get_attachments(config: dict):
r"""Download and save the attachments."""
validate_config_struct(config)
# Most of this function comes from
# https://github.com/markuz/scripts/blob/master/getmail.py
#
# This file is part of my scripts project
#
# Copyright (c) 2011 Marco Antonio Islas Cruz
#
# This script is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This script is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
# @author Marco Antonio Islas Cruz
# @copyright 2011 Marco Antonio Islas Cruz
# @license http://www.gnu.org/licenses/gpl.txt
conn = imaplib.IMAP4_SSL(host=config['certified_email']['host'],
port=config['certified_email']['port'])
conn.login(user=config['certified_email']['username'],
password=config['certified_email']['password'])
conn.select(mailbox=config['certified_email']['mailbox'])
# message_ids is 1-element list of message ids in BytesIO form.
# Filter by subject and unread emails.
# See:
# https://tools.ietf.org/html/rfc2060.html
# for all the commands and parameters.
typ, message_ids = conn.search(
None,
'(SUBJECT "' + config['certified_email']['subject_filter'] + '")',
'(UNSEEN)')
if typ != 'OK':
raise EmailError
# Email id.
i = 0
# Group attachments by email so that they can be processed easily.
saved_files = dict()
for m_id in message_ids[0].split():
# Once the message is processed it will be set as SEEN (read).
# Attachment group.
saved_files[i] = list()
# Returned data are tuples of message part envelope and data.
# data is 1-element list.
# data [0][0] corresponds to the header,
# while data[0][1] corresponds to the text.
# See:
# https://tools.ietf.org/html/rfc2060.html#page-41
# in particular the RFC822 and BODY parameters.
typ, data = conn.fetch(m_id, '(RFC822)')
if typ != 'OK':
raise EmailError
# Load payload in the email data structure.
text = data[0][1]
msg = email.message_from_bytes(text)
# Get the receiving date of the email.
date = msg['Date']
# Iterate through all the attachments of the email.
for part in msg.walk():
# Skip current element if necessary.
if part.get_content_maintype() == 'multipart':
print('iterating down the tree, skipping...')
continue
if part.get('Content-Disposition') is None:
print('unkown content disposition, skipping...')
continue
# Get the filename and the content.
filename = part.get_filename()
data = part.get_payload(decode=True)
# Get the year and month in terms of local time when the email
# was received.
dt = dateutil.parser.parse(date)
# Define a subpath of 'year/month'.
date_part_path = dt.astimezone(
dateutil.tz.tzlocal()).strftime('%Y/%m')
if (filename is not None and data
and filename not in config['files']['ignore_attachments']):
dst_directory = str(
pathlib.Path(config['files']['destination_base_directory'],
date_part_path))
# Create the final directory.
pathlib.Path(dst_directory).mkdir(mode=0o700,
parents=True,
exist_ok=True)
# Compute the filename path based on the final directory.
filename = str(pathlib.Path(dst_directory, filename))
# Write the attachment content to its file.
with open(filename, 'wb') as f:
f.write(data)
saved_files[i].append(filename)
else:
print(
'undefined filename or no attachments, marking as read anyway'
)
i += 1
conn.close()
conn.logout()
return saved_files
def decode_invoice_file(metadata_file: str,
invoice_file: str,
extract_attachments: bool = False) -> dict:
r"""Try to decode the invoice file."""
source = 'invoice'
file_type = 'p7m'
data = {
'patched': True,
'configuration_file': '',
'write_default_configuration_file': False,
'extract_attachments': extract_attachments,
'invoice_xslt_type': 'ordinaria',
'no_invoice_xml_validation': False,
'force_invoice_schema_file_download': False,
'generate_html_output': True,
'invoice_filename': invoice_file,
'no_checksum_check': False,
'force_invoice_xml_stylesheet_file_download': False,
'ignore_attachment_extension_whitelist': False,
'ignore_attachment_filetype_whitelist': False,
'metadata_file': metadata_file,
'ignore_signature_check': False,
'ignore_signers_certificate_check': False,
'force_trusted_list_file_download': False,
'keep_original_file': True,
'ignore_assets_checksum': False,
'destination_directory': str(pathlib.Path(invoice_file).parents[0])
}
status = {
'invoice_file': invoice_file,
'valid_checksum': True,
'valid_signature_and_signers_certificate': True,
'valid_assets_checksum': True,
'file_type': file_type,
}
# Most probably a metadata file or a non-signed invoice file.
# Metadata file must have .xml as extension
# Avoid case sensitivity problems.
if str(pathlib.PurePath(metadata_file).suffix).lower() == '.xml':
done = False
else:
done = True
# Unprocessed.
status['invoice_file'] = ''
while not done:
try:
fattura_elettronica_reader.pipeline(source=source,
file_type=file_type,
data=data)
done = True
except fattura_elettronica_reader.exceptions.InvoiceFileChecksumFailed:
if status['valid_checksum']:
status['valid_checksum'] = False
# Ignore checksum at the next iteration but mark the checksum
# as invalid.
data['no_checksum_check'] = True
except fattura_elettronica_reader.exceptions.P7MFileNotAuthentic:
if status['valid_signature_and_signers_certificate']:
status['valid_signature_and_signers_certificate'] = False
data['ignore_signature_check'] = True
data['ignore_signers_certificate_check'] = True
except fattura_elettronica_reader.exceptions.P7MFileDoesNotHaveACoherentCryptographicalSignature:
if status['file_type'] == 'p7m':
status['file_type'] = 'plain'
file_type = 'plain'
except lxml.etree.LxmlError:
# The selected metadata file is the real invoice file.
# Retry with the next loop from the caller function.
done = True
traceback.print_exc()
except fattura_elettronica_reader.exceptions.AssetsChecksumDoesNotMatch:
if status['valid_assets_checksum']:
status['valid_assets_checksum'] = False
data['ignore_assets_checksum'] = True
except fattura_elettronica_reader.exceptions.CannotExtractOriginalP7MFile:
# Fatal error.
done = True
traceback.print_exc()
sys.exit(1)
return status
def validate_decoded_invoice_files_struct(struct: list):
r"""Check if the data structure corresponds to the specifications."""
for e in struct:
if not isinstance(e, dict):
raise TypeError
if 'invoice_file' not in e:
raise ValueError
if 'valid_checksum' not in e:
raise ValueError
if 'valid_signature_and_signers_certificate' not in e:
raise ValueError
if 'valid_assets_checksum' not in e:
raise ValueError
if 'file_type' not in e:
raise ValueError
if not isinstance(e['invoice_file'], str):
raise TypeError
if not isinstance(e['valid_checksum'], bool):
raise TypeError
if not isinstance(e['valid_signature_and_signers_certificate'], bool):
raise TypeError
if not isinstance(e['valid_assets_checksum'], bool):
raise TypeError
if not isinstance(e['file_type'], str):
raise TypeError
if e['file_type'] not in ['p7m', 'plain']:
raise ValueError
def validate_config_struct(data: dict):
r"""Check if the data structure corresponds to the specifications."""
if 'certified_email' not in data:
raise ValueError
if 'files' not in data:
raise ValueError
if 'print' not in data:
raise ValueError
if 'invoice' not in data:
raise ValueError
if 'status_page' not in data:
raise ValueError
if 'notify' not in data:
raise ValueError
if 'host' not in data['certified_email']:
raise ValueError
if 'port' not in data['certified_email']:
raise ValueError
if 'username' not in data['certified_email']:
raise ValueError
if 'password' not in data['certified_email']:
raise ValueError
if 'mailbox' not in data['certified_email']:
raise ValueError
if 'subject_filter' not in data['certified_email']:
raise ValueError
if 'destination_base_directory' not in data['files']:
raise ValueError
if 'ignore_attachments' not in data['files']:
raise ValueError
if 'printer' not in data['print']:
raise ValueError
if 'css_string' not in data['print']:
raise ValueError
if 'file' not in data['invoice']:
raise ValueError
if 'attachments' not in data['invoice']:
raise ValueError
if 'file' not in data['status_page']:
raise ValueError
if 'show' not in data['status_page']:
raise ValueError
if 'status' not in data['status_page']:
raise ValueError
if 'gotify' not in data['notify']:
raise ValueError
if not isinstance(data['certified_email']['host'], str):
raise TypeError
if not isinstance(data['certified_email']['port'], int):
raise TypeError
if not isinstance(data['certified_email']['username'], str):
raise TypeError
if not isinstance(data['certified_email']['password'], str):
raise TypeError
if not isinstance(data['certified_email']['mailbox'], str):
raise TypeError
if not isinstance(data['certified_email']['subject_filter'], str):
raise TypeError
if not isinstance(data['files']['destination_base_directory'], str):
raise TypeError
if not isinstance(data['files']['ignore_attachments'], list):
raise TypeError
if not isinstance(data['print']['printer'], str):
raise TypeError
if not isinstance(data['print']['css_string'], str):
raise TypeError
if 'print' not in data['invoice']['file']:
raise ValueError
if 'extract' not in data['invoice']['attachments']:
raise ValueError
if 'print' not in data['invoice']['attachments']:
raise ValueError
if 'store' not in data['status_page']['file']:
raise ValueError
if 'print' not in data['status_page']['file']:
raise ValueError
if 'info' not in data['status_page']['show']:
raise ValueError
if 'openssl_version' not in data['status_page']['show']:
raise ValueError
if 'crypto' not in data['status_page']['status']:
raise ValueError
if 'checksum' not in data['status_page']['status']:
raise ValueError
if 'p7m' not in data['status_page']['status']:
raise ValueError
if 'assets' not in data['status_page']['status']:
raise ValueError
if 'enabled' not in data['notify']['gotify']:
raise ValueError
if 'url' not in data['notify']['gotify']:
raise ValueError
if 'token' not in data['notify']['gotify']:
raise ValueError
if 'message' not in data['notify']['gotify']:
raise ValueError
if 'priority' not in data['notify']['gotify']:
raise ValueError
for a in data['files']['ignore_attachments']:
if not isinstance(a, str):
raise TypeError
if not isinstance(data['invoice']['file']['print'], bool):
raise TypeError
if not isinstance(data['invoice']['attachments']['extract'], bool):
raise TypeError
if not isinstance(data['invoice']['attachments']['print'], bool):
raise TypeError
if not isinstance(data['status_page']['file']['store'], bool):
raise TypeError
if not isinstance(data['status_page']['file']['print'], bool):
raise TypeError
if 'enabled' not in data['status_page']['show']['info']:
raise ValueError
if 'url' not in data['status_page']['show']['info']:
raise ValueError
if 'enabled' not in data['status_page']['show']['openssl_version']:
raise ValueError
if 'enabled' not in data['status_page']['status']['crypto']:
raise ValueError
if 'message' not in data['status_page']['status']['crypto']:
raise ValueError
if 'valid_value' not in data['status_page']['status']['crypto']:
raise ValueError
if 'invalid_value' not in data['status_page']['status']['crypto']:
raise ValueError
if 'enabled' not in data['status_page']['status']['checksum']:
raise ValueError
if 'message' not in data['status_page']['status']['checksum']:
raise ValueError
if 'valid_value' not in data['status_page']['status']['checksum']:
raise ValueError
if 'invalid_value' not in data['status_page']['status']['checksum']:
raise ValueError
if 'enabled' not in data['status_page']['status']['p7m']:
raise ValueError
if 'message' not in data['status_page']['status']['p7m']:
raise ValueError
if 'valid_value' not in data['status_page']['status']['p7m']:
raise ValueError
if 'invalid_value' not in data['status_page']['status']['p7m']:
raise ValueError
if 'enabled' not in data['status_page']['status']['assets']:
raise ValueError
if 'message' not in data['status_page']['status']['assets']:
raise ValueError
if 'valid_value' not in data['status_page']['status']['assets']:
raise ValueError
if 'invalid_value' not in data['status_page']['status']['assets']:
raise ValueError
if not isinstance(data['status_page']['show']['info']['enabled'], bool):
raise TypeError
if not isinstance(data['status_page']['show']['info']['url'], str):
raise TypeError
if not isinstance(
data['status_page']['show']['openssl_version']['enabled'], bool):
raise TypeError
if not isinstance(data['status_page']['status']['crypto']['enabled'],
bool):
raise TypeError
if not isinstance(data['status_page']['status']['crypto']['message'], str):
raise TypeError
if not isinstance(data['status_page']['status']['crypto']['valid_value'],
str):
raise TypeError
if not isinstance(data['status_page']['status']['crypto']['invalid_value'],
str):
raise TypeError
if not isinstance(data['status_page']['status']['checksum']['enabled'],
bool):
raise TypeError
if not isinstance(data['status_page']['status']['checksum']['message'],
str):
raise TypeError
if not isinstance(data['status_page']['status']['checksum']['valid_value'],
str):
raise TypeError
if not isinstance(
data['status_page']['status']['checksum']['invalid_value'], str):
raise TypeError
if not isinstance(data['status_page']['status']['p7m']['enabled'], bool):
raise TypeError
if not isinstance(data['status_page']['status']['p7m']['message'], str):
raise TypeError
if not isinstance(data['status_page']['status']['p7m']['valid_value'],
str):
raise TypeError
if not isinstance(data['status_page']['status']['p7m']['invalid_value'],
str):
raise TypeError
if not isinstance(data['status_page']['status']['assets']['enabled'],
bool):
raise TypeError
if not isinstance(data['status_page']['status']['assets']['message'], str):
raise TypeError
if not isinstance(data['status_page']['status']['assets']['valid_value'],
str):
raise TypeError
if not isinstance(data['status_page']['status']['assets']['invalid_value'],
str):
raise TypeError
def decode_invoice_files(file_group: dict,
extract_attachments: bool = False) -> list:
r"""Decode multiple invoice files."""
invoice_files = list()
for i in file_group:
files = file_group[i]
perm = permutations(files)
files_perm = list(perm)
j = 0
done = False
while j < len(files_perm) and not done:
# Try all permutations.
metadata_file = files_perm[j][0]
invoice_file = files_perm[j][1]
status = decode_invoice_file(metadata_file, invoice_file,
extract_attachments)
if status['invoice_file'] != '':
# Ignore unprocessed files.
invoice_files.append(status)
# There is no need to try to invert the input files because
# processing completed correctly.
done = True
j += 1
return invoice_files
def print_file(printer, file, job_name, proprieties):
r"""Print a file with CUPS."""
conn = cups.Connection()
conn.printFile(printer, file, job_name, proprieties)
def print_invoice(file: dict, data: dict):
r"""Print the invoice file."""
validate_config_struct(data)
html_file = file['invoice_file'] + '.html'
with tempfile.NamedTemporaryFile() as g:
css = CSS(string=data['print']['css_string'])
html = HTML(html_file)
temp_name = g.name
html.write_pdf(temp_name, stylesheets=[css])
print_file(data['print']['printer'], temp_name, 'invoice',
{'media': 'a4'})
def get_status_page(file: dict, data: dict):
r"""Save and print the status page."""
validate_config_struct(data)
html_file = file['invoice_file'] + '.html'
content = '' + pathlib.Path(html_file).stem + '
'
if data['status_page']['show']['info']['enabled']:
content += 'generated by ' + data['status_page']['show'][
'info']['url'] + '
'
if data['status_page']['show']['openssl_version']['enabled']:
content += '' + subprocess.run(
shlex.split('openssl version'), capture_output=True,
shell=False).stdout.decode('UTF-8').rstrip() + '
'
if data['status_page']['status']['crypto']['enabled']:
if file['valid_signature_and_signers_certificate']:
content += '' + data['status_page']['status']['crypto'][
'message'] + ' ' + data['status_page']['status']['crypto'][
'valid_value'] + '
'
else:
content += '' + data['status_page']['status']['crypto'][
'message'] + ' ' + data['status_page']['status']['crypto'][
'invalid_value'] + '
'
if data['status_page']['status']['checksum']['enabled']:
if file['valid_checksum']:
content += '' + data['status_page']['status']['checksum'][
'message'] + ' ' + data['status_page']['status']['checksum'][
'valid_value'] + '
'
else:
content += '' + data['status_page']['status']['checksum'][
'message'] + ' ' + data['status_page']['status']['checksum'][
'invalid_value'] + '
'
if data['status_page']['status']['p7m']['enabled']:
if file['file_type'] == 'p7m':
content += '' + data['status_page']['status']['p7m'][
'message'] + ' ' + data['status_page']['status']['p7m'][
'valid_value'] + '
'
else:
content += '' + data['status_page']['status']['p7m'][
'message'] + ' ' + data['status_page']['status']['p7m'][
'invalid_value'] + '
'
if data['status_page']['status']['assets']['enabled']:
if file['valid_assets_checksum']:
content += '' + data['status_page']['status']['assets'][
'message'] + ' ' + data['status_page']['status']['assets'][
'valid_value'] + '
'
else:
content += '' + data['status_page']['status']['assets'][
'message'] + ' ' + data['status_page']['status']['assets'][
'invalid_value'] + '
'
# Save and print.
with tempfile.TemporaryDirectory() as tmpdirname:
css = CSS(string=data['print']['css_string'])
html = HTML(string=content)
status_page_tmp_path = str(pathlib.Path(tmpdirname, 'status_page.pdf'))
html.write_pdf(status_page_tmp_path, stylesheets=[css])
if data['status_page']['file']['print']:
print_file(data['print']['printer'], status_page_tmp_path,
'status_page', {'media': 'a4'})
if data['status_page']['file']['store']:
dir = pathlib.Path(file['invoice_file']).parent
shutil.move(
status_page_tmp_path,
str(
pathlib.Path(dir,
file['invoice_file'] + '_status_page.pdf')))
if __name__ == '__main__':
configuration_file = sys.argv[1]
config = yaml.load(open(configuration_file), Loader=yaml.SafeLoader)
validate_config_struct(config)
pathlib.Path(config['files']['destination_base_directory']).mkdir(
mode=0o700, parents=True, exist_ok=True)
file_group = get_attachments(config)
decoded_invoice_files = decode_invoice_files(
file_group, config['invoice']['attachments']['extract'])
validate_decoded_invoice_files_struct(decoded_invoice_files)
for f in decoded_invoice_files:
if config['invoice']['file']['print']:
print_invoice(f, config)
get_status_page(f, config)
message = 'processed invoice = ' + pathlib.Path(f['invoice_file']).name
if config['notify']['gotify']['enabled']:
m = config['notify']['gotify']['message'] + '\n' + message
fpyutils.notify.send_gotify_message(
config['notify']['gotify']['url'],
config['notify']['gotify']['token'], m,
config['notify']['gotify']['title'],
config['notify']['gotify']['priority'])
if config['notify']['email']['enabled']:
fpyutils.notify.send_email(
message, config['notify']['email']['smtp_server'],
config['notify']['email']['port'],
config['notify']['email']['sender'],
config['notify']['email']['user'],
config['notify']['email']['password'],
config['notify']['email']['receiver'],
config['notify']['email']['subject'])