Source code for qnd.generic

"""Generic file or file family open.

"""
from __future__ import absolute_import

from os.path import expanduser, expandvars, abspath, exists, commonprefix
from os.path import getsize
from itertools import count as itertools_count
import glob
import re
import sys

from numpy import int64, uint64  # Needed to work around Windows misfeatures.

PY2 = sys.version_info < (3,)
if PY2:
    range = xrange
    chr = unichr
else:
    basestring = str
_glob_group = re.compile(r'(\*|\?|\[.+?\])+')
_glob_digit = re.compile(r'(?:\*|\?|\[0-9\])')
_glob_ranges = re.compile(r'[^-[](?:-[^-])?')
_digits_format = re.compile(r'%(?:0(\d+))?d')


[docs]def opener(filename, mode, **kwargs): """Generic file or file family opener. Parameters ---------- filename : str Name of file to open. See notes below for family conventions. mode : str One of 'r' (default, read-only), 'r+' (read-write, must exist), 'a' (read-write, create if does not exist), 'w' (create, clobber if exists), 'w-' (create, fail if exists). **kwargs Other keywords. This opener consumes one item from kwargs: nextaddr_mode : int Affects setting of nextaddr for families opened with 'a' or 'r+' mode. 0 (default) sets nextaddr to the end of the final existing file, 1 sets nextaddr to 0 (beginning of first file), and 2 sets nextaddr to the beginning of the next file after all existing files. Returns ------- handle A file handle implementing the generic interface, consisting of:: handle.callbacks(flusher, initializer) addr = handle.next_address() # next unused address f = handle.seek(addr) # return ordinary file handle at addr f = handle.open(n) # open nth file, calling initializer(f) handle.flush() # make file readable, calling flusher(f) # flush() restores next_address to value on entry handle.close() # flush if necessary, then close nexisting : int Number of existing paths matching `filename`. Notes ----- The `filename` may be an iterable, one string per file in order. The sequence may extend beyond the files which actually exist for 'r+', 'a', 'w', or 'w-' modes. Alternatively `filename` specifies a family if it contains shell globbing wildcard characters. Existing matching files are sorted first by length, then alphabetically (ensuring that 'file100' comes after 'file99', for example). If there is only a single wildcard group, it also serves to define a sequence of future family names beyond those currently existing for 'r+', 'a', 'w', or 'w-' modes. A '?' pattern is treated the same as a '[0-9]' pattern if all its matches are digits or if the pattern matches no existing files. Similarly, a '*' acts like the minimum number of all-digit matches, or three digits if there are no matches. A single filename may also contain a ``%d`` or ``%0nd`` print format directive, which will be converted to the corresponding number of ``[0-9]`` glob patterns. """ isstr = isinstance(filename, basestring) if isstr: filename = abspath(expanduser(expandvars(filename))) match = _digits_format.search(filename) if match: # %0nd --> [0-9][0-9]... n times n = int(match.group(1) or '1') # %d --> [0-9] once filename = filename.replace(match.group(0), ''.join(['[0-9]']*n)) match = _glob_group.search(filename) if match: existing = glob.glob(filename) else: existing = [f for f in [filename] if exists(f)] else: match = None filename = [expanduser(expandvars(f)) for f in filename] existing = [f for f in filename if exists(f)] if len(existing) < len(filename): for g, f in zip(existing, filename): if f == g: continue raise IOError("intermediate file {} missing".format(f)) if not filename: raise IOError("sequence of filenames is empty") mode = mode.lower() if existing: existing.sort(key=lambda path: (len(path), path)) if mode.startswith('w-'): raise IOError("protecting existing file {}".format(existing[0])) elif mode.startswith('r'): raise IOError("no file matches {}".format(filename)) if mode.startswith('r'): mode = 'r+b' if mode.startswith('r+') else 'rb' elif mode.startswith('a'): mode = 'r+b' if existing else 'w+b' elif mode.startswith('w'): existing = [] # ignore any existing files mode = 'w+b' else: raise IOError("open mode {} not understood".format(mode)) # Compute: # pattern = pattern containing {...} if either existing or future # existing = list of existing {...} items (int or str) for pattern # future = iterable yielding future {...} items (int or str) for pattern future = None if match: if '+' in mode: prefix, suffix = filename[:match.start()], filename predictable = 2 while match: predictable <<= 1 suffix = suffix[match.end():] match = _glob_group.search(suffix) p, s = len(prefix), -len(suffix) if suffix else None existing = [f[p:s] for f in existing] if predictable: # With a single wildcard group, we may be able to predict # future names in the family. # We handle two cases: # 1. a sequence of *, ?, [0-9] we guess means decimal numbers # 2. a single range like [a-z] we take as sequence of chars fmt = '{}' pat = filename[p:s] chunks = _glob_digit.findall(pat) if ''.join(chunks) == pat: nast = chunks.count('*') ndig = len(chunks) - nast fmt = '{' + ':0{}d'.format(3*nast + ndig) + '}' future = itertools_count(0) else: future = iter(_expand_ranges(pat)) if existing: if all(f.isdigit() for f in existing): # existing matches are all decimal numbers nums = list(map(int, existing)) fmt = '{' + ':0{}d'.format(len(existing[0])) + '}' if all(f == fmt.format(n) for f, n in zip(existing, nums)): existing = nums future = itertools_count(existing[-1] + 1) else: fmt, future = '{}', None elif fmt != '{}': # pattern looked numerical, but matched non-digits fmt, future = '{}', None elif all(len(f) == 1 for f in existing): # existing matches all non-digit single characters final = existing[-1] for f in future: if f == final: break pattern = prefix + fmt + suffix else: filename = existing isstr = False elif isstr: pattern = '{}' if not existing: future = iter([filename]) if not isstr: prefix = commonprefix(filename) if len(filename) > 1: suffix = commonprefix([f[::-1] for f in filename])[::-1] if suffix == prefix: suffix = '' # all filenames identical (not an error??) else: suffix = '' n = len(prefix) m = -len(suffix) if suffix else None existing = [f[n:m] for f in existing] future = iter([f[n:m] for f in filename[len(existing):]]) pattern = prefix + '{}' + suffix return MultiFile(pattern, existing, future, mode, **kwargs), len(existing)
def _expand_ranges(pat): chunks = _glob_ranges.findall(pat) if '[{}]'.format(''.join(chunks)) != pat: return None for i, c in enumerate(chunks): if len(c) != 3: continue c2 = ord(c[2]) if c2 >= 256: return None chunks[i] = ''.join(map(chr, range(ord(c[0]), c2+1))) chunks = ''.join(chunks) return chunks if len(chunks) == len(set(chunks)) else None # MultiFile implements the basic file handle methods used by pdbf.
[docs]class MultiFile(object): """A binary file or family of binary files.""" # Main purpose is to combine address spaces in a file family, so # "address" is (file_index | byte_address) packed into 64 bit int. # Default number of address bits is 42, with top 22 bits reserved # for file index. This allows for 4 million 4 terabyte files, which # should be adequate for most purposes as of 2019. You can always # override abits in an instance if necessary. # The address -1 is reserved to mean "no address", as in NULL pointer. # Force abits to be int64 to work around Windows numpy misfeature in # which 42 becomes int32. We also explicitly convert abits = int64(42) def __init__(self, pattern, existing, future, mode, **kwargs): newfile = not existing nextaddr = int64(0) # Definite type important on Windows. nextaddr_mode = (kwargs.pop('nextaddr_mode', 0) if mode.startswith('r+') else 0) if newfile: try: existing = [next(future)] except (StopIteration, TypeError): # TypeError if future is None raise IOError("filename specified no filename?") else: if not isinstance(existing, list): existing = list(existing) if nextaddr_mode == 0: # In a or r+ mode, should begin with a guess at nextaddr, # which is the end of the last existing file. # This may be modified by the non-generic caller (see pdbf). i = len(existing) - 1 nextaddr = int64(getsize(pattern.format(existing[i]))) nextaddr |= int64(i) << self.abits elif nextaddr_mode == 2: nextaddr = int64(i) << self.abits current = 0 self.f = open(pattern.format(existing[current]), mode) self.state = [mode, pattern, existing, current, future] self._callbacks = None, Ellipsis if newfile else None self.nextaddr = nextaddr
[docs] def callbacks(self, flusher, initializer): """set callback function that flushes file metadata""" newfile = self._callbacks[1] is Ellipsis # set in __init__ only self._callbacks = flusher, initializer if newfile: initializer(self.f)
[docs] def filename(self, n=None): """current or n-th existing filename in family""" _, pattern, existing, current, _ = self.state if n is None: n = current return pattern.format(existing[n])
def filemode(self): return self.state[0]
[docs] def open(self, n): """open n-th file of family""" mode, pattern, existing, current, future = self.state if n == current: return self.f writeable = mode.startswith('w') or '+' in mode isnew = n == len(existing) if n < len(existing): if writeable: mode = 'r+b' member = existing[n] elif not isnew: raise IOError("cannot open file {} in family of {}" "".format(n, len(existing))) elif not writeable: raise IOError("cannot create new file in read-only family") else: # Do not catch StopIteration here. Instead handle in caller # of next_address(newfile=1). if future is None: raise StopIteration member = next(future) if not mode.startswith('w'): mode = 'w+b' if isnew: self.flush() f = open(pattern.format(member), mode) self.f.close() self.state[3] = n self.f = f # Starting nextaddr at 0 is correct only when isnew. # In other cases we leave it set to its most recent known value # in a different file. This assures nextaddr is in last file. if isnew: existing.append(member) self.nextaddr = int64(n) << self.abits initializer = self._callbacks[1] if initializer not in (None, Ellipsis): # Note that initializer may call open recursively, but if # so caller must take care to ensure that such a recursive # call does not reach this point. initializer(f) return f
[docs] def flush(self): """flush metadata and ordinary file buffers""" mode = self.state[0] if mode.startswith('r') and '+' not in mode: return # quick no-op for read-only files existing, current = self.state[2:4] if current + 1 == len(existing): flusher = self._callbacks[0] if flusher is not None: i, nextaddr = self.split_address(self.nextaddr) if nextaddr == 0 and i == current + 1: return # handle special case nextaddr_mode==2 if i != current: raise AssertionError("(BUG) impossible current file value") f = self.f addr = f.tell() f.seek(nextaddr) flusher(f) f.seek(addr) self.f.flush()
[docs] def close(self): """flush and close the current file""" self.flush() self.f.close()
[docs] def current_file(self): """Index of current file in family, argument to open method.""" return self.state[3]
[docs] def zero_address(self, n=None): """multifile address of first byte in current or n-th file""" return int64(self.state[3] if n is None else n) << self.abits
# This is not used by pdbf module, but provide it anyway.
[docs] def tell(self): """return current multi-file address""" current = int64(self.state[3]) abits = self.abits one = int64(1) mask = (one << abits) - one addr = int64(self.f.tell()) if addr & ~mask: raise IOError("file too large for {} bit address".format(abits)) return (current << abits) | addr
[docs] def seek(self, addr): """seek to multi-file address, opening alternate file if needed""" i, addr = self.split_address(addr) f = self.open(i) f.seek(addr) return f
[docs] def split_address(self, addr): """return file index, address for a multifile address""" # There is a serious long standing bug in numpy type promotion rules # which prevents uint64 from being useful when combined with any other # integer type -- numpy will promote uint64 to float64 in a stupid # attempt to find a signed type that can hold the result. # This is completely different from C type promotion rules, in # which signed gets promoted to unsigned (also a bad idea). addr = int64(addr) # even on 64 bit Windows, addr can be int32 one = int64(1) abits = self.abits mask = (one << (int64(64) - abits)) - one i = (addr >> abits) & mask mask = (one << abits) - one return i, addr & mask
[docs] def next_address(self, both=False, newfile=False): """next unused multi-file address, or None if newfile cannot create""" # Use special value of nextaddr as implicit newfile flag. nfiles = len(self.state[2]) if newfile or self.nextaddr == int64(nfiles) << self.abits: try: self.open(nfiles) except StopIteration: # Signal caller that we have run out of filenames. return (None, None) if both else None nextaddr = int64(self.nextaddr) if both: one = int64(1) mask = (one << self.abits) - one return nextaddr, nextaddr & mask return nextaddr
[docs] def declared(self, addr, dtype, nitems): """declare that array has been declared, maybe update next_address""" addr = int64((nitems if dtype is None else nitems * dtype.itemsize) + addr) nextaddr = int64(self.nextaddr) if uint64(addr) > uint64(nextaddr): self.nextaddr = addr