Skip to content

Commit

Permalink
serialize all Pathway types to Delta Lake (#7988)
Browse files Browse the repository at this point in the history
Co-authored-by: Mateusz Lewandowski <[email protected]>
Co-authored-by: Kamil Piechowiak <[email protected]>
GitOrigin-RevId: 74fcfa5c09a09085a50cca169eaabfc67013690c
  • Loading branch information
3 people authored and Manul from Pathway committed Jan 22, 2025
1 parent 12bccce commit 5f1d24d
Show file tree
Hide file tree
Showing 13 changed files with 1,399 additions and 301 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added
- `pw.io.iceberg.read` method for reading Apache Iceberg tables into Pathway.
- methods `pw.io.postgres.write` and `pw.io.postgres.write_snapshot` now accept an additional argument `init_mode`, which allows initializing the table before writing.
- `pw.io.deltalake.read` now supports serialization and deserialization for all Pathway data types.

### Changed
- **BREAKING**: `pw.io.deltalake.read` now requires explicit specification of primary key fields.
- **BREAKING**: `pw.indexing.build_sorted_index`, `pw.indexing.retrieve_prev_next_values`, `pw.indexing.sort_from_index` and `pw.indexing.SortedIndex` are removed. Sorting is now done with `pw.Table.sort`.
- **BREAKING**: `pw.xpacks.llm.question_answering.BaseRAGQuestionAnswerer` now returns a dictionary from `pw_ai_answer` endpoint.
- `pw.xpacks.llm.question_answering.BaseRAGQuestionAnswerer` allows optionally returning context documents from `pw_ai_answer` endpoint.
- **BREAKING**: When using delay in temporal behavior, current time is updated immediately, not in the next batch.
- **BREAKING**: The `Pointer` type is now serialized to Delta Tables as raw bytes.
- `pw.io.kafka.write` now allows to specify `key` and `headers` for JSON and CSV data formats.

### Fixed
Expand Down
106 changes: 105 additions & 1 deletion integration_tests/iceberg/test_iceberg.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import json
import os
import pickle
import threading
import time
import uuid

import pandas as pd
from dateutil import tz
from pyiceberg.catalog import load_catalog

import pathway as pw
Expand All @@ -23,7 +26,7 @@
{"user_id": 11, "name": "Steve"}
{"user_id": 12, "name": "Sarah"}"""

CATALOG_URI = "http://iceberg:8181"
CATALOG_URI = os.environ.get("ICEBERG_CATALOG_URI", "http://iceberg:8181")
INPUT_CONTENTS = {
1: INPUT_CONTENTS_1,
2: INPUT_CONTENTS_2,
Expand Down Expand Up @@ -205,3 +208,104 @@ class InputSchema(pw.Schema):
assert set(pandas_table["name"]) == all_names
assert set(pandas_table["diff"]) == {1}
assert len(set(pandas_table["time"])) == len(INPUT_CONTENTS)


def test_py_object_wrapper_in_iceberg(tmp_path):
input_path = tmp_path / "input.jsonl"
output_path = tmp_path / "output.jsonl"
iceberg_table_name = str(uuid.uuid4())
input_path.write_text("test")

table = pw.io.plaintext.read(input_path, mode="static")
table = table.select(
data=pw.this.data,
fun=pw.wrap_py_object(len, serializer=pickle), # type: ignore
)
pw.io.iceberg.write(
table,
catalog_uri=CATALOG_URI,
namespace=["my_database"],
table_name=iceberg_table_name,
)
run()
G.clear()

class InputSchema(pw.Schema):
data: str = pw.column_definition(primary_key=True)
fun: pw.PyObjectWrapper

@pw.udf
def use_python_object(a: pw.PyObjectWrapper, x: str) -> int:
return a.value(x)

table = pw.io.iceberg.read(
catalog_uri=CATALOG_URI,
namespace=["my_database"],
table_name=iceberg_table_name,
mode="static",
schema=InputSchema,
)
table = table.select(len=use_python_object(pw.this.fun, pw.this.data))
pw.io.jsonlines.write(table, output_path)
run()

with open(output_path, "r") as f:
data = json.load(f)
assert data["len"] == 4


def test_iceberg_different_types_serialization(tmp_path):
input_path = tmp_path / "input.jsonl"
iceberg_table_name = str(uuid.uuid4())
input_path.write_text("test")

column_values = {
"boolean": True,
"integer": 123,
"double": -5.6,
"string": "abcdef",
"binary_data": b"fedcba",
"datetime_naive": pw.DateTimeNaive(year=2025, month=1, day=17),
"datetime_utc_aware": pw.DateTimeUtc(year=2025, month=1, day=17, tz=tz.UTC),
"duration": pw.Duration(days=5),
"json_data": pw.Json.parse('{"a": 15, "b": "hello"}'),
}
table = pw.io.plaintext.read(input_path, mode="static")
table = table.select(
data=pw.this.data,
**column_values,
)
pw.io.iceberg.write(
table,
catalog_uri=CATALOG_URI,
namespace=["my_database"],
table_name=iceberg_table_name,
)
run()
G.clear()

class InputSchema(pw.Schema):
data: str = pw.column_definition(primary_key=True)
boolean: bool
integer: int
double: float
string: str
binary_data: bytes
datetime_naive: pw.DateTimeNaive
datetime_utc_aware: pw.DateTimeUtc
duration: pw.Duration
json_data: pw.Json

def on_change(key, row, time, is_addition):
for field, expected_value in column_values.items():
assert row[field] == expected_value

table = pw.io.iceberg.read(
catalog_uri=CATALOG_URI,
namespace=["my_database"],
table_name=iceberg_table_name,
mode="static",
schema=InputSchema,
)
pw.io.subscribe(table, on_change=on_change)
run()
15 changes: 12 additions & 3 deletions python/pathway/io/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@


class RawDataSchema(pw.Schema):
data: Any
data: bytes


class PlaintextDataSchema(pw.Schema):
data: str


class MetadataSchema(Schema):
Expand Down Expand Up @@ -294,7 +298,12 @@ def construct_schema_and_data_format(
if param in kwargs and kwargs[param] is not None:
raise ValueError(f"Unexpected argument for plaintext format: {param}")

schema = RawDataSchema
parse_utf8 = format != "binary"
if parse_utf8:
schema = PlaintextDataSchema
else:
schema = RawDataSchema

if with_metadata:
schema |= MetadataSchema
schema, api_schema = read_schema(
Expand All @@ -308,7 +317,7 @@ def construct_schema_and_data_format(
return schema, api.DataFormat(
format_type=data_format_type,
**api_schema,
parse_utf8=(format != "binary"),
parse_utf8=parse_utf8,
key_generation_policy=(
api.KeyGenerationPolicy.ALWAYS_AUTOGENERATE
if autogenerate_key
Expand Down
6 changes: 5 additions & 1 deletion python/pathway/io/python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pathway.internals.trace import trace_user_frame
from pathway.io._utils import (
MetadataSchema,
PlaintextDataSchema,
RawDataSchema,
assert_schema_or_value_columns_not_none,
get_data_format_type,
Expand Down Expand Up @@ -418,7 +419,10 @@ def read(
raise ValueError("raw format must not be used with primary_key property")
if value_columns:
raise ValueError("raw format must not be used with value_columns property")
schema = RawDataSchema
if format == "binary":
schema = RawDataSchema
else:
schema = PlaintextDataSchema
if subject._with_metadata is True:
schema |= MetadataSchema
assert_schema_or_value_columns_not_none(schema, value_columns, data_format_type)
Expand Down
109 changes: 105 additions & 4 deletions python/pathway/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import os
import pathlib
import pickle
import socket
import sqlite3
import sys
Expand All @@ -11,9 +12,11 @@
from typing import Any, Optional
from unittest import mock

import numpy as np
import pandas as pd
import pytest
import yaml
from dateutil import tz
from deltalake import DeltaTable, write_deltalake
from fs import open_fs

Expand All @@ -30,7 +33,6 @@
T,
assert_table_equality,
assert_table_equality_wo_index,
assert_table_equality_wo_index_types,
deprecated_call_here,
needs_multiprocessing_fork,
run,
Expand Down Expand Up @@ -214,7 +216,7 @@ def run(self):

table = pw.io.python.read(TestSubject(), format="raw")

assert_table_equality_wo_index_types(
assert_table_equality_wo_index(
table,
T(
"""
Expand Down Expand Up @@ -2358,7 +2360,7 @@ class OutputSchema(pw.Schema):
3 | baz | 1701283942
""",
).update_types(
data=Any,
data=str,
createdAt=Optional[int],
),
result,
Expand Down Expand Up @@ -3418,7 +3420,7 @@ class InputSchema(pw.Schema):
pw.io.deltalake.read(tmp_path / "lake", schema=InputSchema)


def test_iceberg_no_primary_key(tmp_path: pathlib.Path):
def test_iceberg_no_primary_key():
class InputSchema(pw.Schema):
k: int
v: str
Expand All @@ -3433,3 +3435,102 @@ class InputSchema(pw.Schema):
table_name="test",
schema=InputSchema,
)


def test_py_object_wrapper_in_deltalake(tmp_path: pathlib.Path):
input_path = tmp_path / "input.jsonl"
lake_path = tmp_path / "delta-lake"
output_path = tmp_path / "output.jsonl"
input_path.write_text("test")

table = pw.io.plaintext.read(input_path, mode="static")
table = table.select(
data=pw.this.data, fun=pw.wrap_py_object(len, serializer=pickle) # type: ignore
)
pw.io.deltalake.write(table, lake_path)
run_all()
G.clear()

class InputSchema(pw.Schema):
data: str = pw.column_definition(primary_key=True)
fun: pw.PyObjectWrapper

@pw.udf
def use_python_object(a: pw.PyObjectWrapper, x: str) -> int:
return a.value(x)

table = pw.io.deltalake.read(lake_path, schema=InputSchema, mode="static")
table = table.select(len=use_python_object(pw.this.fun, pw.this.data))
pw.io.jsonlines.write(table, output_path)
run_all()

with open(output_path, "r") as f:
data = json.load(f)
assert data["len"] == 4


def test_deltalake_different_types_serialization(tmp_path: pathlib.Path):
input_path = tmp_path / "input.jsonl"
lake_path = tmp_path / "delta-lake"
input_path.write_text("test")

column_values = {
"boolean": True,
"integer": 123,
"double": -5.6,
"string": "abcdef",
"binary_data": b"fedcba",
"datetime_naive": pw.DateTimeNaive(year=2025, month=1, day=17),
"datetime_utc_aware": pw.DateTimeUtc(year=2025, month=1, day=17, tz=tz.UTC),
"duration": pw.Duration(days=5),
"ints": np.array([9, 9, 9], dtype=int),
"floats": np.array([1.1, 2.2, 3.3], dtype=float),
"json_data": pw.Json.parse('{"a": 15, "b": "hello"}'),
"tuple_data": (b"world", True),
"list_data": ["lorem", None, "ipsum"],
"fun": pw.wrap_py_object(len, serializer=pickle),
}
table = pw.io.plaintext.read(input_path, mode="static")
table = table.select(
data=pw.this.data,
**column_values,
)
table = table.update_types(
ints=np.ndarray[None, int],
floats=np.ndarray[None, float],
tuple_data=tuple[bytes, bool],
list_data=list[str | None],
)
table = table.select()
pw.io.deltalake.write(table, lake_path)
run_all()
G.clear()

class InputSchema(pw.Schema):
data: str = pw.column_definition(primary_key=True)
boolean: bool
integer: int
double: float
string: str
binary_data: bytes
datetime_naive: pw.DateTimeNaive
datetime_utc_aware: pw.DateTimeUtc
duration: pw.Duration
ints: np.ndarray[None, int]
floats: np.ndarray[None, float]
json_data: pw.Json
tuple_data: tuple[bytes, bool]
list_data: list[str | None]
fun: pw.PyObjectWrapper

def on_change(key, row, time, is_addition):
for field, expected_value in column_values.items():
if isinstance(field, np.ndarray):
assert row[field].shape() == expected_value.shape()
assert (row[field] == expected_value).all()
else:
assert row[field] == expected_value

table = pw.io.deltalake.read(lake_path, schema=InputSchema, mode="static")
pw.io.subscribe(table, on_change=on_change)
run_all()
Loading

0 comments on commit 5f1d24d

Please sign in to comment.