# coding: utf-8
"""
defines:
- mydict = load_obj_from_hdf5(hdf5_filename, log=None, debug=False)
- mydict = load_obj_from_hdf5_file(mydict, h5_file, log=None, debug=False)
- export_obj_to_hdf5(hdf5_filename, mydict)
- export_obj_to_hdf5_file(hdf5_file, mydict)
Supports:
- integers, floats, None, strings, unicode, lists, tuple,
- numpy arrays (including NaN),
- objects (including custom objects),
- scikit-learn StandardScalar
- minimal dependencies (e.g., no scikit-learn)
Limitations:
- Dictionary keys must be strings/unicode
- May run into problems if you have two classes with the same name,
but point to different locations. There is some support for this,
but hopefully you aren’t using it.
"""
#from types import MethodType, FunctionType
from typing import List, Optional, Any
import h5py
from h5py._hl.dataset import Dataset
import numpy as np
from cpylog import get_logger2
from pyNastran.utils import object_attributes, check_path
from pyNastran.utils.numpy_utils import integer_types, float_types
#integer_types = (int, np.int32, np.int64)
#float_types = (float, np.float32, np.float64)
#---------------------------------------------------------------------------------------------
[docs]
def export_obj_to_hdf5(hdf5_filename, obj, user_custom_types=None, log=None, debug=False):
"""exports an object to an HDF5 file"""
log = get_logger2(log=log, debug=debug, encoding='utf-8')
with h5py.File(hdf5_filename, 'w') as hdf5_file:
log.info('starting export_op2_to_hdf5_file of %r' % hdf5_filename)
export_obj_to_hdf5_file(
hdf5_file, obj, user_custom_types=user_custom_types, log=log)
[docs]
def export_obj_to_hdf5_file(hdf5_file, obj, user_custom_types=None, log=None, debug=False):
"""exports an object to an HDF5 file object"""
exporter = HDF5Exporter(hdf5_file, user_custom_types=user_custom_types, log=log, debug=debug)
exporter._create_dict_group(hdf5_file, obj, exporter.user_custom_types, nlevels=0)
[docs]
class HDF5Exporter:
def __init__(self, hdf5_file, user_custom_types=None, log=None, debug=False):
log = get_logger2(log=log, debug=debug, encoding='utf-8')
if user_custom_types is None:
user_custom_types = []
self.user_custom_types = user_custom_types
self.hdf5_file = hdf5_file
self.log = log
def _create_dict_group(self, hdf5_file, mydict, user_custom_types, nlevels):
"""creates the info HDF5 group"""
key_types = [key.__class__.__name__ for key in mydict.keys()]
ukey_types = set(list(key_types))
#if ukey_types:
#print('ukey_types =', ukey_types)
is_int = 'int' in ukey_types
#if not len(ukey_types) in [0, 1]:
#print('%skey_types = %s' % (nlevels*' ', list(key.__class__.__name__ for key in mydict.keys())))
#if is_int:
#print('%sis_int = %s' % (nlevels*' ', is_int))
#is_int = any(isinstance(key, (integer_types, float_types)) for key in mydict.keys())
if is_int:
sub_group = hdf5_file.create_group('dict_keys')
if len(ukey_types):
sub_group.attrs['key_type'] = list(ukey_types)[0]
else:
raise NotImplementedError('variable type key')
#key_types2 = (str(key) for key in mydict.keys())
for key, value in sorted(mydict.items()):
skey = str(key)
self._add_dataset(hdf5_file, skey, value, user_custom_types, nlevels+1)
else:
for key, value in sorted(mydict.items()):
self._add_dataset(hdf5_file, key, value, user_custom_types, nlevels+1)
def _add_attrs(self, sub_group, obj, attrs, user_custom_types, nlevels):
#print(nlevels*' ', sub_group, obj)
for attr in attrs:
#print('%sattr %s' % ((nlevels+1)*' ', attr))
if not hasattr(obj, attr):
# an attribute is not required, but may exist
continue
value = getattr(obj, attr)
#if not isinstance(value, (dict, list)):
print('%sattr %s %s' % ((nlevels+1)*' ', attr, type(value)))
self._add_dataset(sub_group, attr, value, user_custom_types, nlevels+1)
def _add_dataset(self, hdf5_file, key, value, user_custom_types, nlevels):
#print(key, type(key))
#print(key, type(key), value, type(value))
if value is None:
# Nones can't be stored, so we create a custom type
sub_group = hdf5_file.create_group(key)
sub_group.attrs['type'] = 'None'
return
if isinstance(key, (integer_types, float_types)):
raise TypeError('key=%r; key must be a string, not %s\nvalue:\n%r' % (key, type(key), value))
custom_types_list = user_custom_types + ['BDF', 'OP2', 'OP2Geom', 'StandardScaler', 'lil_matrix']
class_name = value.__class__.__name__
if isinstance(value, dict):
try:
sub_group = hdf5_file.create_group(key)
except Exception:
print('key = %s; type=%s' % (key, type(key)))
raise
sub_group.attrs['type'] = 'dict'
self._create_dict_group(sub_group, value, user_custom_types, nlevels+1)
elif isinstance(value, (integer_types, float_types, str, np.ndarray)):
try:
hdf5_file.create_dataset(key, data=value)
except (TypeError, RuntimeError):
print('key=%r value=%s type=%s' % (key, str(value), type(value)))
raise
elif isinstance(value, bytes):
# https://docs.h5py.org/en/stable/strings.html
# this is incomplete; we need to flag it as binary
#
#value_bytes = np.void(value)
#hdf5_file.create_dataset(key, data=value_bytes)
raise NotImplementedError(f'bytes is not supported (key={key}; value={value})')
elif isinstance(value, tuple):
add_list_tuple(hdf5_file, key, value, 'tuple', self.log)
elif isinstance(value, list):
add_list_tuple(hdf5_file, key, value, 'list', self.log)
elif isinstance(value, set):
add_list_tuple(hdf5_file, key, value, 'set', self.log)
elif hasattr(value, 'export_hdf5_file'):
#print('export_hdf5_file', key)
sub_group = hdf5_file.create_group(key)
sub_group.attrs['type'] = value.__class__.__name__
value.export_hdf5_file(sub_group, exporter=self)
elif hasattr(value, 'get_h5attrs'):
h5attrs = value.get_h5attrs()
sub_group = hdf5_file.create_group(key)
sub_group.attrs['type'] = value.__class__.__name__
self._add_attrs(sub_group, value, h5attrs, user_custom_types, nlevels+1)
elif hasattr(value, 'object_attributes'):
h5attrs = value.object_attributes(mode='both')
sub_group = hdf5_file.create_group(key)
sub_group.attrs['type'] = value.__class__.__name__
self._add_attrs(sub_group, value, h5attrs, user_custom_types, nlevels+1)
elif class_name == 'lil_matrix':
h5attrs = ['dtype', 'shape', 'ndim', 'nnz'] # 'data', 'rows'
sub_group = hdf5_file.create_group(key)
sub_group.attrs['type'] = value.__class__.__name__
self._add_attrs(sub_group, value, h5attrs, user_custom_types, nlevels+1)
elif class_name == 'dtype':
h5attrs = ['dtype']
sub_group = hdf5_file.create_group(key)
sub_group.attrs['type'] = value.__class__.__name__
self._add_attrs(sub_group, value, h5attrs, user_custom_types, nlevels+1)
elif class_name in custom_types_list:
attrs = object_attributes(value, mode='both', keys_to_skip=None)
sub_group = hdf5_file.create_group(key)
sub_group.attrs['type'] = class_name
self._add_attrs(sub_group, value, attrs, user_custom_types, nlevels+1)
else:
#print('string_types =', string_types)
print('value =', value)
msg = (
'key=%r Type=%r is not in custom_types=%s and does not have:\n'
' - export_hdf5_file(h5_file)\n'
' - object_attributes()\n'
' - get_h5attrs(self)' % (
key, class_name, custom_types_list))
raise TypeError(msg)
#raise TypeError(type(value))
[docs]
def add_list_tuple(hdf5_file, key, value, Type: str, log):
"""
tuples are indistinguishable from lists as a dataset,
so we'll store it as a numpy array, list it, and then tuple it back
lists/tuples with numpy unicode are special
"""
try:
sub_group = hdf5_file.create_group(key)
except ValueError: # pragma: no cover
print('key=%s is duplicated' % key)
print('value = ', value)
raise
sub_group.attrs['type'] = Type
try:
sub_group.create_dataset('value', data=value)
except TypeError:
# contains unicode
sub_group.attrs['type'] = Type
for i, valuei in enumerate(value):
try:
sub_group.create_dataset(str(i), data=valuei)
except TypeError:
log.error('key=%r Type=%r' % (key, Type))
print('value = %s' % str(value))
print('value[%i] = %s' % (i, str(valuei)))
raise
[docs]
def load_obj_from_hdf5(hdf5_filename: str, custom_types_dict=None, log=None, debug=False):
"""
loads an hdf5 file into an object
Parameters
----------
hdf5_filename : str
the h5 filename to load
custom_types_dict : dict[key] : function()
the custom mapper
"""
check_path(hdf5_filename, 'hdf5_filename')
log = get_logger2(log=log, debug=debug, encoding='utf-8')
log.info('hdf5_filename = %r' % hdf5_filename)
model = {}
with h5py.File(hdf5_filename, 'r') as h5_file:
load_obj_from_hdf5_file(model, h5_file, custom_types_dict=custom_types_dict, log=log, debug=debug)
return model
[docs]
def load_obj_from_hdf5_file(model, h5_file, log=None, custom_types_dict=None, debug=False):
"""loads an h5 file object into an dict object"""
importer = HDF5Importer(h5_file, custom_types_dict=custom_types_dict, log=log, debug=debug)
importer.load(model, h5_file)
[docs]
class HDF5Importer:
def __init__(self, h5_file, custom_types_dict=None, log=None, debug=False, encoding='utf8'):
self.log = get_logger2(log=log, debug=debug, encoding='utf-8')
if custom_types_dict is None:
custom_types_dict = {}
self.custom_types_dict = custom_types_dict
self.h5_file = h5_file
[docs]
def load(self, model, h5_file, self_obj=None):
for key in h5_file.keys():
self._load_value(model, h5_file, key, self.custom_types_dict, self_obj, nlevels=1, encoding='utf8')
def _load_value(self, model, h5_file, key, custom_types_dict, self_obj, nlevels, encoding='utf8'):
value = h5_file.get(key)
keys = None
if not hasattr(value, 'attrs'):
#print('%s%s %s' % ((nlevels)*' ', key, value))
#print("%sno_attrs_cast" % ((nlevels)*' '))
value2 = cast(h5_file, key, value, nlevels)
model[key] = value2
return
# group
attrs = value.attrs
keys = list(attrs.keys())
if not keys:
# not bytes
#
#print('%s%s %s' % ((nlevels)*' ', key, value))
#print("%sno_keys_cast" % ((nlevels)*' '))
value2 = cast(h5_file, key, value, nlevels)
if isinstance(value2, bytes):
value2 = value2.decode(encoding)
model[key] = value2
return
# keys exist
#print('%s%s %s %s' % ((nlevels)*' ', key, value, keys))
Type = None
if 'type' in keys:
Type = value.attrs['type']
#print('%sType=%s' % ((nlevels)*' ', Type))
keys.remove('type')
key_type = None
if 'key_type' in keys:
self.log.warning('not handling key_type for %s' % value)
key_type = value.attrs['key_type']
#print('%sType=%s' % ((nlevels)*' ', key_type))
keys.remove('key_type')
return
function = None
if 'function' in keys:
function = value.attrs['function']
keys.remove('function')
if keys:
raise NotImplementedError(keys)
if function is not None:
_function_data = getattr(self_obj, function)(self, model, h5_file, key, value)
model[key] = _function_data
return
if Type == 'dict':
self._load_dict(model, h5_file, key, value, custom_types_dict, self_obj,
nlevels+1, print_dict=False)
elif Type == 'None':
model[key] = None
elif Type == 'list':
_list = self._load_mixed_tuple_list(h5_file, key, value, custom_types_dict, self_obj, nlevels+1)
model[key] = _list
elif Type == 'tuple':
_list = self._load_mixed_tuple_list(h5_file, key, value, custom_types_dict, self_obj, nlevels+1)
model[key] = tuple(_list)
elif Type == 'set':
_list = self._load_mixed_tuple_list(h5_file, key, value, custom_types_dict, self_obj, nlevels+1)
model[key] = set(_list)
elif Type in custom_types_dict:
try:
obj = self._load_custom_type(h5_file, Type, key, value, custom_types_dict,
self_obj, nlevels)
except Exception:
msg = ('Cannot load custom type: %s. Try setting:\n'
' - load_hdf5_file\n'
' - function\n' % (Type))
self.log.error(msg)
raise
model[key] = obj
else:
print('%s%s %s %s' % ((nlevels)*' ', key, value, keys))
custom_type_keys = list(custom_types_dict.keys())
custom_type_keys.sort()
raise TypeError('Type=%r is not in custom_types_dict=%s' % (Type, custom_type_keys))
#print("%stype_cast Type=%s" % ((nlevels)*' ', Type))
#value2 = cast(h5_file, key, value, nlevels)
#model[key] = value2
def _load_mixed_tuple_list(self, h5_file, key, value, custom_types_dict, self_obj, nlevels):
"""
Lists/tuples are stored as lists if the data doesn't contain unicode.
Otherwise, they're stored like dictionaries, with string indices that are
integer values.
"""
keys = value.keys()
is_unicode_list = '0' in keys
if is_unicode_list:
mylist = self._load_unicode_list(h5_file, key, value, custom_types_dict, self_obj, nlevels)
else:
temp_dict = {}
sub_h5 = value
self._load_value(temp_dict, sub_h5, 'value', custom_types_dict, self_obj, nlevels+2)
mylist = temp_dict['value']
return mylist
def _load_unicode_list(self, h5_file, key, value, custom_types_dict, self_obj, nlevels):
"""
We have a dictionary like:
data = {
'1' : value1,
'2' : value2,
'3' : value3,
}
We do this because we need to worry about unicode
"""
temp_dict = {}
sub_h5 = value
self._load_dict(temp_dict, h5_file, key, value, custom_types_dict, self_obj, nlevels, print_dict=False)
mydict = temp_dict[key]
nvalues = len(mydict)
mylist = [None] * nvalues
for int_key, valuei in mydict.items():
i = int(int_key)
mylist[i] = valuei
return mylist
def _load_custom_type(self, h5_file, Type, key, value, custom_types, self_obj, nlevels):
"""
The following custom methods can/should be defined in a class:
- init_from_empty()
- _init_from_self(parent)
- get_custom_types()
"""
class_instance = custom_types[Type]
#print('******Type=%r' % Type)
if hasattr(class_instance, '_init_from_empty'):
obj = class_instance._init_from_empty()
elif hasattr(class_instance, '_init_from_self'):
#print('self_obj', self_obj)
obj = class_instance._init_from_self(self_obj)
else:
try:
obj = class_instance()
except Exception:
self.log.error('%s cannot load with 0 arguments' % Type)
raise
temp_dict = {}
local_custom_types = custom_types
if hasattr(obj, 'load_hdf5_file'):
keys = list(value.keys())
obj.load_hdf5_file(value)
return obj
if hasattr(obj, 'get_custom_types'):
local_custom_types = obj.get_custom_types()
#print('local_custom_types =', local_custom_types)
print(key, value)
self._load_dict(temp_dict, h5_file, key, value, local_custom_types,
obj, nlevels+1, print_dict=False)
#print('!!!!!!', temp_dict)
for keyi, valuei in sorted(temp_dict[key].items()):
#print('&& %s %s' % (keyi, valuei))
try:
setattr(obj, keyi, valuei)
except AttributeError:
self.log.error('cant set %r as %s; is this a property?' % (keyi, valuei))
continue
return obj
def _load_dict(self, model, h5_file, key, value, custom_types, self_obj, nlevels, print_dict=True):
#print('%svalue = %s' % (nlevels*' ', value))
keys = value.keys()
new_dict = {}
sub_h5 = value
for keyi in sorted(keys):
self._load_value(new_dict, sub_h5, keyi, custom_types, self_obj, nlevels+2)
if print_dict:
print('%s%s' % (nlevels*' ', str(new_dict)))
model[key] = new_dict
[docs]
def cast(h5_file: Dataset, key: str, value, nlevels: int):
"""casts a value"""
# value
#print('%s****castingA' % (nlevels*' '))
#print(key, value)
try:
value2 = _cast(h5_file.get(key))
except AttributeError:
print(key)
raise
#print('%s****%s' % (nlevels*' ', value2))
#print('%s %r : %s %s' % (nlevels*' ', key, value2, type(value2)))
return value2
def _cast(h5_result_attr) -> Optional[Any]:
"""converts the h5py type back into the actual type"""
if h5_result_attr is None:
return None
if len(h5_result_attr.shape) == 0:
# np.int32/np.float32/np.str_
# calling tolist() doesn't make it a list; it makes it an int/float/str
out = np.array(h5_result_attr).tolist()
#raise NotImplementedError(h5_result_attr.dtype)
else:
out = np.array(h5_result_attr)
if out.dtype.name == 'object':
out = out.tolist()
#assert not isinstance(out, bytes), f'out={out!r}'
#if isinstance(out, bytes):
#out = out.decode(encoding)
return out
def _cast_array(h5_result_attr) -> Optional[Any]:
"""converts the h5py type back into the actual type"""
if h5_result_attr is None:
return None
if len(h5_result_attr.shape) == 0:
# np.int32/np.float32/np.str_
# calling tolist() doesn't make it a list; it makes it an int/float/str
out = np.array(h5_result_attr).tolist()
raise RuntimeError(out)
#raise NotImplementedError(h5_result_attr.dtype)
else:
out = np.array(h5_result_attr)
#assert not isinstance(out, bytes), f'out={out!r}'
#if isinstance(out, bytes):
#out = out.decode(encoding)
return out
[docs]
def cast_strings(group, encoding: str) -> list[str]:
bytes_list = _cast(group)
str_list = [bytesi.decode(encoding) for bytesi in bytes_list]
return str_list
[docs]
def cast_string(h5_result_attr, encoding: str) -> Optional[str]:
"""converts the h5py type back into the actual type"""
if h5_result_attr is None:
return None
if len(h5_result_attr.shape) == 0:
out = np.array(h5_result_attr).tolist()
#out_str = out_bytes.decode(encoding)
#out_lst = np.array(h5_result_attr).tolist()
#raise NotImplementedError(f'dtype={h5_result_attr.dtype}; out_lst={out_lst!r}')
else: # pragma: no cover
out = np.array(h5_result_attr)
raise NotImplementedError(f'dtype={h5_result_attr.dtype}; out_bytes={out_bytes!r}')
if isinstance(out, str):
return out
elif isinstance(out, bytes):
out_str = out.decode(encoding)
else: # pragma: no cover
raise NotImplementedError(f'dtype={h5_result_attr.dtype}; out={out!r}')
#print(f'name={h5_result_attr.name} out={out_str!r} type={type(out_str)}')
return out_str