diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/e2e.test.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/e2e.test.ts index d068d7350..a46552d19 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/e2e.test.ts +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/e2e.test.ts @@ -126,7 +126,7 @@ describe("e2e", () => { datasetId, tableId, timePartitioning: "DAY", - timePartitioningField: "created", + timePartitioningField: "timestamp", timePartitioningFieldType: "TIMESTAMP", timePartitioningFirestoreField: "created", }).record([event]); @@ -141,7 +141,7 @@ describe("e2e", () => { tableId ); - expect(changeLogRows[0].created.value).toBe( + expect(changeLogRows[0].timestamp.value).toBe( BigQuery.timestamp(created.toDate()).value ); }); diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/initializeLatestMaterializedView.test.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/initializeLatestMaterializedView.test.ts new file mode 100644 index 000000000..a905e4bf5 --- /dev/null +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/initializeLatestMaterializedView.test.ts @@ -0,0 +1,197 @@ +import { + BigQuery, + Dataset, + TableMetadata, + Table, +} from "@google-cloud/bigquery"; +import { firestore } from "firebase-admin"; +import { RawChangelogViewSchema } from "../../../bigquery/schema"; +import { initializeLatestMaterializedView } from "../../../bigquery/initializeLatestMaterializedView"; +import { + changeTracker, + changeTrackerEvent, +} from "../../fixtures/changeTracker"; +import { deleteTable } from "../../fixtures/clearTables"; +import * as logs from "../../../logs"; + +jest.mock("../../../logs"); +// jest.mock("sql-formatter"); + +describe("initializeLatestMaterializedView", () => { + const projectId = "dev-extensions-testing"; + const bq = new BigQuery({ projectId }); + + let dataset: Dataset; + let table: Table; + let testConfig: { + datasetId: string; + tableId: string; + tableIdRaw: string; + viewIdRaw: string; + }; + + beforeEach(async () => { + const randomId = (Math.random() + 1).toString(36).substring(7); + testConfig = { + datasetId: `dataset_${randomId}`, + tableId: `table_${randomId}`, + tableIdRaw: `table_${randomId}_raw_changelog`, + viewIdRaw: `table_${randomId}_raw_latest`, + }; + dataset = bq.dataset(testConfig.datasetId); + table = dataset.table(testConfig.tableIdRaw); + + await dataset.create(); + + await table.create({ schema: RawChangelogViewSchema }); + }); + + afterEach(async () => { + await deleteTable({ datasetId: testConfig.datasetId }); + }); + + test("creates a new materialized view when view does not exist", async () => { + const view = dataset.table(testConfig.viewIdRaw); + const config = { + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + useIncrementalMaterializedView: false, + maxStaleness: `INTERVAL "4:0:0" HOUR TO SECOND`, + refreshIntervalMinutes: 5, + clustering: null, + }; + + await initializeLatestMaterializedView({ + bq, + changeTrackerConfig: config, + view, + viewExists: false, + rawChangeLogTableName: testConfig.tableIdRaw, + rawLatestViewName: testConfig.viewIdRaw, + schema: RawChangelogViewSchema, + }); + + const [metadata] = (await view.getMetadata()) as unknown as [TableMetadata]; + expect(metadata.materializedView).toBeDefined(); + expect(metadata.materializedView?.enableRefresh).toBe(true); + expect( + metadata.materializedView?.allowNonIncrementalDefinition + ).toBeDefined(); + }); + + test("does not recreate view if configuration matches", async () => { + const event = changeTrackerEvent({ + data: { end_date: firestore.Timestamp.now() }, + eventId: "testing2", + }); + + await changeTracker({ + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([event]); + + const view = dataset.table(testConfig.viewIdRaw); + const config = { + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + clustering: null, + }; + + const [initialMetadata] = (await view.getMetadata()) as unknown as [ + TableMetadata + ]; + + await initializeLatestMaterializedView({ + bq, + changeTrackerConfig: config, + view, + viewExists: true, + rawChangeLogTableName: testConfig.tableIdRaw, + rawLatestViewName: testConfig.viewIdRaw, + schema: RawChangelogViewSchema, + }); + + const [finalMetadata] = (await view.getMetadata()) as unknown as [ + TableMetadata + ]; + expect(finalMetadata).toEqual(initialMetadata); + }); + + test("recreates view when switching from incremental to non-incremental", async () => { + const event = changeTrackerEvent({ + data: { end_date: firestore.Timestamp.now() }, + eventId: "testing3", + }); + + await changeTracker({ + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([event]); + + const view = dataset.table(testConfig.viewIdRaw); + const newConfig = { + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + maxStaleness: `INTERVAL "4:0:0" HOUR TO SECOND`, + refreshIntervalMinutes: 5, + clustering: null, + }; + + const [initialMetadata] = (await view.getMetadata()) as unknown as [ + TableMetadata + ]; + expect( + initialMetadata.materializedView?.allowNonIncrementalDefinition + ).toBeUndefined(); + + await initializeLatestMaterializedView({ + bq, + changeTrackerConfig: newConfig, + view, + viewExists: true, + rawChangeLogTableName: testConfig.tableIdRaw, + rawLatestViewName: testConfig.viewIdRaw, + schema: RawChangelogViewSchema, + }); + + const [finalMetadata] = (await view.getMetadata()) as unknown as [ + TableMetadata + ]; + expect( + finalMetadata.materializedView?.allowNonIncrementalDefinition + ).toBeDefined(); + }); + + test("handles view creation errors", async () => { + const view = dataset.table(testConfig.viewIdRaw); + const invalidConfig = { + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + maxStaleness: "invalid", + clustering: null, + }; + + await expect( + initializeLatestMaterializedView({ + bq, + changeTrackerConfig: invalidConfig, + view, + viewExists: false, + rawChangeLogTableName: testConfig.tableIdRaw, + rawLatestViewName: testConfig.viewIdRaw, + schema: RawChangelogViewSchema, + }) + ).rejects.toThrow(); + + expect(logs.tableCreationError).toHaveBeenCalled(); + }); +}); diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/initializeLatestView.test.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/initializeLatestView.test.ts new file mode 100644 index 000000000..8421674f2 --- /dev/null +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/initializeLatestView.test.ts @@ -0,0 +1,65 @@ +import { initializeLatestView } from "../../../bigquery/initializeLatestView"; +import { initializeLatestMaterializedView } from "../../../bigquery/initializeLatestMaterializedView"; +import { FirestoreBigQueryEventHistoryTrackerConfig } from "../../../bigquery"; + +jest.mock("../../../bigquery/initializeLatestMaterializedView"); + +describe("initializeLatestView", () => { + const mockView = { + id: "test_view", + getMetadata: jest.fn(), + setMetadata: jest.fn(), + create: jest.fn(), + }; + + const mockConfig: FirestoreBigQueryEventHistoryTrackerConfig = { + wildcardIds: true, + datasetId: "test_dataset", + useNewSnapshotQuerySyntax: true, + clustering: [], + tableId: "test_raw_table", + useMaterializedView: false, + }; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + describe("initializeLatestView", () => { + it("calls initializeLatestMaterializedView when useMaterializedView is true", async () => { + const mockOptions = { + bq: {} as any, // Mocked BigQuery instance + dataset: { id: "test_dataset" } as any, // Mocked Dataset instance + view: mockView as any, // Mocked Table instance + viewExists: false, + rawChangeLogTableName: "test_raw_table", + rawLatestViewName: "test_raw_view", + changeTrackerConfig: { ...mockConfig, useMaterializedView: true }, + useMaterializedView: true, + useIncrementalMaterializedView: false, + }; + + await initializeLatestView(mockOptions); + + expect(initializeLatestMaterializedView).toHaveBeenCalled(); + }); + + it("does not call initializeLatestMaterializedView when useMaterializedView is false", async () => { + const mockOptions = { + bq: {} as any, // Mocked BigQuery instance + dataset: { id: "test_dataset" } as any, // Mocked Dataset instance + view: mockView as any, // Mocked Table instance + viewExists: false, + rawChangeLogTableName: "test_raw_table", + rawLatestViewName: "test_raw_view", + changeTrackerConfig: { ...mockConfig, useMaterializedView: false }, + useMaterializedView: false, + useIncrementalMaterializedView: false, + }; + + await initializeLatestView(mockOptions); + + expect(initializeLatestMaterializedView).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/integration.test.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/integration.test.ts new file mode 100644 index 000000000..1289b0977 --- /dev/null +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/integration.test.ts @@ -0,0 +1,503 @@ +import { + BigQuery, + Dataset, + Table, + TableMetadata, +} from "@google-cloud/bigquery"; +const { logger } = require("firebase-functions"); + +import { + RawChangelogSchema, + RawChangelogViewSchema, +} from "../../../bigquery/schema"; + +import { ChangeType, FirestoreDocumentChangeEvent } from "../../.."; +import { latestConsistentSnapshotView } from "../../../bigquery/snapshot"; +import { deleteTable } from "../../fixtures/clearTables"; +import { + changeTracker, + changeTrackerEvent, +} from "../../fixtures/changeTracker"; +import { getBigQueryTableData } from "../../fixtures/queries"; +import { firestore } from "firebase-admin"; + +process.env.PROJECT_ID = "dev-extensions-testing"; + +// export const changeTrackerEvent = ({ +// timestamp = "2022-02-13T10:17:43.505Z", +// operation = ChangeType.CREATE, +// documentName = "testing", +// eventId = "testing", +// documentId = "testing", +// pathParams = { documentId: "12345" }, +// data = { end_date: firestore.Timestamp.now() }, +// oldData = null, +// useNewSnapshotQuerySyntax = false, +// }: any): FirestoreDocumentChangeEvent => { +// return { +// timestamp, +// operation, +// documentName, +// eventId, +// documentId, +// data, +// oldData, +// pathParams, +// useNewSnapshotQuerySyntax, +// }; +// }; + +const bq: BigQuery = new BigQuery({ projectId: process.env.PROJECT_ID }); +const event: FirestoreDocumentChangeEvent = changeTrackerEvent({}); +const event2: FirestoreDocumentChangeEvent = changeTrackerEvent({ + data: { end_date: firestore.Timestamp.now() }, + eventId: "testing2", +}); +let randomID: string; +let datasetId: string; +let tableId: string; +let tableId_raw: string; +let viewId_raw: string; +let dataset: Dataset; +let table: Table; +let view: Table; + +describe("integration", () => { + describe("materialized views", () => { + beforeEach(async () => { + randomID = (Math.random() + 1).toString(36).substring(7); + datasetId = `dataset_${randomID}`; + tableId = `table_${randomID}`; + tableId_raw = `${tableId}_raw_changelog`; + dataset = bq.dataset(datasetId); + viewId_raw = `${tableId}_raw_latest`; + }); + + afterEach(async () => { + await deleteTable({ + datasetId, + }); + }); + test("successfully creates a dataset and table and view (incremental)", async () => { + await changeTracker({ + datasetId, + tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([event]); + + const [m] = await dataset.table(viewId_raw).getMetadata(); + + const metadata = m as TableMetadata; + expect(metadata).toBeDefined(); + + expect(metadata.materializedView).toBeDefined(); + expect(metadata.materializedView.query).toBeDefined(); + expect(metadata.materializedView.enableRefresh).toBe(true); + expect( + metadata.materializedView.allowNonIncrementalDefinition + ).not.toBeDefined(); + }); + + test("successfully creates a dataset and table and view (non-incremental)", async () => { + await changeTracker({ + datasetId, + tableId, + useMaterializedView: true, + maxStaleness: `INTERVAL "4:0:0" HOUR TO SECOND`, + refreshIntervalMinutes: 5, + }).record([event]); + + const [m] = await dataset.table(viewId_raw).getMetadata(); + + const metadata = m as TableMetadata; + expect(metadata).toBeDefined(); + + expect(metadata.materializedView).toBeDefined(); + expect(metadata.materializedView.query).toBeDefined(); + expect(metadata.materializedView.enableRefresh).toBe(true); + expect(metadata.materializedView.allowNonIncrementalDefinition).toBe( + true + ); + }); + test("does not recreate a view when it already exists (incremental)", async () => { + await changeTracker({ + datasetId, + tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([event]); + + const [m] = await dataset.table(viewId_raw).getMetadata(); + + const metadata = m as TableMetadata; + expect(metadata).toBeDefined(); + + expect(metadata.materializedView).toBeDefined(); + expect(metadata.materializedView.query).toBeDefined(); + expect(metadata.materializedView.enableRefresh).toBe(true); + expect( + metadata.materializedView.allowNonIncrementalDefinition + ).not.toBeDefined(); + + await new Promise((resolve) => setTimeout(resolve, 2000)); + + await changeTracker({ + datasetId, + tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([event2]); + + await new Promise((resolve) => setTimeout(resolve, 2000)); + + const [m2] = await dataset.table(viewId_raw).getMetadata(); + + const metadata2 = m2 as TableMetadata; + expect(metadata2).toBeDefined(); + expect( + metadata2.materializedView.allowNonIncrementalDefinition + ).not.toBeDefined(); + }); + + test("successfully recreates a view when it already exists (non incremental -> incremental)", async () => { + await changeTracker({ + datasetId, + tableId, + useMaterializedView: true, + maxStaleness: `INTERVAL "4:0:0" HOUR TO SECOND`, + refreshIntervalMinutes: 5, + }).record([event]); + + const [m] = await dataset.table(viewId_raw).getMetadata(); + + const metadata = m as TableMetadata; + expect(metadata).toBeDefined(); + + expect(metadata.materializedView).toBeDefined(); + expect(metadata.materializedView.query).toBeDefined(); + expect(metadata.materializedView.enableRefresh).toBe(true); + expect(metadata.materializedView.allowNonIncrementalDefinition).toBe( + true + ); + + await new Promise((resolve) => setTimeout(resolve, 2000)); + + await changeTracker({ + datasetId, + tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([event2]); + + await new Promise((resolve) => setTimeout(resolve, 2000)); + + const [m2] = await dataset.table(viewId_raw).getMetadata(); + + const metadata2 = m2 as TableMetadata; + expect(metadata2).toBeDefined(); + expect( + metadata2.materializedView.allowNonIncrementalDefinition + ).not.toBeDefined(); + }); + + test("successfully updates incremental materialized view with new events", async () => { + // Initial event recording + await changeTracker({ + datasetId, + tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([event]); + + // Wait for BigQuery to process + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Create additional test events with different data + const event3 = changeTrackerEvent({ + timestamp: new Date().toISOString(), + operation: "CREATE", + documentName: "testCollection/doc3", + eventId: "testing3", + documentId: "doc3", + pathParams: { documentId: "doc3" }, + data: { end_date: firestore.Timestamp.now(), status: "completed" }, + oldData: null, + }); + + const event4 = changeTrackerEvent({ + timestamp: new Date().toISOString(), + operation: "CREATE", + documentName: "testCollection/doc4", + eventId: "testing4", + documentId: "doc4", + pathParams: { documentId: "doc4" }, + data: { end_date: firestore.Timestamp.now(), status: "pending" }, + oldData: null, + }); + + // Record additional events + await changeTracker({ + datasetId, + tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([event3, event4]); + + // Wait for BigQuery to process + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // // Get the materialized view data + const [changeLogRows, latestRows] = await getBigQueryTableData( + "dev-extensions-testing", + datasetId, + tableId + ); + + const query = `SELECT * FROM \`${process.env.PROJECT_ID}.${datasetId}.${tableId}_raw_latest\` LIMIT 10`; + + const [viewData] = await bq.query(query); + + expect(viewData.length).toBe(3); // Should contain all three events + + expect(viewData[0].document_id).toBe("testing"); // First event + expect(viewData[1].document_id).toBe("doc3"); // Third event + expect(viewData[2].document_id).toBe("doc4"); // Fourth event + + expect(JSON.parse(viewData[1].data).status).toBe("completed"); + expect(JSON.parse(viewData[2].data).status).toBe("pending"); + }); + }); +}); + +describe("materialized views operations", () => { + beforeEach(async () => { + randomID = (Math.random() + 1).toString(36).substring(7); + datasetId = `dataset_${randomID}`; + tableId = `table_${randomID}`; + tableId_raw = `${tableId}_raw_changelog`; + dataset = bq.dataset(datasetId); + viewId_raw = `${tableId}_raw_latest`; + }); + + afterEach(async () => { + await deleteTable({ + datasetId, + }); + }); + + const testCases = [ + { + name: "handles document updates correctly", + events: [ + // Initial create + changeTrackerEvent({ + timestamp: new Date().toISOString(), + operation: ChangeType.CREATE, + documentName: "testCollection/update1", + eventId: "update-test-1", + documentId: "update1", + pathParams: { documentId: "update1" }, + data: { status: "draft", title: "Original Title" }, + oldData: null, + }), + // Update the document + changeTrackerEvent({ + timestamp: new Date(Date.now() + 1000).toISOString(), // Ensure later timestamp + operation: ChangeType.UPDATE, + documentName: "testCollection/update1", + eventId: "update-test-2", + documentId: "update1", + pathParams: { documentId: "update1" }, + data: { status: "published", title: "Updated Title" }, + oldData: { status: "draft", title: "Original Title" }, + }), + ], + assertions: async (viewData: any[]) => { + expect(viewData.length).toBe(1); // Should only have one record for the document + const doc = viewData[0]; + const data = JSON.parse(doc.data); + expect(doc.document_id).toBe("update1"); + expect(data.status).toBe("published"); + expect(data.title).toBe("Updated Title"); + }, + }, + { + name: "handles document deletions correctly", + events: [ + // Create first document + changeTrackerEvent({ + timestamp: new Date().toISOString(), + operation: ChangeType.CREATE, + documentName: "testCollection/delete1", + eventId: "delete-test-1", + documentId: "delete1", + pathParams: { documentId: "delete1" }, + data: { status: "active" }, + oldData: null, + }), + // Create second document + changeTrackerEvent({ + timestamp: new Date().toISOString(), + operation: ChangeType.CREATE, + documentName: "testCollection/delete2", + eventId: "delete-test-2", + documentId: "delete2", + pathParams: { documentId: "delete2" }, + data: { status: "active" }, + oldData: null, + }), + // Delete first document + changeTrackerEvent({ + timestamp: new Date(Date.now() + 1000).toISOString(), + operation: ChangeType.DELETE, + documentName: "testCollection/delete1", + eventId: "delete-test-3", + documentId: "delete1", + pathParams: { documentId: "delete1" }, + data: null, + oldData: { status: "active" }, + }), + ], + assertions: async (viewData: any[]) => { + expect(viewData.length).toBe(2); // incremental materialized view cant remove deletes + const doc1 = viewData.find((doc) => doc.document_id === "delete1"); + const doc2 = viewData.find((doc) => doc.document_id === "delete2"); + expect(doc1).toBeDefined(); + expect(doc2).toBeDefined(); + + console.log(JSON.stringify(doc1, null, 2)); + + const operation1 = doc1.operation; + expect(operation1).toBe("DELETE"); + + const operation2 = doc2.operation; + expect(operation2).toBe("CREATE"); + }, + }, + { + name: "handles multiple updates to same document correctly", + events: [ + // Initial create + changeTrackerEvent({ + timestamp: new Date().toISOString(), + operation: ChangeType.CREATE, + documentName: "testCollection/multiUpdate", + eventId: "multi-update-1", + documentId: "multiUpdate", + pathParams: { documentId: "multiUpdate" }, + data: { status: "draft", version: 1 }, + oldData: null, + }), + // First update + changeTrackerEvent({ + timestamp: new Date(Date.now() + 1000).toISOString(), + operation: ChangeType.UPDATE, + documentName: "testCollection/multiUpdate", + eventId: "multi-update-2", + documentId: "multiUpdate", + pathParams: { documentId: "multiUpdate" }, + data: { status: "in_review", version: 2 }, + oldData: { status: "draft", version: 1 }, + }), + // Second update + changeTrackerEvent({ + timestamp: new Date(Date.now() + 2000).toISOString(), + operation: ChangeType.UPDATE, + documentName: "testCollection/multiUpdate", + eventId: "multi-update-3", + documentId: "multiUpdate", + pathParams: { documentId: "multiUpdate" }, + data: { status: "published", version: 3 }, + oldData: { status: "in_review", version: 2 }, + }), + ], + assertions: async (viewData: any[]) => { + expect(viewData.length).toBe(1); + const doc = viewData[0]; + const data = JSON.parse(doc.data); + expect(doc.document_id).toBe("multiUpdate"); + expect(data.status).toBe("published"); + expect(data.version).toBe(3); + }, + }, + { + name: "handles create-delete-create sequence correctly", + events: [ + // Initial create + changeTrackerEvent({ + timestamp: new Date().toISOString(), + operation: "CREATE", + documentName: "testCollection/recreate", + eventId: "recreate-1", + documentId: "recreate", + pathParams: { documentId: "recreate" }, + data: { status: "first_version" }, + oldData: null, + }), + // Delete + changeTrackerEvent({ + timestamp: new Date(Date.now() + 1000).toISOString(), + operation: ChangeType.DELETE, + documentName: "testCollection/recreate", + eventId: "recreate-2", + documentId: "recreate", + pathParams: { documentId: "recreate" }, + data: null, + oldData: { status: "first_version" }, + }), + // Recreate + changeTrackerEvent({ + timestamp: new Date(Date.now() + 2000).toISOString(), + operation: ChangeType.CREATE, + documentName: "testCollection/recreate", + eventId: "recreate-3", + documentId: "recreate", + pathParams: { documentId: "recreate" }, + data: { status: "second_version" }, + oldData: null, + }), + ], + assertions: async (viewData: any[]) => { + expect(viewData.length).toBe(1); + const doc = viewData[0]; + const data = JSON.parse(doc.data); + expect(doc.document_id).toBe("recreate"); + expect(data.status).toBe("second_version"); + }, + }, + ]; + + testCases.forEach(({ name, events, assertions }) => { + test(name, async () => { + // Set up incremental materialized view + await changeTracker({ + datasetId, + tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([events[0]]); + + // Wait for initial setup + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Record remaining events + for (let i = 1; i < events.length; i++) { + await changeTracker({ + datasetId, + tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([events[i]]); + await new Promise((resolve) => setTimeout(resolve, 2000)); + } + + // Query view data + const query = `SELECT * FROM \`${process.env.PROJECT_ID}.${datasetId}.${tableId}_raw_latest\` ORDER BY document_id`; + const [viewData] = await bq.query(query); + + // Run test-specific assertions + await assertions(viewData); + }); + }); +}); diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/shouldRecreateMaterializedView.test.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/shouldRecreateMaterializedView.test.ts new file mode 100644 index 000000000..ec99a23c7 --- /dev/null +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/materializedViews/shouldRecreateMaterializedView.test.ts @@ -0,0 +1,255 @@ +import { BigQuery, Dataset, TableMetadata } from "@google-cloud/bigquery"; +import { firestore } from "firebase-admin"; +import { RawChangelogViewSchema } from "../../../bigquery/schema"; +import { + buildMaterializedViewQuery, + buildNonIncrementalMaterializedViewQuery, +} from "../../../bigquery/snapshot"; +import { shouldRecreateMaterializedView } from "../../../bigquery/initializeLatestMaterializedView"; +import { + changeTracker, + changeTrackerEvent, +} from "../../fixtures/changeTracker"; +import { deleteTable } from "../../fixtures/clearTables"; + +describe("Materialized View Recreation", () => { + const projectId = "dev-extensions-testing"; + const bq = new BigQuery({ projectId }); + + let dataset: Dataset; + let testConfig: { + datasetId: string; + tableId: string; + tableIdRaw: string; + viewIdRaw: string; + }; + + beforeEach(() => { + const randomId = (Math.random() + 1).toString(36).substring(7); + testConfig = { + datasetId: `dataset_${randomId}`, + tableId: `table_${randomId}`, + tableIdRaw: `table_${randomId}_raw_changelog`, + viewIdRaw: `table_${randomId}_raw_latest`, + }; + dataset = bq.dataset(testConfig.datasetId); + }); + + afterEach(async () => { + await deleteTable({ datasetId: testConfig.datasetId }); + }); + + test("should not recreate incremental materialized view when unchanged", async () => { + // Create initial event + const event = changeTrackerEvent({ + data: { end_date: firestore.Timestamp.now() }, + eventId: "testing2", + }); + + // Setup materialized view + await changeTracker({ + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([event]); + + // Verify view metadata + const [metadata] = (await dataset + .table(testConfig.viewIdRaw) + .getMetadata()) as unknown as [TableMetadata]; + + expect(metadata.materializedView).toBeDefined(); + expect(metadata.materializedView?.enableRefresh).toBe(true); + expect( + metadata.materializedView?.allowNonIncrementalDefinition + ).toBeUndefined(); + + // Check if view needs recreation + const view = dataset.table(testConfig.viewIdRaw); + const config = { + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + clustering: null, + }; + + const { source } = buildMaterializedViewQuery({ + schema: RawChangelogViewSchema, + projectId, + datasetId: config.datasetId, + tableName: testConfig.tableIdRaw, + rawLatestViewName: testConfig.viewIdRaw, + }); + + const shouldRecreate = await shouldRecreateMaterializedView( + view, + config, + source + ); + expect(shouldRecreate).toBe(false); + }); + + test("should not recreate non-incremental materialized view when unchanged", async () => { + // Create initial event + const event = changeTrackerEvent({ + data: { end_date: firestore.Timestamp.now() }, + eventId: "testing2", + }); + + // Setup materialized view + await changeTracker({ + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + maxStaleness: `INTERVAL "4:0:0" HOUR TO SECOND`, + refreshIntervalMinutes: 5, + clustering: null, + }).record([event]); + + // // Verify view metadata + const [metadata] = (await dataset + .table(testConfig.viewIdRaw) + .getMetadata()) as unknown as [TableMetadata]; + + expect(metadata.materializedView).toBeDefined(); + expect(metadata.materializedView?.enableRefresh).toBe(true); + expect( + metadata.materializedView?.allowNonIncrementalDefinition + ).toBeDefined(); + + // Check if view needs recreation + const view = dataset.table(testConfig.viewIdRaw); + const config = { + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + maxStaleness: `INTERVAL "4:0:0" HOUR TO SECOND`, + refreshIntervalMinutes: 5, + clustering: null, + }; + + const { source } = buildNonIncrementalMaterializedViewQuery({ + schema: RawChangelogViewSchema, + projectId, + datasetId: config.datasetId, + tableName: testConfig.tableIdRaw, + rawLatestViewName: testConfig.viewIdRaw, + }); + + const shouldRecreate = await shouldRecreateMaterializedView( + view, + config, + source + ); + expect(shouldRecreate).toBe(false); + }); + + test("should recreate materialized view when inc -> non-inc ", async () => { + // Create initial event + const event = changeTrackerEvent({ + data: { end_date: firestore.Timestamp.now() }, + eventId: "testing2", + }); + + // Setup materialized view + await changeTracker({ + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + }).record([event]); + + // Verify view metadata + const [metadata] = (await dataset + .table(testConfig.viewIdRaw) + .getMetadata()) as unknown as [TableMetadata]; + + expect(metadata.materializedView).toBeDefined(); + expect(metadata.materializedView?.enableRefresh).toBe(true); + expect( + metadata.materializedView?.allowNonIncrementalDefinition + ).toBeUndefined(); + + // Check if view needs recreation + const view = dataset.table(testConfig.viewIdRaw); + const config = { + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + maxStaleness: `INTERVAL "4:0:0" HOUR TO SECOND`, + refreshIntervalMinutes: 5, + clustering: null, + }; + + const { source } = buildNonIncrementalMaterializedViewQuery({ + schema: RawChangelogViewSchema, + projectId, + datasetId: config.datasetId, + tableName: testConfig.tableIdRaw, + rawLatestViewName: testConfig.viewIdRaw, + }); + + const shouldRecreate = await shouldRecreateMaterializedView( + view, + config, + source + ); + expect(shouldRecreate).toBe(true); + }); + + test("should recreate materialized view when non-inc -> inc ", async () => { + // Create initial event + const event = changeTrackerEvent({ + data: { end_date: firestore.Timestamp.now() }, + eventId: "testing2", + }); + + // Setup materialized view + await changeTracker({ + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + maxStaleness: `INTERVAL "4:0:0" HOUR TO SECOND`, + refreshIntervalMinutes: 5, + clustering: null, + }).record([event]); + + // Verify view metadata + const [metadata] = (await dataset + .table(testConfig.viewIdRaw) + .getMetadata()) as unknown as [TableMetadata]; + + expect(metadata.materializedView).toBeDefined(); + expect(metadata.materializedView?.enableRefresh).toBe(true); + expect( + metadata.materializedView?.allowNonIncrementalDefinition + ).toBeDefined(); + + // Check if view needs recreation + const view = dataset.table(testConfig.viewIdRaw); + const config = { + datasetId: testConfig.datasetId, + tableId: testConfig.tableId, + useMaterializedView: true, + useIncrementalMaterializedView: true, + clustering: null, + }; + + const { source } = buildNonIncrementalMaterializedViewQuery({ + schema: RawChangelogViewSchema, + projectId, + datasetId: config.datasetId, + tableName: testConfig.tableIdRaw, + rawLatestViewName: testConfig.viewIdRaw, + }); + + const shouldRecreate = await shouldRecreateMaterializedView( + view, + config, + source + ); + expect(shouldRecreate).toBe(true); + }); +}); diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/msql/incremental/standard.sql b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/msql/incremental/standard.sql new file mode 100644 index 000000000..1d392e094 --- /dev/null +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/msql/incremental/standard.sql @@ -0,0 +1,18 @@ +CREATE MATERIALIZED VIEW `test.test_dataset.materialized_view_test` + AS ( + WITH latests AS ( + SELECT + document_name, + MAX_BY(document_id, timestamp) AS document_id, + MAX(timestamp) AS timestamp, + MAX_BY(event_id, timestamp) AS event_id, + MAX_BY(operation, timestamp) AS operation, + MAX_BY(data, timestamp) AS data, + MAX_BY(old_data, timestamp) AS old_data, + MAX_BY(extra_field, timestamp) AS extra_field + FROM `test.test_dataset.test_table` + GROUP BY document_name + ) + SELECT * + FROM latests + ) \ No newline at end of file diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/msql/nonIncremental/standard.sql b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/msql/nonIncremental/standard.sql new file mode 100644 index 000000000..05122ca61 --- /dev/null +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/msql/nonIncremental/standard.sql @@ -0,0 +1,25 @@ +CREATE MATERIALIZED VIEW `test.test_dataset.materialized_view_test` + OPTIONS ( + allow_non_incremental_definition = true, + enable_refresh = true, + refresh_interval_minutes = 60, + max_staleness = INTERVAL "4:0:0" HOUR TO SECOND + ) + AS ( + WITH latests AS ( + SELECT + document_name, + MAX_BY(document_id, timestamp) AS document_id, + MAX(timestamp) AS timestamp, + MAX_BY(event_id, timestamp) AS event_id, + MAX_BY(operation, timestamp) AS operation, + MAX_BY(data, timestamp) AS data, + MAX_BY(old_data, timestamp) AS old_data, + MAX_BY(extra_field, timestamp) AS extra_field + FROM `test.test_dataset.test_table` + GROUP BY document_name + ) + SELECT * + FROM latests + WHERE operation != "DELETE" + ) \ No newline at end of file diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/snapshot.test.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/snapshot.test.ts index 67cfd1b1d..c0132a3f5 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/snapshot.test.ts +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/snapshot.test.ts @@ -19,13 +19,22 @@ import * as fs from "fs"; import * as sqlFormatter from "sql-formatter"; import * as util from "util"; import * as bigquery from "@google-cloud/bigquery"; - -import { buildLatestSnapshotViewQuery } from "../../bigquery/snapshot"; -import { FirestoreBigQueryEventHistoryTracker } from "../../bigquery"; +import * as path from "path"; +import { + buildNonIncrementalMaterializedViewQuery, + buildLatestSnapshotViewQuery, + buildMaterializedViewQuery, +} from "../../bigquery/snapshot"; +import { + FirestoreBigQueryEventHistoryTracker, + RawChangelogViewSchema, +} from "../../bigquery"; const fixturesDir = __dirname + "/../fixtures"; const sqlDir = fixturesDir + "/sql"; +const msqlDir = path.join(__dirname, "/msql"); + const testProjectId = "test"; const testDataset = "test_dataset"; const testTable = "test_table"; @@ -63,53 +72,55 @@ describe("FirestoreBigQueryEventHistoryTracker functionality", () => { }); }); -describe("latest snapshot view sql generation", () => { - it("should generate the expected sql", async () => { - const expectedQuery = await readFormattedSQL( - `${sqlDir}/latestConsistentSnapshot.sql` - ); - const query = buildLatestSnapshotViewQuery({ - datasetId: testDataset, - tableName: testTable, - timestampColumnName: "timestamp", - groupByColumns: ["timestamp", "event_id", "operation", "data"], - useLegacyQuery: false, - }); - expect(query).to.equal(expectedQuery); - }); - it("should generate correct sql with no groupBy columns", async () => { - const expectedQuery = await readFormattedSQL( - `${sqlDir}/latestConsistentSnapshotNoGroupBy.sql` - ); - const query = buildLatestSnapshotViewQuery({ - datasetId: testDataset, - tableName: testTable, - timestampColumnName: "timestamp", - groupByColumns: [], - useLegacyQuery: false, - }); - expect(query).to.equal(expectedQuery); - }); - it("should throw an error for empty group by columns", async () => { - expect( - buildLatestSnapshotViewQuery.bind(null, { +describe("materialized view sql generation", () => { + const testSchema = { + fields: [ + { name: "document_name" }, + { name: "document_id" }, + { name: "timestamp" }, + { name: "event_id" }, + { name: "operation" }, + { name: "data" }, + { name: "old_data" }, + { name: "extra_field" }, + ], + }; + + describe("incremental materialized view", () => { + it("should generate correct SQL", async () => { + const expectedQuery = await readFormattedSQL( + path.join(msqlDir, "incremental/standard.sql") + ); + + const { query } = buildMaterializedViewQuery({ + projectId: testProjectId, datasetId: testDataset, tableName: testTable, - timestampColumnName: "timestamp", - groupByColumns: [""], - useLegacyQuery: false, - }) - ).to.throw(); + rawLatestViewName: "materialized_view_test", + schema: testSchema, + }); + + expect(query).to.equal(sqlFormatter.format(expectedQuery)); + }); }); - it("should throw an error for empty timestamp field", async () => { - expect( - buildLatestSnapshotViewQuery.bind(null, { + + describe("non-incremental materialized view", () => { + it("should generate correct SQL", async () => { + const expectedQuery = await readFormattedSQL( + path.join(msqlDir, "nonIncremental/standard.sql") + ); + + const { query } = buildNonIncrementalMaterializedViewQuery({ + projectId: testProjectId, datasetId: testDataset, tableName: testTable, - timestampColumnName: "", - groupByColumns: [], - useLegacyQuery: false, - }) - ).to.throw(); + rawLatestViewName: "materialized_view_test", + schema: testSchema, + refreshIntervalMinutes: 60, + maxStaleness: `INTERVAL "4:0:0" HOUR TO SECOND`, + }); + + expect(query).to.equal(sqlFormatter.format(expectedQuery)); + }); }); }); diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/changeTracker.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/changeTracker.ts index edabc401e..3bcc9b060 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/changeTracker.ts +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/changeTracker.ts @@ -18,6 +18,10 @@ export const changeTracker = ({ clustering = null, bqProjectId = "dev-extensions-testing", useNewSnapshotQuerySyntax = false, + useMaterializedView = false, + useIncrementalMaterializedView = false, + maxStaleness = undefined, + refreshIntervalMinutes = undefined, }): FirestoreBigQueryEventHistoryTracker => { return new FirestoreBigQueryEventHistoryTracker({ datasetId, @@ -32,6 +36,10 @@ export const changeTracker = ({ clustering, bqProjectId, useNewSnapshotQuerySyntax, + useMaterializedView, + useIncrementalMaterializedView, + maxStaleness, + refreshIntervalMinutes, }); }; diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/sql/latestConsistentSnapshot.sql b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/sql/latestConsistentSnapshot.sql index 7612efdb1..3c1c023ab 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/sql/latestConsistentSnapshot.sql +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/sql/latestConsistentSnapshot.sql @@ -6,7 +6,7 @@ -- document_id: The document id as defined in the Firestore database WITH latest AS ( SELECT - max(timestamp) as latest_timestamp, + MAX(timestamp) AS latest_timestamp, document_name FROM `test.test_dataset.test_table` @@ -16,22 +16,18 @@ SELECT t.document_name, document_id, - timestamp as timestamp, - ANY_VALUE(event_id) as event_id, - operation as operation, - ANY_VALUE(data) as data + timestamp AS timestamp, + ANY_VALUE(event_id) AS event_id, + operation AS operation, + ANY_VALUE(data) AS data FROM `test.test_dataset.test_table` AS t JOIN latest ON ( t.document_name = latest.document_name - AND ( - IFNULL(t.timestamp, timestamp("1970-01-01 00:00:00+00")) - ) = ( - IFNULL( + AND IFNULL(t.timestamp, TIMESTAMP("1970-01-01 00:00:00+00")) = IFNULL( latest.latest_timestamp, - timestamp("1970-01-01 00:00:00+00") + TIMESTAMP("1970-01-01 00:00:00+00") ) - ) ) WHERE operation != "DELETE" diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/sql/latestConsistentSnapshotNoGroupBy.sql b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/sql/latestConsistentSnapshotNoGroupBy.sql index 8a8538818..5f319d842 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/sql/latestConsistentSnapshotNoGroupBy.sql +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/fixtures/sql/latestConsistentSnapshotNoGroupBy.sql @@ -6,7 +6,7 @@ -- document_id: The document id as defined in the Firestore database WITH latest AS ( SELECT - max(timestamp) as latest_timestamp, + MAX(timestamp) AS latest_timestamp, document_name FROM `test.test_dataset.test_table` @@ -20,10 +20,10 @@ FROM `test.test_dataset.test_table` AS t JOIN latest ON ( t.document_name = latest.document_name - AND (IFNULL(t.timestamp, timestamp("1970-01-01 00:00:00+00"))) = (IFNULL(latest.latest_timestamp, timestamp("1970-01-01 00:00:00+00"))) + AND IFNULL(t.timestamp, TIMESTAMP("1970-01-01 00:00:00+00")) = IFNULL(latest.latest_timestamp, TIMESTAMP("1970-01-01 00:00:00+00")) ) WHERE operation != "DELETE" GROUP BY document_name, - document_id \ No newline at end of file + document_id diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/index.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/index.ts index cc2c8d772..5b84be835 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/index.ts +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/index.ts @@ -41,8 +41,9 @@ import { import { Partitioning } from "./partitioning"; import { Clustering } from "./clustering"; -import { tableRequiresUpdate, viewRequiresUpdate } from "./checkUpdates"; +import { tableRequiresUpdate } from "./checkUpdates"; import { parseErrorMessage, waitForInitialization } from "./utils"; +import { initializeLatestView } from "./initializeLatestView"; export { RawChangelogSchema, RawChangelogViewSchema } from "./schema"; @@ -63,6 +64,10 @@ export interface FirestoreBigQueryEventHistoryTrackerConfig { useNewSnapshotQuerySyntax?: boolean; skipInit?: boolean; kmsKeyName?: string | undefined; + useMaterializedView?: boolean; + useIncrementalMaterializedView?: boolean; + maxStaleness?: string; + refreshIntervalMinutes?: number; } /** @@ -207,7 +212,18 @@ export class FirestoreBigQueryEventHistoryTracker private async _waitForInitialization() { const dataset = this.bigqueryDataset(); const changelogName = this.rawChangeLogTableName(); - return waitForInitialization({ dataset, changelogName }); + + let materializedViewName; + + if (this.config.useMaterializedView) { + materializedViewName = this.rawLatestView(); + } + + return waitForInitialization({ + dataset, + changelogName, + materializedViewName, + }); } /** @@ -281,7 +297,7 @@ export class FirestoreBigQueryEventHistoryTracker } try { - await this.initializeLatestView(); + await this._initializeLatestView(); } catch (error) { const message = parseErrorMessage(error, "initializing latest view"); throw new Error(`Error initializing latest view: ${message}`); @@ -424,85 +440,20 @@ export class FirestoreBigQueryEventHistoryTracker * Creates the latest snapshot view, which returns only latest operations * of all existing documents over the raw change log table. */ - private async initializeLatestView() { + private async _initializeLatestView() { const dataset = this.bigqueryDataset(); const view = dataset.table(this.rawLatestView()); const [viewExists] = await view.exists(); - const schema = RawChangelogViewSchema; - - if (viewExists) { - logs.bigQueryViewAlreadyExists(view.id, dataset.id); - const [metadata] = await view.getMetadata(); - // TODO: just casting this for now, needs properly fixing - const fields = (metadata.schema ? metadata.schema.fields : []) as { - name: string; - }[]; - if (this.config.wildcardIds) { - schema.fields.push(documentPathParams); - } - - const columnNames = fields.map((field) => field.name); - const documentIdColExists = columnNames.includes("document_id"); - const pathParamsColExists = columnNames.includes("path_params"); - const oldDataColExists = columnNames.includes("old_data"); - - /** If new view or opt-in to new query syntax **/ - const updateView = viewRequiresUpdate({ - metadata, - config: this.config, - documentIdColExists, - pathParamsColExists, - oldDataColExists, - }); - - if (updateView) { - metadata.view = latestConsistentSnapshotView({ - datasetId: this.config.datasetId, - tableName: this.rawChangeLogTableName(), - schema, - useLegacyQuery: !this.config.useNewSnapshotQuerySyntax, - }); - - if (!documentIdColExists) { - logs.addNewColumn(this.rawLatestView(), documentIdField.name); - } - await view.setMetadata(metadata); - logs.updatingMetadata(this.rawLatestView(), { - config: this.config, - documentIdColExists, - pathParamsColExists, - oldDataColExists, - }); - } - } else { - const schema = { fields: [...RawChangelogViewSchema.fields] }; - - if (this.config.wildcardIds) { - schema.fields.push(documentPathParams); - } - const latestSnapshot = latestConsistentSnapshotView({ - datasetId: this.config.datasetId, - tableName: this.rawChangeLogTableName(), - schema, - bqProjectId: this.bq.projectId, - useLegacyQuery: !this.config.useNewSnapshotQuerySyntax, - }); - logs.bigQueryViewCreating(this.rawLatestView(), latestSnapshot.query); - const options: TableMetadata = { - friendlyName: this.rawLatestView(), - view: latestSnapshot, - }; - - try { - await view.create(options); - await view.setMetadata({ schema: RawChangelogViewSchema }); - logs.bigQueryViewCreated(this.rawLatestView()); - } catch (ex) { - logs.tableCreationError(this.rawLatestView(), ex.message); - } - } - return view; + return await initializeLatestView({ + bq: this.bq, + changeTrackerConfig: this.config, + dataset, + view, + viewExists, + rawChangeLogTableName: this.rawChangeLogTableName(), + rawLatestViewName: this.rawLatestView(), + }); } bigqueryDataset() { diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/initializeLatestMaterializedView.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/initializeLatestMaterializedView.ts new file mode 100644 index 000000000..0cff4d3f7 --- /dev/null +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/initializeLatestMaterializedView.ts @@ -0,0 +1,119 @@ +import { BigQuery, Table, TableMetadata } from "@google-cloud/bigquery"; +import { FirestoreBigQueryEventHistoryTrackerConfig } from "."; +import * as logs from "../logs"; +import { + buildMaterializedViewQuery, + buildNonIncrementalMaterializedViewQuery, +} from "./snapshot"; +import { logger } from "firebase-functions"; +import * as sqlFormatter from "sql-formatter"; + +interface InitializeLatestMaterializedViewOptions { + bq: BigQuery; + changeTrackerConfig: FirestoreBigQueryEventHistoryTrackerConfig; + view: Table; + viewExists: boolean; + rawChangeLogTableName: string; + rawLatestViewName: string; + schema?: any; +} + +export async function shouldRecreateMaterializedView( + view: Table, + config: FirestoreBigQueryEventHistoryTrackerConfig, + source: string +): Promise { + const [viewMetadata] = await view.getMetadata(); + + const isIncremental = !(viewMetadata as TableMetadata).materializedView + ?.allowNonIncrementalDefinition; + + const incrementalMatch = + isIncremental === !!config.useIncrementalMaterializedView; + + const viewQuery = + (viewMetadata as TableMetadata).materializedView?.query || ""; + + const queryMatch = + sqlFormatter.format(viewQuery) === sqlFormatter.format(source); + + return !queryMatch || !incrementalMatch; +} + +/** + * Creates the latest materialized view. + */ +export async function initializeLatestMaterializedView({ + bq, + changeTrackerConfig: config, + view, + viewExists, + rawChangeLogTableName, + rawLatestViewName, + schema, +}: InitializeLatestMaterializedViewOptions): Promise { + try { + const { query, source } = config.useIncrementalMaterializedView + ? buildMaterializedViewQuery({ + projectId: bq.projectId, + datasetId: config.datasetId, + tableName: rawChangeLogTableName, + rawLatestViewName, + schema, + }) + : buildNonIncrementalMaterializedViewQuery({ + projectId: bq.projectId, + datasetId: config.datasetId, + tableName: rawChangeLogTableName, + maxStaleness: config.maxStaleness, + refreshIntervalMinutes: config.refreshIntervalMinutes, + rawLatestViewName, + enableRefresh: true, + schema, + }); + + const desiredQuery = sqlFormatter.format(query); + + if (viewExists) { + const shouldRecreate = await shouldRecreateMaterializedView( + view, + config, + source + ); + + if (!shouldRecreate) { + logger.warn( + `Materialized view requested, but a view with matching configuration exists. Skipping creation.` + ); + return view; + } + + logger.warn( + `Configuration mismatch detected for ${rawLatestViewName} ` + + `Recreating view...` + ); + + await view.delete(); + + return await initializeLatestMaterializedView({ + bq, + changeTrackerConfig: config, + view, + viewExists: false, + rawChangeLogTableName, + rawLatestViewName, + schema, + }); + } + + logs.bigQueryViewCreating(rawLatestViewName, desiredQuery); + await bq.query(desiredQuery); + + logs.bigQueryViewCreated(rawLatestViewName); + } catch (error) { + logs.tableCreationError(rawLatestViewName, error.message); + throw error; + } + + return view; +} diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/initializeLatestView.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/initializeLatestView.ts new file mode 100644 index 000000000..8e017397b --- /dev/null +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/initializeLatestView.ts @@ -0,0 +1,134 @@ +import { + BigQuery, + Dataset, + Table, + TableMetadata, +} from "@google-cloud/bigquery"; +import { + documentIdField, + documentPathParams, + RawChangelogViewSchema, +} from "./schema"; +import { FirestoreBigQueryEventHistoryTrackerConfig } from "."; +import * as logs from "../logs"; +import { latestConsistentSnapshotView } from "./snapshot"; +import { viewRequiresUpdate } from "./checkUpdates"; +import { initializeLatestMaterializedView } from "./initializeLatestMaterializedView"; + +interface InitializeLatestViewOptions { + bq: BigQuery; + changeTrackerConfig: FirestoreBigQueryEventHistoryTrackerConfig; + dataset: Dataset; + view: Table; + viewExists: boolean; + rawChangeLogTableName: string; + rawLatestViewName: string; + useMaterializedView?: boolean; + useIncrementalMaterializedView?: boolean; + useLegacyQuery?: boolean; + refreshIntervalMinutes?: number; + maxStaleness?: string; +} +/** + * Creates the latest snapshot view or materialized view. + */ +export async function initializeLatestView({ + changeTrackerConfig: config, + dataset, + view, + viewExists, + rawChangeLogTableName, + rawLatestViewName, + bq, +}: InitializeLatestViewOptions): Promise
{ + if (config.useMaterializedView) { + const schema = { fields: [...RawChangelogViewSchema.fields] }; + + if (config.wildcardIds) { + schema.fields.push(documentPathParams); + } + return initializeLatestMaterializedView({ + bq, + changeTrackerConfig: config, + view, + viewExists, + rawChangeLogTableName, + rawLatestViewName, + schema, + }); + } + + const schema = RawChangelogViewSchema; + + if (viewExists) { + logs.bigQueryViewAlreadyExists(view.id, dataset.id); + const [metadata] = await view.getMetadata(); + const fields = (metadata.schema ? metadata.schema.fields : []) as { + name: string; + }[]; + if (config.wildcardIds) { + schema.fields.push(documentPathParams); + } + + const columnNames = fields.map((field) => field.name); + const documentIdColExists = columnNames.includes("document_id"); + const pathParamsColExists = columnNames.includes("path_params"); + const oldDataColExists = columnNames.includes("old_data"); + + const updateView = viewRequiresUpdate({ + metadata, + config, + documentIdColExists, + pathParamsColExists, + oldDataColExists, + }); + + if (updateView) { + metadata.view = latestConsistentSnapshotView({ + datasetId: config.datasetId, + tableName: rawChangeLogTableName, + schema, + useLegacyQuery: !config.useNewSnapshotQuerySyntax, + }); + + if (!documentIdColExists) { + logs.addNewColumn(rawLatestViewName, documentIdField.name); + } + + await view.setMetadata(metadata); + logs.updatingMetadata(rawLatestViewName, { + config, + documentIdColExists, + pathParamsColExists, + oldDataColExists, + }); + } + } else { + const schema = { fields: [...RawChangelogViewSchema.fields] }; + + if (config.wildcardIds) { + schema.fields.push(documentPathParams); + } + const latestSnapshot = latestConsistentSnapshotView({ + datasetId: config.datasetId, + tableName: rawChangeLogTableName, + schema, + bqProjectId: bq.projectId, + useLegacyQuery: !config.useNewSnapshotQuerySyntax, + }); + logs.bigQueryViewCreating(rawLatestViewName, latestSnapshot.query); + const options: TableMetadata = { + friendlyName: rawLatestViewName, + view: latestSnapshot, + }; + + try { + await view.create(options); + await view.setMetadata({ schema: RawChangelogViewSchema }); + logs.bigQueryViewCreated(rawLatestViewName); + } catch (error) { + logs.tableCreationError(rawLatestViewName, error.message); + } + } + return view; +} diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/snapshot.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/snapshot.ts index 7c42f38d1..01d2bee62 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/snapshot.ts +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/snapshot.ts @@ -15,11 +15,11 @@ */ import * as sqlFormatter from "sql-formatter"; - import { timestampField } from "./schema"; const excludeFields: string[] = ["document_name", "document_id"]; - +const nonGroupFields = ["event_id", "data", "old_data"]; +import { TableMetadata } from "@google-cloud/bigquery"; interface LatestConsistentSnapshotViewOptions { datasetId: string; tableName: string; @@ -39,16 +39,14 @@ export const latestConsistentSnapshotView = ({ datasetId, tableName, timestampColumnName: timestampField.name, - groupByColumns: schema["fields"] - .map((field) => field.name) - .filter((name) => excludeFields.indexOf(name) === -1), + groupByColumns: extractGroupByColumns(schema), bqProjectId, useLegacyQuery, }), useLegacySql: false, }); -interface buildLatestSnapshotViewQueryOptions { +interface BuildLatestSnapshotViewQueryOptions { datasetId: string; tableName: string; timestampColumnName: string; @@ -64,90 +62,315 @@ export function buildLatestSnapshotViewQuery({ groupByColumns, bqProjectId, useLegacyQuery = true, -}: buildLatestSnapshotViewQueryOptions): string { - if (datasetId === "" || tableName === "" || timestampColumnName === "") { - throw Error(`Missing some query parameters!`); +}: BuildLatestSnapshotViewQueryOptions): string { + validateInputs({ datasetId, tableName, timestampColumnName, groupByColumns }); + + const projectId = bqProjectId || process.env.PROJECT_ID; + + return useLegacyQuery + ? buildLegacyQuery( + projectId, + datasetId, + tableName, + timestampColumnName, + groupByColumns + ) + : buildStandardQuery( + projectId, + datasetId, + tableName, + timestampColumnName, + groupByColumns + ); +} + +function extractGroupByColumns(schema: any): string[] { + return schema["fields"] + .map((field: { name: string }) => field.name) + .filter((name: string) => !excludeFields.includes(name)); +} + +function validateInputs({ + datasetId, + tableName, + timestampColumnName, + groupByColumns, +}: { + datasetId: string; + tableName: string; + timestampColumnName: string; + groupByColumns: string[]; +}) { + if (!datasetId || !tableName || !timestampColumnName) { + throw new Error("Missing required query parameters!"); } - for (let columnName of groupByColumns) { - if (columnName === "") { - throw Error(`Found empty group by column!`); - } + if (groupByColumns.some((columnName) => !columnName)) { + throw new Error("Group by columns must not contain empty values!"); } +} - const legacyQuery = - sqlFormatter.format(` -- Retrieves the latest document change events for all live documents. - -- timestamp: The Firestore timestamp at which the event took place. - -- operation: One of INSERT, UPDATE, DELETE, IMPORT. - -- event_id: The id of the event that triggered the cloud function mirrored the event. - -- data: A raw JSON payload of the current state of the document. - -- document_id: The document id as defined in the Firestore database - SELECT - document_name, - document_id${groupByColumns.length > 0 ? `,` : ``} - ${groupByColumns.join(",")} - FROM ( +function buildLegacyQuery( + projectId: string, + datasetId: string, + tableName: string, + timestampColumnName: string, + groupByColumns: string[] +): string { + return sqlFormatter.format(` + -- Retrieves the latest document change events for all live documents. + -- timestamp: The Firestore timestamp at which the event took place. + -- operation: One of INSERT, UPDATE, DELETE, IMPORT. + -- event_id: The id of the event that triggered the cloud function mirrored the event. + -- data: A raw JSON payload of the current state of the document. + -- document_id: The document id as defined in the Firestore database SELECT document_name, - document_id, - ${groupByColumns - .map( - (columnName) => `FIRST_VALUE(${columnName}) - OVER(PARTITION BY document_name ORDER BY ${timestampColumnName} DESC) - AS ${columnName}` - ) - .join(",")}${groupByColumns.length > 0 ? `,` : ``} - FIRST_VALUE(operation) - OVER(PARTITION BY document_name ORDER BY ${timestampColumnName} DESC) = "DELETE" - AS is_deleted - FROM \`${bqProjectId || process.env.PROJECT_ID}.${datasetId}.${tableName}\` - ORDER BY document_name, ${timestampColumnName} DESC - ) - WHERE NOT is_deleted - GROUP BY document_name, document_id${ - groupByColumns.length > 0 ? `, ` : `` - }${groupByColumns.join(",")}`); - - const nonGroupFields = ["event_id", "data", "old_data"]; - const joinFields = ["document_name"]; - - const addSelectField = (field) => { - if (joinFields.includes(field)) return `t.${field}`; - - return nonGroupFields.includes(field) - ? `ANY_VALUE(${field}) as ${field}` - : `${field} as ${field}`; - }; - - const filterGroupField = (field) => { - return nonGroupFields.includes(field); - }; - - const query = - sqlFormatter.format(` -- Retrieves the latest document change events for all live documents. + document_id${groupByColumns.length > 0 ? `,` : ``} + ${groupByColumns.join(",")} + FROM ( + SELECT + document_name, + document_id, + ${groupByColumns + .map( + (columnName) => `FIRST_VALUE(${columnName}) OVER ( + PARTITION BY document_name + ORDER BY ${timestampColumnName} DESC + ) AS ${columnName}` + ) + .join(",")}${groupByColumns.length > 0 ? "," : ""} + FIRST_VALUE(operation) OVER ( + PARTITION BY document_name + ORDER BY ${timestampColumnName} DESC + ) = "DELETE" AS is_deleted + FROM \`${projectId}.${datasetId}.${tableName}\` + ORDER BY document_name, ${timestampColumnName} DESC + ) + WHERE NOT is_deleted + GROUP BY document_name, document_id${ + groupByColumns.length > 0 ? ", " : "" + }${groupByColumns.join(",")}`); +} + +function buildStandardQuery( + projectId: string, + datasetId: string, + tableName: string, + timestampColumnName: string, + groupByColumns: string[] +): string { + return sqlFormatter.format(` + -- Retrieves the latest document change events for all live documents. -- timestamp: The Firestore timestamp at which the event took place. -- operation: One of INSERT, UPDATE, DELETE, IMPORT. -- event_id: The id of the event that triggered the cloud function mirrored the event. -- data: A raw JSON payload of the current state of the document. -- document_id: The document id as defined in the Firestore database WITH latest AS ( - SELECT max(${timestampColumnName}) as latest_timestamp, document_name - FROM \`${ - bqProjectId || process.env.PROJECT_ID - }.${datasetId}.${tableName}\` + SELECT MAX(${timestampColumnName}) AS latest_timestamp, document_name + FROM \`${projectId}.${datasetId}.${tableName}\` GROUP BY document_name ) SELECT - t.document_name, - document_id${groupByColumns.length > 0 ? `,` : ``} - ${groupByColumns.map((f) => addSelectField(f)).join(",")} - FROM \`${ - bqProjectId || process.env.PROJECT_ID - }.${datasetId}.${tableName}\` AS t - JOIN latest ON (t.document_name = latest.document_name AND (IFNULL(t.${timestampColumnName}, timestamp("1970-01-01 00:00:00+00"))) = (IFNULL(latest.latest_timestamp, timestamp("1970-01-01 00:00:00+00")))) + t.document_name, + document_id${groupByColumns.length > 0 ? "," : ""} + ${groupByColumns + .map((field) => + nonGroupFields.includes(field) + ? `ANY_VALUE(${field}) AS ${field}` + : `${field} AS ${field}` + ) + .join(",")} + FROM \`${projectId}.${datasetId}.${tableName}\` AS t + JOIN latest ON ( + t.document_name = latest.document_name AND + IFNULL(t.${timestampColumnName}, TIMESTAMP("1970-01-01 00:00:00+00")) = + IFNULL(latest.latest_timestamp, TIMESTAMP("1970-01-01 00:00:00+00")) + ) WHERE operation != "DELETE" GROUP BY document_name, document_id${ - groupByColumns.length > 0 ? `, ` : `` - }${groupByColumns.filter((c) => !filterGroupField(c)).join(",")}`); + groupByColumns.length > 0 ? ", " : "" + }${groupByColumns + .filter((field) => !nonGroupFields.includes(field)) + .join(",")}`); +} +interface MaterializedViewOptions { + projectId: string; + datasetId: string; + tableName: string; + rawLatestViewName: string; + schema: any; + refreshIntervalMinutes?: number; + maxStaleness?: string; +} + +// type ITimePartitioning = { +// /** +// * Optional. Number of milliseconds for which to keep the storage for a partition. A wrapper is used here because 0 is an invalid value. +// */ +// expirationMs?: string; +// /** +// * Optional. If not set, the table is partitioned by pseudo column '_PARTITIONTIME'; if set, the table is partitioned by this field. The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE or REQUIRED. A wrapper is used here because an empty string is an invalid value. +// */ +// field?: string; +// /** +// * If set to true, queries over this table require a partition filter that can be used for partition elimination to be specified. This field is deprecated; please set the field with the same name on the table itself instead. This field needs a wrapper because we want to output the default value, false, if the user explicitly set it. +// */ +// requirePartitionFilter?: boolean; +// /** +// * Required. The supported types are DAY, HOUR, MONTH, and YEAR, which will generate one partition per day, hour, month, and year, respectively. +// */ +// type?: string; +// }; + +interface NonIncrementalMaterializedViewOptions + extends MaterializedViewOptions { + enableRefresh?: boolean; +} + +// Helper function to extract fields from schema +function extractFieldsFromSchema(schema: any): string[] { + if (!schema || !schema.fields) { + throw new Error("Invalid schema: must contain fields array"); + } + return schema.fields.map((field: { name: string }) => field.name); +} + +export function buildMaterializedViewQuery({ + projectId, + datasetId, + tableName, + rawLatestViewName, + schema, + refreshIntervalMinutes, + maxStaleness, +}: NonIncrementalMaterializedViewOptions): { + target: string; + source: string; + query: string; +} { + // Build the options string + const options = []; + + if (refreshIntervalMinutes !== undefined) { + options.push(`refresh_interval_minutes = ${refreshIntervalMinutes}`); + } + + if (maxStaleness) { + options.push(`max_staleness = ${maxStaleness}`); + } + + const optionsString = + options.length > 0 + ? `OPTIONS ( + ${options.join(",\n ")} + )` + : ""; + + // Extract fields from schema + const fields = extractFieldsFromSchema(schema); + + // Build the aggregated fields for the CTE + const aggregatedFields = fields + .map((fieldName) => { + if (fieldName === "document_name") { + return " document_name"; + } + if (fieldName === "timestamp") { + return " MAX(timestamp) AS timestamp"; + } + return ` MAX_BY(${fieldName}, timestamp) AS ${fieldName}`; + }) + .join(",\n "); + + const target = `CREATE MATERIALIZED VIEW \`${projectId}.${datasetId}.${rawLatestViewName}\` ${optionsString}`; + + const source = ` + WITH latests AS ( + SELECT + ${aggregatedFields} + FROM \`${projectId}.${datasetId}.${tableName}\` + GROUP BY document_name + ) + SELECT * + FROM latests + `; + + // Combine all parts with options before AS + const fullQuery = sqlFormatter.format(`${target} AS (${source})`); + + return { target, source, query: fullQuery }; +} + +export function buildNonIncrementalMaterializedViewQuery({ + projectId, + datasetId, + tableName, + rawLatestViewName, + schema, + refreshIntervalMinutes, + maxStaleness, + enableRefresh = true, +}: NonIncrementalMaterializedViewOptions): { + target: string; + source: string; + query: string; +} { + // Build the options string + const options = []; + options.push("allow_non_incremental_definition = true"); + + if (enableRefresh !== undefined) { + options.push(`enable_refresh = ${enableRefresh}`); + } + + if (refreshIntervalMinutes !== undefined) { + options.push(`refresh_interval_minutes = ${refreshIntervalMinutes}`); + } + + if (maxStaleness) { + options.push(`max_staleness = ${maxStaleness}`); + } + + const optionsString = + options.length > 0 + ? `OPTIONS ( + ${options.join(",\n ")} + )` + : ""; + + // Extract fields from schema + const fields = extractFieldsFromSchema(schema); + + // Build the aggregated fields for the CTE + const aggregatedFields = fields + .map((fieldName) => { + if (fieldName === "document_name") { + return " document_name"; + } + if (fieldName === "timestamp") { + return " MAX(timestamp) AS timestamp"; + } + return ` MAX_BY(${fieldName}, timestamp) AS ${fieldName}`; + }) + .join(",\n "); + + const target = `CREATE MATERIALIZED VIEW \`${projectId}.${datasetId}.${rawLatestViewName}\` ${optionsString}`; + + const source = ` + WITH latests AS ( + SELECT + ${aggregatedFields} + FROM \`${projectId}.${datasetId}.${tableName}\` + GROUP BY document_name + ) + SELECT * + FROM latests + WHERE operation != "DELETE" + `; + + // Combine all parts with options before AS + const fullQuery = sqlFormatter.format(`${target} AS (${source})`); - return useLegacyQuery ? legacyQuery : query; + return { target, source, query: fullQuery }; } diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.ts index e930c43f2..0a67d70db 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.ts +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.ts @@ -4,6 +4,7 @@ import * as logs from "../logs"; interface WaitForInitializationParams { dataset: Dataset; changelogName: string; + materializedViewName?: string; } /** @@ -14,7 +15,7 @@ interface WaitForInitializationParams { * @throws {Error} Throws an error if the dataset or table cannot be verified to exist after multiple attempts or if an unexpected error occurs. */ export async function waitForInitialization( - { dataset, changelogName }: WaitForInitializationParams, + { dataset, changelogName, materializedViewName }: WaitForInitializationParams, maxAttempts = 12 ): Promise
{ return new Promise((resolve, reject) => { @@ -25,7 +26,15 @@ export async function waitForInitialization( const table = dataset.table(changelogName); const [tableExists] = await table.exists(); - if (datasetExists && tableExists) { + let waitingForMaterializedView = false; + + if (materializedViewName) { + const materializedView = dataset.table(materializedViewName); + const [materializedViewExists] = await materializedView.exists(); + waitingForMaterializedView = !materializedViewExists; + } + + if (datasetExists && tableExists && !waitingForMaterializedView) { clearInterval(handle); resolve(table); } else {