Skip to content

Commit

Permalink
[full persistence] upsert session (#8125)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 8196f0fdb95275ac32696fd44ad112c95513030c
  • Loading branch information
KamilPiechowiak authored and Manul from Pathway committed Jan 31, 2025
1 parent 36b9ec2 commit bfd3414
Show file tree
Hide file tree
Showing 17 changed files with 268 additions and 500 deletions.
1 change: 0 additions & 1 deletion python/pathway/engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,6 @@ class SnapshotAccess(Enum):
class PythonConnectorEventType(Enum):
INSERT: PythonConnectorEventType
DELETE: PythonConnectorEventType
UPSERT: PythonConnectorEventType
EXTERNAL_OFFSET: PythonConnectorEventType

class SessionType(Enum):
Expand Down
9 changes: 2 additions & 7 deletions python/pathway/io/python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,7 @@ def close(self) -> None:
self._send_special_message(FINISH_LITERAL)

def _send_special_message(self, msg: str) -> None:
event_type = (
PythonConnectorEventType.INSERT
if self._session_type == SessionType.NATIVE
else PythonConnectorEventType.UPSERT
)
self._buffer.put((event_type, None, {"_pw_special": msg}))
self._buffer.put((PythonConnectorEventType.INSERT, None, {"_pw_special": msg}))

def start(self) -> None:
"""Runs a separate thread with function feeding data into buffer.
Expand Down Expand Up @@ -297,7 +292,7 @@ def _add_inner(self, key: Pointer | None, values: dict[str, Any]) -> None:
raise ValueError(
f"Trying to modify a row in {type(self)} but deletions_enabled is set to False."
)
self._buffer.put((PythonConnectorEventType.UPSERT, key, values))
self._buffer.put((PythonConnectorEventType.INSERT, key, values))
else:
raise NotImplementedError(f"session type {self._session_type} not handled")

Expand Down
80 changes: 74 additions & 6 deletions python/pathway/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
CsvLinesNumberChecker,
FileLinesNumberChecker,
T,
assert_sets_equality_from_path,
assert_table_equality,
assert_table_equality_wo_index,
needs_multiprocessing_fork,
Expand Down Expand Up @@ -2220,6 +2221,8 @@ class InputSchema(pw.Schema):


def test_python_connector_upsert_raw(tmp_path: pathlib.Path):
output_path = tmp_path / "output.csv"

class TestSubject(pw.io.python.ConnectorSubject):
@property
def _session_type(self) -> SessionType:
Expand All @@ -2228,20 +2231,47 @@ def _session_type(self) -> SessionType:
def run(self):
self._add(api.ref_scalar(0), b"one")
time.sleep(5e-2)
self._remove(api.ref_scalar(0), b"")
time.sleep(5e-2)
self._add(api.ref_scalar(0), b"two")
time.sleep(5e-2)
self._add(api.ref_scalar(0), b"three")
self.close()

table = pw.io.python.read(TestSubject(), format="raw", autocommit_duration_ms=10)
pw.io.csv.write(table, tmp_path / "output.csv")
pw.io.csv.write(table, output_path)
run()

result = pd.read_csv(tmp_path / "output.csv")
result = pd.read_csv(output_path)
assert len(result) == 5
assert_sets_equality_from_path(output_path, {"three,1"})


def test_python_connector_upsert_remove_raw(tmp_path: pathlib.Path):
output_path = tmp_path / "output.csv"

class TestSubject(pw.io.python.ConnectorSubject):
@property
def _session_type(self) -> SessionType:
return SessionType.UPSERT

def run(self):
self._add(api.ref_scalar(0), b"one")
time.sleep(5e-2)
self._remove(api.ref_scalar(0), b"")

table = pw.io.python.read(TestSubject(), format="raw", autocommit_duration_ms=10)
pw.io.csv.write(table, output_path)
run()

result = pd.read_csv(output_path)
assert len(result) == 2
assert_sets_equality_from_path(output_path, set())


def test_python_connector_removal_by_key(tmp_path: pathlib.Path):
output_path = tmp_path / "output.csv"

class TestSubject(pw.io.python.ConnectorSubject):
@property
def _session_type(self) -> SessionType:
Expand All @@ -2254,14 +2284,17 @@ def run(self):
self.close()

table = pw.io.python.read(TestSubject(), format="raw", autocommit_duration_ms=10)
pw.io.csv.write(table, tmp_path / "output.csv")
pw.io.csv.write(table, output_path)
run()

result = pd.read_csv(tmp_path / "output.csv")
result = pd.read_csv(output_path)
assert len(result) == 2
assert_sets_equality_from_path(output_path, set())


def test_python_connector_upsert_json(tmp_path: pathlib.Path):
output_path = tmp_path / "output.csv"

class TestSubject(pw.io.python.ConnectorSubject):
@property
def _session_type(self) -> SessionType:
Expand Down Expand Up @@ -2291,11 +2324,46 @@ class InputSchema(pw.Schema):
table = pw.io.python.read(
TestSubject(), format="json", schema=InputSchema, autocommit_duration_ms=10
)
pw.io.csv.write(table, tmp_path / "output.csv")
pw.io.csv.write(table, output_path)
run()

result = pd.read_csv(tmp_path / "output.csv")
result = pd.read_csv(output_path)
assert len(result) == 5
assert_sets_equality_from_path(output_path, {"three,3,1"})


def test_python_connector_upsert_remove_json(tmp_path: pathlib.Path):
output_path = tmp_path / "output.csv"

class TestSubject(pw.io.python.ConnectorSubject):
@property
def _session_type(self) -> SessionType:
return SessionType.UPSERT

def run(self):
self._add(
api.ref_scalar(0),
json.dumps({"word": "one", "digit": 1}).encode("utf-8"),
)
time.sleep(5e-2)
self._remove_inner(
api.ref_scalar(0),
{},
)

class InputSchema(pw.Schema):
word: str
digit: int

table = pw.io.python.read(
TestSubject(), format="json", schema=InputSchema, autocommit_duration_ms=10
)
pw.io.csv.write(table, output_path)
run()

result = pd.read_csv(output_path)
assert len(result) == 2
assert_sets_equality_from_path(output_path, set())


def test_python_connector_metadata():
Expand Down
84 changes: 53 additions & 31 deletions python/pathway/tests/test_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
import time
from typing import Callable

import pandas as pd
import pytest

import pathway as pw
from pathway.engine import SessionType
from pathway.internals import api
from pathway.internals.parse_graph import G
from pathway.tests.utils import (
CsvPathwayChecker,
LogicChecker,
consolidate,
assert_sets_equality_from_path,
needs_multiprocessing_fork,
run,
wait_result_with_checker,
Expand Down Expand Up @@ -274,18 +274,6 @@ def pw_identity_program():
assert actual_diffs == expected_diffs


def combine_columns(df: pd.DataFrame) -> pd.Series:
result = None
for column in df.columns:
if column == "time":
continue
if result is None:
result = df[column].astype(str)
else:
result += "," + df[column].astype(str)
return result


def get_one_table_runner(
tmp_path: pathlib.Path,
mode: api.PersistenceMode,
Expand Down Expand Up @@ -313,11 +301,7 @@ def run_computation(inputs, expected):
persistence_mode=mode,
)
)
try:
result = combine_columns(consolidate(pd.read_csv(output_path)))
except pd.errors.EmptyDataError:
result = pd.Series([])
assert set(result) == expected
assert_sets_equality_from_path(output_path, expected)

return run_computation, input_path

Expand Down Expand Up @@ -360,8 +344,7 @@ def run_computation(inputs_1, inputs_2, expected):
terminate_on_error=terminate_on_error,
# hack to allow changes from different files at different point in time
)
result = consolidate(pd.read_csv(output_path))
assert set(combine_columns(result)) == expected
assert_sets_equality_from_path(output_path, expected)

return run_computation, input_path_1, input_path_2

Expand Down Expand Up @@ -741,11 +724,7 @@ def setup(inputs: list[str]) -> None:

def get_checker(expected: set[str]) -> Callable:
def check() -> None:
try:
result = combine_columns(consolidate(pd.read_csv(output_path)))
except pd.errors.EmptyDataError:
result = pd.Series([])
assert set(result) == expected
assert_sets_equality_from_path(output_path, expected)

return LogicChecker(check)

Expand Down Expand Up @@ -832,11 +811,7 @@ def setup(inputs: list[str]) -> None:

def get_checker(expected: set[str]) -> Callable:
def check() -> None:
try:
result = combine_columns(consolidate(pd.read_csv(output_path)))
except pd.errors.EmptyDataError:
result = pd.Series([])
assert set(result) == expected
assert_sets_equality_from_path(output_path, expected)

return LogicChecker(check)

Expand Down Expand Up @@ -882,3 +857,50 @@ def check() -> None:
target=run,
kwargs={"persistence_config": persistence_config},
)


@pytest.mark.parametrize(
"mode", [api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING]
)
def test_upsert_session_with_python_connector(tmp_path, mode):
output_path = tmp_path / "out.csv"
persistent_storage_path = tmp_path / "p"

class InputSchema(pw.Schema):
a: int = pw.column_definition(primary_key=True)
b: int

class InputSubject(pw.io.python.ConnectorSubject):
data: list[dict[str, int]]

def __init__(self, data: list[dict[str, int]]) -> None:
super().__init__()
self.data = data

def run(self) -> None:
for entry in self.data:
self.next(**entry)

@property
def _session_type(self) -> SessionType:
return SessionType.UPSERT

def run_computation(inputs: list[dict[str, int]], expected: set[str]):
G.clear()
res = pw.io.python.read(InputSubject(inputs), schema=InputSchema)
pw.io.csv.write(res, output_path)
run(
persistence_config=pw.persistence.Config(
pw.persistence.Backend.filesystem(persistent_storage_path),
persistence_mode=mode,
)
)
assert_sets_equality_from_path(output_path, expected)

run_computation([{"a": 1, "b": 2}, {"a": 2, "b": 3}], {"1,2,1", "2,3,1"})
run_computation(
[{"a": 1, "b": 4}, {"a": 3, "b": 10}], {"1,2,-1", "1,4,1", "3,10,1"}
)
run_computation([{"a": 3, "b": 9}], {"3,10,-1", "3,9,1"})
run_computation([{"a": 4, "b": 6}], {"4,6,1"})
run_computation([{"a": 1, "b": 0}], {"1,4,-1", "1,0,1"})
21 changes: 21 additions & 0 deletions python/pathway/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,3 +823,24 @@ def consolidate(df: pd.DataFrame) -> pd.DataFrame:
df.at[i, "diff"] = total[value]
total[value] = 0
return df[df["diff"] != 0].drop(columns=["_all_values"])


def combine_columns(df: pd.DataFrame) -> pd.Series:
result = None
for column in df.columns:
if column == "time":
continue
if result is None:
result = df[column].astype(str)
else:
result += "," + df[column].astype(str)
assert result is not None
return result


def assert_sets_equality_from_path(path: pathlib.Path, expected: set[str]) -> None:
try:
result = combine_columns(consolidate(pd.read_csv(path)))
except pd.errors.EmptyDataError:
result = pd.Series([])
assert set(result) == expected
Loading

0 comments on commit bfd3414

Please sign in to comment.