Source code for ewokscore.persistence.nexus
from pathlib import Path
from typing import Any
from typing import List
from typing import Mapping
from typing import Optional
from typing import Union
import h5py
from silx.io import h5py_utils
from silx.io.dictdump import dicttonx
from silx.io.dictdump import nxtodict
from silx.utils.proxy import docstring
from .._serialization import common
from .._serialization.common import types
from . import atomic
from .file import FileProxy
# @h5py_utils.retry(retry_period=1)
[docs]
def h5_item_exists(path, item):
try:
with h5py_utils.File(path) as f:
return item in f
except Exception:
return False
[docs]
class NexusProxy(FileProxy):
SCHEME = "nexus"
EXTENSIONS = [".nx", "nxs", ".h5", ".hdf5", ".nexus"]
ALLOW_PATH_IN_FILE = True
[docs]
@docstring(FileProxy)
def exists(self) -> bool:
if not super().exists():
return False
return h5_item_exists(self.path, self.path_in_file)
def _dump(
self,
path: Path,
data: Any,
update_mode: str = "add",
serializer: Optional[
Union[types.GraphSerializer, str]
] = types.GraphSerializer.hdf5_pickle,
**_,
) -> None:
prefix = ""
if isinstance(data, Mapping):
h5group = self.path_in_file
else:
h5group = self.path_in_file_parent
h5name = self.path_in_file_name
if h5name:
data = {h5name: data}
prefix = h5name
elif isinstance(data, Mapping):
pass
else:
raise TypeError("'data' must be a dictionary")
def _insert_serialize_info(
data: dict, key: str, serialize_info: types.SerializeInfo
) -> dict:
if serializer is not None:
for k, v in serialize_info.serialize().items():
data[f"{prefix}@{key}{k}"] = v
return data
data = common.pre_serialize(
data, serializer=serializer, insert_serialize_info=_insert_serialize_info
)
with atomic.atomic_write_hdf5(path, h5group) as (h5file, h5group):
dicttonx(
treedict=data,
h5file=h5file,
h5path=h5group,
update_mode=update_mode,
add_nx_class=True,
)
self._set_nx_classes(h5file, self.path_in_file_parts())
@staticmethod
def _set_nx_classes(h5file, parts: List[str]):
h5file.attrs["NX_class"] = "NXroot"
h5group = h5file
for i, name in enumerate(parts):
h5group = h5group[name]
if not isinstance(h5group, h5py.Group):
break
if i == 0:
NX_class = "NXentry"
elif i == 1:
NX_class = "NXprocess"
else:
NX_class = "NXcollection"
h5group.attrs["NX_class"] = NX_class
def _load(self, path: Path, **kw) -> Any:
h5group = self.path_in_file_parent
h5name = self.path_in_file_name
adict = nxtodict(h5file=str(path), path=h5group, **kw)
def _pop_serialize_info(data: dict, key: str) -> Optional[types.SerializeInfo]:
parent = data
if h5name and isinstance(data[h5name], dict):
parent = data[h5name]
prefix = f"@{key}"
nprefix = len(prefix)
keys = [k for k in parent if k.startswith(prefix)]
if not keys:
return None
serialize_info = {k[nprefix:]: parent.pop(k, None) for k in keys}
return types.SerializeInfo.deserialize(serialize_info)
adict = common.post_deserialize(adict, pop_serialize_info=_pop_serialize_info)
if h5name:
return adict[h5name]
return adict