Skip to content

Commit

Permalink
AsyncTransport
Browse files Browse the repository at this point in the history
  • Loading branch information
khsrali committed Jan 16, 2025
1 parent c88fc05 commit 28d7971
Show file tree
Hide file tree
Showing 23 changed files with 2,946 additions and 488 deletions.
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies:
- python~=3.9
- alembic~=1.2
- archive-path~=0.4.2
- asyncssh~=2.19.0
- circus~=0.18.0
- click-spinner~=0.1.8
- click~=8.1
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ classifiers = [
dependencies = [
'alembic~=1.2',
'archive-path~=0.4.2',
"asyncssh~=2.19.0",
'circus~=0.18.0',
'click-spinner~=0.1.8',
'click~=8.1',
Expand Down Expand Up @@ -175,6 +176,7 @@ requires-python = '>=3.9'
[project.entry-points.'aiida.transports']
'core.local' = 'aiida.transports.plugins.local:LocalTransport'
'core.ssh' = 'aiida.transports.plugins.ssh:SshTransport'
'core.ssh_async' = 'aiida.transports.plugins.ssh_async:AsyncSshTransport'
'core.ssh_auto' = 'aiida.transports.plugins.ssh_auto:SshAutoTransport'

[project.entry-points.'aiida.workflows']
Expand Down Expand Up @@ -308,6 +310,7 @@ module = 'tests.*'
ignore_missing_imports = true
module = [
'ase.*',
'asyncssh.*',
'bpython.*',
'bs4.*',
'CifFile.*',
Expand Down Expand Up @@ -388,6 +391,7 @@ testpaths = [
'tests'
]
timeout = 240
timeout_method = "thread"
xfail_strict = true

[tool.ruff]
Expand Down
64 changes: 32 additions & 32 deletions src/aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async def upload_calculation(
if dry_run:
workdir = Path(folder.abspath)
else:
remote_user = transport.whoami()
remote_user = await transport.whoami_async()
remote_working_directory = computer.get_workdir().format(username=remote_user)
if not remote_working_directory.strip():
raise exceptions.ConfigurationError(
Expand All @@ -114,13 +114,13 @@ async def upload_calculation(
)

# If it already exists, no exception is raised
if not transport.path_exists(remote_working_directory):
if not await transport.path_exists_async(remote_working_directory):
logger.debug(
f'[submission of calculation {node.pk}] Path '
f'{remote_working_directory} does not exist, trying to create it'
)
try:
transport.makedirs(remote_working_directory)
await transport.makedirs_async(remote_working_directory)
except EnvironmentError as exc:
raise exceptions.ConfigurationError(
f'[submission of calculation {node.pk}] '
Expand All @@ -133,14 +133,14 @@ async def upload_calculation(
# and I do not have to know the logic, but I just need to
# read the absolute path from the calculation properties.
workdir = Path(remote_working_directory).joinpath(calc_info.uuid[:2], calc_info.uuid[2:4])
transport.makedirs(str(workdir), ignore_existing=True)
await transport.makedirs_async(workdir, ignore_existing=True)

try:
# The final directory may already exist, most likely because this function was already executed once, but
# failed and as a result was rescheduled by the engine. In this case it would be fine to delete the folder
# and create it from scratch, except that we cannot be sure that this the actual case. Therefore, to err on
# the safe side, we move the folder to the lost+found directory before recreating the folder from scratch
transport.mkdir(str(workdir.joinpath(calc_info.uuid[4:])))
await transport.mkdir_async(workdir.joinpath(calc_info.uuid[4:]))
except OSError:
# Move the existing directory to lost+found, log a warning and create a clean directory anyway
path_existing = os.path.join(str(workdir), calc_info.uuid[4:])
Expand All @@ -151,12 +151,12 @@ async def upload_calculation(
)

# Make sure the lost+found directory exists, then copy the existing folder there and delete the original
transport.mkdir(path_lost_found, ignore_existing=True)
transport.copytree(path_existing, path_target)
transport.rmtree(path_existing)
await transport.mkdir_async(path_lost_found, ignore_existing=True)
await transport.copytree_async(path_existing, path_target)
await transport.rmtree_async(path_existing)

# Now we can create a clean folder for this calculation
transport.mkdir(str(workdir.joinpath(calc_info.uuid[4:])))
await transport.mkdir_async(workdir.joinpath(calc_info.uuid[4:]))
finally:
workdir = workdir.joinpath(calc_info.uuid[4:])

Expand All @@ -171,11 +171,11 @@ async def upload_calculation(
# Note: this will possibly overwrite files
for root, dirnames, filenames in code.base.repository.walk():
# mkdir of root
transport.makedirs(str(workdir.joinpath(root)), ignore_existing=True)
await transport.makedirs_async(workdir.joinpath(root), ignore_existing=True)

# remotely mkdir first
for dirname in dirnames:
transport.makedirs(str(workdir.joinpath(root, dirname)), ignore_existing=True)
await transport.makedirs_async(workdir.joinpath(root, dirname), ignore_existing=True)

# Note, once #2579 is implemented, use the `node.open` method instead of the named temporary file in
# combination with the new `Transport.put_object_from_filelike`
Expand All @@ -185,11 +185,11 @@ async def upload_calculation(
content = code.base.repository.get_object_content(Path(root) / filename, mode='rb')
handle.write(content)
handle.flush()
transport.put(handle.name, str(workdir.joinpath(root, filename)))
await transport.put_async(handle.name, workdir.joinpath(root, filename))
if code.filepath_executable.is_absolute():
transport.chmod(str(code.filepath_executable), 0o755) # rwxr-xr-x
await transport.chmod_async(code.filepath_executable, 0o755) # rwxr-xr-x
else:
transport.chmod(str(workdir.joinpath(code.filepath_executable)), 0o755) # rwxr-xr-x
await transport.chmod_async(workdir.joinpath(code.filepath_executable), 0o755) # rwxr-xr-x

# local_copy_list is a list of tuples, each with (uuid, dest_path, rel_path)
# NOTE: validation of these lists are done inside calculation.presubmit()
Expand Down Expand Up @@ -288,7 +288,7 @@ async def _copy_remote_files(logger, node, computer, transport, remote_copy_list
f'remotely, directly on the machine {computer.label}'
)
try:
transport.copy(remote_abs_path, str(workdir.joinpath(dest_rel_path)))
await transport.copy_async(remote_abs_path, workdir.joinpath(dest_rel_path))
except FileNotFoundError:
logger.warning(
f'[submission of calculation {node.pk}] Unable to copy remote '
Expand All @@ -314,8 +314,8 @@ async def _copy_remote_files(logger, node, computer, transport, remote_copy_list
)
remote_dirname = Path(dest_rel_path).parent
try:
transport.makedirs(str(workdir.joinpath(remote_dirname)), ignore_existing=True)
transport.symlink(remote_abs_path, str(workdir.joinpath(dest_rel_path)))
await transport.makedirs_async(workdir.joinpath(remote_dirname), ignore_existing=True)
await transport.symlink_async(remote_abs_path, workdir.joinpath(dest_rel_path))
except OSError:
logger.warning(
f'[submission of calculation {node.pk}] Unable to create remote symlink '
Expand Down Expand Up @@ -356,14 +356,14 @@ async def _copy_local_files(logger, node, transport, inputs, local_copy_list, wo
# The logic below takes care of an edge case where the source is a file but the target is a directory. In
# this case, the v2.5.1 implementation would raise an `IsADirectoryError` exception, because it would try
# to open the directory in the sandbox folder as a file when writing the contents.
if file_type_source == FileType.FILE and target and transport.isdir(str(workdir.joinpath(target))):
if file_type_source == FileType.FILE and target and await transport.isdir_async(workdir.joinpath(target)):
raise IsADirectoryError

# In case the source filename is specified and it is a directory that already exists in the remote, we
# want to avoid nested directories in the target path to replicate the behavior of v2.5.1. This is done by
# setting the target filename to '.', which means the contents of the node will be copied in the top level
# of the temporary directory, whose contents are then copied into the target directory.
if filename and transport.isdir(str(workdir.joinpath(filename))):
if filename and await transport.isdir_async(workdir.joinpath(filename)):
filename_target = '.'

filepath_target = (dirpath / filename_target).resolve().absolute()
Expand All @@ -372,25 +372,25 @@ async def _copy_local_files(logger, node, transport, inputs, local_copy_list, wo
if file_type_source == FileType.DIRECTORY:
# If the source object is a directory, we copy its entire contents
data_node.base.repository.copy_tree(filepath_target, filename_source)
transport.put(
await transport.put_async(
f'{dirpath}/*',
str(workdir.joinpath(target)) if target else str(workdir.joinpath('.')),
workdir.joinpath(target) if target else workdir.joinpath('.'),
overwrite=True,
)
else:
# Otherwise, simply copy the file
with filepath_target.open('wb') as handle:
with data_node.base.repository.open(filename_source, 'rb') as source:
shutil.copyfileobj(source, handle)
transport.makedirs(str(workdir.joinpath(Path(target).parent)), ignore_existing=True)
transport.put(str(filepath_target), str(workdir.joinpath(target)))
await transport.makedirs_async(workdir.joinpath(Path(target).parent), ignore_existing=True)
await transport.put_async(filepath_target, workdir.joinpath(target))


async def _copy_sandbox_files(logger, node, transport, folder, workdir: Path):
"""Copy the contents of the sandbox folder to the working directory."""
for filename in folder.get_content_list():
logger.debug(f'[submission of calculation {node.pk}] copying file/folder {filename}...')
transport.put(folder.get_abs_path(filename), str(workdir.joinpath(filename)))
await transport.put_async(folder.get_abs_path(filename), workdir.joinpath(filename))


def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | ExitCode:
Expand Down Expand Up @@ -461,7 +461,7 @@ async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> N
for source_filename in source_list:
if transport.has_magic(source_filename):
copy_instructions = []
for globbed_filename in transport.glob(str(source_basepath / source_filename)):
for globbed_filename in await transport.glob_async(source_basepath / source_filename):
target_filepath = target_basepath / Path(globbed_filename).relative_to(source_basepath)
copy_instructions.append((globbed_filename, target_filepath))
else:
Expand All @@ -470,10 +470,10 @@ async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> N
for source_filepath, target_filepath in copy_instructions:
# If the source file is in a (nested) directory, create those directories first in the target directory
target_dirname = target_filepath.parent
transport.makedirs(str(target_dirname), ignore_existing=True)
await transport.makedirs_async(target_dirname, ignore_existing=True)

try:
transport.copy(str(source_filepath), str(target_filepath))
await transport.copy_async(source_filepath, target_filepath)
except (OSError, ValueError) as exception:
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
else:
Expand Down Expand Up @@ -612,7 +612,7 @@ async def retrieve_files_from_list(
upto what level of the original remotepath nesting the files will be copied.
:param transport: the Transport instance.
:param folder: an absolute path to a folder that contains the files to copy.
:param folder: an absolute path to a folder that contains the files to retrieve.
:param retrieve_list: the list of files to retrieve.
"""
workdir = Path(calculation.get_remote_workdir())
Expand All @@ -621,7 +621,7 @@ async def retrieve_files_from_list(
tmp_rname, tmp_lname, depth = item
# if there are more than one file I do something differently
if transport.has_magic(tmp_rname):
remote_names = transport.glob(str(workdir.joinpath(tmp_rname)))
remote_names = await transport.glob_async(workdir.joinpath(tmp_rname))
local_names = []
for rem in remote_names:
# get the relative path so to make local_names relative
Expand All @@ -644,7 +644,7 @@ async def retrieve_files_from_list(
abs_item = item if item.startswith('/') else str(workdir.joinpath(item))

if transport.has_magic(abs_item):
remote_names = transport.glob(abs_item)
remote_names = await transport.glob_async(abs_item)
local_names = [os.path.split(rem)[1] for rem in remote_names]
else:
remote_names = [abs_item]
Expand All @@ -656,6 +656,6 @@ async def retrieve_files_from_list(
if rem.startswith('/'):
to_get = rem
else:
to_get = str(workdir.joinpath(rem))
to_get = workdir.joinpath(rem)

transport.get(to_get, os.path.join(folder, loc), ignore_nonexisting=True)
await transport.get_async(to_get, os.path.join(folder, loc), ignore_nonexisting=True)
4 changes: 2 additions & 2 deletions src/aiida/orm/computers.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,12 +626,12 @@ def get_transport(self, user: Optional['User'] = None) -> 'Transport':
"""Return a Transport class, configured with all correct parameters.
The Transport is closed (meaning that if you want to run any operation with
it, you have to open it first (i.e., e.g. for a SSH transport, you have
to open a connection). To do this you can call ``transports.open()``, or simply
to open a connection). To do this you can call ``transport.open()``, or simply
run within a ``with`` statement::
transport = Computer.get_transport()
with transport:
print(transports.whoami())
print(transport.whoami())
:param user: if None, try to obtain a transport for the default user.
Otherwise, pass a valid User.
Expand Down
3 changes: 2 additions & 1 deletion src/aiida/orm/nodes/data/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ def listdir_withattributes(self, path='.'):
"""Connects to the remote folder and lists the directory content.
:param relpath: If 'relpath' is specified, lists the content of the given subfolder.
:return: a list of dictionaries, where the documentation is in :py:class:Transport.listdir_withattributes.
:return: a list of dictionaries, where the documentation
is in :py:class:Transport.listdir_withattributes.
"""
authinfo = self.get_authinfo()

Expand Down
3 changes: 2 additions & 1 deletion src/aiida/orm/nodes/process/calculation/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ def get_authinfo(self) -> 'AuthInfo':
def get_transport(self) -> 'Transport':
"""Return the transport for this calculation.
:return: `Transport` configured with the `AuthInfo` associated to the computer of this node
:return: Transport configured
with the `AuthInfo` associated to the computer of this node
"""
return self.get_authinfo().get_transport()

Expand Down
8 changes: 4 additions & 4 deletions src/aiida/plugins/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,15 +410,15 @@ def StorageFactory(entry_point_name: str, load: bool = True) -> Union[EntryPoint


@overload
def TransportFactory(entry_point_name: str, load: Literal[True] = True) -> Type['Transport']: ...
def TransportFactory(entry_point_name: str, load: Literal[True] = True) -> Union[Type['Transport']]: ...


@overload
def TransportFactory(entry_point_name: str, load: Literal[False]) -> EntryPoint: ...


def TransportFactory(entry_point_name: str, load: bool = True) -> Union[EntryPoint, Type['Transport']]:
"""Return the `Transport` sub class registered under the given entry point.
"""Return the Transport sub class registered under the given entry point.
:param entry_point_name: the entry point name.
:param load: if True, load the matched entry point and return the loaded resource instead of the entry point itself.
Expand All @@ -430,12 +430,12 @@ def TransportFactory(entry_point_name: str, load: bool = True) -> Union[EntryPoi

entry_point_group = 'aiida.transports'
entry_point = BaseFactory(entry_point_group, entry_point_name, load=load)
valid_classes = (Transport,)
valid_classes = Transport

if not load:
return entry_point

if isclass(entry_point) and issubclass(entry_point, Transport):
if isclass(entry_point) and (issubclass(entry_point, Transport)):
return entry_point

raise_invalid_type_error(entry_point_name, entry_point_group, valid_classes)
Expand Down
2 changes: 1 addition & 1 deletion src/aiida/schedulers/plugins/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def _get_submit_command(self, submit_script):
directory.
IMPORTANT: submit_script should be already escaped.
"""
submit_command = f'bash {submit_script} > /dev/null 2>&1 & echo $!'
submit_command = f'(bash {submit_script} > /dev/null 2>&1 & echo $!) &'

self.logger.info(f'submitting with: {submit_command}')

Expand Down
2 changes: 2 additions & 0 deletions src/aiida/tools/pytest_fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
aiida_computer,
aiida_computer_local,
aiida_computer_ssh,
aiida_computer_ssh_async,
aiida_localhost,
ssh_key,
)
Expand All @@ -33,6 +34,7 @@
'aiida_computer',
'aiida_computer_local',
'aiida_computer_ssh',
'aiida_computer_ssh_async',
'aiida_config',
'aiida_config_factory',
'aiida_config_tmp',
Expand Down
32 changes: 32 additions & 0 deletions src/aiida/tools/pytest_fixtures/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,38 @@ def factory(label: str | None = None, configure: bool = True) -> 'Computer':
return factory


@pytest.fixture
def aiida_computer_ssh_async(aiida_computer) -> t.Callable[[], 'Computer']:
"""Factory to return a :class:`aiida.orm.computers.Computer` instance with ``core.ssh_async`` transport.
Usage::
def test(aiida_computer_ssh):
computer = aiida_computer_ssh(label='some-label', configure=True)
assert computer.transport_type == 'core.ssh_async'
assert computer.is_configured
The factory has the following signature:
:param label: The computer label. If not specified, a random UUID4 is used.
:param configure: Boolean, if ``True``, ensures the computer is configured, otherwise the computer is returned
as is. Note that if a computer with the given label already exists and it was configured before, the
computer will not be "un-"configured. If an unconfigured computer is absolutely required, make sure to first
delete the existing computer or specify another label.
:return: A stored computer instance.
"""

def factory(label: str | None = None, configure: bool = True) -> 'Computer':
computer = aiida_computer(label=label, hostname='localhost', transport_type='core.ssh_async')

if configure:
computer.configure()

return computer

return factory


@pytest.fixture
def aiida_localhost(aiida_computer_local) -> 'Computer':
"""Return a :class:`aiida.orm.computers.Computer` instance representing localhost with ``core.local`` transport.
Expand Down
3 changes: 3 additions & 0 deletions src/aiida/transports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
from .transport import *

__all__ = (
'AsyncTransport',
'BlockingTransport',
'SshTransport',
'Transport',
'TransportPath',
'convert_to_bool',
'parse_sshconfig',
)
Expand Down
Loading

0 comments on commit 28d7971

Please sign in to comment.