Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python: support relative and absolute path-like targets in jobid URI resolver #6562

Merged
merged 5 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions doc/man1/flux-uri.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,21 @@ URI SCHEMES

The following URI schemes are included by default:

jobid:ID[/ID...]
jobid:PATH
This scheme attempts to get the URI for a Flux instance running as a
job in the current enclosing instance. This is the assumed scheme if no
``scheme:`` is provided in *TARGET* passed to :program:`flux uri`, so the
``jobid:`` prefix is optional. A hierarchy of Flux jobids is supported,
so ``f1234/f3456`` will resolve the URI for job ``f3456`` running in
job ``f1234`` in the current instance. This scheme will raise an error
if the target job is not running.
``jobid:`` prefix is optional. *PATH* is a hierarchical path expression
that may contain an optional leading slash (``/``) (which references
the top-level, root instance explicitly), followed by zero or more job
IDs separated by slashes. The special IDs ``.`` and ``..`` indicate
the current instance (within the hierarchy) and its parent, respectively.
This allows resolution of a single job running in the current instance
via ``f1234``, explicitly within the root instance via ``/f2345``, or
a job running within another job via ``f3456/f789``. Completely relative
paths can also be used such as ``..`` to get the URI of the current
parent, or ``../..`` to get the URI of the parent's parent. Finally,
a single slash (``/``) may be used to get the root instance URI.

The ``jobid`` scheme supports the optional query parameter ``?wait``, which
causes the resolver to wait until a URI has been posted to the job eventlog
Expand Down Expand Up @@ -150,6 +157,13 @@ Get the URI of a nested job:
the last component of the jobid "path" or hierarchy. This will resolve
each URI in turn as a local URI.

Get the URI of the root instance from within a job running at any depth:

::

$ flux uri /
local:///run/flux/local

Get the URI of a local flux-broker

::
Expand Down
100 changes: 68 additions & 32 deletions src/bindings/python/flux/uri/resolvers/jobid.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,13 @@
###############################################################

import os
from pathlib import PurePath
from pathlib import PurePosixPath

import flux
from flux.job import JobID, job_list_id
from flux.uri import JobURI, URIResolverPlugin, URIResolverURI


def filter_slash(iterable):
return list(filter(lambda x: "/" not in x, iterable))


def wait_for_uri(flux_handle, jobid):
"""Wait for memo event containing job uri, O/w finish event"""
for event in flux.job.event_watch(flux_handle, jobid):
Expand All @@ -30,41 +26,78 @@
return None


def resolve_parent(handle):
"""Return parent-uri if instance-level > 0, else local-uri"""
if int(handle.attr_get("instance-level")) > 0:
return handle.attr_get("parent-uri")
return handle.attr_get("local-uri")

Check warning on line 33 in src/bindings/python/flux/uri/resolvers/jobid.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/uri/resolvers/jobid.py#L33

Added line #L33 was not covered by tests


def resolve_root(flux_handle):
"""Return the URI of the top-level, or root, instance."""
handle = flux_handle
while int(handle.attr_get("instance-level")) > 0:
handle = flux.Flux(resolve_parent(handle))
return handle.attr_get("local-uri")


def resolve_jobid(flux_handle, arg, wait):
try:
jobid = JobID(arg)
except OSError as exc:
raise ValueError(f"{arg} is not a valid jobid")

Check warning on line 48 in src/bindings/python/flux/uri/resolvers/jobid.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/uri/resolvers/jobid.py#L48

Added line #L48 was not covered by tests

try:
if wait:
uri = wait_for_uri(flux_handle, jobid)
else:
# Fetch the jobinfo object for this job
job = job_list_id(
flux_handle, jobid, attrs=["state", "annotations"]
).get_jobinfo()
if job.state != "RUN":
raise ValueError(f"jobid {arg} is not running")
uri = job.user.uri
except FileNotFoundError as exc:
raise ValueError(f"jobid {arg} not found") from exc

if uri is None or str(uri) == "":
raise ValueError(f"URI not found for job {arg}")
return uri


class URIResolver(URIResolverPlugin):
"""A URI resolver that attempts to fetch the remote_uri for a job"""

def describe(self):
return "Get URI for a given Flux JOBID"

def _do_resolve(self, uri, flux_handle, force_local=False, wait=False):
def _do_resolve(
self, uri, flux_handle, force_local=False, wait=False, hostname=None
):
#
# Convert a possible hierarchy of jobids to a list, dropping any
# extraneous '/' (e.g. //id0/id1 -> [ "id0", "id1" ]
jobids = filter_slash(PurePath(uri.path).parts)
# Convert a possible hierarchy of jobids to a list
jobids = list(PurePosixPath(uri.path).parts)

# If path is empty, return current enclosing URI
if not jobids:
return flux_handle.attr_get("local-uri")

# Pop the first jobid off the list, this id should be local:
# Pop the first jobid off the list, if a jobid it should be local,
# otherwise "/" for the root URI or ".." for parent URI:
arg = jobids.pop(0)
try:
jobid = JobID(arg)
except OSError as exc:
raise ValueError(f"{arg} is not a valid jobid")

try:
if wait:
uri = wait_for_uri(flux_handle, jobid)
else:
# Fetch the jobinfo object for this job
job = job_list_id(
flux_handle, jobid, attrs=["state", "annotations"]
).get_jobinfo()
if job.state != "RUN":
raise ValueError(f"jobid {arg} is not running")
uri = job.user.uri
except FileNotFoundError as exc:
raise ValueError(f"jobid {arg} not found") from exc

if uri is None or str(uri) == "":
raise ValueError(f"URI not found for job {arg}")
if arg == "/":
uri = resolve_root(flux_handle)
elif arg == "..":
uri = resolve_parent(flux_handle)
# Relative paths always use a local:// uri. But, if a jobid was
# resolved earlier in the path, then use the hostname associated
# with that job.
if hostname:
uri = JobURI(uri, remote_hostname=hostname).remote

Check warning on line 97 in src/bindings/python/flux/uri/resolvers/jobid.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/uri/resolvers/jobid.py#L97

Added line #L97 was not covered by tests
else:
uri = resolve_jobid(flux_handle, arg, wait)
hostname = JobURI(uri).netloc

# If there are more jobids in the hierarchy to resolve, resolve
# them recursively
Expand All @@ -74,7 +107,10 @@
if force_local:
uri = JobURI(uri).local
return self._do_resolve(
resolver_uri, flux.Flux(uri), force_local=force_local
resolver_uri,
flux.Flux(uri),
force_local=force_local,
hostname=hostname,
)
return uri

Expand Down
14 changes: 11 additions & 3 deletions src/bindings/python/flux/uri/uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,34 @@ class JobURI(URI):
remote: If local URI, returns a remote URI substituting current hostname.
If a remote URI, returns the URI.
local: If a remote URI, convert to a local URI. Otherwise return the URI.

Args:
uri (str): The URI string with which to initialize the JobURI instance.
remote_hostname (str): If ``uri`` is a local URI, use the provided
hostname instead of the current hostname when rendering the remote
URI.
"""

force_local = os.environ.get("FLUX_URI_RESOLVE_LOCAL", False)

def __init__(self, uri):
def __init__(self, uri, remote_hostname=None):
super().__init__(uri)
if self.scheme == "":
raise ValueError(f"JobURI '{uri}' does not have a valid scheme")
self.path = re.sub("/+", "/", self.path)
self.remote_uri = None
self.local_uri = None
self.remote_hostname = remote_hostname

@property
def remote(self):
if not self.remote_uri:
if self.scheme == "ssh":
self.remote_uri = self.uri
elif self.scheme == "local":
hostname = platform.uname()[1]
self.remote_uri = f"ssh://{hostname}{self.path}"
if not self.remote_hostname:
self.remote_hostname = platform.uname()[1]
self.remote_uri = f"ssh://{self.remote_hostname}{self.path}"
else:
raise ValueError(
f"Cannot convert JobURI with scheme {self.scheme} to remote"
Expand Down
14 changes: 14 additions & 0 deletions t/python/t0025-uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ def test_parse_local(self):
self.assertEqual(uri.fragment, "")
self.assertEqual(uri.params, "")

def test_parse_local_with_remote_hostname(self):
hostname = "fakehost"
uri = JobURI("local:///tmp/foo", remote_hostname=hostname)
self.assertEqual(uri.uri, "local:///tmp/foo")
self.assertEqual(str(uri), "local:///tmp/foo")
self.assertEqual(uri.remote, f"ssh://{hostname}/tmp/foo")
self.assertEqual(uri.local, "local:///tmp/foo")
self.assertEqual(uri.scheme, "local")
self.assertEqual(uri.netloc, "")
self.assertEqual(uri.path, "/tmp/foo")
self.assertEqual(uri.query, "")
self.assertEqual(uri.fragment, "")
self.assertEqual(uri.params, "")

def test_parse_errors(self):
with self.assertRaises(ValueError):
JobURI("foo:///tmp/bar").remote
Expand Down
17 changes: 17 additions & 0 deletions t/t2802-uri-cmd.t
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,23 @@ test_expect_success 'flux uri resolves hierarchical jobids with ?local' '
test_debug "echo ${jobid}/${jobid2}?local is ${uri}"

'
test_expect_success 'flux uri works with relative paths' '
root_uri=$(FLUX_SSH=$testssh flux uri --local .) &&
job1_uri=$(FLUX_SSH=$testssh flux uri --local ${jobid}) &&
job2_uri=$(FLUX_SSH=$testssh flux uri --local ${jobid}/${jobid2}) &&
uri=$(FLUX_SSH=$testssh flux proxy $job2_uri flux uri /) &&
test_debug "echo flux uri / got ${uri} expected ${root_uri}" &&
test "$uri" = "$root_uri" &&
uri=$(FLUX_SSH=$testssh flux proxy $job2_uri flux uri ../..) &&
test_debug "echo flux uri ../.. got ${uri} expected ${root_uri}" &&
test "$uri" = "$root_uri" &&
uri=$(FLUX_SSH=$testssh flux proxy $job2_uri flux uri ..) &&
test_debug "echo flux uri .. got ${uri} expected ${job1_uri}" &&
test "$uri" = "$job1_uri" &&
uri=$(FLUX_SSH=$testssh flux proxy $job2_uri flux uri .) &&
test_debug "echo flux uri . got ${uri} expected ${job2_uri}" &&
test "$uri" = "$job2_uri"
'
test_expect_success 'flux uri --wait can resolve URI for pending job' '
uri=$(flux uri --wait $(flux batch -n1 --wrap hostname)) &&
flux job wait-event -vt 30 $(flux job last) clean &&
Expand Down
Loading