Source code for qharv.reel.ascii_out

# Author: Yubo "Paul" Yang
# Email: yubo.paul.yang@gmail.com
# Routines to parse ASCII output. Mostly built around mmap's API.
#  The central object is mmap.mmap, which is usually named "mm".
from mmap import mmap

[docs]def read(fname): """ get a memory map pointer to file Args: fname (str): filename Return: mmap.mmap: memory map to file """ with open(fname, 'r+') as f: mm = mmap(f.fileno(), 0) return mm
[docs]def stay(read_func, *args, **kwargs): """ stay at current memory location after read Args: Callable: read function, which takes mmap as first input Return: Callable: read but no change to mmap tell() """ def wrapper(mm, *args, **kwargs): idx = mm.tell() ret = read_func(mm, *args, **kwargs) mm.seek(idx) return ret return wrapper
[docs]@stay def get_key_value_pairs(mm, sep='='): """ read all key value pairs using separator Args: mm (mmap.mmap): memory map Return: dict: string->string key-value pairs """ idxl = all_lines_with_tag(mm, sep) entry = {} for idx in idxl: mm.seek(idx) ibegin = mm.rfind(b'\n', 0, idx) mm.seek(ibegin) line = mm.readline() # skip \n line = mm.readline() tokens = line.split(sep.encode()) name = tokens[0].strip() # strip whitespace val = tokens[1].strip() entry[name.decode()] = val.decode() return entry
[docs]@stay def name_sep_val(mm, name, sep='=', dtype=float, pos=1): """ read key-value pair such as "name = value" e.g. name_sep_val(mm, 'a'): 'a = 2.4' name_sep_val(mm, 'volume', pos=-2): 'volume = 100.0 bohr^3' name_sep_val(mm, 'key', sep=':'): 'key:val' name_sep_val(mm, 'new', sep=':'): 'new:name' name_sep_val(mm, 'natom', dtype=int): 'new:name' Args: mm (mmap.mmap): memory map name (str): name of variable; used to find value line sep (str, optional): separator, default '=' dtype (type, optional): variable data type, default float pos (int, optiona): position of value in line, default last (-1) Return: dtype: value of requested variable """ idx = mm.find(name.encode()) if idx == -1: raise RuntimeError(name+' not found') mm.seek(idx) line = mm.readline().decode() tokens = line.split(sep) # assume the text immediately next to the separator is the desired value val_text = tokens[pos].split()[0] val = dtype(val_text) return val
[docs]@stay def all_lines_with_tag(mm, tag, nline_max=1024*1024): """ return a list of memory indices pointing to the start of tag the search is conducted starting from the current location of mm. Args: mm (mmap.mmap): memory map to file tag (str): tag to identify lines to look for nline_max (int, optional): maximum number of lines to look for , default is 2^20. Error will be raised if max is too low Return: list: a list of memory locations of all found tags """ all_idx = [] for iline in range(nline_max): idx = mm.find(tag.encode()) if idx == -1: break mm.seek(idx) all_idx.append(idx) mm.readline() # guard if iline >= nline_max-1: raise RuntimeError('may need to increase nline_max') return all_idx
[docs]@stay def all_lines_at_idx(mm, idx_list): """ return a list of lines given a list of memory locations follow up on all_lines_with_tag e.g. all_lines_at_idx(mm, all_lines_with_tag(mm, 'Atom') ) reads ''' Atom 0 0 0 0 Atom 1 1 1 1 Atom 2 2 2 2 ''' Args: mm (mmap.mmap): memory map to file idx_list (list): a list of memory locations (int) Return: list: a list of strings, each being the line at idx """ lines = [] for idx in idx_list: mm.seek(idx) # row back to beginning of line ibegin = mm.rfind('\n') if ibegin == -1: ibegin = 0 mm.seek(ibegin) mm.readline() # read desired line line = mm.readline() lines.append(line) return lines
[docs]@stay def locate_block(mm, header, trailer, force_head=False, force_tail=False, skip_header=True, skip_trailer=None): """ find the memory locations bounding a block of text in between header and trailer; header and trailer are not included by default e.g. see block_text Args: mm (mmap.mmap): memory map to text file header (str): string indicating the beginning of block trailer (str): string indicating the end of block skip_head (bool, optional): skip header, default is True skip_trailer (bool, optional): skip trailer, default is True, unless force_tail, then default to False Return: tuple: (begin_idx, end_idx), memory span of text block """ if skip_trailer is None: skip_trailer = not force_tail begin_idx = mm.find(header.encode()) if begin_idx == -1: if force_head: begin_idx = 0 else: raise RuntimeError('failed to find "%s"' % header) if skip_header: mm.seek(begin_idx) mm.readline() begin_idx = mm.tell() end_idx = mm.find(trailer.encode()) if (end_idx == -1) and (not force_tail): raise RuntimeError('failed to find "%s"' % trailer) if not skip_trailer: whence = 1 # seek from current location by default if force_tail: whence = 2 # seek from end of file mm.seek(end_idx, whence) mm.readline() end_idx = mm.tell() return begin_idx, end_idx
[docs]def block_text(mm, header, trailer, **kwargs): """ find a block of text in between header and trailer header and trailer are not included by default e.g. given text in mm ''' begin important data 1 2 3 4 5 6 7 8 9 end important data ''' mm.block_text(mm, 'begin', 'end') returns '''1 2 3 4 5 6 7 8 9 ''' Args: mm (mmap.mmap): memory map to text file header (str): string indicating the beginning of block trailer (str): string indicating the end of block """ bidx, eidx = locate_block(mm, header, trailer, **kwargs) return mm[bidx:eidx].decode()
[docs]def lr_mark(line, lmark, rmark): """ read a string segment from line, which is enclosed between l&rmark e.g. extract the contents in parenteses Args: line (str): text line lmark (str): left marker, e.g. '(' rmark (str): right marker, e.g. ')' Return: str: text in between left and right markers """ lidx = line.find(lmark) assert lidx != -1 ridx = line.find(rmark) assert ridx != -1 return line[lidx+1:ridx]
[docs]def name_val_table(text, dtype=float): """ designed to parse optVariables text block e.g. '''uu_0 1.0770e+00 1 1 ON 0 uu_1 6.7940e-01 1 1 ON 1 uu_2 4.3156e-01 1 1 ON 2 ud_0 1.6913e+00 1 1 ON 5 ud_1 1.0443e+00 1 1 ON 6 ud_2 6.1912e-01 1 1 ON 7 ''' return variable-value map, only the first two columns are parsed. Args: text (str): text block such as given in the example dtype (type): data type of value, default is float Return: dict: variable name -> value map """ lines = text.split('\n')[:-1] var_dict = {} for line in lines: tokens = line.split() name = tokens[0] val = dtype(tokens[1]) var_dict[name] = val return var_dict
[docs]def change_line(text, t0, t1): text1 = '' for line in text.split('\n'): if t0 in line: text1 += t1 else: text1 += line text1 += '\n' return text1.strip('\n')