#!/usr/bin/python -u # # Python Bindings for LZMA # # Copyright (c) 2004-2010 by Joachim Bauch, mail@joachim-bauch.de # 7-Zip Copyright (C) 1999-2010 Igor Pavlov # LZMA SDK Copyright (C) 1999-2010 Igor Pavlov # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # $Id$ # """Read from and write to 7zip format archives. """ import pylzma from struct import pack, unpack from zlib import crc32 import zlib import bz2 from cStringIO import StringIO try: from functools import reduce except ImportError: # reduce is available in functools starting with Python 2.6 pass try: import M2Crypto from M2Crypto import EVP except ImportError: # support for encrypted files is optional M2Crypto = None from hashlib import sha256 READ_BLOCKSIZE = 16384 MAGIC_7Z = '7z\xbc\xaf\x27\x1c' PROPERTY_END = '\x00' PROPERTY_HEADER = '\x01' PROPERTY_ARCHIVE_PROPERTIES = '\x02' PROPERTY_ADDITIONAL_STREAMS_INFO = '\x03' PROPERTY_MAIN_STREAMS_INFO = '\x04' PROPERTY_FILES_INFO = '\x05' PROPERTY_PACK_INFO = '\x06' PROPERTY_UNPACK_INFO = '\x07' PROPERTY_SUBSTREAMS_INFO = '\x08' PROPERTY_SIZE = '\x09' PROPERTY_CRC = '\x0a' PROPERTY_FOLDER = '\x0b' PROPERTY_CODERS_UNPACK_SIZE = '\x0c' PROPERTY_NUM_UNPACK_STREAM = '\x0d' PROPERTY_EMPTY_STREAM = '\x0e' PROPERTY_EMPTY_FILE = '\x0f' PROPERTY_ANTI = '\x10' PROPERTY_NAME = '\x11' PROPERTY_CREATION_TIME = '\x12' PROPERTY_LAST_ACCESS_TIME = '\x13' PROPERTY_LAST_WRITE_TIME = '\x14' PROPERTY_ATTRIBUTES = '\x15' PROPERTY_COMMENT = '\x16' PROPERTY_ENCODED_HEADER = '\x17' COMPRESSION_METHOD_COPY = '\x00' COMPRESSION_METHOD_LZMA = '\x03' COMPRESSION_METHOD_CRYPTO = '\x06' COMPRESSION_METHOD_MISC = '\x04' COMPRESSION_METHOD_MISC_ZIP = '\x04\x01' COMPRESSION_METHOD_MISC_BZIP = '\x04\x02' COMPRESSION_METHOD_7Z_AES256_SHA256 = '\x06\xf1\x07\x01' class ArchiveError(Exception): pass class FormatError(ArchiveError): pass class EncryptedArchiveError(ArchiveError): pass class UnsupportedCompressionMethodError(ArchiveError): pass class DecryptionError(ArchiveError): pass class NoPasswordGivenError(DecryptionError): pass class WrongPasswordError(DecryptionError): pass class Base(object): """ base class with support for various basic read/write functions """ def _readReal64Bit(self, file): res = file.read(8) a, b = unpack('>= 1 def _readBoolean(self, file, count, checkall=0): if checkall: alldefined = file.read(1) if alldefined != '\x00': return [True] * count result = [] b = 0 mask = 0 for i in xrange(count): if mask == 0: b = ord(file.read(1)) mask = 0x80 result.append(b & mask != 0) mask >>= 1 return result def checkcrc(self, crc, data): check = crc32(data) & 0xffffffffL return crc == check class PackInfo(Base): """ informations about packed streams """ def __init__(self, file): self.packpos = self._read64Bit(file) self.numstreams = self._read64Bit(file) id = file.read(1) if id == PROPERTY_SIZE: self.packsizes = [self._read64Bit(file) for x in xrange(self.numstreams)] id = file.read(1) if id == PROPERTY_CRC: self.crcs = [self._read64Bit(file) for x in xrange(self.numstreams)] id = file.read(1) if id != PROPERTY_END: raise FormatError, 'end id expected but %s found' % repr(id) class Folder(Base): """ a "Folder" represents a stream of compressed data """ def __init__(self, file): numcoders = self._read64Bit(file) self.coders = [] self.digestdefined = False totalin = 0 self.totalout = 0 for i in xrange(numcoders): while True: b = ord(file.read(1)) methodsize = b & 0xf issimple = b & 0x10 == 0 noattributes = b & 0x20 == 0 last_alternative = b & 0x80 == 0 c = {} c['method'] = file.read(methodsize) if not issimple: c['numinstreams'] = self._read64Bit(file) c['numoutstreams'] = self._read64Bit(file) else: c['numinstreams'] = 1 c['numoutstreams'] = 1 totalin += c['numinstreams'] self.totalout += c['numoutstreams'] if not noattributes: c['properties'] = file.read(self._read64Bit(file)) self.coders.append(c) if last_alternative: break numbindpairs = self.totalout - 1 self.bindpairs = [] for i in xrange(numbindpairs): self.bindpairs.append((self._read64Bit(file), self._read64Bit(file), )) numpackedstreams = totalin - numbindpairs self.packed_indexes = [] if numpackedstreams == 1: for i in xrange(totalin): if self.findInBindPair(i) < 0: self.packed_indexes.append(i) elif numpackedstreams > 1: for i in xrange(numpackedstreams): self.packed_indexes.append(self._read64Bit(file)) def getUnpackSize(self): if not self.unpacksizes: return 0 r = range(len(self.unpacksizes)) r.reverse() for i in r: if self.findOutBindPair(i): return self.unpacksizes[i] raise 'not found' def findInBindPair(self, index): for idx in xrange(len(self.bindpairs)): a, b = self.bindpairs[idx] if a == index: return idx return -1 def findOutBindPair(self, index): for idx in xrange(len(self.bindpairs)): a, b = self.bindpairs[idx] if b == index: return idx return -1 class Digests(Base): """ holds a list of checksums """ def __init__(self, file, count): self.defined = self._readBoolean(file, count, checkall=1) self.crcs = [unpack(' 255: raise FormatError, 'invalid type, must be below 256, is %d' % typ typ = chr(typ) if typ == PROPERTY_END: break size = self._read64Bit(file) buffer = StringIO(file.read(size)) if typ == PROPERTY_EMPTY_STREAM: isempty = self._readBoolean(buffer, self.numfiles) map(lambda x, y: x.update({'emptystream': y}), self.files, isempty) for x in isempty: if x: numemptystreams += 1 emptyfiles = [False] * numemptystreams antifiles = [False] * numemptystreams elif typ == PROPERTY_EMPTY_FILE: emptyfiles = self._readBoolean(buffer, numemptystreams) elif typ == PROPERTY_ANTI: antifiles = self._readBoolean(buffer, numemptystreams) elif typ == PROPERTY_NAME: external = buffer.read(1) if external != '\x00': self.dataindex = self._read64Bit(buffer) # XXX: evaluate external raise NotImplementedError for f in self.files: name = '' while True: ch = buffer.read(2) if ch == '\0\0': f['filename'] = unicode(name, 'utf-16') break name += ch elif typ == PROPERTY_CREATION_TIME: self._readTimes(buffer, self.files, 'creationtime') elif typ == PROPERTY_LAST_ACCESS_TIME: self._readTimes(buffer, self.files, 'lastaccesstime') elif typ == PROPERTY_LAST_WRITE_TIME: self._readTimes(buffer, self.files, 'lastwritetime') elif typ == PROPERTY_ATTRIBUTES: defined = self._readBoolean(buffer, self.numfiles, checkall=1) for i in xrange(self.numfiles): f = self.files[i] if defined[i]: f['attributes'] = unpack(' 0: data = self._file.read(READ_BLOCKSIZE) if checkremaining: tmp = decompressor.decompress(data, remaining) else: tmp = decompressor.decompress(data) out.write(tmp) remaining -= len(tmp) data = out.getvalue() else: if not input: input = self._file.read(total) if checkremaining: data = decompressor.decompress(input, self._start+self.size) else: data = decompressor.decompress(input) return data[self._start:self._start+self.size] def _read_lzma(self, coder, input): dec = pylzma.decompressobj(maxlength=self._start+self.size) try: return self._read_from_decompressor(coder, dec, input, checkremaining=True) except ValueError: if self._is_encrypted(): raise WrongPasswordError('invalid password') raise def _read_zip(self, coder, input): dec = zlib.decompressobj(-15) return self._read_from_decompressor(coder, dec, input, checkremaining=True) def _read_bzip(self, coder, input): dec = bz2.BZ2Decompressor() return self._read_from_decompressor(coder, dec, input) def _read_7z_aes256_sha256(self, coder, input): if not self._archive.password: raise NoPasswordGivenError() # TODO: this needs some sanity checks firstbyte = ord(coder['properties'][0]) numcyclespower = firstbyte & 0x3f if firstbyte & 0xc0 != 0: saltsize = (firstbyte >> 7) & 1 ivsize = (firstbyte >> 6) & 1 secondbyte = ord(coder['properties'][1]) saltsize += (secondbyte >> 4) ivsize += (secondbyte & 0x0f) assert len(coder['properties']) == 2+saltsize+ivsize salt = coder['properties'][2:2+saltsize] iv = coder['properties'][2+saltsize:2+saltsize+ivsize] assert len(salt) == saltsize assert len(iv) == ivsize assert numcyclespower <= 24 if ivsize < 16: iv += '\x00'*(16-ivsize) else: salt = iv = '' password = self._archive.password.encode('utf-16-le') key = pylzma.calculate_key(password, numcyclespower, salt=salt) cipher = pylzma.AESDecrypt(key, iv=iv) if not input: self._file.seek(self._src_start) uncompressed_size = self.uncompressed if uncompressed_size & 0x0f: # we need a multiple of 16 bytes uncompressed_size += 16 - (uncompressed_size & 0x0f) input = self._file.read(uncompressed_size) result = cipher.decrypt(input) return result def checkcrc(self): if self.digest is None: return True self.reset() data = self.read() return super(ArchiveFile, self).checkcrc(self.digest, data) class Archive7z(Base): """ the archive itself """ def __init__(self, file, password=None): self._file = file self.password = password self.header = file.read(len(MAGIC_7Z)) if self.header != MAGIC_7Z: raise FormatError, 'not a 7z file' self.version = unpack('BB', file.read(2)) self.startheadercrc = unpack('