Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prototype support for an ak.apply function #3963

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

jabraham17
Copy link
Contributor

@jabraham17 jabraham17 commented Jan 8, 2025

Adds prototypical support for calling an arbitrary python function on a distributed Arkouda array.

For example:

import arkouda as ak
ak.connect()
arr = ak.array([1,2,3])
res = ak.apply(arr, lambda x: x+1)
# res is now [2, 3, 4]

This PR uses cloudpickle to pickle arbitrary python code. The pickle is shipped to the server as base64, where it is depickled and executed.

To use this module, there are new requirements for Arkouda.

  • The arkouda_server must be built with the same version of python that it will be run with.
  • The arkouda_server's python version and the client's python version must be the same major/minor version.
  • cloudpickle must be available to the arkouda_server, not just the client
  • Any dependencies used within the body of the applied function must be available to the arkouda_server

@jabraham17 jabraham17 marked this pull request as draft January 8, 2025 00:11
@jabraham17 jabraham17 marked this pull request as ready for review January 18, 2025 02:17
setup.py Show resolved Hide resolved
tests/apply_test.py Show resolved Hide resolved
add test case

Signed-off-by: Jade Abraham <[email protected]>

add test to ini

Signed-off-by: Jade Abraham <[email protected]>

add server msg

Signed-off-by: Jade Abraham <[email protected]>

add client code

Signed-off-by: Jade Abraham <[email protected]>

add support for arbitrary python code

Signed-off-by: Jade Abraham <[email protected]>

add more tests

Signed-off-by: Jade Abraham <[email protected]>

add python interop flags

Signed-off-by: Jade Abraham <[email protected]>

add compat modules

Signed-off-by: Jade Abraham <[email protected]>

add cloudpickle

Signed-off-by: Jade Abraham <[email protected]>

use non-generic function

Signed-off-by: Jade Abraham <[email protected]>

add stubs

Signed-off-by: Jade Abraham <[email protected]>

add missing dependency

Signed-off-by: Jade Abraham <[email protected]>

remove unused import

Signed-off-by: Jade Abraham <[email protected]>

format code nicely

Signed-off-by: Jade Abraham <[email protected]>

fix test

Signed-off-by: Jade Abraham <[email protected]>

skip tests if not supported

Signed-off-by: Jade Abraham <[email protected]>

ignore mypy errors for cloudpickle

Signed-off-by: Jade Abraham <[email protected]>

add missing cloudpickle deps

Signed-off-by: Jade Abraham <[email protected]>
@jabraham17
Copy link
Contributor Author

The CI failures appear unrelated to the contents of this PR

@jabraham17 jabraham17 requested a review from ajpotts January 24, 2025 16:19
src/ApplyMsgFunctions.chpl Outdated Show resolved Hide resolved
Copy link
Contributor

@e-kayrakli e-kayrakli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jabraham17 -- thanks for this PR. I left some comments.
@ajpotts -- I see that Jade has some questions for you in the PR, could you take a look?

Makefile Show resolved Hide resolved
arkouda/apply.py Outdated Show resolved Hide resolved
arkouda/apply.py Outdated Show resolved Hide resolved
arkouda/apply.py Outdated Show resolved Hide resolved
arkouda/apply.py Outdated Show resolved Hide resolved
{
// TODO: this is a big hack, ideally we properly implement the base64 module
// instead of using Python as a polyfill.
// even better, we should have a way to just send bytes from the client to the server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not send a Python bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the server does not have general support for serializing/deserializing raw bytes. If it did, this code would look like

const pickleData = msgArgs["pickleData"].toScalar(bytes);

ServerModules.cfg Outdated Show resolved Hide resolved
@e-kayrakli
Copy link
Contributor

Some libraries seem to not play with being cloudpickled multiple times. For example, using numpy inside a function applied to a pdarray is fine. However, if running from a jupter notebook and the apply is run from a cell twice in a row, the server will segfault

Do you have an idea how to mitigate this? It sounds like the second time you cloudpickle a numpy function, it is not playing well with the server, is that correct? Could the server cache functions? Arguably, that will not prevent the client from resending them, but they can be ignored. Or ideally, client can maintain a state, in which the fact that a given function is cached by the server can be stored.

Signed-off-by: Jade Abraham <[email protected]>
Signed-off-by: Jade Abraham <[email protected]>
@jabraham17
Copy link
Contributor Author

Do you have an idea how to mitigate this? It sounds like the second time you cloudpickle a numpy function, it is not playing well with the server, is that correct? Could the server cache functions? Arguably, that will not prevent the client from resending them, but they can be ignored. Or ideally, client can maintain a state, in which the fact that a given function is cached by the server can be stored.

Its not just the use of the same function, its about the use of a module in a function. This snippet replicates the issue

import arkouda as ak
import numpy as np

ak.connect()

n = 100_000
arr = ak.randint(0, 10, n)

def foo(x):
    np
    return 2.0
res = ak.apply(arr, foo, "float64")
print(res)

def bar(x):
    np
    return 1.0
res = ak.apply(arr, bar, "float64")
print(res)

@jabraham17
Copy link
Contributor Author

I have tracked down the crash to an issue with numpy and multiple interpreters: numpy/numpy#28271. This error should not crash the server (only throw an exception), so there is something on the Chapel Python interop side to fix. But this will just make a nicer error message, it will not fix the problem. For that, we either need to reusue the interpreter for each locale or numpy needs to be fixed.

@jabraham17
Copy link
Contributor Author

This error should not crash the server (only throw an exception), so there is something on the Chapel Python interop side to fix

This is a general Chapel bug, there is nothing that can be done within the Python module to fix it

Signed-off-by: Jade Abraham <[email protected]>
Copy link
Contributor

@ajpotts ajpotts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this so much!

Unfortunately I don't think we can merge it in until we can reduce the potential for server crashes.

>>> def times_pi(x):
return x*math.pi
>>> ak.apply(arr, times_pi, "float64")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I try this example, it works great the first time and then crashes the server on a second call.

In [7]: ak.connect()
connected to arkouda server tcp://*:5555

In [8]: import math

In [9]: arr = ak.randint(0, 10, 10_000)

In [10]: def times_pi(x):
    ...:             return x*math.pi
    ...: 

In [11]: ak.apply(arr, times_pi, "float64")
Out[11]: array([25.132741228718345 6.2831853071795862 21.991148575128552 ... 18.849555921538759 18.849555921538759 25.132741228718345])

In [12]: ak.apply(arr, times_pi, "float64")
^C---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[12], line 1
----> 1 ak.apply(arr, times_pi, "float64")

File ~/anaconda3/envs/arkouda-dev/lib/python3.12/site-packages/typeguard/__init__.py:891, in typechecked.<locals>.wrapper(*args, **kwargs)

I know this may be the same error mentioned in other comments. I'm not sure it's a good idea to merge this in when it can crash the server on basic examples. In practice there may be multiple users sharing a server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you were using numpy, I would call it the same error as mentioned above and in the docs. But the server should not crash on this example.

import arkouda as ak
ak.connect()

import math
arr = ak.randint(0, 10, 10_000)
def times_pi(x):
  return x*math.pi

ak.apply(arr, times_pi, "float64")
ak.apply(arr, times_pi, "float64")
for i in range(100):
  ak.apply(arr, times_pi, "float64")

This works fine for me. Can you provide more details so I can replicate this and fix it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.txt

Here are the logs I am getting for this example. Here is my configuration:

In [5]: ak.get_config()
Out[5]: 
{'arkoudaVersion': 'v2025.01.13+9.ga60c66f2b',
 'chplVersion': '2.4.0 (351d4d59a5)',
 'ZMQVersion': '4.3.5',
 'HDF5Version': '1.14.4',
 'serverHostname': 'pop-os',
 'ServerPort': 5555,
 'numLocales': 1,
 'numPUs': 8,
 'maxTaskPar': 8,
 'physicalMemory': 67257028608,
 'distributionType': 'domain(1,int(64),one)',
 'LocaleConfigs': [{'id': 0,
   'name': 'pop-os',
   'numPUs': 8,
   'maxTaskPar': 8,
   'physicalMemory': 67257028608}],
 'authenticate': False,
 'logLevel': 'INFO',
 'logChannel': 'CONSOLE',
 'regexMaxCaptures': 20,
 'byteorder': 'little',
 'autoShutdown': False,
 'serverInfoNoSplash': False,
 'maxArrayDims': 1,
 'ARROW_VERSION': '19.0.0'}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@e-kayrakli asked me to check if the python versions matched. I tried this snippet from python interpreter:

$ python3
Python 3.12.3 | packaged by conda-forge | (main, Apr 15 2024, 18:38:13) [GCC 12.3.0] on linux

and checked it matches the conda environment I compiled with:

$ conda env list | grep -v "^$\|#" |awk '{print $1;}'|xargs -I{} -d "\n" sh -c 'printf "Env: {}\t"; conda list -n {} |grep "^python\s";'
Env: base	
python                    3.9.20               he870216_1  
Env: arkouda-dev	python                    3.12.3          hab00c5b_0_cpython    conda-forge

They appear to match, but I still get the same seg fault.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made some improvements here. The server will now check for python version mismatches.

Additionally, the crashes from using numpy are avoided by reusing the interpreters for each call to apply

While doing this, I also found a bug in register_commands.py which I am opening a separate PR for

Signed-off-by: Jade Abraham <[email protected]>
Signed-off-by: Jade Abraham <[email protected]>
Signed-off-by: Jade Abraham <[email protected]>
Signed-off-by: Jade Abraham <[email protected]>
arkouda/apply.py Outdated
if not vers_supported:
interp_version = f"{sys.version_info.major}.{sys.version_info.minor}"
raise RuntimeError(
f"The current interpreter version ({interp_version}) is not supported by the server"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How difficult would it be to have the error recommend which version is supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e6b9703 should do that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants