Skip to content

Commit

Permalink
feat(bq-change-tracker): materialized views (#2258)
Browse files Browse the repository at this point in the history
* chore(firestore-bigquery-change-tracker): fix tests

* refactor(firestore-bigquery-change-tracker): improve snapshot.ts readability

* feat(firestore-bigquery-change-tracker): add initial materialized views to change tracker

* fix(firestore-bigquery-change-tracker): address some issues with recreate logic

* test(firestore-bigquery-change-tracker): fix tests

* refactor(firestore-bigquery-change-tracker): update max_staleness and query

* fix(firestore-bigquery-change-tracker): remove debug logs
  • Loading branch information
cabljac authored Jan 31, 2025
1 parent 5153673 commit 99fed38
Show file tree
Hide file tree
Showing 16 changed files with 1,731 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ describe("e2e", () => {
datasetId,
tableId,
timePartitioning: "DAY",
timePartitioningField: "created",
timePartitioningField: "timestamp",
timePartitioningFieldType: "TIMESTAMP",
timePartitioningFirestoreField: "created",
}).record([event]);
Expand All @@ -141,7 +141,7 @@ describe("e2e", () => {
tableId
);

expect(changeLogRows[0].created.value).toBe(
expect(changeLogRows[0].timestamp.value).toBe(
BigQuery.timestamp(created.toDate()).value
);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import {
BigQuery,
Dataset,
TableMetadata,
Table,
} from "@google-cloud/bigquery";
import { firestore } from "firebase-admin";
import { RawChangelogViewSchema } from "../../../bigquery/schema";
import { initializeLatestMaterializedView } from "../../../bigquery/initializeLatestMaterializedView";
import {
changeTracker,
changeTrackerEvent,
} from "../../fixtures/changeTracker";
import { deleteTable } from "../../fixtures/clearTables";
import * as logs from "../../../logs";

jest.mock("../../../logs");
// jest.mock("sql-formatter");

describe("initializeLatestMaterializedView", () => {
const projectId = "dev-extensions-testing";
const bq = new BigQuery({ projectId });

let dataset: Dataset;
let table: Table;
let testConfig: {
datasetId: string;
tableId: string;
tableIdRaw: string;
viewIdRaw: string;
};

beforeEach(async () => {
const randomId = (Math.random() + 1).toString(36).substring(7);
testConfig = {
datasetId: `dataset_${randomId}`,
tableId: `table_${randomId}`,
tableIdRaw: `table_${randomId}_raw_changelog`,
viewIdRaw: `table_${randomId}_raw_latest`,
};
dataset = bq.dataset(testConfig.datasetId);
table = dataset.table(testConfig.tableIdRaw);

await dataset.create();

await table.create({ schema: RawChangelogViewSchema });
});

afterEach(async () => {
await deleteTable({ datasetId: testConfig.datasetId });
});

test("creates a new materialized view when view does not exist", async () => {
const view = dataset.table(testConfig.viewIdRaw);
const config = {
datasetId: testConfig.datasetId,
tableId: testConfig.tableId,
useMaterializedView: true,
useIncrementalMaterializedView: false,
maxStaleness: `INTERVAL "4:0:0" HOUR TO SECOND`,
refreshIntervalMinutes: 5,
clustering: null,
};

await initializeLatestMaterializedView({
bq,
changeTrackerConfig: config,
view,
viewExists: false,
rawChangeLogTableName: testConfig.tableIdRaw,
rawLatestViewName: testConfig.viewIdRaw,
schema: RawChangelogViewSchema,
});

const [metadata] = (await view.getMetadata()) as unknown as [TableMetadata];
expect(metadata.materializedView).toBeDefined();
expect(metadata.materializedView?.enableRefresh).toBe(true);
expect(
metadata.materializedView?.allowNonIncrementalDefinition
).toBeDefined();
});

test("does not recreate view if configuration matches", async () => {
const event = changeTrackerEvent({
data: { end_date: firestore.Timestamp.now() },
eventId: "testing2",
});

await changeTracker({
datasetId: testConfig.datasetId,
tableId: testConfig.tableId,
useMaterializedView: true,
useIncrementalMaterializedView: true,
}).record([event]);

const view = dataset.table(testConfig.viewIdRaw);
const config = {
datasetId: testConfig.datasetId,
tableId: testConfig.tableId,
useMaterializedView: true,
useIncrementalMaterializedView: true,
clustering: null,
};

const [initialMetadata] = (await view.getMetadata()) as unknown as [
TableMetadata
];

await initializeLatestMaterializedView({
bq,
changeTrackerConfig: config,
view,
viewExists: true,
rawChangeLogTableName: testConfig.tableIdRaw,
rawLatestViewName: testConfig.viewIdRaw,
schema: RawChangelogViewSchema,
});

const [finalMetadata] = (await view.getMetadata()) as unknown as [
TableMetadata
];
expect(finalMetadata).toEqual(initialMetadata);
});

test("recreates view when switching from incremental to non-incremental", async () => {
const event = changeTrackerEvent({
data: { end_date: firestore.Timestamp.now() },
eventId: "testing3",
});

await changeTracker({
datasetId: testConfig.datasetId,
tableId: testConfig.tableId,
useMaterializedView: true,
useIncrementalMaterializedView: true,
}).record([event]);

const view = dataset.table(testConfig.viewIdRaw);
const newConfig = {
datasetId: testConfig.datasetId,
tableId: testConfig.tableId,
useMaterializedView: true,
maxStaleness: `INTERVAL "4:0:0" HOUR TO SECOND`,
refreshIntervalMinutes: 5,
clustering: null,
};

const [initialMetadata] = (await view.getMetadata()) as unknown as [
TableMetadata
];
expect(
initialMetadata.materializedView?.allowNonIncrementalDefinition
).toBeUndefined();

await initializeLatestMaterializedView({
bq,
changeTrackerConfig: newConfig,
view,
viewExists: true,
rawChangeLogTableName: testConfig.tableIdRaw,
rawLatestViewName: testConfig.viewIdRaw,
schema: RawChangelogViewSchema,
});

const [finalMetadata] = (await view.getMetadata()) as unknown as [
TableMetadata
];
expect(
finalMetadata.materializedView?.allowNonIncrementalDefinition
).toBeDefined();
});

test("handles view creation errors", async () => {
const view = dataset.table(testConfig.viewIdRaw);
const invalidConfig = {
datasetId: testConfig.datasetId,
tableId: testConfig.tableId,
useMaterializedView: true,
maxStaleness: "invalid",
clustering: null,
};

await expect(
initializeLatestMaterializedView({
bq,
changeTrackerConfig: invalidConfig,
view,
viewExists: false,
rawChangeLogTableName: testConfig.tableIdRaw,
rawLatestViewName: testConfig.viewIdRaw,
schema: RawChangelogViewSchema,
})
).rejects.toThrow();

expect(logs.tableCreationError).toHaveBeenCalled();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { initializeLatestView } from "../../../bigquery/initializeLatestView";
import { initializeLatestMaterializedView } from "../../../bigquery/initializeLatestMaterializedView";
import { FirestoreBigQueryEventHistoryTrackerConfig } from "../../../bigquery";

jest.mock("../../../bigquery/initializeLatestMaterializedView");

describe("initializeLatestView", () => {
const mockView = {
id: "test_view",
getMetadata: jest.fn(),
setMetadata: jest.fn(),
create: jest.fn(),
};

const mockConfig: FirestoreBigQueryEventHistoryTrackerConfig = {
wildcardIds: true,
datasetId: "test_dataset",
useNewSnapshotQuerySyntax: true,
clustering: [],
tableId: "test_raw_table",
useMaterializedView: false,
};

beforeEach(() => {
jest.clearAllMocks();
});

describe("initializeLatestView", () => {
it("calls initializeLatestMaterializedView when useMaterializedView is true", async () => {
const mockOptions = {
bq: {} as any, // Mocked BigQuery instance
dataset: { id: "test_dataset" } as any, // Mocked Dataset instance
view: mockView as any, // Mocked Table instance
viewExists: false,
rawChangeLogTableName: "test_raw_table",
rawLatestViewName: "test_raw_view",
changeTrackerConfig: { ...mockConfig, useMaterializedView: true },
useMaterializedView: true,
useIncrementalMaterializedView: false,
};

await initializeLatestView(mockOptions);

expect(initializeLatestMaterializedView).toHaveBeenCalled();
});

it("does not call initializeLatestMaterializedView when useMaterializedView is false", async () => {
const mockOptions = {
bq: {} as any, // Mocked BigQuery instance
dataset: { id: "test_dataset" } as any, // Mocked Dataset instance
view: mockView as any, // Mocked Table instance
viewExists: false,
rawChangeLogTableName: "test_raw_table",
rawLatestViewName: "test_raw_view",
changeTrackerConfig: { ...mockConfig, useMaterializedView: false },
useMaterializedView: false,
useIncrementalMaterializedView: false,
};

await initializeLatestView(mockOptions);

expect(initializeLatestMaterializedView).not.toHaveBeenCalled();
});
});
});
Loading

0 comments on commit 99fed38

Please sign in to comment.