Source code for swmmio.utils.text
# UTILITY FUNCTIONS AIMED AT I/O OPERATIONS WITH TEXT FILES
# STANDARD READING AND WRITING OF TEXT FILES (E.G. .INP AND .RPT)
import os
import re
from io import StringIO
from collections import OrderedDict, deque
from swmmio.defs import INFILTRATION_COLS, INP_SECTION_TAGS, SWMM5_VERSION
from swmmio.utils.functions import format_inp_section_header
from swmmio.defs.sectionheaders import normalize_inp_config
[docs]
def inline_comments_in_inp(filepath, overwrite=False):
"""
with an existing INP file, shift any comments that have been placed above the
element (behavoir from saving in GUI) and place them to the right, inline with
the element. To improve readability
"""
newfilename = os.path.splitext(os.path.basename(filepath))[0] + '_unGUI.inp'
newfilepath = os.path.join(os.path.dirname(filepath), newfilename)
allheaders = get_inp_sections_details(filepath)
with open(filepath) as oldf:
with open(newfilepath, 'w') as new:
# to hold list of comments (handles multiline comments)
comment_concat = []
current_section = list(allheaders.keys())[0]
for line in oldf:
# determine what section we are in by noting when we pass double brackets
if '[' and ']' in line:
current_section = line.strip()
if len(line.strip()) > 1:
if line.strip()[0] == ';' and ''.join(line.strip()[:2]) != ';;':
# this is a comment bc first char is ; and the
# second char is not (which would resemble the header section)
words = line.split()
hdrs = allheaders[current_section]['columns']
perc_match_to_header = float(len([x for x in words if x in hdrs])) / float(len(hdrs))
if perc_match_to_header <= 0.75:
comment_concat.append(line.strip())
else:
# this row has data, tack any comment to the line end
comment_string = ''
if len(comment_concat) > 0:
comment_string = r' '.join(comment_concat)
newlinestring = line.strip() + comment_string + '\n'
new.write(newlinestring)
comment_concat = []
else:
# write the short line
new.write(line)
# rename files and remove old if we should overwrite
if overwrite:
os.remove(filepath)
os.rename(newfilepath, filepath)
[docs]
def extract_section_of_file(file_path, start_strings, end_strings, comment=';', **kwargs):
"""
Extract a portion of a file found between one or more start strings and the first encountered end string.
Parameters
----------
file_path : str
Path to the source file.
start_strings : str or list of str
String or list of strings from which to start extracting.
end_strings : str or list of str
String or list of strings at which to stop extracting.
comment : str, optional
Comment string used to ignore parts of the source file. Defaults to ';'.
**kwargs
Other keyword arguments.
Returns
-------
str
String extracted from source file.
Examples
--------
>>> from swmmio.tests.data import MODEL_FULL_FEATURES_XY
>>> s = extract_section_of_file(MODEL_FULL_FEATURES_XY, '[EVAPORATI', '[', comment=None)
>>> print(s.strip())
[EVAPORATION]
;;Data Source Parameters
;;-------------- ----------------
CONSTANT 0.0
DRY_ONLY NO
"""
if isinstance(end_strings, str):
end_strings = [end_strings]
if isinstance(start_strings, str):
start_strings = [start_strings]
starts_ix = 0
starts_len = len(start_strings)
start_found = False
out_string = ''
with open(file_path, 'r') as f:
for line in f:
# the current string we are searching for
if starts_ix < starts_len:
search_str = start_strings[starts_ix]
if start_found and any(es.upper() in line.upper() for es in end_strings):
# if we found the start and the line contains any of
# the end strings, break out
break
elif not start_found and search_str.upper() in line.upper():
# increment the index of the start strings that have been found
starts_ix += 1
if starts_ix == starts_len:
# we found each of the start strings
start_found = True
if start_found:
if comment is not None:
# ignore anything after a comment
if comment in line:
s = line.split(comment)[0] + '\n'
out_string += s
else:
out_string += line
else:
out_string += line
return out_string
[docs]
def get_rpt_value(file_path, value_type):
"""
scan rpt file and find the line starting with value_type and return the last numeric value
"""
with open(file_path,"r") as fi:
for ln in fi:
if ln.strip().startswith(value_type):
#print(ln)
return ln.split()[-1]
return None
[docs]
def get_rpt_metadata(file_path):
"""
Scan rpt file and extract meta data
:param file_path: path to rpt file
:return: dict of metadata
"""
with open(file_path) as f:
for line in f:
if "Starting Date" in line:
simulation_start = line.split(".. ")[1].replace("\n", "")
if "Ending Date" in line:
simulation_end = line.split(".. ")[1].replace("\n", "")
if "STORM WATER MANAGEMENT MODEL - VERSION" in line:
version = re.search(r"\d+.\d+.\d+", line)
if version is not None:
version = version.group(0).split('.')
swmm_version = {
'major': int(version[0]),
'minor': int(version[1]),
'patch': int(version[2])
}
if "Report Time Step ........." in line:
time_step_min = int(line.split(":")[1].replace("\n", ""))
break
# grab the date of analysis from end of file
with open(file_path) as f:
f.seek(os.path.getsize(file_path) - 500) # jump to 500 bytes before the end of file
for line in f:
if "Analysis begun on" in line:
analysis_date = line.split("Analysis begun on: ")[1].replace("\n", "")
meta_data = dict(
swmm_version=swmm_version,
simulation_start=simulation_start,
simulation_end=simulation_end,
time_step_min=time_step_min,
analysis_date=analysis_date,
)
return meta_data
[docs]
def find_byte_range_of_section(path, start_string):
'''
returns the start and end "byte" location of substrings in a text file
'''
with open(path) as f:
start = None
end = None
l = 0 # line bytes index
for line in f:
if start and line.strip() == "" and (l - start) > 100:
# LOGIC: if start exists (was found) and the current line
# length is 3 or less (length of /n ) and we're more than
# 100 bytes from the start location then we are at the first
# "blank" line after our start section (aka the end of the
# section)
end = l
break
if (start_string in line) and (not start):
start = l
# increment length (bytes?) of current position
l += len(line) + len("\n")
return [start, end]
[docs]
def get_inp_sections_details(inp_path, include_brackets=False):
"""
Creates a dictionary with all the headers found in an INP file
(which varies based on what the user has defined in a given model)
and updates them based on the definitions in inp_header_dict
to ensure the list is comprehensive.
Parameters
----------
inp_path : str
Path to the INP file.
include_brackets : bool, optional
Whether to parse sections including the brackets ([]). Default is False.
Returns
-------
OrderedDict
An ordered dictionary with section headers as keys and their
corresponding details.
Examples
--------
>>> from swmmio.tests.data import MODEL_FULL_FEATURES_XY
>>> headers = get_inp_sections_details(MODEL_FULL_FEATURES_XY)
>>> [header for header, cols in headers.items()][:4]
['TITLE', 'OPTIONS', 'EVAPORATION', 'RAINGAGES']
>>> headers['SUBCATCHMENTS']['columns']
['Name', 'Raingage', 'Outlet', 'Area', 'PercImperv', 'Width', 'PercSlope', 'CurbLength', 'SnowPack']
"""
from swmmio.defs import INP_OBJECTS
import pandas as pd
found_sects = OrderedDict()
with open(inp_path) as f:
txt = f.read()
section_dict = {
key: txt.find("[{}]".format(key)) for key in INP_OBJECTS.keys()
if txt.find("[{}]".format(key)) >= 0
}
section_dict = sorted(section_dict, key=section_dict.get)
bracketed_words = re.findall(r"\[([A-Za-z0-9_]+)\]", txt)
for sect in bracketed_words:
sect_id = f'[{sect.upper()}]' if include_brackets else sect.upper()
if sect not in section_dict:
found_sects[sect_id] = OrderedDict(columns=['blob'])
else:
found_sects[sect_id] = INP_OBJECTS[sect]
# make necessary adjustments to columns that change based on options
ops_cols = INP_OBJECTS['OPTIONS']['columns']
ops_string = extract_section_of_file(inp_path, '[OPTIONS]', INP_SECTION_TAGS, )
options = pd.read_csv(StringIO(ops_string), header=None,
sep='\s+', skiprows=[0], index_col=0,
names=ops_cols)
if 'INFILTRATION' in found_sects:
# select the correct infiltration column names
# fall back to HORTON if invalid/unset infil type
infil_type = options['Value'].get('INFILTRATION', None)
if pd.isna(infil_type):
infil_type = 'HORTON'
infil_cols = INFILTRATION_COLS[infil_type.upper()]
inf_id = 'INFILTRATION'
if include_brackets:
inf_id = '[{}]'.format('INFILTRATION')
# overwrite the dynamic sections with proper header cols
found_sects[inf_id]['columns'] = list(infil_cols)
return found_sects
[docs]
def get_rpt_sections_details(rpt_path: str):
"""
Extracts and returns the details of sections from a SWMM report file.
Parameters
----------
rpt_path : str
The file path to the SWMM report file.
Returns
-------
OrderedDict
An ordered dictionary where the keys are section headers found in the report file
and the values are the corresponding details as defined in the RPT_OBJECTS.
"""
from swmmio.defs import RPT_OBJECTS
found_sects = OrderedDict()
rpt_headers = RPT_OBJECTS.copy()
# get rpt file metadata
meta_data = get_rpt_metadata(rpt_path)
swmm_version = meta_data['swmm_version']
# make necessary adjustments to columns that change based on swmm version
for version in SWMM5_VERSION:
version_value = float(version)
rpt_version = float(f"{swmm_version['minor']}.{swmm_version['patch']}")
if rpt_version >= version_value:
update_rpt = normalize_inp_config(SWMM5_VERSION[version]['rpt_sections'])
rpt_headers.update(update_rpt)
with open(rpt_path) as f:
buff3line = deque()
for line in f:
# maintains a 3 line buffer and looks for instances where
# a top and bottom line have '*****' and records the middle line
# typical of section headers in RPT files
buff3line.append(line)
if len(buff3line) > 3:
buff3line.popleft()
# search for section header between two rows of *'s
if ('***********' in buff3line[0] and
'***********' in buff3line[2] and
len(buff3line[1].strip()) > 0):
header = buff3line[1].strip()
if header in rpt_headers:
found_sects[header] = rpt_headers[header]
else:
# unrecognized section
found_sects[header] = OrderedDict(columns=['blob'])
return found_sects