1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# vim: set sw=4 sts=4 et :
# Copyright: 2008-2009 Gentoo Foundation
# Author(s): Nirbheek Chauhan <nirbheek.chauhan@gmail.com>
# License: GPL-3
#
import subprocess, os
from .. import config
class Crypto(object):
"""
Data Encrypter/Decrypter
"""
def __init__(self, gpghome=config.GPGHOME):
"""
@param gpghome: Home directory for GPG
@type gpghome: string
"""
self.gpghome = gpghome
self.gpgcmd = 'gpg -a --keyid-format long --trust-model always '
self.gpgcmd += '--homedir="%s" ' % self.gpghome
def _validate_gpghome(self):
try:
gpghome = open(self.gpghome+'/secring.gpg')
except IOError, OSError:
raise Exception('"%s": Unable to use GPG homedir' % self.gpghome)
finally:
gpghome.close()
def _get_fp_from_keyid(self, keyid):
gpg_args = '--with-colons --fingerprint --list-keys "%s"' % keyid
process = subprocess.Popen(self.gpgcmd+gpg_args, shell=True,
stdout=subprocess.PIPE)
output = process.stdout.readlines()
process.wait()
for line in output:
# Fingerprint line
if line.startswith('fpr'):
# Fingerprint
return line.split(':')[-2]
def export_pubkey(self, file, which):
self._validate_gpghome()
gpg_args = '--export "%s" > "%s"' % (which, file)
print self.gpgcmd+gpg_args
subprocess.check_call(self.gpgcmd+gpg_args, shell=True)
def import_pubkey(self, pubkey):
self._validate_gpghome()
gpg_args = '--import <<<"%s"' % pubkey
subprocess.check_call(self.gpgcmd+gpg_args, shell=True)
def init_gpghome(self, name='Test AutotuA Slave', email='test_slave@test.org',
expire='5y', length='4096'):
"""
Initialize a GnuPG home by generating keys
"""
params = (('Key-Type', 'DSA'),
('Key-Length', '1024'),
('Subkey-Type', 'ELG-E'),
('Subkey-Length', length),
('Name-Real', name),
('Name-Email', email),
('Expire-Date', expire),)
# Batch mode
gpg_args = '--batch --gen-key <<<"'
gpg_args += '%echo Generating keys.. [Worship the gods of randomness :p]'
for param in params:
gpg_args += '\n%s: %s' % param
gpg_args += '"'
subprocess.check_call(self.gpgcmd+gpg_args, shell=True)
print 'Done.'
def encrypt(self, data, recipient='Autotua Master'):
"""
@param data: Data to be encrypted
@type data: string
@param recipient: Recipient for the data
@type recipient: string
returns: encrypted_data
"""
self._validate_gpghome()
gpg_args = '--encrypt --sign --recipient "%s" <<<"%s"' % (recipient, data)
process = subprocess.Popen(self.gpgcmd+gpg_args, shell=True,
stdout=subprocess.PIPE)
data = process.stdout.read()
process.wait()
if process.returncode < 0:
raise Exception('Unable to encrypt, something went wrong :(')
return data
def decrypt(self, data):
"""
@param data: Data to be encrypted
@type data: string
returns: (decrypted_data, sender)
"""
self._validate_gpghome()
gpg_args = '--decrypt <<<"%s"' % data
process = subprocess.Popen(self.gpgcmd+gpg_args, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ddata = process.stdout.read()[:-1] # Extra \n at the end :-/
# Get the output to stderr
gpg_out = process.stderr.readlines()
process.wait()
if process.returncode < 0 or not ddata:
raise Exception('Unable to decrypt, something went wrong')
# Get the line with the DSA long key ID
for line in gpg_out:
if line.find('using DSA key') != -1:
# Get the long key ID
gpg_out = line.split()[-1]
break
# Get the fingerprint
sender = self._get_fp_from_keyid(gpg_out)
return (ddata, sender)
|