import os.path import struct from namedstruct import Struct class FatError(Exception): pass class Attrib: EMPTY = 0x00 READONLY = 0x01 HIDDEN = 0x02 SYSTEM = 0x04 VOLUME = 0x08 SUBDIR = 0x10 ARCHIVE = 0x20 DEVICE = 0x40 LONGNAME = 0x0F BootSector = Struct('BootSector', (None, '=3x'), # jump to boot code ('oem_name', '8s'), ('sector_size', 'H'), ('sectors_per_cluster', 'b'), ('reserved_sectors', 'H'), ('number_of_fats', 'b'), ('max_roots', 'H'), ('total_sectors_16', 'H'), ('media_descriptor', 'b'), ('sectors_per_fat_16', 'H'), ('sectors_per_track', 'H'), ('heads', 'H'), ('hidden_sectors', 'i'), ('total_sectors_32', 'i'), ('sectors_per_fat_32', 'i'), ('flags', 'H'), ('version', 'H'), ('root_start', 'i'), ('fs_info_sector', 'H'), ('copy_of_boot_sector', 'H'), (None, '12x'), # reserved ('phy_drive_number', 'b'), (None, 'x'), # reserved ('ext_boot_sig', 'b'), ('ID', 'i'), ('volume_label', '11s'), ('FAT_type', '8s'), (None, '420x'), # O.S. boot code ('boot_sector_signature', 'H'), # always 0x55 0xAA expected_size = 512) DirEntry = Struct('DirEntry', ('short_name', '=8s'), ('short_ext', '3s'), ('attrib', 'b'), (None, 'x'), ('create_time_fine', 'b'), ('create_time', 'H'), ('create_date', 'H'), ('last_access_date', 'H'), ('first_cluster_high', 'H'), ('last_modified_time', 'H'), ('last_modified_date', 'H'), ('first_cluster_low', 'H'), ('file_size', 'i'), expected_size = 32) LongNameEntry = Struct('LongNameEntry', ('seq_num', '=b'), ('name0', '10s'), ('attrib', 'b'), (None, 'x'), ('checksum', 'b'), ('name1', '12s'), ('first_cluster_low', 'H'), ('name2', '4s'), expected_size = 32) class FAT(object): def __init__(self, f): self.f = f self.bs = bs = self.read_boot_sector() self.cluster_begin_lba = bs.reserved_sectors + (bs.number_of_fats * bs.sectors_per_fat_32) self.cluster_size = bs.sectors_per_cluster * bs.sector_size def read_boot_sector(self): self.f.seek(0) bs = BootSector.unpack(self.f.read(512)) if bs.FAT_type.strip() != 'FAT32': raise FatError('The disk seems not to be formatted as FAT32') return bs def read_fat(self, i): bs = self.bs fat_pos = bs.reserved_sectors * bs.sector_size fat_size = bs.sectors_per_fat_32 * bs.sector_size item_pos = fat_pos + (i * 4) self.f.seek(item_pos) item = self.f.read(4) return struct.unpack('I', item)[0] def is_last_cluster(self, cluster): cluster &= 0x0FFFFFFF return 0x0FFFFFF8 <= cluster <= 0x0FFFFFFF def get_cluster_chain(self, start_cluster): chain = [] i = start_cluster while True: chain.append(i) value = self.read_fat(i) if self.is_last_cluster(value): break i = value return chain def read_file(self, entry): first_cluster = self.get_first_cluster(entry) chain = self.get_cluster_chain(first_cluster) parts = [self.read_cluster(cluster) for cluster in chain] content = ''.join(parts) return content[:entry.file_size] def lba_addr(self, cluster): return self.cluster_begin_lba + (cluster - 2) * self.bs.sectors_per_cluster def pos(self, cluster): return self.lba_addr(cluster) * self.bs.sector_size def decode_longname(self, longname): if not longname: return '' try: i = longname.index('\x00\x00') longname = longname[:i+1] except ValueError: pass return longname.decode('utf-16') def read_cluster(self, cluster): dir_pos = self.pos(cluster) self.f.seek(dir_pos) return self.f.read(self.cluster_size) def read_entries_from_cluster(self, cluster): num_entries = self.cluster_size / DirEntry.size dir_pos = self.pos(cluster) self.f.seek(dir_pos) entries = [] for i in range(num_entries): pos = self.f.tell() bytes = self.f.read(DirEntry.size) attrib = ord(bytes[11]) if attrib == Attrib.EMPTY: entries.append((pos, None)) continue elif attrib == Attrib.LONGNAME: entry = LongNameEntry.unpack(bytes) entries.append((pos, entry)) else: entry = DirEntry.unpack(bytes) entries.append((pos, entry)) return entries def read_directory(self, start_cluster): chain = self.get_cluster_chain(start_cluster) entries = [] for cluster in chain: entries += self.read_entries_from_cluster(cluster) return self.dir_entries_to_list_of_files(entries) def dir_entries_to_list_of_files(self, entries): files = [] longname = '' for pos, entry in entries: if entry is None: # empty continue elif entry.attrib == Attrib.LONGNAME: longname = entry.name0 + entry.name1 + entry.name2 + longname elif entry.short_name[0] == '\xE5': longname = '' continue else: longname = self.decode_longname(longname) files.append((longname, pos, entry)) longname = '' return files def read_root(self): return self.read_directory(self.bs.root_start) def get_first_cluster(self, entry): low = entry.first_cluster_low high = entry.first_cluster_high return high << 16 | low def lookup(self, fullpath): if fullpath and fullpath[0] != '/': raise FatError('%s is not an absolute path' % fullpath) fullpath = fullpath.lower() path, filename = os.path.split(fullpath) if path == '/': # base case files = self.read_root() else: # recursive case _, _, direntry = self.lookup(path) assert direntry.attrib | Attrib.SUBDIR files = self.read_directory(self.get_first_cluster(direntry)) for filename2, pos, entry in files: if filename == filename2.lower(): # found! return filename2, pos, entry raise FatError('Cannot find %s' % fullpath) def write_entry(self, pos, entry): assert isinstance(entry, DirEntry.T) s = DirEntry.pack(entry) self.f.seek(pos) self.f.write(s) def hard_link(self, srcpath, dstpath): """WARNING: this operation is not officially supported by FAT fileystems""" _, _, src = self.lookup(srcpath) _, pos, dst = self.lookup(dstpath) if dst.file_size != 0: raise FatError("Destination file %s must be empty" % dstpath) new_dst = src._replace(short_name = dst.short_name, short_ext = dst.short_ext, attrib = src.attrib | Attrib.READONLY) self.write_entry(pos, new_dst) def unlink(self, path): _, pos, entry = self.lookup(path) short_name = chr(0xE5) + entry.short_name[1:] newentry = entry._replace(short_name = short_name) self.write_entry(pos, newentry)