Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add use of zstd compression on compute services #336

Merged
merged 23 commits into from
Jan 24, 2025
Merged
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
67f7964
Add zstd compression to set_task_result in compute service
ianmkenney Nov 25, 2024
41ad174
Use request and manually process payload
ianmkenney Dec 2, 2024
54cd184
Add compression module and send send bytes from API path as latin-1
ianmkenney Dec 2, 2024
f2da1fc
Use compression module in objectstore
ianmkenney Dec 11, 2024
18131f1
Attempt to decompress object store objects in a try block
ianmkenney Dec 13, 2024
8ee5349
Use fixed filename for compressed object
ianmkenney Dec 18, 2024
b32f62d
Implement test_set_task_result_legacy
ianmkenney Dec 26, 2024
6f3ce3c
Separate executing tasks and pushing results in TestClient
ianmkenney Dec 30, 2024
63f9982
Clear leftover state before testing legacy PDR pull
ianmkenney Dec 31, 2024
6985ca7
Parameterize test_get_transformation_and_network_results
ianmkenney Dec 31, 2024
32bef06
Merge branch 'main' into feature/220-zstd-compression-compute-services
ianmkenney Dec 31, 2024
cda3541
Small docstring adjustment
dotsdl Jan 23, 2025
664cce4
Merge branch 'main' into feature/220-zstd-compression-compute-services
dotsdl Jan 23, 2025
0a15138
Merge fixes
dotsdl Jan 23, 2025
8b65c72
Removed need to decompress protocoldagresult in S3ObjectStore.push_pr…
dotsdl Jan 24, 2025
7d32286
Merge branch 'main' into feature/220-zstd-compression-compute-services
dotsdl Jan 24, 2025
1187666
Some clarity edits
dotsdl Jan 24, 2025
7d3c86c
Black!
dotsdl Jan 24, 2025
e149d09
CI fixes, other edits from review
dotsdl Jan 24, 2025
8473729
Black!
dotsdl Jan 24, 2025
d61d133
Assign value to protocoldagresult
ianmkenney Jan 24, 2025
443e193
Simplification
dotsdl Jan 24, 2025
321f5d7
Revert "simplification"
dotsdl Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Attempt to decompress object store objects in a try block
If a decompression error is raised, assume that the original data was
never compressed.
  • Loading branch information
ianmkenney committed Dec 13, 2024
commit 18131f16d29f0d8c6ae371b57e2e61152df020f9
21 changes: 13 additions & 8 deletions alchemiscale/compute/client.py
Original file line number Diff line number Diff line change
@@ -115,20 +115,25 @@ def get_task_transformation(self, task: ScopedKey) -> ScopedKey:

def retrieve_task_transformation(
self, task: ScopedKey
) -> Tuple[Transformation, Optional[ProtocolDAGResult]]:
) -> tuple[Transformation, ProtocolDAGResult | None]:
transformation, protocoldagresult = self._get_resource(
f"/tasks/{task}/transformation/gufe"
)

if protocoldagresult is not None:
protocoldagresult = decompress_gufe_zstd(
protocoldagresult.encode("latin-1")
)

return (
json_to_gufe(transformation),
protocoldagresult,
)
protocoldagresult_bytes = protocoldagresult.encode("latin-1")

try:
# Attempt to decompress the ProtocolDAGResult object
protocoldagresult = decompress_gufe_zstd(
protocoldagresult_bytes
)
except zstd.ZstdError:
# If decompression fails, assume it's a UTF-8 encoded JSON string
protocoldagresult = json_to_gufe(protocoldagresult_bytes.decode('utf-8'))

return json_to_gufe(transformation), protocoldagresult

def set_task_result(
self,
13 changes: 11 additions & 2 deletions alchemiscale/interface/client.py
Original file line number Diff line number Diff line change
@@ -15,11 +15,13 @@
from gufe import AlchemicalNetwork, Transformation, ChemicalSystem
from gufe.tokenization import GufeTokenizable, JSON_HANDLER, KeyedChain
from gufe.protocols import ProtocolResult, ProtocolDAGResult
import zstandard as zstd


from ..base.client import (
AlchemiscaleBaseClient,
AlchemiscaleBaseClientError,
json_to_gufe,
use_session,
)
from ..compression import decompress_gufe_zstd
@@ -1353,11 +1355,18 @@ def get_tasks_priority(
async def _async_get_protocoldagresult(
self, protocoldagresultref, transformation, route, compress
):
pdr_compressed_latin1 = await self._get_resource_async(
pdr_latin1_decoded = await self._get_resource_async(
f"/transformations/{transformation}/{route}/{protocoldagresultref}",
compress=compress,
)
pdr = decompress_gufe_zstd(pdr_compressed_latin1[0].encode("latin-1"))

try:
# Attempt to decompress the ProtocolDAGResult object
pdr_bytes = pdr_latin1_decoded[0].encode("latin-1")
pdr = decompress_gufe_zstd(pdr_bytes)
except zstd.ZstdError:
# If decompress fails, assume it's a UTF-8 encoded JSON string
pdr = json_to_gufe(pdr_bytes.decode('utf-8'))

return pdr

4 changes: 2 additions & 2 deletions alchemiscale/storage/objectstore.py
Original file line number Diff line number Diff line change
@@ -228,7 +228,7 @@ def push_protocoldagresult(
transformation.gufe_key,
route,
pdr.key,
"obj.json",
"obj",
)

response = self._store_bytes(location, protocoldagresult)
@@ -289,7 +289,7 @@ def pull_protocoldagresult(
transformation.gufe_key,
route,
protocoldagresult.gufe_key,
"obj.json",
"obj",
)

## TODO: want organization alongside `obj.json` of `ProtocolUnit` gufe_keys
64 changes: 63 additions & 1 deletion alchemiscale/tests/integration/interface/client/test_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import pytest
from time import sleep
import os
from pathlib import Path
from itertools import chain
import json

from gufe import AlchemicalNetwork
from gufe.tokenization import TOKENIZABLE_REGISTRY, GufeKey
from gufe.tokenization import TOKENIZABLE_REGISTRY, GufeKey, JSON_HANDLER
from gufe.protocols.protocoldag import execute_DAG
import networkx as nx

@@ -1860,6 +1862,66 @@ def _execute_tasks(tasks, n4js, s3os_server):

return protocoldagresults

def test_get_transformation_and_network_results_json(
self,
scope_test,
n4js_preloaded,
s3os_server,
user_client: client.AlchemiscaleClient,
network_tyk2,
tmpdir,
):
n4js = n4js_preloaded

# select the transformation we want to compute
an = network_tyk2
transformation = list(t for t in an.edges if "_solvent" in t.name)[0]

network_sk = user_client.get_scoped_key(an, scope_test)
transformation_sk = user_client.get_scoped_key(transformation, scope_test)

# user client : create three independent tasks for the transformation
user_client.create_tasks(transformation_sk, count=3)

# user client : action the tasks for execution
all_tasks = user_client.get_transformation_tasks(transformation_sk)
actioned_tasks = user_client.action_tasks(all_tasks, network_sk)

# execute the actioned tasks and push results directly using statestore and object store
with tmpdir.as_cwd():
protocoldagresults = self._execute_tasks(actioned_tasks, n4js, s3os_server)
# overwrite what's in the object store
for protocoldagresult in protocoldagresults:
pdr_jb = json.dumps(
protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder
).encode("utf-8")

location = os.path.join(
"protocoldagresult",
*transformation_sk.scope.to_tuple(),
transformation_sk.gufe_key,
"results",
protocoldagresult.key,
"obj",
)

s3os_server._store_bytes(location, pdr_jb)

# clear local gufe registry of pdr objects
# not critical, but ensures we see the objects that are deserialized
# instead of our instances already in memory post-pull
for pdr in protocoldagresults:
TOKENIZABLE_REGISTRY.pop(pdr.key, None)

# get back protocoldagresults instead
protocoldagresults_r = user_client.get_transformation_results(
transformation_sk, return_protocoldagresults=True
)

assert set(protocoldagresults_r) == set(protocoldagresults)

pass

def test_get_transformation_and_network_results(
self,
scope_test,