Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate size and capacity of LGDO ArrayLike objects #109

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
545d6dc
Array manages capacity separately from size
Oct 11, 2024
980ad6a
VectorOfVectors dtype is a property
Oct 11, 2024
d0d8ce2
Raise error on insert if i>len
Oct 13, 2024
075a4f1
Add get/set_capacity to VoV and change modifiers to take advantage of…
Oct 13, 2024
1e1fdda
Modify core.read and store.read to resize array when filling in place…
Oct 13, 2024
23a03a6
Changed table to handle capacity and resizing similar to array
Oct 13, 2024
32ceef9
Fixed test
Oct 13, 2024
8d7c1eb
Added abstract base class for LGDO collections
Oct 13, 2024
8a5dcb2
style: pre-commit fixes
pre-commit-ci[bot] Oct 13, 2024
8dd3d75
Appease the pre-commit bot
Oct 13, 2024
5a2e402
Fixed tutorial
Oct 14, 2024
0a24cf9
Fixed docstring error
Oct 14, 2024
0fb6adf
Added tests for capacity and fixed bugs
Oct 14, 2024
03f5ce7
style: pre-commit fixes
pre-commit-ci[bot] Oct 14, 2024
a1cf2b2
Appease pre-commit bot
Oct 14, 2024
0a0bffb
Improve test coverage
Oct 14, 2024
c7c0c28
Merge branch 'main' of https://github.com/legend-exp/legend-pydataobj
Nov 5, 2024
ae4979f
style: pre-commit fixes
pre-commit-ci[bot] Nov 5, 2024
8f98559
Merge branch 'main' of https://github.com/legend-exp/legend-pydataobj
Nov 25, 2024
7afab72
Fixed test
Nov 25, 2024
74cb281
When filling VoV from AoesA, if length of V is longer than A use fill…
Nov 27, 2024
51137a7
Do not return current_i_entry when iterating
Dec 20, 2024
f06ae09
Merge branch 'main' of https://github.com/legend-exp/legend-pydataobj
Dec 20, 2024
2e0f597
Fixed tests
Dec 20, 2024
10193a9
style: pre-commit fixes
pre-commit-ci[bot] Dec 20, 2024
0546407
Merge branch 'main' of https://github.com/legend-exp/legend-pydataobj
Jan 14, 2025
847c19c
Fixed broken test
Jan 14, 2025
f118350
Merge branch 'main' of https://github.com/iguinn/legend-pydataobj
Jan 14, 2025
dc88aa1
Merge branch 'main' of https://github.com/legend-exp/legend-pydataobj
iguinn Jan 21, 2025
582960c
Fixed tutorial notebook
iguinn Jan 24, 2025
d808cd8
Merge branch 'main' of https://github.com/legend-exp/legend-pydataobj
iguinn Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/source/notebooks/LH5Files.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@
"source": [
"from lgdo.lh5 import LH5Iterator\n",
"\n",
"for lh5_obj, entry, n_rows in LH5Iterator(lh5_file, \"geds/raw/energy\", buffer_len=20):\n",
" print(f\"entry {entry}, energy = {lh5_obj} ({n_rows} rows)\")"
"for lh5_obj in LH5Iterator(lh5_file, \"geds/raw/energy\", buffer_len=20):\n",
" print(f\"energy = {lh5_obj} ({len(lh5_obj)} rows)\")"
]
},
{
Expand All @@ -215,7 +215,7 @@
"store = LH5Store(\n",
" keep_open=True\n",
") # with keep_open=True, files are kept open inside the store\n",
"store.read(\"geds/raw\", lh5_file) # returns a tuple: (obj, n_rows_read)"
"store.read(\"geds/raw\", lh5_file)"
]
},
{
Expand Down
6 changes: 3 additions & 3 deletions src/lgdo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ def lh5concat(args=None):
continue

# read as little as possible
obj, _ = store.read(current, h5f0, n_rows=1)
obj = store.read(current, h5f0, n_rows=1)
if isinstance(obj, (Table, Array, VectorOfVectors)):
# read all!
obj, _ = store.read(current, h5f0)
obj = store.read(current, h5f0)
lgdos[current] = obj
elif isinstance(obj, Struct):
# structs might be used in a "group-like" fashion (i.e. they might only
Expand Down Expand Up @@ -309,7 +309,7 @@ def _inplace_table_filter(name, table, obj_list):
log.info(msg)

for name in lgdos:
obj, _ = store.read(name, file)
obj = store.read(name, file)
# need to remove nested LGDOs from obj too before appending
if isinstance(obj, Table):
_inplace_table_filter(name, obj, obj_list)
Expand Down
4 changes: 1 addition & 3 deletions src/lgdo/lh5/_serializers/read/composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,15 +353,13 @@ def _h5_read_table(
table = Table(col_dict=col_dict, attrs=attrs)

# set (write) loc to end of tree
table.loc = n_rows_read
table.resize(do_warn=True)
return table, n_rows_read

# We have read all fields into the object buffer. Run
# checks: All columns should be the same size. So update
# table's size as necessary, warn if any mismatches are found
obj_buf.resize(do_warn=True)
# set (write) loc to end of tree
obj_buf.loc = obj_buf_start + n_rows_read

# check attributes
utils.check_obj_buf_attrs(obj_buf.attrs, attrs, fname, oname)
Expand Down
51 changes: 21 additions & 30 deletions src/lgdo/lh5/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import inspect
import sys
from collections.abc import Mapping, Sequence
from contextlib import suppress
from typing import Any

import h5py
Expand Down Expand Up @@ -92,8 +93,7 @@ def read(
will be set to ``True``, while the rest will default to ``False``.
obj_buf
Read directly into memory provided in `obj_buf`. Note: the buffer
will be expanded to accommodate the data requested. To maintain the
buffer length, send in ``n_rows = len(obj_buf)``.
will be resized to accommodate the data retrieved.
obj_buf_start
Start location in ``obj_buf`` for read. For concatenating data to
array-like objects.
Expand All @@ -106,25 +106,21 @@ def read(

Returns
-------
(object, n_rows_read)
`object` is the read-out object `n_rows_read` is the number of rows
successfully read out. Essential for arrays when the amount of data
is smaller than the object buffer. For scalars and structs
`n_rows_read` will be``1``. For tables it is redundant with
``table.loc``. If `obj_buf` is ``None``, only `object` is returned.
object
the read-out object
"""
if isinstance(lh5_file, h5py.File):
lh5_obj = lh5_file[name]
elif isinstance(lh5_file, str):
lh5_file = h5py.File(lh5_file, mode="r", locking=locking)
lh5_obj = lh5_file[name]
else:
lh5_files = list(lh5_file)

n_rows_read = 0
obj_buf_is_new = False
if obj_buf is not None:
obj_buf.resize(obj_buf_start)
else:
obj_buf_start = 0

for i, h5f in enumerate(lh5_files):
for i, h5f in enumerate(lh5_file):
if (
isinstance(idx, (list, tuple))
and len(idx) > 0
Expand All @@ -146,33 +142,26 @@ def read(
idx = np.array(idx[0])[n_rows_to_read_i:] - n_rows_i
else:
idx_i = None
n_rows_i = n_rows - n_rows_read

obj_ret = read(
obj_buf_start_i = len(obj_buf) if obj_buf else 0
n_rows_i = n_rows - (obj_buf_start_i - obj_buf_start)

obj_buf = read(
name,
h5f,
start_row,
start_row if i == 0 else 0,
n_rows_i,
idx_i,
use_h5idx,
field_mask,
obj_buf,
obj_buf_start,
obj_buf_start_i,
decompress,
)
if isinstance(obj_ret, tuple):
obj_buf, n_rows_read_i = obj_ret
obj_buf_is_new = True
else:
obj_buf = obj_ret
n_rows_read_i = len(obj_buf)

n_rows_read += n_rows_read_i
if n_rows_read >= n_rows or obj_buf is None:
return obj_buf, n_rows_read
start_row = 0
obj_buf_start += n_rows_read_i
return obj_buf if obj_buf_is_new else (obj_buf, n_rows_read)
if obj_buf is None or (len(obj_buf) - obj_buf_start) >= n_rows:
return obj_buf
return obj_buf

if isinstance(idx, (list, tuple)) and len(idx) > 0 and not np.isscalar(idx[0]):
idx = idx[0]
Expand All @@ -192,8 +181,10 @@ def read(
obj_buf_start=obj_buf_start,
decompress=decompress,
)
with suppress(AttributeError):
obj.resize(obj_buf_start + n_rows_read)

return obj if obj_buf is None else (obj, n_rows_read)
return obj


def write(
Expand Down
49 changes: 24 additions & 25 deletions src/lgdo/lh5/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class LH5Iterator(typing.Iterator):

This can be used as an iterator:

>>> for lh5_obj, i_entry, n_rows in LH5Iterator(...):

>>> for lh5_obj in LH5Iterator(...):
>>> # do the thing!

This is intended for if you are reading a large quantity of data. This
Expand All @@ -42,16 +43,18 @@ class LH5Iterator(typing.Iterator):
In addition to accessing requested data via ``lh5_obj``, several
properties exist to tell you where that data came from:

- lh5_it.current_i_entry: get the index within the entry list of the
first entry that is currently read
- lh5_it.current_local_entries: get the entry numbers relative to the
file the data came from
- lh5_it.current_global_entries: get the entry number relative to the
full dataset
- lh5_it.current_files: get the file name corresponding to each entry
- lh5_it.current_groups: get the group name corresponding to each entry

This class can also be used either for random access:
This class can also be used for random access:

>>> lh5_obj, n_rows = lh5_it.read(i_entry)
>>> lh5_obj = lh5_it.read(i_entry)

to read the block of entries starting at i_entry. In case of multiple files
or the use of an event selection, i_entry refers to a global event index
Expand Down Expand Up @@ -183,7 +186,6 @@ def __init__(
msg = f"can't open any files from {lh5_files}"
raise RuntimeError(msg)

self.n_rows = 0
self.current_i_entry = 0
self.next_i_entry = 0

Expand Down Expand Up @@ -317,11 +319,10 @@ def get_global_entrylist(self) -> np.ndarray:
)
return self.global_entry_list

def read(self, i_entry: int) -> tuple[LGDO, int]:
"""Read the nextlocal chunk of events, starting at i_entry. Return the
LH5 buffer and number of rows read."""
self.n_rows = 0
def read(self, i_entry: int) -> LGDO:
"Read the nextlocal chunk of events, starting at entry."
i_file = np.searchsorted(self.entry_map, i_entry, "right")
self.lh5_buffer.resize(0)

# if file hasn't been opened yet, search through files
# sequentially until we find the right one
Expand All @@ -332,10 +333,10 @@ def read(self, i_entry: int) -> tuple[LGDO, int]:
i_file += 1

if i_file == len(self.lh5_files):
return (self.lh5_buffer, self.n_rows)
return self.lh5_buffer
local_i_entry = i_entry - self._get_file_cumentries(i_file - 1)

while self.n_rows < self.buffer_len and i_file < len(self.file_map):
while len(self.lh5_buffer) < self.buffer_len and i_file < len(self.file_map):
# Loop through files
local_idx = self.get_file_entrylist(i_file)
if local_idx is not None and len(local_idx) == 0:
Expand All @@ -344,18 +345,17 @@ def read(self, i_entry: int) -> tuple[LGDO, int]:
continue

i_local = local_i_entry if local_idx is None else local_idx[local_i_entry]
self.lh5_buffer, n_rows = self.lh5_st.read(
self.lh5_buffer = self.lh5_st.read(
self.groups[i_file],
self.lh5_files[i_file],
start_row=i_local,
n_rows=self.buffer_len - self.n_rows,
n_rows=self.buffer_len - len(self.lh5_buffer),
idx=local_idx,
field_mask=self.field_mask,
obj_buf=self.lh5_buffer,
obj_buf_start=self.n_rows,
obj_buf_start=len(self.lh5_buffer),
)

self.n_rows += n_rows
i_file += 1
local_i_entry = 0

Expand All @@ -364,7 +364,7 @@ def read(self, i_entry: int) -> tuple[LGDO, int]:
if self.friend is not None:
self.friend.read(i_entry)

return (self.lh5_buffer, self.n_rows)
return self.lh5_buffer

def reset_field_mask(self, mask):
"""Replaces the field mask of this iterator and any friends with mask"""
Expand All @@ -375,7 +375,7 @@ def reset_field_mask(self, mask):
@property
def current_local_entries(self) -> NDArray[int]:
"""Return list of local file entries in buffer"""
cur_entries = np.zeros(self.n_rows, dtype="int32")
cur_entries = np.zeros(len(self.lh5_buffer), dtype="int32")
i_file = np.searchsorted(self.entry_map, self.current_i_entry, "right")
file_start = self._get_file_cumentries(i_file - 1)
i_local = self.current_i_entry - file_start
Expand All @@ -402,7 +402,7 @@ def current_local_entries(self) -> NDArray[int]:
@property
def current_global_entries(self) -> NDArray[int]:
"""Return list of local file entries in buffer"""
cur_entries = np.zeros(self.n_rows, dtype="int32")
cur_entries = np.zeros(len(self.lh5_buffer), dtype="int32")
i_file = np.searchsorted(self.entry_map, self.current_i_entry, "right")
file_start = self._get_file_cumentries(i_file - 1)
i_local = self.current_i_entry - file_start
Expand Down Expand Up @@ -433,7 +433,7 @@ def current_global_entries(self) -> NDArray[int]:
@property
def current_files(self) -> NDArray[str]:
"""Return list of file names for entries in buffer"""
cur_files = np.zeros(self.n_rows, dtype=object)
cur_files = np.zeros(len(self.lh5_buffer), dtype=object)
i_file = np.searchsorted(self.entry_map, self.current_i_entry, "right")
file_start = self._get_file_cumentries(i_file - 1)
i_local = self.current_i_entry - file_start
Expand All @@ -455,7 +455,7 @@ def current_files(self) -> NDArray[str]:
@property
def current_groups(self) -> NDArray[str]:
"""Return list of group names for entries in buffer"""
cur_groups = np.zeros(self.n_rows, dtype=object)
cur_groups = np.zeros(len(self.lh5_buffer), dtype=object)
i_file = np.searchsorted(self.entry_map, self.current_i_entry, "right")
file_start = self._get_file_cumentries(i_file - 1)
i_local = self.current_i_entry - file_start
Expand Down Expand Up @@ -489,10 +489,9 @@ def __iter__(self) -> typing.Iterator:
return self

def __next__(self) -> tuple[LGDO, int, int]:
"""Read next buffer_len entries and return lh5_table, iterator entry
and n_rows read."""
buf, n_rows = self.read(self.next_i_entry)
self.next_i_entry = self.current_i_entry + n_rows
if n_rows == 0:
"""Read next buffer_len entries and return lh5_table and iterator entry."""
buf = self.read(self.next_i_entry)
if len(buf) == 0:
raise StopIteration
return (buf, self.current_i_entry, n_rows)
self.next_i_entry = self.current_i_entry + len(buf)
return buf
Loading
Loading