diff --git a/python/pathway/engine.pyi b/python/pathway/engine.pyi index 6864b5fd..a990bab9 100644 --- a/python/pathway/engine.pyi +++ b/python/pathway/engine.pyi @@ -805,7 +805,6 @@ class SnapshotAccess(Enum): class PythonConnectorEventType(Enum): INSERT: PythonConnectorEventType DELETE: PythonConnectorEventType - UPSERT: PythonConnectorEventType EXTERNAL_OFFSET: PythonConnectorEventType class SessionType(Enum): diff --git a/python/pathway/io/python/__init__.py b/python/pathway/io/python/__init__.py index d9f3de8f..6d0963b3 100644 --- a/python/pathway/io/python/__init__.py +++ b/python/pathway/io/python/__init__.py @@ -232,12 +232,7 @@ def close(self) -> None: self._send_special_message(FINISH_LITERAL) def _send_special_message(self, msg: str) -> None: - event_type = ( - PythonConnectorEventType.INSERT - if self._session_type == SessionType.NATIVE - else PythonConnectorEventType.UPSERT - ) - self._buffer.put((event_type, None, {"_pw_special": msg})) + self._buffer.put((PythonConnectorEventType.INSERT, None, {"_pw_special": msg})) def start(self) -> None: """Runs a separate thread with function feeding data into buffer. @@ -297,7 +292,7 @@ def _add_inner(self, key: Pointer | None, values: dict[str, Any]) -> None: raise ValueError( f"Trying to modify a row in {type(self)} but deletions_enabled is set to False." ) - self._buffer.put((PythonConnectorEventType.UPSERT, key, values)) + self._buffer.put((PythonConnectorEventType.INSERT, key, values)) else: raise NotImplementedError(f"session type {self._session_type} not handled") diff --git a/python/pathway/tests/test_io.py b/python/pathway/tests/test_io.py index 5a263a7f..cacda522 100644 --- a/python/pathway/tests/test_io.py +++ b/python/pathway/tests/test_io.py @@ -31,6 +31,7 @@ CsvLinesNumberChecker, FileLinesNumberChecker, T, + assert_sets_equality_from_path, assert_table_equality, assert_table_equality_wo_index, needs_multiprocessing_fork, @@ -2220,6 +2221,8 @@ class InputSchema(pw.Schema): def test_python_connector_upsert_raw(tmp_path: pathlib.Path): + output_path = tmp_path / "output.csv" + class TestSubject(pw.io.python.ConnectorSubject): @property def _session_type(self) -> SessionType: @@ -2228,20 +2231,47 @@ def _session_type(self) -> SessionType: def run(self): self._add(api.ref_scalar(0), b"one") time.sleep(5e-2) + self._remove(api.ref_scalar(0), b"") + time.sleep(5e-2) self._add(api.ref_scalar(0), b"two") time.sleep(5e-2) self._add(api.ref_scalar(0), b"three") self.close() table = pw.io.python.read(TestSubject(), format="raw", autocommit_duration_ms=10) - pw.io.csv.write(table, tmp_path / "output.csv") + pw.io.csv.write(table, output_path) run() - result = pd.read_csv(tmp_path / "output.csv") + result = pd.read_csv(output_path) assert len(result) == 5 + assert_sets_equality_from_path(output_path, {"three,1"}) + + +def test_python_connector_upsert_remove_raw(tmp_path: pathlib.Path): + output_path = tmp_path / "output.csv" + + class TestSubject(pw.io.python.ConnectorSubject): + @property + def _session_type(self) -> SessionType: + return SessionType.UPSERT + + def run(self): + self._add(api.ref_scalar(0), b"one") + time.sleep(5e-2) + self._remove(api.ref_scalar(0), b"") + + table = pw.io.python.read(TestSubject(), format="raw", autocommit_duration_ms=10) + pw.io.csv.write(table, output_path) + run() + + result = pd.read_csv(output_path) + assert len(result) == 2 + assert_sets_equality_from_path(output_path, set()) def test_python_connector_removal_by_key(tmp_path: pathlib.Path): + output_path = tmp_path / "output.csv" + class TestSubject(pw.io.python.ConnectorSubject): @property def _session_type(self) -> SessionType: @@ -2254,14 +2284,17 @@ def run(self): self.close() table = pw.io.python.read(TestSubject(), format="raw", autocommit_duration_ms=10) - pw.io.csv.write(table, tmp_path / "output.csv") + pw.io.csv.write(table, output_path) run() - result = pd.read_csv(tmp_path / "output.csv") + result = pd.read_csv(output_path) assert len(result) == 2 + assert_sets_equality_from_path(output_path, set()) def test_python_connector_upsert_json(tmp_path: pathlib.Path): + output_path = tmp_path / "output.csv" + class TestSubject(pw.io.python.ConnectorSubject): @property def _session_type(self) -> SessionType: @@ -2291,11 +2324,46 @@ class InputSchema(pw.Schema): table = pw.io.python.read( TestSubject(), format="json", schema=InputSchema, autocommit_duration_ms=10 ) - pw.io.csv.write(table, tmp_path / "output.csv") + pw.io.csv.write(table, output_path) run() - result = pd.read_csv(tmp_path / "output.csv") + result = pd.read_csv(output_path) assert len(result) == 5 + assert_sets_equality_from_path(output_path, {"three,3,1"}) + + +def test_python_connector_upsert_remove_json(tmp_path: pathlib.Path): + output_path = tmp_path / "output.csv" + + class TestSubject(pw.io.python.ConnectorSubject): + @property + def _session_type(self) -> SessionType: + return SessionType.UPSERT + + def run(self): + self._add( + api.ref_scalar(0), + json.dumps({"word": "one", "digit": 1}).encode("utf-8"), + ) + time.sleep(5e-2) + self._remove_inner( + api.ref_scalar(0), + {}, + ) + + class InputSchema(pw.Schema): + word: str + digit: int + + table = pw.io.python.read( + TestSubject(), format="json", schema=InputSchema, autocommit_duration_ms=10 + ) + pw.io.csv.write(table, output_path) + run() + + result = pd.read_csv(output_path) + assert len(result) == 2 + assert_sets_equality_from_path(output_path, set()) def test_python_connector_metadata(): diff --git a/python/pathway/tests/test_persistence.py b/python/pathway/tests/test_persistence.py index 906e9cba..9bca76fd 100644 --- a/python/pathway/tests/test_persistence.py +++ b/python/pathway/tests/test_persistence.py @@ -7,16 +7,16 @@ import time from typing import Callable -import pandas as pd import pytest import pathway as pw +from pathway.engine import SessionType from pathway.internals import api from pathway.internals.parse_graph import G from pathway.tests.utils import ( CsvPathwayChecker, LogicChecker, - consolidate, + assert_sets_equality_from_path, needs_multiprocessing_fork, run, wait_result_with_checker, @@ -274,18 +274,6 @@ def pw_identity_program(): assert actual_diffs == expected_diffs -def combine_columns(df: pd.DataFrame) -> pd.Series: - result = None - for column in df.columns: - if column == "time": - continue - if result is None: - result = df[column].astype(str) - else: - result += "," + df[column].astype(str) - return result - - def get_one_table_runner( tmp_path: pathlib.Path, mode: api.PersistenceMode, @@ -313,11 +301,7 @@ def run_computation(inputs, expected): persistence_mode=mode, ) ) - try: - result = combine_columns(consolidate(pd.read_csv(output_path))) - except pd.errors.EmptyDataError: - result = pd.Series([]) - assert set(result) == expected + assert_sets_equality_from_path(output_path, expected) return run_computation, input_path @@ -360,8 +344,7 @@ def run_computation(inputs_1, inputs_2, expected): terminate_on_error=terminate_on_error, # hack to allow changes from different files at different point in time ) - result = consolidate(pd.read_csv(output_path)) - assert set(combine_columns(result)) == expected + assert_sets_equality_from_path(output_path, expected) return run_computation, input_path_1, input_path_2 @@ -741,11 +724,7 @@ def setup(inputs: list[str]) -> None: def get_checker(expected: set[str]) -> Callable: def check() -> None: - try: - result = combine_columns(consolidate(pd.read_csv(output_path))) - except pd.errors.EmptyDataError: - result = pd.Series([]) - assert set(result) == expected + assert_sets_equality_from_path(output_path, expected) return LogicChecker(check) @@ -832,11 +811,7 @@ def setup(inputs: list[str]) -> None: def get_checker(expected: set[str]) -> Callable: def check() -> None: - try: - result = combine_columns(consolidate(pd.read_csv(output_path))) - except pd.errors.EmptyDataError: - result = pd.Series([]) - assert set(result) == expected + assert_sets_equality_from_path(output_path, expected) return LogicChecker(check) @@ -882,3 +857,50 @@ def check() -> None: target=run, kwargs={"persistence_config": persistence_config}, ) + + +@pytest.mark.parametrize( + "mode", [api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING] +) +def test_upsert_session_with_python_connector(tmp_path, mode): + output_path = tmp_path / "out.csv" + persistent_storage_path = tmp_path / "p" + + class InputSchema(pw.Schema): + a: int = pw.column_definition(primary_key=True) + b: int + + class InputSubject(pw.io.python.ConnectorSubject): + data: list[dict[str, int]] + + def __init__(self, data: list[dict[str, int]]) -> None: + super().__init__() + self.data = data + + def run(self) -> None: + for entry in self.data: + self.next(**entry) + + @property + def _session_type(self) -> SessionType: + return SessionType.UPSERT + + def run_computation(inputs: list[dict[str, int]], expected: set[str]): + G.clear() + res = pw.io.python.read(InputSubject(inputs), schema=InputSchema) + pw.io.csv.write(res, output_path) + run( + persistence_config=pw.persistence.Config( + pw.persistence.Backend.filesystem(persistent_storage_path), + persistence_mode=mode, + ) + ) + assert_sets_equality_from_path(output_path, expected) + + run_computation([{"a": 1, "b": 2}, {"a": 2, "b": 3}], {"1,2,1", "2,3,1"}) + run_computation( + [{"a": 1, "b": 4}, {"a": 3, "b": 10}], {"1,2,-1", "1,4,1", "3,10,1"} + ) + run_computation([{"a": 3, "b": 9}], {"3,10,-1", "3,9,1"}) + run_computation([{"a": 4, "b": 6}], {"4,6,1"}) + run_computation([{"a": 1, "b": 0}], {"1,4,-1", "1,0,1"}) diff --git a/python/pathway/tests/utils.py b/python/pathway/tests/utils.py index 2e99fca8..f92b9920 100644 --- a/python/pathway/tests/utils.py +++ b/python/pathway/tests/utils.py @@ -823,3 +823,24 @@ def consolidate(df: pd.DataFrame) -> pd.DataFrame: df.at[i, "diff"] = total[value] total[value] = 0 return df[df["diff"] != 0].drop(columns=["_all_values"]) + + +def combine_columns(df: pd.DataFrame) -> pd.Series: + result = None + for column in df.columns: + if column == "time": + continue + if result is None: + result = df[column].astype(str) + else: + result += "," + df[column].astype(str) + assert result is not None + return result + + +def assert_sets_equality_from_path(path: pathlib.Path, expected: set[str]) -> None: + try: + result = combine_columns(consolidate(pd.read_csv(path))) + except pd.errors.EmptyDataError: + result = pd.Series([]) + assert set(result) == expected diff --git a/src/connectors/adaptors.rs b/src/connectors/adaptors.rs deleted file mode 100644 index 4cd94f64..00000000 --- a/src/connectors/adaptors.rs +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright © 2024 Pathway - -use differential_dataflow::input::InputSession; -use differential_dataflow::lattice::Lattice; -use differential_dataflow::operators::arrange::upsert::arrange_from_upsert; -use differential_dataflow::trace::implementations::ord::OrdValBatch; -use differential_dataflow::trace::implementations::spine_fueled::Spine; -use differential_dataflow::Collection; -use timely::dataflow::operators::input::Handle; -use timely::dataflow::operators::Input as TimelyInput; -use timely::order::TotalOrder; -use timely::progress::Timestamp as TimelyTimestamp; - -use crate::engine::dataflow::maybe_total::MaybeTotalScope; -use crate::engine::{Key, Value}; - -use std::rc::Rc; - -pub type GenericValues = Collection; -pub type ValuesSessionAdaptor = Box>; - -#[derive(Clone, Copy, Debug)] -pub enum SessionType { - Native, - Upsert, -} - -impl SessionType { - pub fn new_collection< - Timestamp: TimelyTimestamp + Lattice + TotalOrder, - S: MaybeTotalScope, - >( - &self, - scope: &mut S, - ) -> (ValuesSessionAdaptor, GenericValues) { - match &self { - SessionType::Native => { - let mut input_session = InputSession::new(); - let collection = input_session.to_collection(scope); - (Box::new(input_session), collection) - } - SessionType::Upsert => { - let mut upsert_session = UpsertSession::new(); - let collection = upsert_session.to_collection(scope); - (Box::new(upsert_session), collection) - } - } - } -} - -pub trait InputAdaptor { - fn new() -> Self - where - Self: Sized; - - fn insert(&mut self, key: Key, value: Value); - fn remove(&mut self, key: Key, value: Value); - fn upsert(&mut self, key: Key, value: Option); - - fn advance_to(&mut self, time: Timestamp); - fn time(&self) -> &Timestamp; - - fn flush(&mut self); -} - -#[derive(Default)] -pub struct UpsertSession { - time: Timestamp, - buffer: Vec<(Key, Option, Timestamp)>, - handle: Handle, Timestamp)>, -} - -impl UpsertSession { - pub fn to_collection>( - &mut self, - scope: &mut S, - ) -> Collection { - // We require that any given key is provided only to a single worker. - arrange_from_upsert::>>>( - &scope.input_from(&mut self.handle), - "UpsertSession", - ) - .as_collection(|k, v| (*k, v.clone())) - } -} - -impl InputAdaptor - for UpsertSession -{ - /// The implementation below mostly reuses differetial dataflow's `InputSession` internals. - /// - /// The main difference is the interface of the `to_collection` method and more task-based - /// insert and remove methods. - - fn new() -> Self { - let handle: Handle = Handle::new(); - UpsertSession { - time: handle.time().clone(), - buffer: Vec::new(), - handle, - } - } - - fn flush(&mut self) { - self.handle.send_batch(&mut self.buffer); - if self.handle.epoch().less_than(&self.time) { - self.handle.advance_to(self.time.clone()); - } - } - - fn advance_to(&mut self, time: Timestamp) { - assert!(self.handle.epoch().less_equal(&time)); - assert!(self.time.less_equal(&time)); - self.time = time; - } - - fn insert(&mut self, _key: Key, _value: Value) { - unimplemented!("this type of InputAdaptor doesn't support inserts") - } - - fn remove(&mut self, _key: Key, _value: Value) { - unimplemented!("this type of InputAdaptor doesn't support removals") - } - - fn upsert(&mut self, key: Key, value: Option) { - if self.buffer.len() == self.buffer.capacity() { - if !self.buffer.is_empty() { - self.handle.send_batch(&mut self.buffer); - } - self.buffer.reserve(1024); - } - self.buffer.push((key, value, self.time.clone())); - } - - fn time(&self) -> &Timestamp { - &self.time - } -} - -impl Drop for UpsertSession { - fn drop(&mut self) { - self.flush(); - } -} - -impl InputAdaptor - for InputSession -{ - fn new() -> Self { - Self::new() - } - - fn insert(&mut self, key: Key, value: Value) { - self.insert((key, value)); - } - - fn remove(&mut self, key: Key, value: Value) { - self.remove((key, value)); - } - - fn upsert(&mut self, _key: Key, _value: Option) { - unimplemented!("this type of InputAdaptor doesn't support upserts") - } - - fn flush(&mut self) { - self.flush(); - } - - fn advance_to(&mut self, time: Timestamp) { - self.advance_to(time); - } - - fn time(&self) -> &Timestamp { - self.time() - } -} diff --git a/src/connectors/data_format.rs b/src/connectors/data_format.rs index 1d26bec2..47679f85 100644 --- a/src/connectors/data_format.rs +++ b/src/connectors/data_format.rs @@ -52,10 +52,6 @@ pub type ErrorRemovalLogic = Box DynResult)), - // If None, finding the key for the provided values becomes responsibility of the connector Delete((KeyFieldsWithErrors, ValueFieldsWithErrors)), } @@ -67,21 +63,12 @@ impl ParsedEventWithErrors { key: KeyFieldsWithErrors, values: ValueFieldsWithErrors, ) -> Self { - match session_type { - SessionType::Native => { - match data_event_type { - DataEventType::Insert => ParsedEventWithErrors::Insert((key, values)), - DataEventType::Delete => ParsedEventWithErrors::Delete((key, values)), - DataEventType::Upsert => panic!("incorrect Reader-Parser configuration: unexpected Upsert event in Native session"), - } - } - SessionType::Upsert => { - match data_event_type { - DataEventType::Insert => panic!("incorrect Reader-Parser configuration: unexpected Insert event in Upsert session"), - DataEventType::Delete => ParsedEventWithErrors::Upsert((key, None)), - DataEventType::Upsert => ParsedEventWithErrors::Upsert((key, Some(values))), - } - } + match data_event_type { + DataEventType::Insert => ParsedEventWithErrors::Insert((key, values)), + DataEventType::Delete => match session_type { + SessionType::Native => ParsedEventWithErrors::Delete((key, values)), + SessionType::Upsert => ParsedEventWithErrors::Delete((key, vec![])), + }, } } pub fn remove_errors(self, logic: &ErrorRemovalLogic) -> DynResult { @@ -90,9 +77,6 @@ impl ParsedEventWithErrors { Self::Insert((key, values)) => key .transpose() .and_then(|key| Ok(ParsedEvent::Insert((key, logic(values)?)))), - Self::Upsert((key, values)) => key - .transpose() - .and_then(|key| Ok(ParsedEvent::Upsert((key, values.map(logic).transpose()?)))), Self::Delete((key, values)) => key .transpose() .and_then(|key| Ok(ParsedEvent::Delete((key, logic(values)?)))), @@ -104,10 +88,6 @@ impl ParsedEventWithErrors { pub enum ParsedEvent { AdvanceTime, Insert((Option>, Vec)), - - // None as Vec of values means that the record is removed - Upsert((Option>, Option>)), - // If None, finding the key for the provided values becomes responsibility of the connector Delete((Option>, Vec)), } @@ -119,9 +99,9 @@ impl ParsedEvent { offset: Option<&Offset>, ) -> Option { match self { - ParsedEvent::Insert((raw_key, _)) - | ParsedEvent::Upsert((raw_key, _)) - | ParsedEvent::Delete((raw_key, _)) => Some(values_to_key(raw_key.as_ref(), offset)), + ParsedEvent::Insert((raw_key, _)) | ParsedEvent::Delete((raw_key, _)) => { + Some(values_to_key(raw_key.as_ref(), offset)) + } ParsedEvent::AdvanceTime => None, } } @@ -129,7 +109,6 @@ impl ParsedEvent { pub fn snapshot_event(&self, key: Key) -> Option { match self { ParsedEvent::Insert((_, values)) => Some(SnapshotEvent::Insert(key, values.clone())), - ParsedEvent::Upsert((_, values)) => Some(SnapshotEvent::Upsert(key, values.clone())), ParsedEvent::Delete((_, values)) => Some(SnapshotEvent::Delete(key, values.clone())), ParsedEvent::AdvanceTime => None, } @@ -761,11 +740,8 @@ impl DsvParser { }; let parsed_tokens = self.values_by_indices(tokens, &self.value_column_indices, &self.header); - let parsed_entry = match event { - DataEventType::Insert => ParsedEventWithErrors::Insert((key, parsed_tokens)), - DataEventType::Delete => ParsedEventWithErrors::Delete((key, parsed_tokens)), - DataEventType::Upsert => unreachable!("readers can't send upserts to DsvParser"), - }; + let parsed_entry = + ParsedEventWithErrors::new(self.session_type(), event, key, parsed_tokens); Ok(vec![parsed_entry]) } else { Err(ParseError::UnexpectedNumberOfCsvTokens(tokens.len()).into()) @@ -898,24 +874,7 @@ impl Parser for IdentityParser { }; values.push(to_insert); } - match self.session_type { - SessionType::Native => { - match event { - DataEventType::Insert => ParsedEventWithErrors::Insert((key, values)), - DataEventType::Delete => ParsedEventWithErrors::Delete((key, values)), - DataEventType::Upsert => { - panic!("incorrect Reader-Parser configuration: unexpected Upsert event in Native session") - } - } - } - SessionType::Upsert => { - match event { - DataEventType::Insert => panic!("incorrect Reader-Parser configuration: unexpected Insert event in Upsert session"), - DataEventType::Delete => ParsedEventWithErrors::Upsert((key, None)), - DataEventType::Upsert => ParsedEventWithErrors::Upsert((key, Some(values))), - } - } - } + ParsedEventWithErrors::new(self.session_type(), event, key, values) }; Ok(vec![event]) @@ -1277,22 +1236,16 @@ impl DebeziumMessageParser { &Value::None, ); - match event { - DataEventType::Insert => Ok(ParsedEventWithErrors::Insert((key, parsed_values))), - DataEventType::Delete => Ok(ParsedEventWithErrors::Delete((key, parsed_values))), - DataEventType::Upsert => Ok(ParsedEventWithErrors::Upsert((key, Some(parsed_values)))), - } + Ok(ParsedEventWithErrors::new( + self.session_type(), + event, + key, + parsed_values, + )) } fn parse_read_or_create(&mut self, key: &JsonValue, value: &JsonValue) -> ParseResult { - let event = match self.db_type { - DebeziumDBType::Postgres => { - self.parse_event(key, &value["after"], DataEventType::Insert)? - } - DebeziumDBType::MongoDB => { - self.parse_event(key, &value["after"], DataEventType::Upsert)? - } - }; + let event = self.parse_event(key, &value["after"], DataEventType::Insert)?; Ok(vec![event]) } @@ -1314,7 +1267,8 @@ impl DebeziumMessageParser { .into_iter() .collect() }); - ParsedEventWithErrors::Upsert((key, None)) + //deletion in upsert session - no data needed + ParsedEventWithErrors::Delete((key, vec![])) } }; Ok(vec![event]) @@ -1329,7 +1283,7 @@ impl DebeziumMessageParser { Ok(vec![event_before, event_after]) } DebeziumDBType::MongoDB => { - let event_after = self.parse_event(key, &value["after"], DataEventType::Upsert)?; + let event_after = self.parse_event(key, &value["after"], DataEventType::Insert)?; Ok(vec![event_after]) } } diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index 9ea0a849..106b0929 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -88,14 +88,12 @@ pub use super::data_lake::LakeWriter; pub enum DataEventType { Insert, Delete, - Upsert, } #[derive(Clone, Debug, Eq, PartialEq, Copy)] pub enum PythonConnectorEventType { Insert, Delete, - Upsert, ExternalOffset, } @@ -948,7 +946,6 @@ impl Reader for PythonReader { let event = match py_event { PythonConnectorEventType::Insert => DataEventType::Insert, PythonConnectorEventType::Delete => DataEventType::Delete, - PythonConnectorEventType::Upsert => DataEventType::Upsert, PythonConnectorEventType::ExternalOffset => { let py_external_offset = objects.get(PW_OFFSET_FIELD_NAME).unwrap_or_else(|| { diff --git a/src/connectors/mod.rs b/src/connectors/mod.rs index 40186634..7253b4d0 100644 --- a/src/connectors/mod.rs +++ b/src/connectors/mod.rs @@ -3,6 +3,7 @@ use differential_dataflow::input::InputSession; use itertools::Itertools; use log::{error, info, warn}; +use scopeguard::guard; use std::cell::RefCell; use std::env; use std::ops::ControlFlow; @@ -12,11 +13,8 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::thread::Thread; use std::time::{Duration, SystemTime}; - -use scopeguard::guard; use timely::dataflow::operators::probe::Handle; -pub mod adaptors; pub mod data_format; pub mod data_lake; pub mod data_storage; @@ -34,7 +32,6 @@ use crate::engine::report_error::{ }; use crate::engine::{DataError, Key, Value}; -use crate::connectors::adaptors::InputAdaptor; use crate::engine::Error as EngineError; use crate::engine::Timestamp; use crate::persistence::config::ReadersQueryPurpose; @@ -48,10 +45,16 @@ use data_storage::{ DataEventType, ReadError, ReadResult, Reader, ReaderBuilder, ReaderContext, WriteError, Writer, }; -pub use adaptors::SessionType; pub use data_storage::StorageType; pub use offset::{Offset, OffsetKey, OffsetValue}; +type ValuesSession = InputSession; +#[derive(Clone, Copy, Debug)] +pub enum SessionType { + Native, + Upsert, +} + /* Below is the custom reader stuff. In most cases, the input can be separated into raw data reads and parsing. @@ -205,7 +208,7 @@ impl Connector { } } - fn advance_time(&mut self, input_session: &mut dyn InputAdaptor) -> Timestamp { + fn advance_time(&mut self, input_session: &mut ValuesSession) -> Timestamp { let new_timestamp = Timestamp::new_from_current_time(); let timestamp_updated = self.current_timestamp <= new_timestamp; if timestamp_updated { @@ -245,9 +248,7 @@ impl Connector { info!("Reached the end of the snapshot. Exiting the rewind after {entries_read} entries"); break; } - SnapshotEvent::Insert(_, _) - | SnapshotEvent::Delete(_, _) - | SnapshotEvent::Upsert(_, _) => { + SnapshotEvent::Insert(_, _) | SnapshotEvent::Delete(_, _) => { entries_read += 1; let send_res = sender.send(Entry::Snapshot(entry_read)); if let Err(e) = send_res { @@ -430,7 +431,7 @@ impl Connector { mut self, reader: Box, mut parser: Box, - mut input_session: Box>, + mut input_session: ValuesSession, mut values_to_key: impl FnMut(Option<&Vec>, Option<&Offset>) -> Key + 'static, probe: Handle, persistent_storage: Option>>, @@ -518,10 +519,11 @@ impl Connector { self.on_parsed_data( parsed_entries, None, // no key generation for time advancement - input_session.as_mut(), + &mut input_session, &mut values_to_key, &mut snapshot_writer, &mut Some(&mut *connector_monitor.borrow_mut()), + parser.session_type(), ); } @@ -563,7 +565,7 @@ impl Connector { entry, &mut backfilling_finished, &mut parser, - input_session.as_mut(), + &mut input_session, &mut values_to_key, &mut snapshot_writer, &mut Some(&mut *connector_monitor.borrow_mut()), @@ -591,13 +593,14 @@ impl Connector { entry: Entry, backfilling_finished: &mut bool, parser: &mut Box, - input_session: &mut dyn InputAdaptor, + input_session: &mut ValuesSession, values_to_key: impl FnMut(Option<&Vec>, Option<&Offset>) -> Key, snapshot_writer: &mut Option, connector_monitor: &mut Option<&mut ConnectorMonitor>, commit_allowed: &mut bool, ) { let has_persistent_storage = snapshot_writer.is_some(); + let session_type = parser.session_type(); match entry { Entry::Realtime(read_result) => match read_result { @@ -615,6 +618,7 @@ impl Connector { values_to_key, snapshot_writer, connector_monitor, + session_type, ); } } @@ -642,6 +646,7 @@ impl Connector { values_to_key, snapshot_writer, connector_monitor, + session_type, ); let (offset_key, offset_value) = offset; @@ -664,6 +669,7 @@ impl Connector { values_to_key, snapshot_writer, connector_monitor, + session_type, ); } Entry::Snapshot(snapshot) => { @@ -675,9 +681,6 @@ impl Connector { SnapshotEvent::Delete(key, value) => { Self::on_remove(key, value, input_session); } - SnapshotEvent::Upsert(key, value) => { - Self::on_upsert(key, value, input_session); - } SnapshotEvent::AdvanceTime(_, _) | SnapshotEvent::Finished => { unreachable!() } @@ -686,33 +689,6 @@ impl Connector { } } - /* - The implementation for non-str pulls. - */ - pub fn run_with_custom_reader( - &mut self, - custom_reader: &mut dyn CustomReader, - input_session: &mut dyn InputAdaptor, - mut values_to_key: impl FnMut(Option<&Vec>, Option<&Offset>) -> Key, - snapshot_writer: &mut Option, - ) { - loop { - match custom_reader.acquire_custom_data() { - (Ok(entries), maybe_offset) => self.on_parsed_data( - entries, - maybe_offset.as_ref(), - input_session, - &mut values_to_key, - snapshot_writer, - &mut None, - ), - (Err(e), _) => { - error!("Read data parsed unsuccessfully. {e}"); - } - }; - } - } - /* The implementation for push model of data acquisition: the source of the data notifies us, when the new data arrive. @@ -725,7 +701,7 @@ impl Connector { raw_read_data: &ReaderContext, offset: Option<&Offset>, parser: &mut dyn Parser, - input_session: &mut dyn InputAdaptor, + input_session: &mut ValuesSession, values_to_key: impl FnMut(Option<&Vec>, Option<&Offset>) -> Key, snapshot_writer: &mut Option, ) { @@ -737,6 +713,7 @@ impl Connector { values_to_key, snapshot_writer, &mut None, + parser.session_type(), ), Err(e) => { error!("Read data parsed unsuccessfully. {e}"); @@ -744,20 +721,12 @@ impl Connector { } } - fn on_insert(key: Key, values: Vec, input_session: &mut dyn InputAdaptor) { - input_session.insert(key, Value::Tuple(values.into())); + fn on_insert(key: Key, values: Vec, input_session: &mut ValuesSession) { + input_session.insert((key, Value::Tuple(values.into()))); } - fn on_upsert( - key: Key, - values: Option>, - input_session: &mut dyn InputAdaptor, - ) { - input_session.upsert(key, values.map(|v| Value::Tuple(v.into()))); - } - - fn on_remove(key: Key, values: Vec, input_session: &mut dyn InputAdaptor) { - input_session.remove(key, Value::Tuple(values.into())); + fn on_remove(key: Key, values: Vec, input_session: &mut ValuesSession) { + input_session.remove((key, Value::Tuple(values.into()))); } #[allow(clippy::too_many_arguments)] @@ -765,10 +734,11 @@ impl Connector { &mut self, parsed_entries: Vec, offset: Option<&Offset>, - input_session: &mut dyn InputAdaptor, + input_session: &mut ValuesSession, mut values_to_key: impl FnMut(Option<&Vec>, Option<&Offset>) -> Key, snapshot_writer: &mut Option, connector_monitor: &mut Option<&mut ConnectorMonitor>, + session_type: SessionType, ) { let error_logger = self.error_logger.clone(); let error_handling_logic: data_format::ErrorRemovalLogic = if self.skip_all_errors { @@ -797,7 +767,7 @@ impl Connector { }; let key = entry.key(&mut values_to_key, offset); if let Some(key) = key { - // true for Insert, Remove, Upsert + // true for Insert, Delete if let Some(ref mut connector_monitor) = connector_monitor { connector_monitor.increment(); } @@ -821,11 +791,10 @@ impl Connector { } Self::on_insert(key.expect("No key"), values, input_session); } - ParsedEvent::Upsert((_, values)) => { - Self::on_upsert(key.expect("No key"), values, input_session); - } ParsedEvent::Delete((_, values)) => { - if values.len() != self.num_columns { + if matches!(session_type, SessionType::Native) + && values.len() != self.num_columns + { error!("There are {} tokens in the entry, but the expected number of tokens was {}", values.len(), self.num_columns); continue; } @@ -894,9 +863,7 @@ pub fn read_persisted_state( Ok(Entry::Snapshot(entry)) => match entry { SnapshotEvent::Insert(key, values) => input_session.insert((key, values)), SnapshotEvent::Delete(key, values) => input_session.remove((key, values)), - SnapshotEvent::Upsert(_, _) - | SnapshotEvent::AdvanceTime(_, _) - | SnapshotEvent::Finished => unreachable!(), + SnapshotEvent::AdvanceTime(_, _) | SnapshotEvent::Finished => unreachable!(), }, Ok(Entry::Realtime(_)) => unreachable!(), Ok(Entry::RewindFinishSentinel(_)) | Err(_) => return ControlFlow::Break(()), diff --git a/src/engine/dataflow.rs b/src/engine/dataflow.rs index 41ddc8ea..c814c26f 100644 --- a/src/engine/dataflow.rs +++ b/src/engine/dataflow.rs @@ -12,11 +12,10 @@ pub mod persist; pub mod shard; mod variable; -use crate::connectors::adaptors::{GenericValues, ValuesSessionAdaptor}; use crate::connectors::data_format::{Formatter, Parser}; use crate::connectors::data_storage::{ReaderBuilder, Writer}; use crate::connectors::monitoring::{ConnectorMonitor, ConnectorStats, OutputConnectorStats}; -use crate::connectors::{Connector, PersistenceMode, SnapshotAccess}; +use crate::connectors::{Connector, PersistenceMode, SessionType, SnapshotAccess}; use crate::engine::dataflow::operators::external_index::UseExternalIndexAsOfNow; use crate::engine::dataflow::operators::gradual_broadcast::GradualBroadcast; use crate::engine::dataflow::operators::time_column::{ @@ -54,10 +53,13 @@ use differential_dataflow::collection::concatenate; use differential_dataflow::difference::Semigroup; use differential_dataflow::input::InputSession; use differential_dataflow::lattice::Lattice; +use differential_dataflow::operators::arrange::upsert::arrange_from_upsert; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; use differential_dataflow::operators::reduce::{Reduce, ReduceCore}; use differential_dataflow::operators::JoinCore; +use differential_dataflow::trace::implementations::ord::OrdValBatch; use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine}; +use differential_dataflow::trace::implementations::spine_fueled::Spine; use differential_dataflow::{AsCollection as _, Data}; use differential_dataflow::{Collection, ExchangeData}; use futures::future::BoxFuture; @@ -3613,6 +3615,53 @@ impl> DataflowGraphInner .alloc(Table::from_collection(values).with_properties(table_properties))) } + fn new_upsert_collection( + &mut self, + collection: &Collection, + ) -> Result> { + collection.maybe_persist_with_logic( + self, + "upsert_collection", + |collection| { + let upsert_stream = collection.inner.map(|((key, value), time, diff)| { + // same behavior for new and persisted variants + let value = match value { + OldOrNew::Old(value) | OldOrNew::New(value) => value, + }; + let value_for_upsert = if diff == 1 { + Some(value) + } else { + assert_eq!(diff, -1); + None + }; + (key, value_for_upsert, time) + }); + arrange_from_upsert::>>>( + &upsert_stream, + "UpsertSession", + ) + .as_collection(|k, v| (*k, v.clone())) + }, + |d| d, + ) + } + + fn new_collection( + &mut self, + session_type: SessionType, + ) -> Result<( + InputSession, + Collection, + )> { + let mut input_session = InputSession::new(); + let collection = input_session.to_collection(&mut self.scope); + let collection = match session_type { + SessionType::Native => collection, + SessionType::Upsert => self.new_upsert_collection(&collection)?, + }; + Ok((input_session, collection)) + } + fn connector_table( &mut self, reader: Box, @@ -3640,8 +3689,7 @@ impl> DataflowGraphInner .clone() .map(IntoPersistentId::into_persistent_id); - let (input_session, table_values): (ValuesSessionAdaptor, GenericValues) = - parser.session_type().new_collection(&mut self.scope); + let (input_session, table_values) = self.new_collection(parser.session_type())?; let table_values = table_values.reshard(); table_values.probe_with(&mut self.input_probe); diff --git a/src/persistence/input_snapshot.rs b/src/persistence/input_snapshot.rs index 03d5a5a5..74b24165 100644 --- a/src/persistence/input_snapshot.rs +++ b/src/persistence/input_snapshot.rs @@ -32,7 +32,6 @@ fn get_chunk_ids_with_backend(backend: &dyn PersistenceBackend) -> Result), Delete(Key, Vec), - Upsert(Key, Option>), AdvanceTime(Timestamp, OffsetAntichain), Finished, } diff --git a/src/python_api.rs b/src/python_api.rs index f541c301..a5011aea 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -1730,8 +1730,6 @@ impl PyPythonConnectorEventType { #[classattr] pub const DELETE: PythonConnectorEventType = PythonConnectorEventType::Delete; #[classattr] - pub const UPSERT: PythonConnectorEventType = PythonConnectorEventType::Upsert; - #[classattr] pub const EXTERNAL_OFFSET: PythonConnectorEventType = PythonConnectorEventType::ExternalOffset; } diff --git a/tests/integration/helpers.rs b/tests/integration/helpers.rs index f9c6c2b6..7ede812f 100644 --- a/tests/integration/helpers.rs +++ b/tests/integration/helpers.rs @@ -119,9 +119,6 @@ pub fn full_cycle_read( let key = Key::random(); SnapshotEvent::Delete(key, values.clone()) } - ParsedEvent::Upsert((_, _)) => { - todo!("upsert aren't supported in this test") - } ParsedEvent::AdvanceTime => { SnapshotEvent::AdvanceTime(Timestamp(1), frontier.clone()) } @@ -260,8 +257,7 @@ impl ErrorPlacement { .into_iter() .map(|result| match result { ParsedEventWithErrors::Insert((key, _)) - | ParsedEventWithErrors::Delete((key, _)) - | ParsedEventWithErrors::Upsert((key, _)) => key + | ParsedEventWithErrors::Delete((key, _)) => key .expect("key has to be Some to contain error") .expect_err("error should be in the key but it is not present there"), ParsedEventWithErrors::AdvanceTime => { @@ -274,8 +270,7 @@ impl ErrorPlacement { .into_iter() .map(|result| match result { ParsedEventWithErrors::Insert((_, values)) - | ParsedEventWithErrors::Delete((_, values)) - | ParsedEventWithErrors::Upsert((_, Some(values))) => { + | ParsedEventWithErrors::Delete((_, values)) => { values.into_iter().nth(*i).unwrap().expect_err( format!( "error should be in the values[{}] but it is not present there", diff --git a/tests/integration/main.rs b/tests/integration/main.rs index bf695e86..f3b70371 100644 --- a/tests/integration/main.rs +++ b/tests/integration/main.rs @@ -32,5 +32,4 @@ mod test_stream_snapshot; mod test_time; mod test_time_column; mod test_types; -mod test_upsert_session; mod test_value_to_sql; diff --git a/tests/integration/test_debezium.rs b/tests/integration/test_debezium.rs index ba2aff78..a8556240 100644 --- a/tests/integration/test_debezium.rs +++ b/tests/integration/test_debezium.rs @@ -189,31 +189,13 @@ fn test_debezium_mongodb_format() -> eyre::Result<()> { let changelog = read_data_from_reader(Box::new(reader), Box::new(parser))?; let expected_values = vec![ - ParsedEvent::Upsert(( - Some(vec![Value::from("1001")]), - Some(vec![Value::from("Sally")]), - )), - ParsedEvent::Upsert(( - Some(vec![Value::from("1002")]), - Some(vec![Value::from("George")]), - )), - ParsedEvent::Upsert(( - Some(vec![Value::from("1003")]), - Some(vec![Value::from("Edward")]), - )), - ParsedEvent::Upsert(( - Some(vec![Value::from("1004")]), - Some(vec![Value::from("Anne")]), - )), - ParsedEvent::Upsert(( - Some(vec![Value::from("1005")]), - Some(vec![Value::from("Bob")]), - )), - ParsedEvent::Upsert(( - Some(vec![Value::from("1003")]), - Some(vec![Value::from("Sergey")]), - )), - ParsedEvent::Upsert((Some(vec![Value::from("1004")]), None)), + ParsedEvent::Insert((Some(vec![Value::from("1001")]), vec![Value::from("Sally")])), + ParsedEvent::Insert((Some(vec![Value::from("1002")]), vec![Value::from("George")])), + ParsedEvent::Insert((Some(vec![Value::from("1003")]), vec![Value::from("Edward")])), + ParsedEvent::Insert((Some(vec![Value::from("1004")]), vec![Value::from("Anne")])), + ParsedEvent::Insert((Some(vec![Value::from("1005")]), vec![Value::from("Bob")])), + ParsedEvent::Insert((Some(vec![Value::from("1003")]), vec![Value::from("Sergey")])), + ParsedEvent::Delete((Some(vec![Value::from("1004")]), vec![])), ]; assert_eq!(changelog, expected_values); diff --git a/tests/integration/test_parser.rs b/tests/integration/test_parser.rs index 328a451e..dcb01973 100644 --- a/tests/integration/test_parser.rs +++ b/tests/integration/test_parser.rs @@ -139,7 +139,7 @@ fn test_transparent_parser_upsert() -> eyre::Result<()> { TransparentParser::new(None, value_field_names, schema.into(), SessionType::Upsert)?; let contexts = vec![ ReaderContext::from_diff( - DataEventType::Upsert, + DataEventType::Insert, None, HashMap::from([ ("a".to_owned(), Ok(Value::Int(3))), @@ -158,8 +158,8 @@ fn test_transparent_parser_upsert() -> eyre::Result<()> { ), ]; let expected = vec![ - ParsedEvent::Upsert((None, Some(vec![Value::from(3), Value::from("abc")]))), - ParsedEvent::Upsert((None, None)), + ParsedEvent::Insert((None, vec![Value::from(3), Value::from("abc")])), + ParsedEvent::Delete((None, vec![])), ]; for (context_i, expected_i) in contexts.into_iter().zip_eq(expected) { assert_eq!( diff --git a/tests/integration/test_upsert_session.rs b/tests/integration/test_upsert_session.rs deleted file mode 100644 index 4a71a62b..00000000 --- a/tests/integration/test_upsert_session.rs +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright © 2024 Pathway - -use super::helpers::get_entries_in_receiver; - -use std::sync::{mpsc, Arc, Mutex}; - -use timely::dataflow::operators::Inspect; - -use pathway_engine::connectors::adaptors::{InputAdaptor, UpsertSession}; -use pathway_engine::engine::dataflow::operators::output::ConsolidateForOutput; -use pathway_engine::engine::{Key, Value}; - -#[test] -fn test_upsert_session_replacement() { - let k1 = Key::random(); - let k2 = Key::random(); - - let (sender, receiver) = mpsc::channel(); - let sender = Arc::new(Mutex::new(sender)); - timely::execute_from_args(std::env::args(), move |worker| { - let mut input = UpsertSession::new(); - worker.dataflow( - |scope: &mut timely::dataflow::scopes::Child< - timely::worker::Worker, - u64, - >| { - let sender = sender.lock().unwrap().clone(); - let table = input.to_collection(scope); - table.consolidate_for_output(true).inspect(move |batch| { - for (data, diff) in &batch.data { - sender - .send((data.clone(), batch.time, *diff)) - .expect("inspected entry sending failed"); - } - }); - }, - ); - input.upsert(k1, Some(Value::from("one"))); - input.advance_to(123); - input.upsert(k2, Some(Value::from("two"))); - input.advance_to(246); - input.upsert(k1, Some(Value::from("three"))); - input.advance_to(369); - }) - .expect("Computation terminated abnormally"); - - assert_eq!( - get_entries_in_receiver(receiver), - vec![ - ((k1, Value::from("one")), 0, 1), - ((k2, Value::from("two")), 123, 1), - ((k1, Value::from("one")), 246, -1), - ((k1, Value::from("three")), 246, 1), - ] - ); -} - -#[test] -fn test_removal_by_key() { - let k1 = Key::random(); - let k2 = Key::random(); - - let (sender, receiver) = mpsc::channel(); - let sender = Arc::new(Mutex::new(sender)); - timely::execute_from_args(std::env::args(), move |worker| { - let mut input = UpsertSession::new(); - worker.dataflow( - |scope: &mut timely::dataflow::scopes::Child< - timely::worker::Worker, - u64, - >| { - let sender = sender.lock().unwrap().clone(); - let table = input.to_collection(scope); - table.consolidate_for_output(true).inspect(move |batch| { - for (data, diff) in &batch.data { - sender - .send((data.clone(), batch.time, *diff)) - .expect("inspected entry sending failed"); - } - }); - }, - ); - input.upsert(k1, Some(Value::from("one"))); - input.advance_to(123); - input.upsert(k2, Some(Value::from("two"))); - input.advance_to(246); - input.upsert(k1, None); - input.advance_to(369); - }) - .expect("Computation terminated abnormally"); - - assert_eq!( - get_entries_in_receiver(receiver), - vec![ - ((k1, Value::from("one")), 0, 1), - ((k2, Value::from("two")), 123, 1), - ((k1, Value::from("one")), 246, -1), - ] - ); -}