import gpg import hkp4py import os import sys import asyncio def do_hkp_stuff(server, fingerprint, log): # This isn't perfect, it's a loose search # for example searching for 7C79FCBB8372E5DE5B17E09A90D4B9641E092971 on keyserver.ubuntu.com returns the wrong key. But it works 99% of the time. try: pubkey = hkp4py.KeyServer(f'hkp://{server}').search(f'0x{fingerprint}') log.write(f'{server}: {pubkey}\n') if pubkey != None: import_result = gpg.Context().key_import(pubkey[0].key.encode()) log.write(f'Attempted to import from {server}: {import_result}\n') else: log.write(f'{server}: None\n') except Exception as e: log.write(f'{server}: [ERROR] {e}\n\n') def get_first_real_result(results): for s in results: s = list(s) for task in s: if task != None: return task.result() return None XDG_DATA_HOME = os.getenv('XDG_DATA_HOME') HOME = os.path.expanduser('~') if XDG_DATA_HOME == None: XDG_DATA_HOME = f'{HOME}/.local/share' filename = sys.argv[-1] email = ''.join(open(filename, 'rt').readlines()) if 'gpg-email-helper' not in os.listdir(f'{XDG_DATA_HOME}'): os.mkdir(f'{XDG_DATA_HOME}/gpg-email-helper') async def main(log): log.write(f'\n\nRunning on {filename}\n') if ( email.rfind('-----BEGIN PGP PUBLIC KEY BLOCK-----') != -1 and email.rfind('-----END PGP PUBLIC KEY BLOCK-----') != -1 ): public_key = email[ email.rfind('-----BEGIN PGP PUBLIC KEY BLOCK-----') : email.rfind( '-----END PGP PUBLIC KEY BLOCK-----' ) + 35 ] # quoted-printable attachments have the equals sign escaped as =3D apparently public_key = public_key.replace('=3D', '=') log.write( f'Attempted to import key from email: {gpg.Context(armor=True).key_import(public_key.encode())}\n' ) elif ( email.rfind('-----BEGIN PGP SIGNATURE-----') != -1 and email.rfind('-----END PGP SIGNATURE-----') != -1 ) or (email.rfind('-----BEGIN PGP MESSAGE-----') != -1 and email.rfind('-----END PGP MESSAGE-----') != -1): signature = email[ email.rfind('-----BEGIN PGP SIGNATURE-----') : email.rfind( '-----END PGP SIGNATURE-----' ) + 27 ] if signature == '': signature = email[ email.rfind('-----BEGIN PGP MESSAGE-----') : email.rfind( '-----END PGP MESSAGE-----' ) + 25 ] signature.replace('=3D', '=') try: # Needed to skip geting the key if it already exists. gpg.Context().verify(signature.encode()) except gpg.errors.BadSignatures as e: if 'No public key' in str(e): fingerprint = str(e)[: str(e).find(':')] log.write( f'{fingerprint} not found, gonna try to find it on a keyserver...\n' ) keyservers = [ 'keyserver.ubuntu.com', 'pgpkeys.eu', 'pgp.cyberbits.eu', 'pgp.flexyz.com', 'sks.pgpkeys.eu', 'fi.pgpkeys.eu', 'keys.nicemail.eu', 'pubkeys.intevation.de', 'pgp.surf.nl', 'sks.pod03.togsvcs.com', 'sks.ewr1.newconews.org', 'sks.pod02.fleetstreetops.com', 'sks.pod01.fleetstreetops.com', 'pgp.net.nz', 'zuul.rediris.es', 'pgp.gnd.pw', 'openpgp.circl.lu', 'keywin.trifence.ch', 'data.pgp.gnd.pw', 'sks.infcs.de', 'keyserver.escomposlinux.org', 'keyserver.cert.or.id', 'keyserver1.computer42.org', 'keyserver2.computer42.org', 'keyserver.spline.inf.fu-berlin.de', 'key-server.org', 'pgp.id', 'sks.ygrek.org', #'pgp.mit.edu' # still in operation but very unreliable ] # multithreads it # TODO: make it stop after a thread gets the key tasks = [ asyncio.create_task( asyncio.to_thread(do_hkp_stuff, server, fingerprint, log) ) for server in keyservers ] tasks = set(tasks) await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) except Exception as e: log.write(f'[ERROR] {e}\n\n') with open(f'{XDG_DATA_HOME}/gpg-email-helper/log', 'a') as log: asyncio.run(main(log))