#!/usr/local/bin/python #CHIPSEC: Platform Security Assessment Framework #Copyright (c) 2010-2014, Intel Corporation # #This program is free software; you can redistribute it and/or #modify it under the terms of the GNU General Public License #as published by the Free Software Foundation; Version 2. # #This program is distributed in the hope that it will be useful, #but WITHOUT ANY WARRANTY; without even the implied warranty of #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #GNU General Public License for more details. # #You should have received a copy of the GNU General Public License #along with this program; if not, write to the Free Software #Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # #Contact information: #chipsec@intel.com # # ------------------------------------------------------------------------------- # # CHIPSEC: Platform Hardware Security Assessment Framework # (c) 2010-2012 Intel Corporation # # ------------------------------------------------------------------------------- ## \addtogroup helpers #@{ # __chipsec/helper/linux/helper.py__ -- Linux helper #@} # __version__ = '1.0' import struct import sys import os import fcntl import platform import ctypes import fnmatch from chipsec.helper.oshelper import OsHelperError from chipsec.logger import logger from chipsec.hal.uefi_common import * import errno from ctypes import * _IOCTL_BASE = 0 def IOCTL_BASE(): return 0x0 def IOCTL_RDIO(): return _IOCTL_BASE + 0x1 def IOCTL_WRIO(): return _IOCTL_BASE + 0x2 def IOCTL_RDPCI(): return _IOCTL_BASE + 0x3 def IOCTL_WRPCI(): return _IOCTL_BASE + 0x4 def IOCTL_RDMSR(): return _IOCTL_BASE + 0x5 def IOCTL_WRMSR(): return _IOCTL_BASE + 0x6 def IOCTL_CPUID(): return _IOCTL_BASE + 0x7 def IOCTL_GET_CPU_DESCRIPTOR_TABLE(): return _IOCTL_BASE + 0x8 def IOCTL_HYPERCALL(): return _IOCTL_BASE + 0x9 def IOCTL_SWSMI(): return _IOCTL_BASE + 0xA def IOCTL_LOAD_UCODE_PATCH(): return _IOCTL_BASE + 0xB class LinuxHelper: def __init__(self): import platform self.os_system = platform.system() self.os_release = platform.release() self.os_version = platform.version() self.os_machine = platform.machine() self.os_uname = platform.uname() def __del__(self): try: destroy() except NameError: pass ############################################################################################### # Driver/service management functions ############################################################################################### def create( self ): self.init() if logger().VERBOSE: logger().log("[helper] Linux Helper created") def start( self ): if logger().VERBOSE: logger().log("[helper] Linux Helper started/loaded") def stop( self ): if logger().VERBOSE: logger().log("[helper] Linux Helper stopped/unloaded") def delete( self ): if logger().VERBOSE: logger().log("[helper] Linux Helper deleted") def destroy( self ): self.stop() self.delete() def init( self ): x64 = True if sys.maxsize > 2**32 else False global _DEV_FH _DEV_FH = None #already initialized? if(_DEV_FH != None): return logger().log("\n****** Chipsec Linux Kernel module is licensed under GPL 2.0\n") try: _DEV_FH = open("/dev/chipsec", "r+") except IOError as e: raise OsHelperError("Unable to open chipsec device. %s"%str(e),e.errno) except BaseException as be: raise OsHelperError("Unable to open chipsec device. %s"%str(be),errno.ENXIO) #decode the arg size global _PACK _PACK = 'Q' if x64 else 'I' global _IOCTL_BASE _IOCTL_BASE = fcntl.ioctl(_DEV_FH, IOCTL_BASE()) << 4 global CPU_MASK_LEN CPU_MASK_LEN = 8 if x64 else 4 def close(): global _DEV_FH close(_DEV_FH) _DEV_FH = None ############################################################################################### # Actual API functions to access HW resources ############################################################################################### def __mem_block(self, sz, newval = None): if(newval == None): return _DEV_FH.read(sz) else: _DEV_FH.write(newval) _DEV_FH.flush() return 1 def mem_read_block(self, addr, sz): if(addr != None): _DEV_FH.seek(addr) return self.__mem_block(sz) def mem_write_block(self, addr, sz, newval): if(addr != None): _DEV_FH.seek(addr) return self.__mem_block(sz, newval) def write_phys_mem(self, phys_address_hi, phys_address_lo, sz, newval): if(newval == None): return None return self.mem_write_block((phys_address_hi << 32) | phys_address_lo, sz, newval) def read_phys_mem(self, phys_address_hi, phys_address_lo, length): ret = self.mem_read_block((phys_address_hi << 32) | phys_address_lo, length) if(ret == None): return None return ret #DEPRECATED: Pass-through def read_pci( self, bus, device, function, address ): return self.read_pci_reg(bus, device, function, address) def read_pci_reg( self, bus, device, function, offset, size = 4 ): _PCI_DOM = 0 #Change PCI domain, if there is more than one. d = struct.pack("5"+_PACK, ((_PCI_DOM << 16) | bus), ((device << 16) | function), offset, size, 0) try: ret = fcntl.ioctl(_DEV_FH, IOCTL_RDPCI(), d) except IOError: print "IOError\n" return None x = struct.unpack("5"+_PACK, ret) return x[4] def write_pci_reg( self, bus, device, function, offset, value, size = 4 ): _PCI_DOM = 0 #Change PCI domain, if there is more than one. d = struct.pack("5"+_PACK, ((_PCI_DOM << 16) | bus), ((device << 16) | function), offset, size, value) try: ret = fcntl.ioctl(_DEV_FH, IOCTL_WRPCI(), d) except IOError: print "IOError\n" return None x = struct.unpack("5"+_PACK, ret) return x[4] def read_io_port(self, io_port, size): in_buf = struct.pack( "3"+_PACK, io_port, size, 0 ) out_buf = fcntl.ioctl( _DEV_FH, IOCTL_RDIO(), in_buf ) try: if 1 == size: value = struct.unpack_from( 'B', out_buf, 2) elif 2 == size: value = struct.unpack_from( 'H', out_buf, 2) else: value = struct.unpack_from( 'I', out_buf, 2) except: logger().error( "DeviceIoControl did not return value of proper size %x (value = '%s')" % (size, out_buf) ) return value[0] def write_io_port( self, io_port, value, size ): in_buf = struct.pack( 'HIB', io_port, value, size ) return fcntl.ioctl( _DEV_FH, IOCTL_WRIO(), in_buf) def read_msr(self, thread_id, msr_addr): self.set_affinity(thread_id) edx = eax = 0 in_buf = struct.pack( "4"+_PACK, thread_id, msr_addr, edx, eax) unbuf = struct.unpack("4"+_PACK, fcntl.ioctl( _DEV_FH, IOCTL_RDMSR(), in_buf )) return (unbuf[3], unbuf[2]) def write_msr(self, thread_id, msr_addr, eax, edx): self.set_affinity(thread_id) print "Writing msr 0x%x with eax = 0x%x, edx = 0x%x" % (msr_addr, eax, edx) in_buf = struct.pack( "4"+_PACK, thread_id, msr_addr, edx, eax ) fcntl.ioctl( _DEV_FH, IOCTL_WRMSR(), in_buf ) return def get_descriptor_table(self, cpu_thread_id, desc_table_code ): in_buf = struct.pack( "5"+_PACK, cpu_thread_id, desc_table_code, 0 , 0, 0) out_buf = fcntl.ioctl( _DEV_FH, IOCTL_GET_CPU_DESCRIPTOR_TABLE(), in_buf) (limit,base_hi,base_lo,pa_hi,pa_lo) = struct.unpack( "5"+_PACK, out_buf ) pa = (pa_hi << 32) + pa_lo base = (base_hi << 32) + base_lo return (limit,base,pa) def do_hypercall(self, vector, arg1, arg2, arg3, arg4, arg5, use_peach): in_buf = struct.pack( "7"+_PACK, vector, arg1, arg2, arg3, arg4, arg5, use_peach) out_buf = fcntl.ioctl( _DEV_FH, IOCTL_HYPERCALL(), in_buf) regs = struct.unpack( "7"+_PACK, out_buf ) return regs def cpuid(self, eax): in_buf = struct.pack( "4"+_PACK, eax, 0, 0, 0) out_buf = fcntl.ioctl( _DEV_FH, IOCTL_CPUID(), in_buf) return struct.unpack( "4"+_PACK, out_buf ) def get_affinity(self): CORES = ctypes.cdll.LoadLibrary('./chipsec/helper/linux/cores.so') CORES.sched_getaffinity.argtypes = [ctypes.c_int, ctypes.c_int, POINTER(ctypes.c_int)] CORES.sched_getaffinity.restype = ctypes.c_int pid = ctypes.c_int(0) leng = ctypes.c_int(CPU_MASK_LEN) cpu_mask = ctypes.c_int(0) if (CORES.sched_getaffinity(pid, leng, byref(cpu_mask)) == 0): return cpu_mask.value else: return None def set_affinity(self, thread_id): CORES = ctypes.cdll.LoadLibrary('./chipsec/helper/linux/cores.so') pid = ctypes.c_int(0) leng = ctypes.c_int(CPU_MASK_LEN) cpu_mask = ctypes.c_int(thread_id) ret = CORES.setaffinity(thread_id) if(ret == 0): return thread_id else: #CORES.geterror.restype = ctypes.c_int print "set_affinity error: %s" % os.strerror(ret) return None ############## # UEFI Variable API ############## def use_efivars(self): rel = platform.release() ind = rel.find('.') major = rel[:ind] minor = rel[ind+1:rel.find('.', ind+1)] return (int(major) >= 3) and (int(minor) >= 10) # # Legacy /efi/vars methods # def VARS_get_efivar_from_sys( self, filename ): off = 0 buf = list() hdr = 0 try: f =open('/sys/firmware/efi/vars/'+filename+'/data', 'r') data = f.read() f.close() f = open('/sys/firmware/efi/vars/'+filename+'/guid', 'r') guid = (f.read()).strip() f.close() f = open('/sys/firmware/efi/vars/'+filename+'/attributes', 'r') attrstring = f.read() attr = 0 if fnmatch.fnmatch(attrstring, '*NON_VOLATILE*'): attr |= EFI_VARIABLE_NON_VOLATILE if fnmatch.fnmatch(attrstring, '*BOOTSERVICE*'): attr |= EFI_VARIABLE_BOOTSERVICE_ACCESS if fnmatch.fnmatch(attrstring, '*RUNTIME*'): attr |= EFI_VARIABLE_RUNTIME_ACCESS if fnmatch.fnmatch(attrstring, '*ERROR*'): attr |= EFI_VARIABLE_HARDWARE_ERROR_RECORD if fnmatch.fnmatch(attrstring, 'EFI_VARIABLE_AUTHENTICATED_WRITE_ACCESS'): attr |= EFI_VARIABLE_AUTHENTICATED_WRITE_ACCESS if fnmatch.fnmatch(attrstring, '*TIME_BASED_AUTHENTICATED*'): attr |= EFI_VARIABLE_TIME_BASED_AUTHENTICATED_WRITE_ACCESS if fnmatch.fnmatch(attrstring, '*APPEND_WRITE*'): attr |= EFI_VARIABLE_APPEND_WRITE f.close() except Exception, err: logger().error('Failed to read files under /sys/firmware/efi/vars/'+filename) data = "" guid = 0 attr = 0 finally: return (off, buf, hdr, data, guid, attr) def VARS_list_EFI_variables ( self, infcls=2 ): varlist = os.listdir('/sys/firmware/efi/vars') variables = dict() for v in varlist: name = v[:-37] if name and name is not None: variables[name] = [] var = self.VARS_get_efivar_from_sys(v) # did we get something real back? (off, buf, hdr, data, guid, attr) = var if data != "" or guid != 0 or attr != 0: variables[name].append(var) return variables def VARS_get_EFI_variable( self, name, guid ): if not name: name = '*' if not guid: guid = '*' for var in os.listdir('/sys/firmware/efi/vars'): if fnmatch.fnmatch(var, '%s-%s' % (name,guid)): (off,buf,hdr,data,guid,attr) = self.VARS_get_efivar_from_sys(var) return data def VARS_set_EFI_variable(self, name, guid, value ): ret = True if not name: name = '*' if not guid: guid = '*' for var in os.listdir('/sys/firmware/efi/vars'): if fnmatch.fnmatch(var, '%s-%s' % (name,guid)): try: f = open('/sys/firmware/efi/vars/'+var+'/data', 'w') f.write(value) except Exception, err: logger().error('Failed to write EFI variable. %s' % err) ret = False finally: pass return ret # # New (kernel 3.10+) /efi/efivars methods # def EFIVARS_get_efivar_from_sys( self, filename ): guid = filename[filename.find('-')+1:] off = 0 buf = list() hdr = 0 try: f = open('/sys/firmware/efi/efivars/'+filename, 'r') data = f.read() attr = struct.unpack_from("