Skip to content
Commits on Source (690)
*.pyc
*~
SWIG/_m2crypto.py
SWIG/_m2crypto_wrap.c
build/
/dist
/M2Crypto/__m2crypto.so
/SWIG/_m2crypto.py
/M2Crypto.egg-info
/EGG-INFO
tests/randpool.dat
tests/sig.p7
tests/sig.p7s
tests/tmpcert.der
M2Crypto/_m2crypto.py
This diff is collapsed.
====================
Installing M2Crypto
====================
:Maintainer: Heikki Toivonen
:Web-Site: http://chandlerproject.org/Projects/MeTooCrypto
.. contents::
Pre-requisites
----------------
The following software packages are pre-requisites:
- **Python 2.6 or newer**
- **OpenSSL 0.9.7 or newer**
- **SWIG 1.3.28 or newer**
Note about OpenSSL versions early in the 0.9.7 series
-----------------------------------------------------
Early OpenSSL 0.9.7 versions require the __i386__ symbol to be defined.
Uncomment this line in setup.py:
#'-D__i386__', # Uncomment for early OpenSSL 0.9.7 versions
if you get this compile-time error:
This openssl-devel package does not work your architecture?
Note about Fedora Core -based Distributions
-----------------------------------------------------
Fedora Core (and RedHat, CentOS etc.) have made changes to OpenSSL
configuration compared to many other Linux distributions. If you can not
build M2Crypto normally, try the fedora_setup.sh script included with
M2Crypto sources.
Installing on Unix-like systems, including Cygwin
-------------------------------------------------
::
$ tar zxf m2crypto-<version>.tar.gz
$ cd m2crypto-<version>
$ python setup.py build
$ python setup.py install
If you have installed setuptools you can also optionally run tests like this:
$ python setup.py test
This assumes OpenSSL is installed in /usr. You can provide an alternate
OpenSSL prefix location with --openssl option to build_ext command. Other
commands accept standard options if you need them.
Some distributions, like Fedora Core, package OpenSSL headers in a different
location from OpenSSL itself. In that case you need to tell build_ext the
additional include location with -I option.
Differences when installing on Windows
--------------------------------------
Before building from source, you need to install OpenSSL's include files,
import libraries and DLLs. By default setup.py assumes that OpenSSL include
files are in ``c:\pkg\openssl\include``, and the import libraries
in ``c:\pkg\openssl\lib``. As with other platforms, you can specify a different
OpenSSL location with --openssl option to build_ext command.
Using OpenSSL 0.9.8 on Windows requires Python be built with applink.c
(add an include statement in python.c). This is not a requirement for
Linux or MacOSX. (applink.c is provided by OpenSSL.)
MSVC++
~~~~~~~~
setup.py is already configured to work with MSVC++ by default.
With MSVC++, the OpenSSL DLLs, as built, are named ``libeay32.dll``
and ``ssleay32.dll``. Install these somewhere on your PATH; for example
in ``c:\bin``, together with ``openssl.exe``.
For MSVC++, the import libraries, as built by OpenSSL, are named
``libeay32.lib`` and ``ssleay32.lib``.
MINGW
~~~~~~~
.. NOTE::
The following instructions for building M2Crypto with MINGW are from
M2Crypto 0.12. These instructions should continue to work for this release,
although I have not tested them.
Read Sebastien Sauvage's webpage:
http://sebsauvage.net/python/mingw.html
For mingw32, the OpenSSL import libraries are named ``libeay32.a`` and
``libssl32.a``. You may need to edit setup.py file for these.
You'll also need to create ``libpython2[123].a``, depending on your version
of Python.
OpenSSL DLLs for mingw32 are named ``libeay32.dll`` and ``libssl32.dll``.
Install these somewhere on your PATH; for example in
``c:\bin``, together with ``openssl.exe``.
Build M2Crypto:
python setup.py build -cmingw32
python setup.py install
BC++
~~~~~~
.. NOTE::
The following instructions for building M2Crypto with MSVC++ 6.0 and
BC++ 5.5 free compiler suite are from M2Crypto 0.10. These instructions
should continue to work for this release, although I have not tested
them.
For BC++ these files are created from the MSVC++-built ones using the
tool ``coff2omf.exe``. I call them ``libeay32_bc.lib`` and
``ssleay32_bc.lib``, respectively. You will need to edit setup.py file
for these.
You'll also need Python's import library, e.g., ``python22.lib``, to
be the BC++-compatible version; i.e., create ``python22_bc.lib`` from
``python22.lib``, save a copy of ``python22.lib`` (as ``python22_vc.lib``,
say), then rename ``python22_bc.lib`` to ``python22.lib``.
Now you are ready to build M2Crypto. Do one of the following::
python setup.py build
python setup.py build -cbcpp
Then,
::
python setup.py install
MacOSX
------
Follow the standard instructions to build and install M2Crypto.
However, should you encounter difficulties, you may want to consider
the following possibilities.
- Distutils from Python 2.5 provides support for universal builds (ppc
and i386) and Distutils requires a recent version of Xcode. See
http://developer.apple.com/tools/download/
- OpenSSL 0.9.7l gets installed in /usr with Apple's Security
Update 2006-007. If you need features in OpenSSL 0.9.8, you
should consider installing 0.9.8 in /usr/local. The commands
are:
OpenSSL:
./config shared --prefix=/usr/local
make
make test
sudo make install [or... install_sw]
M2Crypto:
python setup.py build build_ext --openssl=/usr/local
sudo python setup.py install build_ext --openssl=/usr/local
To make Universal builds, you will need to uncomment a line in setup.py:
extra_link_args = ['-Wl,-search_paths_first'],
If that does not work, here is what Marc Hedlund was able to get working:
First, download OpenSSL 0.9.8d and unpack it. Edit the OpenSSL Makefiles
per PROBLEMS. Then:
./config no-shared no-asm --prefix=/usr/local
make
make test
sudo make install
make clean
./Configure no-shared no-asm --prefix=/usr/local darwin-ppc-cc
make build_libs "CC=cc -arch ppc"
lipo -info lib*
mkdir -p build/ppc
mv lib* build/ppc
make clean
./Configure no-shared no-asm --prefix=/usr/local darwin-i386-cc
make build_libs "CC=cc -arch i386"
lipo -info lib*
mkdir -p build/i386
mv lib* build/i386/
/bin/ls -1 build/i386/ > libnames.tmp
mkdir universal
Create a script in the OpenSSL directory called 'make_universal', with these
contents:
#!/bin/sh
for lib in `cat libnames.tmp`; do
lipo -create build/*/$lib -output universal/$lib
done
exit 0
Then:
sh make_universal
lipo -info universal/lib*
sudo cp universal/lib* /usr/local/lib
lipo -info /usr/local/lib/lib{crypto,ssl}*
cd ../m2crypto-0.17
Then edit the m2crypto setup.py and uncomment the extra_link_args line at
the end.
python setup.py build build_ext --openssl=/usr/local
sudo python setup.py install build_ext --openssl=/usr/local
Copyright (c) 1999-2004 Ng Pheng Siong. All rights reserved.
Portions copyright (c) 2004-2006 Open Source Applications Foundation.
All rights reserved.
Portions copyright (c) 2005-2006 Vrije Universiteit Amsterdam.
All rights reserved.
Copyright (c) 2008-2010 Heikki Toivonen. All rights reserved.
Permission to use, copy, modify, and distribute this software and its
documentation for any purpose and without fee is hereby granted,
provided that the above copyright notice appear in all copies and that
both that copyright notice and this permission notice appear in
supporting documentation.
THE AUTHOR PROVIDES THIS SOFTWARE ``AS IS'' AND ANY EXPRESSED OR
IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
M2Crypto wrapper for OpenSSL ASN1 API.
Copyright (c) 1999-2004 Ng Pheng Siong. All rights reserved.
Portions created by Open Source Applications Foundation (OSAF) are
Copyright (C) 2005 OSAF. All Rights Reserved.
"""
import time, datetime
import BIO
import m2
MBSTRING_FLAG = 0x1000
MBSTRING_ASC = MBSTRING_FLAG | 1
MBSTRING_BMP = MBSTRING_FLAG | 2
class ASN1_Integer:
m2_asn1_integer_free = m2.asn1_integer_free
def __init__(self, asn1int, _pyfree=0):
self.asn1int = asn1int
self._pyfree = _pyfree
def __cmp__(self, other):
return m2.asn1_integer_cmp(self.asn1int, other.asn1int)
def __del__(self):
if self._pyfree:
self.m2_asn1_integer_free(self.asn1int)
class ASN1_String:
m2_asn1_string_free = m2.asn1_string_free
def __init__(self, asn1str, _pyfree=0):
self.asn1str = asn1str
self._pyfree = _pyfree
def __str__(self):
buf = BIO.MemoryBuffer()
m2.asn1_string_print( buf.bio_ptr(), self.asn1str )
return buf.read_all()
def __del__(self):
if getattr(self, '_pyfree', 0):
self.m2_asn1_string_free(self.asn1str)
def _ptr(self):
return self.asn1str
def as_text(self, flags=0):
buf = BIO.MemoryBuffer()
m2.asn1_string_print_ex( buf.bio_ptr(), self.asn1str, flags)
return buf.read_all()
class ASN1_Object:
m2_asn1_object_free = m2.asn1_object_free
def __init__(self, asn1obj, _pyfree=0):
self.asn1obj = asn1obj
self._pyfree = _pyfree
def __del__(self):
if self._pyfree:
self.m2_asn1_object_free(self.asn1obj)
def _ptr(self):
return self.asn1obj
class _UTC(datetime.tzinfo):
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return datetime.timedelta(0)
def utcoffset(self, dt):
return datetime.timedelta(0)
def __repr__(self):
return "<Timezone: %s>" % self.tzname(None)
UTC = _UTC()
class LocalTimezone(datetime.tzinfo):
""" Localtimezone from datetime manual """
def __init__(self):
self._stdoffset = datetime.timedelta(seconds = -time.timezone)
if time.daylight:
self._dstoffset = datetime.timedelta(seconds = -time.altzone)
else:
self._dstoffset = self._stdoffset
self._dstdiff = self._dstoffset - self._stdoffset
def utcoffset(self, dt):
if self._isdst(dt):
return self._dstoffset
else:
return self._stdoffset
def dst(self, dt):
if self._isdst(dt):
return self._dstdiff
else:
return datetime.timedelta(0)
def tzname(self, dt):
return time.tzname[self._isdst(dt)]
def _isdst(self, dt):
tt = (dt.year, dt.month, dt.day,
dt.hour, dt.minute, dt.second,
dt.weekday(), 0, -1)
stamp = time.mktime(tt)
tt = time.localtime(stamp)
return tt.tm_isdst > 0
class ASN1_UTCTIME:
_ssl_months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug",
"Sep", "Oct", "Nov", "Dec"]
m2_asn1_utctime_free = m2.asn1_utctime_free
def __init__(self, asn1_utctime=None, _pyfree=0):
if asn1_utctime is not None:
assert m2.asn1_utctime_type_check(asn1_utctime), "'asn1_utctime' type error'"
self.asn1_utctime = asn1_utctime
self._pyfree = _pyfree
else:
self.asn1_utctime = m2.asn1_utctime_new ()
self._pyfree = 1
def __del__(self):
if getattr(self, '_pyfree', 0):
self.m2_asn1_utctime_free(self.asn1_utctime)
def __str__(self):
assert m2.asn1_utctime_type_check(self.asn1_utctime), "'asn1_utctime' type error'"
buf = BIO.MemoryBuffer()
m2.asn1_utctime_print( buf.bio_ptr(), self.asn1_utctime )
return buf.read_all()
def _ptr(self):
assert m2.asn1_utctime_type_check(self.asn1_utctime), "'asn1_utctime' type error'"
return self.asn1_utctime
def set_string (self, string):
"""
Set time from UTC string.
"""
assert m2.asn1_utctime_type_check(self.asn1_utctime), "'asn1_utctime' type error'"
return m2.asn1_utctime_set_string( self.asn1_utctime, string )
def set_time (self, time):
"""
Set time from seconds since epoch (long).
"""
assert m2.asn1_utctime_type_check(self.asn1_utctime), "'asn1_utctime' type error'"
return m2.asn1_utctime_set( self.asn1_utctime, time )
def get_datetime(self):
date = str(self)
timezone = None
if ' ' not in date:
raise ValueError("Invalid date: %s" % date)
month, rest = date.split(' ', 1)
if month not in self._ssl_months:
raise ValueError("Invalid date %s: Invalid month: %s" % (date, month))
if rest.endswith(' GMT'):
timezone = UTC
rest = rest[:-4]
tm = list(time.strptime(rest, "%d %H:%M:%S %Y"))[:6]
tm[1] = self._ssl_months.index(month) + 1
tm.append(0)
tm.append(timezone)
return datetime.datetime(*tm)
def set_datetime(self, date):
local = LocalTimezone()
if date.tzinfo is None:
date = date.replace(tzinfo=local)
date = date.astimezone(local)
return self.set_time(int(time.mktime(date.timetuple())))
"""Secure Authenticator Cookies
Copyright (c) 1999-2002 Ng Pheng Siong. All rights reserved."""
# M2Crypto
import Rand, m2
# Python. Cookie is bundled with Python 2.x.
import Cookie, binascii, re, time
_MIX_FORMAT = 'exp=%s&data=%s&digest='
_MIX_RE = re.compile('exp=(\d+\.\d+)&data=(.+)&digest=(\S*)')
def mix(expiry, data, format=_MIX_FORMAT):
return format % (repr(expiry), data)
def unmix(dough, regex=_MIX_RE):
mo = regex.match(dough)
if mo:
return float(mo.group(1)), mo.group(2)
else:
return None
def unmix3(dough, regex=_MIX_RE):
mo = regex.match(dough)
if mo:
return float(mo.group(1)), mo.group(2), mo.group(3)
else:
return None
_TOKEN = '_M2AUTH_'
class AuthCookieJar:
_keylen = 20
def __init__(self):
self._key = Rand.rand_bytes(self._keylen)
def _hmac(self, key, data):
return binascii.b2a_base64(m2.hmac(key, data, m2.sha1()))[:-1]
def makeCookie(self, expiry, data):
dough = mix(expiry, data)
return AuthCookie(expiry, data, dough, self._hmac(self._key, dough))
def isGoodCookie(self, cookie):
assert isinstance(cookie, AuthCookie)
if cookie.isExpired():
return 0
c = self.makeCookie(cookie._expiry, cookie._data)
return (c._expiry == cookie._expiry) \
and (c._data == cookie._data) \
and (c._mac == cookie._mac) \
and (c.output() == cookie.output())
def isGoodCookieString(self, cookie_str):
c = Cookie.SmartCookie()
c.load(cookie_str)
if not c.has_key(_TOKEN):
return 0
undough = unmix3(c[_TOKEN].value)
if undough is None:
return 0
exp, data, mac = undough
c2 = self.makeCookie(exp, data)
return (not c2.isExpired()) and (c2._mac == mac)
class AuthCookie:
def __init__(self, expiry, data, dough, mac):
self._expiry = expiry
self._data = data
self._mac = mac
self._cookie = Cookie.SmartCookie()
self._cookie[_TOKEN] = '%s%s' % (dough, mac)
self._name = '%s%s' % (dough, mac) # XXX WebKit only.
def expiry(self):
"""Return the cookie's expiry time."""
return self._expiry
def data(self):
"""Return the data portion of the cookie."""
return self._data
def mac(self):
"""Return the cookie's MAC."""
return self._mac
def output(self):
"""Return the cookie's output in "Set-Cookie" format."""
return self._cookie.output()
def value(self):
"""Return the cookie's output minus the "Set-Cookie: " portion.
"""
return self._cookie[_TOKEN].value
def isExpired(self):
"""Return 1 if the cookie has expired, 0 otherwise."""
return (time.time() > self._expiry)
# XXX Following methods are for WebKit only. These should be pushed
# to WKAuthCookie.
def name(self):
return self._name
def headerValue(self):
return self.value()
"""M2Crypto wrapper for OpenSSL BIO API.
Copyright (c) 1999-2004 Ng Pheng Siong. All rights reserved."""
import m2
# Deprecated
from m2 import bio_do_handshake as bio_do_ssl_handshake
from cStringIO import StringIO
class BIOError(Exception): pass
m2.bio_init(BIOError)
class BIO:
"""Abstract object interface to the BIO API."""
m2_bio_free = m2.bio_free
def __init__(self, bio=None, _pyfree=0, _close_cb=None):
self.bio = bio
self._pyfree = _pyfree
self._close_cb = _close_cb
self.closed = 0
self.write_closed = 0
def __del__(self):
if self._pyfree:
self.m2_bio_free(self.bio)
def _ptr(self):
return self.bio
# Deprecated.
bio_ptr = _ptr
def fileno(self):
return m2.bio_get_fd(self.bio)
def readable(self):
return not self.closed
def read(self, size=None):
if not self.readable():
raise IOError, 'cannot read'
if size is None:
buf = StringIO()
while 1:
data = m2.bio_read(self.bio, 4096)
if not data: break
buf.write(data)
return buf.getvalue()
elif size == 0:
return ''
elif size < 0:
raise ValueError, 'read count is negative'
else:
return m2.bio_read(self.bio, size)
def readline(self, size=4096):
if not self.readable():
raise IOError, 'cannot read'
buf = m2.bio_gets(self.bio, size)
return buf
def readlines(self, sizehint='ignored'):
if not self.readable():
raise IOError, 'cannot read'
lines=[]
while 1:
buf=m2.bio_gets(self.bio, 4096)
if buf is None:
break
lines.append(buf)
return lines
def writeable(self):
return (not self.closed) and (not self.write_closed)
def write(self, data):
if not self.writeable():
raise IOError, 'cannot write'
return m2.bio_write(self.bio, data)
def write_close(self):
self.write_closed = 1
def flush(self):
m2.bio_flush(self.bio)
def reset(self):
"""
Sets the bio to its initial state
"""
return m2.bio_reset(self.bio)
def close(self):
self.closed = 1
if self._close_cb:
self._close_cb()
def should_retry(self):
"""
Can the call be attempted again, or was there an error
ie do_handshake
"""
return m2.bio_should_retry(self.bio)
def should_read(self):
"""
Returns whether the cause of the condition is the bio
should read more data
"""
return m2.bio_should_read(self.bio)
def should_write(self):
"""
Returns whether the cause of the condition is the bio
should write more data
"""
return m2.bio_should_write(self.bio)
class MemoryBuffer(BIO):
"""
Object interface to BIO_s_mem.
Empirical testing suggests that this class performs less well than cStringIO,
because cStringIO is implemented in C, whereas this class is implemented in
Python. Thus, the recommended practice is to use cStringIO for regular work and
convert said cStringIO object to a MemoryBuffer object only when necessary.
"""
def __init__(self, data=None):
BIO.__init__(self)
self.bio = m2.bio_new(m2.bio_s_mem())
self._pyfree = 1
if data is not None:
m2.bio_write(self.bio, data)
def __len__(self):
return m2.bio_ctrl_pending(self.bio)
def read(self, size=0):
if not self.readable():
raise IOError, 'cannot read'
if size:
return m2.bio_read(self.bio, size)
else:
return m2.bio_read(self.bio, m2.bio_ctrl_pending(self.bio))
# Backwards-compatibility.
getvalue = read_all = read
def write_close(self):
self.write_closed = 1
m2.bio_set_mem_eof_return(self.bio, 0)
close = write_close
class File(BIO):
"""
Object interface to BIO_s_fp.
This class interfaces Python to OpenSSL functions that expect BIO *. For
general file manipulation in Python, use Python's builtin file object.
"""
def __init__(self, pyfile, close_pyfile=1):
BIO.__init__(self, _pyfree=1)
self.pyfile = pyfile
self.close_pyfile = close_pyfile
self.bio = m2.bio_new_fp(pyfile, 0)
def close(self):
self.closed = 1
if self.close_pyfile:
self.pyfile.close()
def openfile(filename, mode='rb'):
return File(open(filename, mode))
class IOBuffer(BIO):
"""
Object interface to BIO_f_buffer.
Its principal function is to be BIO_push()'ed on top of a BIO_f_ssl, so
that makefile() of said underlying SSL socket works.
"""
m2_bio_pop = m2.bio_pop
m2_bio_free = m2.bio_free
def __init__(self, under_bio, mode='rwb', _pyfree=1):
BIO.__init__(self, _pyfree=_pyfree)
self.io = m2.bio_new(m2.bio_f_buffer())
self.bio = m2.bio_push(self.io, under_bio._ptr())
# This reference keeps the underlying BIO alive while we're not closed.
self._under_bio = under_bio
if 'w' in mode:
self.write_closed = 0
else:
self.write_closed = 1
def __del__(self):
if getattr(self, '_pyfree', 0):
self.m2_bio_pop(self.bio)
self.m2_bio_free(self.io)
def close(self):
BIO.close(self)
class CipherStream(BIO):
"""
Object interface to BIO_f_cipher.
"""
SALT_LEN = m2.PKCS5_SALT_LEN
m2_bio_pop = m2.bio_pop
m2_bio_free = m2.bio_free
def __init__(self, obio):
BIO.__init__(self, _pyfree=1)
self.obio = obio
self.bio = m2.bio_new(m2.bio_f_cipher())
self.closed = 0
def __del__(self):
if not getattr(self, 'closed', 1):
self.close()
def close(self):
self.m2_bio_pop(self.bio)
self.m2_bio_free(self.bio)
self.closed = 1
def write_close(self):
self.obio.write_close()
def set_cipher(self, algo, key, iv, op):
cipher = getattr(m2, algo, None)
if cipher is None:
raise ValueError, ('unknown cipher', algo)
m2.bio_set_cipher(self.bio, cipher(), key, iv, op)
m2.bio_push(self.bio, self.obio._ptr())
class SSLBio(BIO):
"""
Object interface to BIO_f_ssl
"""
def __init__(self, _pyfree=1):
BIO.__init__(self, _pyfree)
self.bio = m2.bio_new(m2.bio_f_ssl())
self.closed = 0
def set_ssl(self, conn, close_flag=m2.bio_noclose):
"""
Sets the bio to the SSL pointer which is
contained in the connection object.
"""
self._pyfree = 0
m2.bio_set_ssl(self.bio, conn.ssl, close_flag)
if close_flag == m2.bio_noclose:
conn.set_ssl_close_flag(m2.bio_close)
def do_handshake(self):
"""
Do the handshake.
Return 1 if the handshake completes
Return 0 or a negative number if there is a problem
"""
return m2.bio_do_handshake(self.bio)
"""
M2Crypto wrapper for OpenSSL BN (BIGNUM) API.
Copyright (c) 2005 Open Source Applications Foundation. All rights reserved.
"""
import m2
def rand(bits, top=-1, bottom=0):
"""
Generate cryptographically strong random number.
@param bits: Length of random number in bits.
@param top: If -1, the most significant bit can be 0. If 0, the most
significant bit is 1, and if 1, the two most significant
bits will be 1.
@param bottom: If bottom is true, the number will be odd.
"""
return m2.bn_rand(bits, top, bottom)
def rand_range(range):
"""
Generate a random number in a range.
@param range: Upper limit for range.
@return: A random number in the range [0, range)
"""
return m2.bn_rand_range(range)
def randfname(length):
"""
Return a random filename, which is simply a string where all
the characters are from the set [a-zA-Z0-9].
@param length: Length of filename to return.
@type length: int
@return: random filename string
"""
letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890'
lettersLen = len(letters)
fname = []
for x in range(length):
fname += [letters[m2.bn_rand_range(lettersLen)]]
return ''.join(fname)
"""M2Crypto wrapper for OpenSSL DH API.
Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved."""
from util import genparam_callback
import BIO, Err, m2
class DHError(Exception): pass
m2.dh_init(DHError)
class DH:
"""
Object interface to the Diffie-Hellman key exchange
protocol.
"""
m2_dh_free = m2.dh_free
def __init__(self, dh, _pyfree=0):
assert m2.dh_type_check(dh)
self.dh = dh
self._pyfree = _pyfree
def __del__(self):
if getattr(self, '_pyfree', 0):
self.m2_dh_free(self.dh)
def __len__(self):
assert m2.dh_type_check(self.dh), "'dh' type error"
return m2.dh_size(self.dh)
def __getattr__(self, name):
if name in ('p', 'g', 'pub', 'priv'):
method = getattr(m2, 'dh_get_%s' % (name,))
assert m2.dh_type_check(self.dh), "'dh' type error"
return method(self.dh)
else:
raise AttributeError
def __setattr__(self, name, value):
if name in ('p', 'g'):
raise DHError, 'set (p, g) via set_params()'
elif name in ('pub','priv'):
raise DHError, 'generate (pub, priv) via gen_key()'
else:
self.__dict__[name] = value
def _ptr(self):
return self.dh
def check_params(self):
assert m2.dh_type_check(self.dh), "'dh' type error"
return m2.dh_check(self.dh)
def gen_key(self):
assert m2.dh_type_check(self.dh), "'dh' type error"
m2.dh_generate_key(self.dh)
def compute_key(self, pubkey):
assert m2.dh_type_check(self.dh), "'dh' type error"
return m2.dh_compute_key(self.dh, pubkey)
def print_params(self, bio):
assert m2.dh_type_check(self.dh), "'dh' type error"
return m2.dhparams_print(bio._ptr(), self.dh)
def gen_params(plen, g, callback=genparam_callback):
return DH(m2.dh_generate_parameters(plen, g, callback), 1)
def load_params(file):
bio = BIO.openfile(file)
return load_params_bio(bio)
def load_params_bio(bio):
return DH(m2.dh_read_parameters(bio._ptr()), 1)
def set_params(p, g):
dh = m2.dh_new()
m2.dh_set_p(dh, p)
m2.dh_set_g(dh, g)
return DH(dh, 1)
#def free_params(cptr):
# m2.dh_free(cptr)
DH_GENERATOR_2 = m2.DH_GENERATOR_2
DH_GENERATOR_5 = m2.DH_GENERATOR_5
"""
M2Crypto wrapper for OpenSSL DSA API.
Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved.
Portions created by Open Source Applications Foundation (OSAF) are
Copyright (C) 2004 OSAF. All Rights Reserved.
"""
import sys
import util, BIO, m2
class DSAError(Exception): pass
m2.dsa_init(DSAError)
class DSA:
"""
This class is a context supporting DSA key and parameter
values, signing and verifying.
Simple example::
from M2Crypto import EVP, DSA, util
message = 'Kilroy was here!'
md = EVP.MessageDigest('sha1')
md.update(message)
digest = md.final()
dsa = DSA.gen_params(1024)
dsa.gen_key()
r, s = dsa.sign(digest)
good = dsa.verify(digest, r, s)
if good:
print ' ** success **'
else:
print ' ** verification failed **'
"""
m2_dsa_free = m2.dsa_free
def __init__(self, dsa, _pyfree=0):
"""
Use one of the factory functions to create an instance.
"""
assert m2.dsa_type_check(dsa), "'dsa' type error"
self.dsa = dsa
self._pyfree = _pyfree
def __del__(self):
if getattr(self, '_pyfree', 0):
self.m2_dsa_free(self.dsa)
def __len__(self):
"""
Return the key length.
@rtype: int
@return: the DSA key length in bits
"""
assert m2.dsa_type_check(self.dsa), "'dsa' type error"
return m2.dsa_keylen(self.dsa)
def __getattr__(self, name):
"""
Return specified DSA parameters and key values.
@type name: str
@param name: name of variable to be returned. Must be
one of 'p', 'q', 'g', 'pub', 'priv'.
@rtype: str
@return: value of specified variable (a "byte string")
"""
if name in ['p', 'q', 'g', 'pub', 'priv']:
method = getattr(m2, 'dsa_get_%s' % (name,))
assert m2.dsa_type_check(self.dsa), "'dsa' type error"
return method(self.dsa)
else:
raise AttributeError
def __setattr__(self, name, value):
if name in ['p', 'q', 'g']:
raise DSAError('set (p, q, g) via set_params()')
elif name in ['pub','priv']:
raise DSAError('generate (pub, priv) via gen_key()')
else:
self.__dict__[name] = value
def set_params(self, p, q, g):
"""
Set new parameters.
@warning: This does not change the private key, so it may be
unsafe to use this method. It is better to use
gen_params function to create a new DSA object.
"""
m2.dsa_set_p(self.dsa, p)
m2.dsa_set_q(self.dsa, q)
m2.dsa_set_g(self.dsa, g)
def gen_key(self):
"""
Generate a key pair.
"""
assert m2.dsa_type_check(self.dsa), "'dsa' type error"
m2.dsa_gen_key(self.dsa)
def save_params(self, filename):
"""
Save the DSA parameters to a file.
@type filename: str
@param filename: Save the DSA parameters to this file.
@return: 1 (true) if successful
"""
bio = BIO.openfile(filename, 'wb')
ret = m2.dsa_write_params_bio(self.dsa, bio._ptr())
bio.close()
return ret
def save_params_bio(self, bio):
"""
Save DSA parameters to a BIO object.
@type bio: M2Crypto.BIO object
@param bio: Save DSA parameters to this object.
@return: 1 (true) if successful
"""
return m2.dsa_write_params_bio(self.dsa, bio._ptr())
def save_key(self, filename, cipher='aes_128_cbc',
callback=util.passphrase_callback):
"""
Save the DSA key pair to a file.
@type filename: str
@param filename: Save the DSA key pair to this file.
@type cipher: str
@param cipher: name of symmetric key algorithm and mode
to encrypt the private key.
@return: 1 (true) if successful
"""
bio = BIO.openfile(filename, 'wb')
ret = self.save_key_bio(bio, cipher, callback)
bio.close()
return ret
def save_key_bio(self, bio, cipher='aes_128_cbc',
callback=util.passphrase_callback):
"""
Save DSA key pair to a BIO object.
@type bio: M2Crypto.BIO object
@param bio: Save DSA parameters to this object.
@type cipher: str
@param cipher: name of symmetric key algorithm and mode
to encrypt the private key.
@return: 1 (true) if successful
"""
if cipher is None:
return m2.dsa_write_key_bio_no_cipher(self.dsa,
bio._ptr(), callback)
else:
ciph = getattr(m2, cipher, None)
if ciph is None:
raise DSAError('no such cipher: %s' % cipher)
else:
ciph = ciph()
return m2.dsa_write_key_bio(self.dsa, bio._ptr(), ciph, callback)
def save_pub_key(self, filename):
"""
Save the DSA public key (with parameters) to a file.
@type filename: str
@param filename: Save DSA public key (with parameters)
to this file.
@return: 1 (true) if successful
"""
bio = BIO.openfile(filename, 'wb')
ret = self.save_pub_key_bio(bio)
bio.close()
return ret
def save_pub_key_bio(self, bio):
"""
Save DSA public key (with parameters) to a BIO object.
@type bio: M2Crypto.BIO object
@param bio: Save DSA public key (with parameters)
to this object.
@return: 1 (true) if successful
"""
return m2.dsa_write_pub_key_bio(self.dsa, bio._ptr())
def sign(self, digest):
"""
Sign the digest.
@type digest: str
@param digest: SHA-1 hash of message (same as output
from MessageDigest, a "byte string")
@rtype: tuple
@return: DSA signature, a tuple of two values, r and s,
both "byte strings".
"""
assert self.check_key(), 'key is not initialised'
return m2.dsa_sign(self.dsa, digest)
def verify(self, digest, r, s):
"""
Verify a newly calculated digest against the signature
values r and s.
@type digest: str
@param digest: SHA-1 hash of message (same as output
from MessageDigest, a "byte string")
@type r: str
@param r: r value of the signature, a "byte string"
@type s: str
@param s: s value of the signature, a "byte string"
@rtype: int
@return: 1 (true) if verify succeeded, 0 if failed
"""
assert self.check_key(), 'key is not initialised'
return m2.dsa_verify(self.dsa, digest, r, s)
def sign_asn1(self, digest):
assert self.check_key(), 'key is not initialised'
return m2.dsa_sign_asn1(self.dsa, digest)
def verify_asn1(self, digest, blob):
assert self.check_key(), 'key is not initialised'
return m2.dsa_verify_asn1(self.dsa, digest, blob)
def check_key(self):
"""
Check to be sure the DSA object has a valid private key.
@rtype: int
@return: 1 (true) if a valid private key
"""
assert m2.dsa_type_check(self.dsa), "'dsa' type error"
return m2.dsa_check_key(self.dsa)
class DSA_pub(DSA):
"""
This class is a DSA context that only supports a public key
and verification. It does NOT support a private key or
signing.
"""
def sign(self, *argv):
raise DSAError('DSA_pub object has no private key')
sign_asn1 = sign
def check_key(self):
return m2.dsa_check_pub_key(self.dsa)
save_key = DSA.save_pub_key
save_key_bio = DSA.save_pub_key_bio
#---------------------------------------------------------------
# factories and other functions
def gen_params(bits, callback=util.genparam_callback):
"""
Factory function that generates DSA parameters and
instantiates a DSA object from the output.
@type bits: int
@param bits: The length of the prime to be generated. If
'bits' < 512, it is set to 512.
@type callback: function
@param callback: A Python callback object that will be
invoked during parameter generation; it usual
purpose is to provide visual feedback.
@rtype: DSA
@return: instance of DSA.
"""
dsa = m2.dsa_generate_parameters(bits, callback)
if dsa is None:
raise DSAError('problem generating DSA parameters')
return DSA(dsa, 1)
def set_params(p, q, g):
"""
Factory function that instantiates a DSA object with DSA
parameters.
@type p: str
@param p: value of p, a "byte string"
@type q: str
@param q: value of q, a "byte string"
@type g: str
@param g: value of g, a "byte string"
@rtype: DSA
@return: instance of DSA.
"""
dsa = m2.dsa_new()
m2.dsa_set_p(dsa, p)
m2.dsa_set_q(dsa, q)
m2.dsa_set_g(dsa, g)
return DSA(dsa, 1)
def load_params(file, callback=util.passphrase_callback):
"""
Factory function that instantiates a DSA object with DSA
parameters from a file.
@type file: str
@param file: Names the file (a path) that contains the PEM
representation of the DSA parameters.
@type callback: A Python callable
@param callback: A Python callback object that will be
invoked if the DSA parameters file is
passphrase-protected.
@rtype: DSA
@return: instance of DSA.
"""
bio = BIO.openfile(file)
ret = load_params_bio(bio, callback)
bio.close()
return ret
def load_params_bio(bio, callback=util.passphrase_callback):
"""
Factory function that instantiates a DSA object with DSA
parameters from a M2Crypto.BIO object.
@type bio: M2Crypto.BIO object
@param bio: Contains the PEM representation of the DSA
parameters.
@type callback: A Python callable
@param callback: A Python callback object that will be
invoked if the DSA parameters file is
passphrase-protected.
@rtype: DSA
@return: instance of DSA.
"""
dsa = m2.dsa_read_params(bio._ptr(), callback)
if dsa is None:
raise DSAError('problem loading DSA parameters')
return DSA(dsa, 1)
def load_key(file, callback=util.passphrase_callback):
"""
Factory function that instantiates a DSA object from a
PEM encoded DSA key pair.
@type file: str
@param file: Names the file (a path) that contains the PEM
representation of the DSA key pair.
@type callback: A Python callable
@param callback: A Python callback object that will be
invoked if the DSA key pair is
passphrase-protected.
@rtype: DSA
@return: instance of DSA.
"""
bio = BIO.openfile(file)
ret = load_key_bio(bio, callback)
bio.close()
return ret
def load_key_bio(bio, callback=util.passphrase_callback):
"""
Factory function that instantiates a DSA object from a
PEM encoded DSA key pair.
@type bio: M2Crypto.BIO object
@param bio: Contains the PEM representation of the DSA
key pair.
@type callback: A Python callable
@param callback: A Python callback object that will be
invoked if the DSA key pair is
passphrase-protected.
@rtype: DSA
@return: instance of DSA.
"""
dsa = m2.dsa_read_key(bio._ptr(), callback)
if not dsa:
raise DSAError('problem loading DSA key pair')
return DSA(dsa, 1)
def load_pub_key(file, callback=util.passphrase_callback):
"""
Factory function that instantiates a DSA_pub object using
a DSA public key contained in PEM file. The PEM file
must contain the parameters in addition to the public key.
@type file: str
@param file: Names the file (a path) that contains the PEM
representation of the DSA public key.
@type callback: A Python callable
@param callback: A Python callback object that will be
invoked should the DSA public key be
passphrase-protected.
@rtype: DSA_pub
@return: instance of DSA_pub.
"""
bio = BIO.openfile(file)
ret = load_pub_key_bio(bio, callback)
bio.close()
return ret
def load_pub_key_bio(bio, callback=util.passphrase_callback):
"""
Factory function that instantiates a DSA_pub object using
a DSA public key contained in PEM format. The PEM
must contain the parameters in addition to the public key.
@type bio: M2Crypto.BIO object
@param bio: Contains the PEM representation of the DSA
public key (with params).
@type callback: A Python callable
@param callback: A Python callback object that will be
invoked should the DSA public key be
passphrase-protected.
@rtype: DSA_pub
@return: instance of DSA_pub.
"""
dsapub = m2.dsa_read_pub_key(bio._ptr(), callback)
if not dsapub:
raise DSAError('problem loading DSA public key')
return DSA_pub(dsapub, 1)
"""
M2Crypto wrapper for OpenSSL ECDH/ECDSA API.
@requires: OpenSSL 0.9.8 or newer
Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved.
Portions copyright (c) 2005-2006 Vrije Universiteit Amsterdam.
All rights reserved."""
import util, BIO, m2
class ECError(Exception): pass
m2.ec_init(ECError)
# Curve identifier constants
NID_secp112r1 = m2.NID_secp112r1
NID_secp112r2 = m2.NID_secp112r2
NID_secp128r1 = m2.NID_secp128r1
NID_secp128r2 = m2.NID_secp128r2
NID_secp160k1 = m2.NID_secp160k1
NID_secp160r1 = m2.NID_secp160r1
NID_secp160r2 = m2.NID_secp160r2
NID_secp192k1 = m2.NID_secp192k1
NID_secp224k1 = m2.NID_secp224k1
NID_secp224r1 = m2.NID_secp224r1
NID_secp256k1 = m2.NID_secp256k1
NID_secp384r1 = m2.NID_secp384r1
NID_secp521r1 = m2.NID_secp521r1
NID_sect113r1 = m2.NID_sect113r1
NID_sect113r2 = m2.NID_sect113r2
NID_sect131r1 = m2.NID_sect131r1
NID_sect131r2 = m2.NID_sect131r2
NID_sect163k1 = m2.NID_sect163k1
NID_sect163r1 = m2.NID_sect163r1
NID_sect163r2 = m2.NID_sect163r2
NID_sect193r1 = m2.NID_sect193r1
NID_sect193r2 = m2.NID_sect193r2
NID_sect233k1 = m2.NID_sect233k1 # default for secg.org TLS test server
NID_sect233r1 = m2.NID_sect233r1
NID_sect239k1 = m2.NID_sect239k1
NID_sect283k1 = m2.NID_sect283k1
NID_sect283r1 = m2.NID_sect283r1
NID_sect409k1 = m2.NID_sect409k1
NID_sect409r1 = m2.NID_sect409r1
NID_sect571k1 = m2.NID_sect571k1
NID_sect571r1 = m2.NID_sect571r1
NID_X9_62_prime192v1 = m2.NID_X9_62_prime192v1
NID_X9_62_prime192v2 = m2.NID_X9_62_prime192v2
NID_X9_62_prime192v3 = m2.NID_X9_62_prime192v3
NID_X9_62_prime239v1 = m2.NID_X9_62_prime239v1
NID_X9_62_prime239v2 = m2.NID_X9_62_prime239v2
NID_X9_62_prime239v3 = m2.NID_X9_62_prime239v3
NID_X9_62_prime256v1 = m2.NID_X9_62_prime256v1
NID_X9_62_c2pnb163v1 = m2.NID_X9_62_c2pnb163v1
NID_X9_62_c2pnb163v2 = m2.NID_X9_62_c2pnb163v2
NID_X9_62_c2pnb163v3 = m2.NID_X9_62_c2pnb163v3
NID_X9_62_c2pnb176v1 = m2.NID_X9_62_c2pnb176v1
NID_X9_62_c2tnb191v1 = m2.NID_X9_62_c2tnb191v1
NID_X9_62_c2tnb191v2 = m2.NID_X9_62_c2tnb191v2
NID_X9_62_c2tnb191v3 = m2.NID_X9_62_c2tnb191v3
NID_X9_62_c2pnb208w1 = m2.NID_X9_62_c2pnb208w1
NID_X9_62_c2tnb239v1 = m2.NID_X9_62_c2tnb239v1
NID_X9_62_c2tnb239v2 = m2.NID_X9_62_c2tnb239v2
NID_X9_62_c2tnb239v3 = m2.NID_X9_62_c2tnb239v3
NID_X9_62_c2pnb272w1 = m2.NID_X9_62_c2pnb272w1
NID_X9_62_c2pnb304w1 = m2.NID_X9_62_c2pnb304w1
NID_X9_62_c2tnb359v1 = m2.NID_X9_62_c2tnb359v1
NID_X9_62_c2pnb368w1 = m2.NID_X9_62_c2pnb368w1
NID_X9_62_c2tnb431r1 = m2.NID_X9_62_c2tnb431r1
NID_wap_wsg_idm_ecid_wtls1 = m2.NID_wap_wsg_idm_ecid_wtls1
NID_wap_wsg_idm_ecid_wtls3 = m2.NID_wap_wsg_idm_ecid_wtls3
NID_wap_wsg_idm_ecid_wtls4 = m2.NID_wap_wsg_idm_ecid_wtls4
NID_wap_wsg_idm_ecid_wtls5 = m2.NID_wap_wsg_idm_ecid_wtls5
NID_wap_wsg_idm_ecid_wtls6 = m2.NID_wap_wsg_idm_ecid_wtls6
NID_wap_wsg_idm_ecid_wtls7 = m2.NID_wap_wsg_idm_ecid_wtls7
NID_wap_wsg_idm_ecid_wtls8 = m2.NID_wap_wsg_idm_ecid_wtls8
NID_wap_wsg_idm_ecid_wtls9 = m2.NID_wap_wsg_idm_ecid_wtls9
NID_wap_wsg_idm_ecid_wtls10 = m2.NID_wap_wsg_idm_ecid_wtls10
NID_wap_wsg_idm_ecid_wtls11 = m2.NID_wap_wsg_idm_ecid_wtls11
NID_wap_wsg_idm_ecid_wtls12 = m2.NID_wap_wsg_idm_ecid_wtls12
# The following two curves, according to OpenSSL, have a
# "Questionable extension field!" and are not supported by
# the OpenSSL inverse function. ECError: no inverse.
# As such they cannot be used for signing. They might,
# however, be usable for encryption but that has not
# been tested. Until thir usefulness can be established,
# they are not supported at this time.
# NID_ipsec3 = m2.NID_ipsec3
# NID_ipsec4 = m2.NID_ipsec4
class EC:
"""
Object interface to a EC key pair.
"""
m2_ec_key_free = m2.ec_key_free
def __init__(self, ec, _pyfree=0):
assert m2.ec_key_type_check(ec), "'ec' type error"
self.ec = ec
self._pyfree = _pyfree
def __del__(self):
if getattr(self, '_pyfree', 0):
self.m2_ec_key_free(self.ec)
def __len__(self):
assert m2.ec_key_type_check(self.ec), "'ec' type error"
return m2.ec_key_keylen(self.ec)
def gen_key(self):
"""
Generates the key pair from its parameters. Use::
keypair = EC.gen_params(curve)
keypair.gen_key()
to create an EC key pair.
"""
assert m2.ec_key_type_check(self.ec), "'ec' type error"
m2.ec_key_gen_key(self.ec)
def pub(self):
# Don't let python free
return EC_pub(self.ec, 0)
def sign_dsa(self, digest):
"""
Sign the given digest using ECDSA. Returns a tuple (r,s), the two
ECDSA signature parameters.
"""
assert self._check_key_type(), "'ec' type error"
return m2.ecdsa_sign(self.ec, digest)
def verify_dsa(self, digest, r, s):
"""
Verify the given digest using ECDSA. r and s are the ECDSA
signature parameters.
"""
assert self._check_key_type(), "'ec' type error"
return m2.ecdsa_verify(self.ec, digest, r, s)
def sign_dsa_asn1(self, digest):
assert self._check_key_type(), "'ec' type error"
return m2.ecdsa_sign_asn1(self.ec, digest)
def verify_dsa_asn1(self, digest, blob):
assert self._check_key_type(), "'ec' type error"
return m2.ecdsa_verify_asn1(self.ec, digest, blob)
def compute_dh_key(self,pub_key):
"""
Compute the ECDH shared key of this key pair and the given public
key object. They must both use the same curve. Returns the
shared key in binary as a buffer object. No Key Derivation Function is
applied.
"""
assert self.check_key(), 'key is not initialised'
return m2.ecdh_compute_key(self.ec, pub_key.ec)
def save_key_bio(self, bio, cipher='aes_128_cbc', callback=util.passphrase_callback):
"""
Save the key pair to an M2Crypto.BIO.BIO object in PEM format.
@type bio: M2Crypto.BIO.BIO
@param bio: M2Crypto.BIO.BIO object to save key to.
@type cipher: string
@param cipher: Symmetric cipher to protect the key. The default
cipher is 'aes_128_cbc'. If cipher is None, then the key is saved
in the clear.
@type callback: Python callable
@param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the key.
The default is util.passphrase_callback.
"""
if cipher is None:
return m2.ec_key_write_bio_no_cipher(self.ec, bio._ptr(), callback)
else:
ciph = getattr(m2, cipher, None)
if ciph is None:
raise ValueError('not such cipher %s' % cipher)
return m2.ec_key_write_bio(self.ec, bio._ptr(), ciph(), callback)
def save_key(self, file, cipher='aes_128_cbc', callback=util.passphrase_callback):
"""
Save the key pair to a file in PEM format.
@type file: string
@param file: Name of file to save key to.
@type cipher: string
@param cipher: Symmetric cipher to protect the key. The default
cipher is 'aes_128_cbc'. If cipher is None, then the key is saved
in the clear.
@type callback: Python callable
@param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the key.
The default is util.passphrase_callback.
"""
bio = BIO.openfile(file, 'wb')
return self.save_key_bio(bio, cipher, callback)
def save_pub_key_bio(self, bio):
"""
Save the public key to an M2Crypto.BIO.BIO object in PEM format.
@type bio: M2Crypto.BIO.BIO
@param bio: M2Crypto.BIO.BIO object to save key to.
"""
return m2.ec_key_write_pubkey(self.ec, bio._ptr())
def save_pub_key(self, file):
"""
Save the public key to a file in PEM format.
@type file: string
@param file: Name of file to save key to.
"""
bio = BIO.openfile(file, 'wb')
return m2.ec_key_write_pubkey(self.ec, bio._ptr())
def _check_key_type(self):
return m2.ec_key_type_check(self.ec)
def check_key(self):
assert m2.ec_key_type_check(self.ec), "'ec' type error"
return m2.ec_key_check_key(self.ec)
class EC_pub(EC):
"""
Object interface to an EC public key.
((don't like this implementation inheritance))
"""
def __init__(self,ec,_pyfree=0):
EC.__init__(self,ec,_pyfree)
self.der = None
def get_der(self):
"""
Returns the public key in DER format as a buffer object.
"""
assert self.check_key(), 'key is not initialised'
if self.der is None:
self.der = m2.ec_key_get_public_der(self.ec)
return self.der
save_key = EC.save_pub_key
save_key_bio = EC.save_pub_key_bio
def gen_params(curve):
"""
Factory function that generates EC parameters and
instantiates a EC object from the output.
@param curve: This is the OpenSSL nid of the curve to use.
"""
return EC(m2.ec_key_new_by_curve_name(curve), 1)
def load_key(file, callback=util.passphrase_callback):
"""
Factory function that instantiates a EC object.
@param file: Names the file that contains the PEM representation
of the EC key pair.
@param callback: Python callback object that will be invoked
if the EC key pair is passphrase-protected.
"""
bio = BIO.openfile(file)
return load_key_bio(bio, callback)
def load_key_bio(bio, callback=util.passphrase_callback):
"""
Factory function that instantiates a EC object.
@param bio: M2Crypto.BIO object that contains the PEM
representation of the EC key pair.
@param callback: Python callback object that will be invoked
if the EC key pair is passphrase-protected.
"""
return EC(m2.ec_key_read_bio(bio._ptr(), callback), 1)
def load_pub_key(file):
"""
Load an EC public key from file.
@type file: string
@param file: Name of file containing EC public key in PEM format.
@rtype: M2Crypto.EC.EC_pub
@return: M2Crypto.EC.EC_pub object.
"""
bio = BIO.openfile(file)
return load_pub_key_bio(bio)
def load_pub_key_bio(bio):
"""
Load an EC public key from an M2Crypto.BIO.BIO object.
@type bio: M2Crypto.BIO.BIO
@param bio: M2Crypto.BIO.BIO object containing EC public key in PEM
format.
@rtype: M2Crypto.EC.EC_pub
@return: M2Crypto.EC.EC_pub object.
"""
ec = m2.ec_key_read_pubkey(bio._ptr())
if ec is None:
ec_error()
return EC_pub(ec, 1)
def ec_error():
raise ECError, m2.err_reason_error_string(m2.err_get_error())
def pub_key_from_der(der):
"""
Create EC_pub from DER.
"""
return EC_pub(m2.ec_key_from_pubkey_der(der), 1)
"""M2Crypto wrapper for OpenSSL EVP API.
Copyright (c) 1999-2004 Ng Pheng Siong. All rights reserved.
Portions Copyright (c) 2004-2007 Open Source Applications Foundation.
Author: Heikki Toivonen
"""
from M2Crypto import Err, util, BIO, RSA
import m2
class EVPError(Exception): pass
m2.evp_init(EVPError)
def pbkdf2(password, salt, iter, keylen):
"""
Derive a key from password using PBKDF2 algorithm specified in RFC 2898.
@param password: Derive the key from this password.
@type password: str
@param salt: Salt.
@type salt: str
@param iter: Number of iterations to perform.
@type iter: int
@param keylen: Length of key to produce.
@type keylen: int
@return: Key.
@rtype: str
"""
return m2.pkcs5_pbkdf2_hmac_sha1(password, salt, iter, keylen)
class MessageDigest:
"""
Message Digest
"""
m2_md_ctx_free = m2.md_ctx_free
def __init__(self, algo):
md = getattr(m2, algo, None)
if md is None:
raise ValueError, ('unknown algorithm', algo)
self.md=md()
self.ctx=m2.md_ctx_new()
m2.digest_init(self.ctx, self.md)
def __del__(self):
if getattr(self, 'ctx', None):
self.m2_md_ctx_free(self.ctx)
def update(self, data):
"""
Add data to be digested.
@return: -1 for Python error, 1 for success, 0 for OpenSSL failure.
"""
return m2.digest_update(self.ctx, data)
def final(self):
return m2.digest_final(self.ctx)
# Deprecated.
digest = final
class HMAC:
m2_hmac_ctx_free = m2.hmac_ctx_free
def __init__(self, key, algo='sha1'):
md = getattr(m2, algo, None)
if md is None:
raise ValueError, ('unknown algorithm', algo)
self.md=md()
self.ctx=m2.hmac_ctx_new()
m2.hmac_init(self.ctx, key, self.md)
def __del__(self):
if getattr(self, 'ctx', None):
self.m2_hmac_ctx_free(self.ctx)
def reset(self, key):
m2.hmac_init(self.ctx, key, self.md)
def update(self, data):
m2.hmac_update(self.ctx, data)
def final(self):
return m2.hmac_final(self.ctx)
digest=final
def hmac(key, data, algo='sha1'):
md = getattr(m2, algo, None)
if md is None:
raise ValueError, ('unknown algorithm', algo)
return m2.hmac(key, data, md())
class Cipher:
m2_cipher_ctx_free = m2.cipher_ctx_free
def __init__(self, alg, key, iv, op, key_as_bytes=0, d='md5', salt='12345678', i=1, padding=1):
cipher = getattr(m2, alg, None)
if cipher is None:
raise ValueError, ('unknown cipher', alg)
self.cipher=cipher()
if key_as_bytes:
kmd = getattr(m2, d, None)
if kmd is None:
raise ValueError, ('unknown message digest', d)
key = m2.bytes_to_key(self.cipher, kmd(), key, salt, iv, i)
self.ctx=m2.cipher_ctx_new()
m2.cipher_init(self.ctx, self.cipher, key, iv, op)
self.set_padding(padding)
del key
def __del__(self):
if getattr(self, 'ctx', None):
self.m2_cipher_ctx_free(self.ctx)
def update(self, data):
return m2.cipher_update(self.ctx, data)
def final(self):
return m2.cipher_final(self.ctx)
def set_padding(self, padding=1):
return m2.cipher_set_padding(self.ctx, padding)
class PKey:
"""
Public Key
"""
m2_pkey_free = m2.pkey_free
m2_md_ctx_free = m2.md_ctx_free
def __init__(self, pkey=None, _pyfree=0, md='sha1'):
if pkey is not None:
self.pkey = pkey
self._pyfree = _pyfree
else:
self.pkey = m2.pkey_new()
self._pyfree = 1
self._set_context(md)
def __del__(self):
if getattr(self, '_pyfree', 0):
self.m2_pkey_free(self.pkey)
if getattr(self, 'ctx', None):
self.m2_md_ctx_free(self.ctx)
def _ptr(self):
return self.pkey
def _set_context(self, md):
mda = getattr(m2, md, None)
if mda is None:
raise ValueError, ('unknown message digest', md)
self.md = mda()
self.ctx = m2.md_ctx_new()
def reset_context(self, md='sha1'):
"""
Reset internal message digest context.
@type md: string
@param md: The message digest algorithm.
"""
self._set_context(md)
def sign_init(self):
"""
Initialise signing operation with self.
"""
m2.sign_init(self.ctx, self.md)
def sign_update(self, data):
"""
Feed data to signing operation.
@type data: string
@param data: Data to be signed.
"""
m2.sign_update(self.ctx, data)
def sign_final(self):
"""
Return signature.
@rtype: string
@return: The signature.
"""
return m2.sign_final(self.ctx, self.pkey)
# Deprecated
update = sign_update
final = sign_final
def verify_init(self):
"""
Initialise signature verification operation with self.
"""
m2.verify_init(self.ctx, self.md)
def verify_update(self, data):
"""
Feed data to verification operation.
@type data: string
@param data: Data to be verified.
@return: -1 on Python error, 1 for success, 0 for OpenSSL error
"""
return m2.verify_update(self.ctx, data)
def verify_final(self, sign):
"""
Return result of verification.
@param sign: Signature to use for verification
@rtype: int
@return: Result of verification: 1 for success, 0 for failure, -1 on
other error.
"""
return m2.verify_final(self.ctx, sign, self.pkey)
def assign_rsa(self, rsa, capture=1):
"""
Assign the RSA key pair to self.
@type rsa: M2Crypto.RSA.RSA
@param rsa: M2Crypto.RSA.RSA object to be assigned to self.
@type capture: boolean
@param capture: If true (default), this PKey object will own the RSA
object, meaning that once the PKey object gets
deleted it is no longer safe to use the RSA object.
@rtype: int
@return: Return 1 for success and 0 for failure.
"""
if capture:
ret = m2.pkey_assign_rsa(self.pkey, rsa.rsa)
if ret:
rsa._pyfree = 0
else:
ret = m2.pkey_set1_rsa(self.pkey, rsa.rsa)
return ret
def get_rsa(self):
"""
Return the underlying RSA key if that is what the EVP
instance is holding.
"""
rsa_ptr = m2.pkey_get1_rsa(self.pkey)
if rsa_ptr is None:
raise ValueError("PKey instance is not holding a RSA key")
rsa = RSA.RSA_pub(rsa_ptr, 1)
return rsa
def save_key(self, file, cipher='aes_128_cbc', callback=util.passphrase_callback):
"""
Save the key pair to a file in PEM format.
@type file: string
@param file: Name of file to save key to.
@type cipher: string
@param cipher: Symmetric cipher to protect the key. The default
cipher is 'aes_128_cbc'. If cipher is None, then the key is saved
in the clear.
@type callback: Python callable
@param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the key.
The default is util.passphrase_callback.
"""
bio = BIO.openfile(file, 'wb')
return self.save_key_bio(bio, cipher, callback)
def save_key_bio(self, bio, cipher='aes_128_cbc', callback=util.passphrase_callback):
"""
Save the key pair to the M2Crypto.BIO object 'bio' in PEM format.
@type bio: M2Crypto.BIO
@param bio: M2Crypto.BIO object to save key to.
@type cipher: string
@param cipher: Symmetric cipher to protect the key. The default
cipher is 'aes_128_cbc'. If cipher is None, then the key is saved
in the clear.
@type callback: Python callable
@param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the key.
The default is util.passphrase_callback.
"""
if cipher is None:
return m2.pkey_write_pem_no_cipher(self.pkey, bio._ptr(), callback)
else:
proto = getattr(m2, cipher, None)
if proto is None:
raise ValueError, 'no such cipher %s' % cipher
return m2.pkey_write_pem(self.pkey, bio._ptr(), proto(), callback)
def as_pem(self, cipher='aes_128_cbc', callback=util.passphrase_callback):
"""
Return key in PEM format in a string.
@type cipher: string
@param cipher: Symmetric cipher to protect the key. The default
cipher is 'aes_128_cbc'. If cipher is None, then the key is saved
in the clear.
@type callback: Python callable
@param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the key.
The default is util.passphrase_callback.
"""
bio = BIO.MemoryBuffer()
self.save_key_bio(bio, cipher, callback)
return bio.read_all()
def as_der(self):
"""
Return key in DER format in a string
"""
buf = m2.pkey_as_der(self.pkey)
bio = BIO.MemoryBuffer(buf)
return bio.read_all()
def size(self):
"""
Return the size of the key in bytes.
"""
return m2.pkey_size(self.pkey)
def get_modulus(self):
"""
Return the modulus in hex format.
"""
return m2.pkey_get_modulus(self.pkey)
def load_key(file, callback=util.passphrase_callback):
"""
Load an M2Crypto.EVP.PKey from file.
@type file: string
@param file: Name of file containing the key in PEM format.
@type callback: Python callable
@param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the key.
@rtype: M2Crypto.EVP.PKey
@return: M2Crypto.EVP.PKey object.
"""
bio = m2.bio_new_file(file, 'r')
if bio is None:
raise BIO.BIOError(Err.get_error())
cptr = m2.pkey_read_pem(bio, callback)
m2.bio_free(bio)
if cptr is None:
raise EVPError(Err.get_error())
return PKey(cptr, 1)
def load_key_bio(bio, callback=util.passphrase_callback):
"""
Load an M2Crypto.EVP.PKey from an M2Crypto.BIO object.
@type bio: M2Crypto.BIO
@param bio: M2Crypto.BIO object containing the key in PEM format.
@type callback: Python callable
@param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the key.
@rtype: M2Crypto.EVP.PKey
@return: M2Crypto.EVP.PKey object.
"""
cptr = m2.pkey_read_pem(bio._ptr(), callback)
if cptr is None:
raise EVPError(Err.get_error())
return PKey(cptr, 1)
def load_key_string(string, callback=util.passphrase_callback):
"""
Load an M2Crypto.EVP.PKey from a string.
@type string: string
@param string: String containing the key in PEM format.
@type callback: Python callable
@param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the key.
@rtype: M2Crypto.EVP.PKey
@return: M2Crypto.EVP.PKey object.
"""
bio = BIO.MemoryBuffer(string)
return load_key_bio( bio, callback)
# vim: sts=4 sw=4 et
"""
M2Crypto wrapper for OpenSSL ENGINE API.
Pavel Shramov
IMEC MSU
"""
from M2Crypto import m2, EVP, X509, Err
class EngineError(Exception): pass
m2.engine_init_error(EngineError)
class Engine:
"""Wrapper for ENGINE object."""
m2_engine_free = m2.engine_free
def __init__(self, id = None, _ptr = None, _pyfree = 1):
"""Create new Engine from ENGINE pointer or obtain by id"""
if not _ptr and not id:
raise ValueError("No engine id specified")
self._ptr = _ptr
if not self._ptr:
self._ptr = m2.engine_by_id(id)
if not self._ptr:
raise ValueError("Unknown engine: %s" % id)
self._pyfree = _pyfree
def __del__(self):
if getattr(self, '_pyfree', 0):
self.m2_engine_free(self._ptr)
def init(self):
"""Obtain a functional reference to the engine.
@return: 0 on error, non-zero on success."""
return m2.engine_init(self._ptr)
def finish(self):
"""Release a functional and structural reference to the engine."""
return m2.engine_finish(self._ptr)
def ctrl_cmd_string(self, cmd, arg, optional = 0):
"""Call ENGINE_ctrl_cmd_string"""
if not m2.engine_ctrl_cmd_string(self._ptr, cmd, arg, optional):
raise EngineError(Err.get_error())
def get_name(self):
"""Return engine name"""
return m2.engine_get_name(self._ptr)
def get_id(self):
"""Return engine id"""
return m2.engine_get_id(self._ptr)
def set_default(self, methods = m2.ENGINE_METHOD_ALL):
"""Use this engine as default for methods specified in argument
Possible values are bitwise OR of m2.ENGINE_METHOD_*"""
return m2.engine_set_default(self._ptr, methods)
def _engine_load_key(self, func, name, pin = None):
"""Helper function for loading keys"""
ui = m2.ui_openssl()
cbd = m2.engine_pkcs11_data_new(pin)
try:
kptr = func(self._ptr, name, ui, cbd)
if not kptr:
raise EngineError(Err.get_error())
key = EVP.PKey(kptr, _pyfree = 1)
finally:
m2.engine_pkcs11_data_free(cbd)
return key
def load_private_key(self, name, pin = None):
"""Load private key with engine methods (e.g from smartcard).
If pin is not set it will be asked
"""
return self._engine_load_key(m2.engine_load_private_key, name, pin)
def load_public_key(self, name, pin = None):
"""Load public key with engine methods (e.g from smartcard)."""
return self._engine_load_key(m2.engine_load_public_key, name, pin)
def load_certificate(self, name):
"""Load certificate from engine (e.g from smartcard).
NOTE: This function may be not implemented by engine!"""
cptr = m2.engine_load_certificate(self._ptr, name)
if not cptr:
raise EngineError("Certificate or card not found")
return X509.X509(cptr, _pyfree = 1)
def load_dynamic_engine(id, sopath):
"""Load and return dymanic engine from sopath and assign id to it"""
m2.engine_load_dynamic()
e = Engine('dynamic')
e.ctrl_cmd_string("SO_PATH", sopath)
e.ctrl_cmd_string("ID", id)
e.ctrl_cmd_string("LIST_ADD", "1")
e.ctrl_cmd_string("LOAD", None)
return e
def load_dynamic():
"""Load dynamic engine"""
m2.engine_load_dynamic()
def load_openssl():
"""Load openssl engine"""
m2.engine_load_openssl()
def cleanup():
"""If you load any engines, you need to clean up after your application
is finished with the engines."""
m2.engine_cleanup()
"""M2Crypto wrapper for OpenSSL Error API.
Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved."""
import BIO
import m2
def get_error():
err=BIO.MemoryBuffer()
m2.err_print_errors(err.bio_ptr())
return err.getvalue()
def get_error_code():
return m2.err_get_error()
def peek_error_code():
return m2.err_peek_error()
def get_error_lib(err):
return m2.err_lib_error_string(err)
def get_error_func(err):
return m2.err_func_error_string(err)
def get_error_reason(err):
return m2.err_reason_error_string(err)
def get_x509_verify_error(err):
return m2.x509_get_verify_error(err)
class SSLError(Exception):
def __init__(self, err, client_addr):
self.err = err
self.client_addr = client_addr
def __str__(self):
if (isinstance(self.client_addr, unicode)):
s = self.client_addr.encode('utf8')
else:
s = self.client_addr
return "%s: %s: %s" % \
(m2.err_func_error_string(self.err), \
s, \
m2.err_reason_error_string(self.err))
class M2CryptoError(Exception):
pass
"""M2Crypto PGP2.
Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved."""
from constants import *
from packet import *
import RSA
class PublicKey:
def __init__(self, pubkey_pkt):
import warnings
warnings.warn('Deprecated. No maintainer for PGP. If you use this, please inform M2Crypto maintainer.', DeprecationWarning)
self._pubkey_pkt = pubkey_pkt
self._pubkey = RSA.new_pub_key((pubkey_pkt._e, pubkey_pkt._n))
self._userid = {}
self._signature = {}
def keyid(self):
return self._pubkey.n[-8:]
def add_userid(self, u_pkt):
assert isinstance(u_pkt, userid_packet)
self._userid[u_pkt.userid()] = u_pkt
def remove_userid(self, userid):
del self._userid[userid]
def add_signature(self, userid, s_pkt):
assert isinstance(s_pkt, signature_packet)
assert self._userid.has_key(userid)
if self._signature.has_key(userid):
self._signature.append(s_pkt)
else:
self._signature = [s_pkt]
def __getitem__(self, id):
return self._userid[id]
def __setitem__(self, *args):
raise NotImplementedError
def __delitem__(self, id):
del self._userid[id]
if self._signature[id]:
del self._signature[id]
def write(self, stream):
pass
def encrypt(self, ptxt):
# XXX Munge ptxt into pgp format.
return self._pubkey.public_encrypt(ptxt, RSA.pkcs1_padding)
def decrypt(self, ctxt):
# XXX Munge ctxt into pgp format.
return self._pubkey.public_encrypt(ctxt, RSA.pkcs1_padding)
"""M2Crypto PGP2.
Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved."""
from constants import *
from packet import *
from PublicKey import *
class PublicKeyRing:
def __init__(self, keyring):
import warnings
warnings.warn('Deprecated. No maintainer for PGP. If you use this, please inform M2Crypto maintainer.', DeprecationWarning)
self._keyring = keyring
self._userid = {}
self._keyid = {}
self._spurious = []
self._pubkey = []
def load(self):
curr_pub = None
curr_index = -1
ps = packet_stream(self._keyring)
while 1:
pkt = ps.read()
if pkt is None:
break
elif isinstance(pkt, public_key_packet):
curr_index = curr_index + 1
curr_pub = PublicKey(pkt)
self._pubkey.append(curr_pub)
#self._keyid[curr_pub.keyid()] = (curr_pub, curr_index)
elif isinstance(pkt, userid_packet):
if curr_pub is None:
self._spurious.append(pkt)
else:
curr_pub.add_userid(pkt)
self._userid[pkt.userid()] = (curr_pub, curr_index)
elif isinstance(pkt, signature_packet):
if curr_pub is None:
self._spurious.append(pkt)
else:
curr_pub.add_signature(pkt)
else:
self._spurious.append(pkt)
ps.close()
def __getitem__(self, id):
return self._userid[id][0]
def __setitem__(self, *args):
raise NotImplementedError
def __delitem__(self, id):
pkt, idx = self._userid[id]
del self._pubkey[idx]
del self._userid[idx]
pkt, idx = self._keyid[id]
del self._keyid[idx]
def spurious(self):
return tuple(self._spurious)
def save(self, keyring):
for p in self._pubkey:
pp = p.pack()
keyring.write(pp)
def load_pubring(filename='pubring.pgp'):
pkr = PublicKeyRing(open(filename, 'rb'))
pkr.load()
return pkr
"""M2Crypto PGP2 RSA.
Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved."""
import sys
from M2Crypto import m2, RSA
_RSA = RSA
del RSA
class RSA(_RSA.RSA):
pass
class RSA_pub(_RSA.RSA_pub):
pass
def new_pub_key((e, n)):
"""
Factory function that instantiates an RSA_pub object from a (e, n) tuple.
'e' is the RSA public exponent; it is a string in OpenSSL's binary format,
i.e., a number of bytes in big-endian.
'n' is the RSA composite of primes; it is a string in OpenSSL's binary format,
i.e., a number of bytes in big-endian.
"""
import warnings
warnings.warn('Deprecated. No maintainer for PGP. If you use this, please inform M2Crypto maintainer.', DeprecationWarning)
rsa = m2.rsa_new()
m2.rsa_set_e_bin(rsa, e)
m2.rsa_set_n_bin(rsa, n)
return RSA_pub(rsa, 1)
"""M2Crypto PGP2.
Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved."""
from constants import *
from packet import public_key_packet, trust_packet, userid_packet,\
comment_packet, signature_packet, private_key_packet, cke_packet,\
pke_packet, literal_packet, packet_stream
from PublicKey import *
from PublicKeyRing import *
"""M2Crypto PGP2.
Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved."""
CTB_TAG = 128
CTB_PKE = 1
CTB_SIGNATURE = 2
CTB_MESSAGE_DIGETS = 3
CTB_PRIVATE_KEY = 5
CTB_PUBLIC_KEY = 6
CTB_COMPRESSED_DATA = 8
CTB_CKE = 9
CTB_LITERAL_DATA = 11
CTB_TRUST = 12
CTB_USERID = 13
CTB_COMMENT = 14
"""M2Crypto PGP2.
This module implements PGP packets per RFC1991 and various source distributions.
Each packet type is represented by a class; packet classes derive from
the abstract 'packet' class.
The 'message digest' packet type, mentioned but not documented in RFC1991,
is not implemented.
Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved."""
# XXX Work-in-progress.
# Be liberal in what you accept.
# Be conservative in what you send.
# Be lazy in what you eval.
import struct, time
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from M2Crypto import EVP, RSA
from M2Crypto.util import octx_to_num
from constants import *
_OK_VERSION = ('\002', '\003')
_OK_VALIDITY = ('\000',)
_OK_PKC = ('\001',)
class packet:
def __init__(self, ctb, body=None):
import warnings
warnings.warn('Deprecated. No maintainer for PGP. If you use this, please inform M2Crypto maintainer.', DeprecationWarning)
self.ctb = ctb
if body is not None:
self.body = StringIO(body)
else:
self.body = None
def validate(self):
return 1
def pack(self):
raise NotImplementedError, '%s.pack(): abstract method' % (self.__class__,)
def version(self):
if hasattr(self, '_version'):
return ord(self._version)
else:
return None
def timestamp(self):
if hasattr(self, '_timestamp'):
return struct.unpack('>L', self._timestamp)[0]
else:
return None
def validity(self):
if hasattr(self, '_validity'):
return struct.unpack('>H', self._validity)[0]
else:
return None
def pkc(self):
if hasattr(self, '_pkc'):
return self._pkc
else:
return None
def _llf(self, lenf):
if lenf < 256:
return (0, chr(lenf))
elif lenf < 65536:
return (1, struct.pack('>H', lenf))
else:
assert lenf < 2L**32
return (2, struct.pack('>L', lenf))
def _ctb(self, llf):
ctbv = _FACTORY[self.__class__]
return chr((1 << 7) | (ctbv << 2) | llf)
class public_key_packet(packet):
def __init__(self, ctb, body=None):
packet.__init__(self, ctb, body)
if self.body is not None:
self._version = self.body.read(1)
self._timestamp = self.body.read(4)
self._validity = self.body.read(2)
self._pkc = self.body.read(1)
self._nlen = self.body.read(2)
nlen = (struct.unpack('>H', self._nlen)[0] + 7) / 8
self._n = self.body.read(nlen)
self._elen = self.body.read(2)
elen = (struct.unpack('>H', self._elen)[0] + 7) / 8
self._e = self.body.read(elen)
def pack(self):
if self.body is None:
self.body = StringIO()
self.body.write(self._version)
self.body.write(self._timestamp)
self.body.write(self._validity)
self.body.write(self._pkc)
self.body.write(self._nlen)
self.body.write(self._n)
self.body.write(self._elen)
self.body.write(self._e)
self.body = self.body.getvalue()
llf, lenf = self._llf(len(self.body))
ctb = self._ctb(llf)
return '%s%s%s' % (ctb, lenf, self.body)
def pubkey(self):
return self._pubkey.pub()
class trust_packet(packet):
# This implementation neither interprets nor emits trust packets.
def __init__(self, ctb, body=None):
packet.__init__(self, ctb, body)
if body is not None:
self.trust = self.body.read(1)
class userid_packet(packet):
def __init__(self, ctb, body=None):
packet.__init__(self, ctb, body)
if body is not None:
self._userid = body
def pack(self):
if self.body is None:
self.body = StringIO()
self.body.write(chr(len(self._userid)))
self.body.write(self._userid)
self.body = self.body.getvalue()
return self.ctb + self.body
def userid(self):
return self._userid
class comment_packet(packet):
def __init__(self, ctb, body=None):
packet.__init__(self, ctb, body)
if body is not None:
self.comment = self.body.getvalue()
def pack(self):
if self.body is None:
self.body = StringIO()
self.body.write(chr(len(self.comment)))
self.body.write(self.comment)
self.body = self.body.getvalue()
return self.ctb + self.body
class signature_packet(packet):
def __init__(self, ctb, body=None):
packet.__init__(self, ctb, body)
if body is not None:
self._version = self.body.read(1)
self._len_md_stuff = self.body.read(1)
self._classification = self.body.read(1)
self._timestamp = self.body.read(4)
self._keyid = self.body.read(8)
self._pkc = self.body.read(1)
self._md_algo = self.body.read(1)
self._md_chksum = self.body.read(2)
self._sig = self.body.read()
def pack(self):
if self.body is None:
self.body = StringIO()
self.body.write(self._version)
self.body.write(self._len_md_stuff)
self.body.write(self._classification)
self.body.write(self._timestamp)
self.body.write(self._keyid)
self.body.write(self._pkc)
self.body.write(self._md_algo)
self.body.write(self._md_chksum)
self.body.write(self._sig)
self.body = self.body.getvalue()
llf, lenf = self._llf(len(body))
self.ctb = self.ctb | llf
return '%s%s%s' % (self.ctb, lenf, self.body)
def validate(self):
if self._version not in _OK_VERSION:
return None
if self._len_md_stuff != '\005':
return None
class private_key_packet(packet):
def __init__(self, ctb, body=None):
packet.__init__(self, ctb, body)
if body is not None:
self._version = self.body.read(1)
self._timestamp = self.body.read(4)
self._validity = self.body.read(2)
self._pkc = self.body.read(1)
self._nlen = self.body.read(2)
nlen = (struct.unpack('>H', self._nlen)[0] + 7) / 8
self._n = self.body.read(nlen)
self._elen = self.body.read(2)
elen = (struct.unpack('>H', self._elen)[0] + 7) / 8
self._e = self.body.read(elen)
self._cipher = self.body.read(1)
if self._cipher == '\001':
self._iv = self.body.read(8)
else:
self._iv = None
for param in ['d', 'p', 'q', 'u']:
_plen = self.body.read(2)
setattr(self, '_'+param+'len', _plen)
plen = (struct.unpack('>H', _plen)[0] + 7) / 8
setattr(self, '_'+param, self.body.read(plen))
self._cksum = self.body.read(2)
def is_encrypted(self):
return ord(self._cipher)
class cke_packet(packet):
def __init__(self, ctb, body=None):
packet.__init__(self, ctb, body)
if body is not None:
self._iv = self.body.read(8)
self._cksum = self.body.read(2)
self._ctxt = self.body.read()
class pke_packet(packet):
def __init__(self, ctb, body=None):
packet.__init__(self, ctb, body)
if body is not None:
self._version = self.body.read(1)
self._keyid = self.body.read(8)
self._pkc = ord(self.body.read(1))
deklen = (struct.unpack('>H', self.body.read(2))[0] + 7 ) / 8
self._dek = octx_to_num(self.body.read(deklen))
class literal_packet(packet):
def __init__(self, ctb, body=None):
packet.__init__(self, ctb, body)
if body is not None:
self.fmode = self.body.read(1)
fnlen = self.body.read(1)
self.fname = self.body.read(fnlen)
self.ftime = self.body.read(4)
#self.data = self.body.read()
class compressed_packet(packet):
def __init__(self, ctb, stream):
packet.__init__(self, ctb, '')
if body is not None:
self.algo = stream.read(1)
# This reads the entire stream into memory.
self.data = stream.read()
def validate(self):
return (self.algo == '\001')
def uncompress(self):
import zlib
decomp = zlib.decompressobj(-13) # RFC 2440, pg 61.
# This doubles the memory usage.
stream = StringIO(decomp.decompress(self.data))
return stream
_FACTORY = {
1 : pke_packet,
2 : signature_packet,
#3 : message_digest_packet, # XXX not implemented
5 : private_key_packet,
6 : public_key_packet,
#8 : compressed_packet, # special case
9 : cke_packet,
11 : literal_packet,
12 : trust_packet,
13 : userid_packet,
14 : comment_packet,
pke_packet : 1,
signature_packet : 2,
#3 : message_digest_packet,
private_key_packet : 5,
public_key_packet : 6,
#8 : compressed_packet,
cke_packet : 9,
literal_packet : 11,
trust_packet : 12,
userid_packet : 13,
comment_packet : 14
}
class packet_stream:
def __init__(self, input):
self.stream = input
self.under_current = None
self._count = 0
def close(self):
self.stream.close()
if self.under_current is not None:
self.under_current.close()
def read(self, keep_trying=0):
while 1:
ctb0 = self.stream.read(1)
if not ctb0:
return None
ctb = ord(ctb0)
if is_ctb(ctb):
break
elif keep_trying:
continue
else:
raise XXXError
ctbt = (ctb & 0x3c) >> 2
if ctbt == CTB_COMPRESSED_DATA:
self.under_current = self.stream
cp = compressed_packet(ctb0, self.stream)
self.stream = cp.uncompress()
return self.read()
# Decode the length of following data. See RFC for details.
llf = ctb & 3
if llf == 0:
lenf = ord(self.stream.read(1))
elif llf == 1:
lenf = struct.unpack('>H', self.stream.read(2))[0]
elif llf == 2:
lenf = struct.unpack('>L', self.stream.read(4))[0]
else: # llf == 3
raise XXXError, 'impossible case'
body = self.stream.read(lenf)
if not body or (len(body) != lenf):
raise XXXError, 'corrupted packet'
self._count = self.stream.tell()
try:
return _FACTORY[ctbt](ctb0, body)
except KeyError:
return packet(ctb0, body)
def count(self):
return self._count
def is_ctb(ctb):
return ctb & 0xc0
def make_ctb(value, llf):
return chr((1 << 7) | (value << 2) | llf)