Skip to content

Commit

Permalink
feat: Support reading from S3 (#916)
Browse files Browse the repository at this point in the history
* Add uproot.source.s3.S3Source

* uproot.source.s3.S3Source: support S3_* environment variables (supported by ROOT) and other tools

* style: pre-commit fixes

* add tests/test_0916-read-from-s3.py [xfail]

* S3Source: fix parsing of object key from URI

urlparse includes leading slash in the path name, however key does not include
that. Apparently leading slash was ignored by presign request. However,
presigning is not really a thing without the credentials. In boto3 one needs to
explicitly provide `Config(signature_version=botocore.UNSIGNED)`, in minio-py,
they build an URL manually:

https://github.com/minio/minio-py/blob/4ee1892b2e238da4e3b1bd0cb43cb7e25015e574/minio/api.py#L2063-L2069

Having a key with a extra leading slash would result in incorrect URL being
built. That would result in 404 error.

* test_0916-read-from-s3.py: fix branch name

* update docs

* add :doc:

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jim Pivarski <[email protected]>
  • Loading branch information
3 people authored Aug 10, 2023
1 parent abc3736 commit 130c55b
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 12 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ The following libraries are also useful in conjunction with Uproot, but are not

**For accessing remote files:**

* `xrootd`: if reading files with `root://` URLs.
* `minio`: if reading files with `s3://` URIs.
* `xrootd`: if reading files with `root://` URIs.
* HTTP/S access is built in (Python standard library).

**For distributed computing with [Dask](https://www.dask.org/):**
Expand Down
2 changes: 1 addition & 1 deletion docs-sphinx/basic.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The :doc:`uproot.reading.open` function can be (and usually should be) used like
>>> with uproot.open("path/to/dataset.root") as file:
... do_something...
to automatically close the file after leaving the ``with`` block. The path-name argument can be a local file (as above), a URL ("``http://``" or "``https://``"), or XRootD ("``root://``") if you have the `Python interface to XRootD <https://anaconda.org/conda-forge/xrootd>`__ installed. It can also be a Python file-like object with ``read`` and ``seek`` methods, but such objects can't be read in parallel.
to automatically close the file after leaving the ``with`` block. The path-name argument can be a local file (as above), a URL ("``http://``" or "``https://``"), S3 ("``s3://``) or XRootD ("``root://``") if you have the `Python interface to XRootD <https://anaconda.org/conda-forge/xrootd>`__ installed. It can also be a Python file-like object with ``read`` and ``seek`` methods, but such objects can't be read in parallel.

The :doc:`uproot.reading.open` function has many options, including alternate handlers for each input type, ``num_workers`` to control parallel reading, and caches (``object_cache`` and ``array_cache``). The defaults attempt to optimize parallel processing, caching, and batching of remote requests, but better performance can often be obtained by tuning these parameters.

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dev = [
]
test = [
"lz4",
"minio",
"pytest>=6",
"pytest-timeout",
"pytest-rerunfailures",
Expand Down
1 change: 1 addition & 0 deletions src/uproot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
from uproot.source.http import MultithreadedHTTPSource
from uproot.source.xrootd import XRootDSource
from uproot.source.xrootd import MultithreadedXRootDSource
from uproot.source.s3 import S3Source
from uproot.source.object import ObjectSource
from uproot.source.cursor import Cursor
from uproot.source.futures import TrivialExecutor
Expand Down
1 change: 1 addition & 0 deletions src/uproot/_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def dask(
* file_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.file.MemmapSource`)
* xrootd_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.XRootDSource`)
* s3_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.s3.S3Source`)
* http_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.http.HTTPSource`)
* object_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.object.ObjectSource`)
* timeout (float for HTTP, int for XRootD; 30)
Expand Down
16 changes: 11 additions & 5 deletions src/uproot/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ def regularize_path(path):
_windows_absolute_path_pattern = re.compile(r"^[A-Za-z]:[\\/]")
_windows_absolute_path_pattern_slash = re.compile(r"^[\\/][A-Za-z]:[\\/]")
_might_be_port = re.compile(r"^[0-9].*")
_remote_schemes = ["ROOT", "S3", "HTTP", "HTTPS"]
_schemes = ["FILE", *_remote_schemes]


def file_object_path_split(path):
Expand All @@ -296,18 +298,14 @@ def file_object_path_split(path):
file_path = file_path.rstrip()
object_path = object_path.lstrip()

if file_path.upper() in ("FILE", "HTTP", "HTTPS", "ROOT"):
if file_path.upper() in _schemes:
return path, None
elif win and _windows_drive_letter_ending.match(file_path) is not None:
return path, None
else:
return file_path, object_path


_remote_schemes = ["ROOT", "HTTP", "HTTPS"]
_schemes = ["FILE", *_remote_schemes]


def file_path_to_source_class(file_path, options):
"""
Use a file path to get the :doc:`uproot.source.chunk.Source` class that would read it.
Expand Down Expand Up @@ -375,6 +373,14 @@ def file_path_to_source_class(file_path, options):
)
return out, file_path

elif parsed_url.scheme.upper() in {"S3"}:
out = options["s3_handler"]
if not (isinstance(out, type) and issubclass(out, uproot.source.chunk.Source)):
raise TypeError(
"'s3' is not a class object inheriting from Source: " + repr(out)
)
return out, file_path

elif parsed_url.scheme.upper() in {"HTTP", "HTTPS"}:
out = options["http_handler"]
if not (isinstance(out, type) and issubclass(out, uproot.source.chunk.Source)):
Expand Down
2 changes: 2 additions & 0 deletions src/uproot/behaviors/TBranch.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def iterate(
* file_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.file.MemmapSource`)
* xrootd_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.XRootDSource`)
* s3_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.s3.S3Source`)
* http_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.http.HTTPSource`)
* object_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.object.ObjectSource`)
* timeout (float for HTTP, int for XRootD; 30)
Expand Down Expand Up @@ -325,6 +326,7 @@ def concatenate(
* file_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.file.MemmapSource`)
* xrootd_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.XRootDSource`)
* s3_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.s3.S3Source`)
* http_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.http.HTTPSource`)
* object_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.object.ObjectSource`)
* timeout (float for HTTP, int for XRootD; 30)
Expand Down
20 changes: 20 additions & 0 deletions src/uproot/extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ def pandas():
return pandas


def Minio_client():
"""
Imports and returns ``minio.Minio``.
"""
try:
from minio import Minio
except ModuleNotFoundError as err:
raise ModuleNotFoundError(
"""install the 'minio' package with:
pip install minio
or
conda install minio"""
) from err
else:
return Minio


def XRootD_client():
"""
Imports and returns ``XRootD.client`` (after setting the
Expand Down
3 changes: 3 additions & 0 deletions src/uproot/reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def open(
* file_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.file.MemmapSource`)
* xrootd_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.XRootDSource`)
* s3_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.s3.S3Source`)
* http_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.http.HTTPSource`)
* object_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.object.ObjectSource`)
* timeout (float for HTTP, int for XRootD; 30)
Expand Down Expand Up @@ -177,6 +178,7 @@ def __getitem__(self, where):
open.defaults = _OpenDefaults(
{
"file_handler": uproot.source.file.MemmapSource,
"s3_handler": uproot.source.s3.S3Source,
"http_handler": uproot.source.http.HTTPSource,
"object_handler": uproot.source.object.ObjectSource,
"timeout": 30,
Expand Down Expand Up @@ -532,6 +534,7 @@ class ReadOnlyFile(CommonFileMethods):
* file_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.file.MemmapSource`)
* xrootd_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.XRootDSource`)
* s3_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.S3Source`)
* http_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.http.HTTPSource`)
* object_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.object.ObjectSource`)
* timeout (float for HTTP, int for XRootD; 30)
Expand Down
10 changes: 5 additions & 5 deletions src/uproot/source/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# BSD 3-Clause License; see https://github.com/scikit-hep/uproot5/blob/main/LICENSE

"""
This module defines the "physical layer" of file-reading, which interacts with local
filesystems or remote protocols like HTTP(S) and XRootD. The "physical layer"
is distinguished from "interpretation" in that the meaning of the bytes that
have been read are not relevant in this layer. The "interpretation layer"
interacts with the "physical layer" by requesting a
This module defines the "physical layer" of file-reading, which interacts with
local filesystems or remote protocols like HTTP(S), S3 and XRootD. The
"physical layer" is distinguished from "interpretation" in that the meaning of
the bytes that have been read are not relevant in this layer. The
"interpretation layer" interacts with the "physical layer" by requesting a
:doc:`uproot.source.chunk.Chunk` from a :doc:`uproot.source.chunk.Source` and
inspecting it with a :doc:`uproot.source.cursor.Cursor`.
Expand Down
85 changes: 85 additions & 0 deletions src/uproot/source/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# BSD 3-Clause License; see https://github.com/scikit-hep/uproot5/blob/main/LICENSE

"""
This module defines a physical layer for remote files, accessed via S3.
"""

import os
from urllib.parse import parse_qsl, urlparse

import uproot.extras
import uproot.source.http


class S3Source(uproot.source.http.HTTPSource):
"""
Args:
file_path (str): A URL of the file to open.
endpoint: S3 endpoint (defaults to AWS)
access_key: Access key of your S3 account
secret_key: Secret key of your S3 account
session_token: Session token of your S3 account
secure: Flag to enable use of TLS
http_client (urllib3.poolmanager.PoolManager): Instance of :doc:`urllib3.poolmanager.PoolManager`
credentials (minio.credentials.Provider): Instance of :doc:`minio.credentials.Provider`
options: See :doc:`uproot.source.http.HTTPSource.__init__`
"""

def __init__(
self,
file_path,
endpoint="s3.amazonaws.com",
access_key=None,
secret_key=None,
session_token=None,
secure=True,
region=None,
http_client=None,
credentials=None,
**options,
):
Minio = uproot.extras.Minio_client()

if access_key is None:
access_key = os.environ.get(
"S3_ACCESS_KEY", os.environ.get("AWS_ACCESS_KEY_ID", None)
)
if secret_key is None:
secret_key = os.environ.get(
"S3_SECRET_KEY", os.environ.get("AWS_SECRET_ACCESS_KEY", None)
)
if session_token is None:
session_token = os.environ.get(
"S3_SESSION_TOKEN", os.environ.get("AWS_SESSION_TOKEN", None)
)
if region is None:
region = os.environ.get("AWS_DEFAULT_REGION", None)

parsed_url = urlparse(file_path)

bucket_name = parsed_url.netloc
assert parsed_url.path[0] == "/"
object_name = parsed_url.path[1:]

parsed_query = dict(parse_qsl(parsed_url.query))
# There is no standard scheme for s3:// URI query parameters,
# but some are often introduced to support extra flexibility:
if "endpoint" in parsed_query:
endpoint = parsed_query["endpoint"]
if "region" in parsed_query:
region = parsed_query["region"]

client = Minio(
endpoint,
access_key=access_key,
secret_key=secret_key,
session_token=session_token,
secure=secure,
region=region,
http_client=http_client,
credentials=credentials,
)

url = client.get_presigned_url("GET", bucket_name, object_name)

super().__init__(url, **options)
23 changes: 23 additions & 0 deletions tests/test_0916-read-from-s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# BSD 3-Clause License; see https://github.com/scikit-hep/uproot5/blob/main/LICENSE

import pytest

import uproot


@pytest.mark.network
def test_s3_fail():
with pytest.raises(Exception):
with uproot.source.http.S3Source(
"s3://pivarski-princeton/does-not-exist", timeout=0.1
) as source:
tobytes(source.chunk(0, 100).raw_data)


@pytest.mark.network
def test_read_s3():
with uproot.open(
"s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root:PicoDst"
) as f:
data = f["Event/Event.mEventId"].array(library="np")
assert len(data) == 8004

0 comments on commit 130c55b

Please sign in to comment.