diff --git a/server/client/index.ts b/server/client/index.ts index de0e0daaa..8ab7d12f6 100644 --- a/server/client/index.ts +++ b/server/client/index.ts @@ -40,6 +40,7 @@ interface IGetClientProps { */ client: ILegacyCustomClusterClient; onExtendClient?: (client: OpenSearchDashboardsClient) => Record | undefined; + getDataSourceId?: (context: RequestHandlerContext, request: OpenSearchDashboardsRequest) => string | undefined; pluginId: string; logger: Logger; } @@ -70,7 +71,7 @@ export const getClientSupportMDS = (props: IGetClientProps) => { */ props.client.asScoped = function (request: OpenSearchDashboardsRequest): ILegacyScopedClusterClient { const context = contextMap[request.id]; - const dataSourceId = "3e2ff6a0-de64-11ed-b697-57f5dd34beb6"; + const dataSourceId = props.getDataSourceId?.(context, request); /** * If no dataSourceId provided @@ -100,11 +101,24 @@ export const getClientSupportMDS = (props: IGetClientProps) => { props.logger.debug(`Call api using the data source: ${dataSourceId}`); try { const dataSourceClient = await context.dataSource.opensearch.getClient(dataSourceId); + + /** + * extend client if needed + **/ Object.assign(dataSourceClient, { ...props.onExtendClient?.(dataSourceClient) }); + + /** + * Call the endpoint by providing client + * The logic is much the same as what callAPI does in Dashboards + */ const clientPath = endpoint.split("."); const api: any = get(dataSourceClient, clientPath); let apiContext = clientPath.length === 1 ? dataSourceClient : get(dataSourceClient, clientPath.slice(0, -1)); const request = api.call(apiContext, clientParams); + + /** + * In case the request is aborted + */ if (options?.signal) { options.signal.addEventListener("abort", () => { request.abort(); diff --git a/server/clusters/extend_client.ts b/server/clusters/extend_client.ts new file mode 100644 index 000000000..569b3ba02 --- /dev/null +++ b/server/clusters/extend_client.ts @@ -0,0 +1,486 @@ +// @ts-ignore +import { factory } from "elasticsearch/src/lib/client_action"; +import { API } from "../utils/constants"; +export function extendClient(props: { ism: any; ca: typeof factory }) { + const { ism, ca } = props; + ism.getPolicy = ca({ + url: { + fmt: `${API.POLICY_BASE}/<%=policyId%>`, + req: { + policyId: { + type: "string", + required: true, + }, + }, + }, + method: "GET", + }); + + ism.getPolicies = ca({ + url: { + fmt: `${API.POLICY_BASE}`, + }, + method: "GET", + }); + + ism.createPolicy = ca({ + url: { + fmt: `${API.POLICY_BASE}/<%=policyId%>?refresh=wait_for`, + req: { + policyId: { + type: "string", + required: true, + }, + }, + }, + needBody: true, + method: "PUT", + }); + + ism.deletePolicy = ca({ + url: { + fmt: `${API.POLICY_BASE}/<%=policyId%>?refresh=wait_for`, + req: { + policyId: { + type: "string", + required: true, + }, + }, + }, + method: "DELETE", + }); + + ism.putPolicy = ca({ + url: { + fmt: `${API.POLICY_BASE}/<%=policyId%>?if_seq_no=<%=ifSeqNo%>&if_primary_term=<%=ifPrimaryTerm%>&refresh=wait_for`, + req: { + policyId: { + type: "string", + required: true, + }, + ifSeqNo: { + type: "string", + required: true, + }, + ifPrimaryTerm: { + type: "string", + required: true, + }, + }, + }, + needBody: true, + method: "PUT", + }); + + ism.explain = ca({ + url: { + fmt: `${API.EXPLAIN_BASE}/<%=index%>`, + req: { + index: { + type: "string", + required: true, + }, + }, + }, + method: "GET", + }); + + ism.explainAll = ca({ + url: { + fmt: `${API.EXPLAIN_BASE}`, + }, + method: "GET", + }); + + ism.retry = ca({ + url: { + fmt: `${API.RETRY_BASE}/<%=index%>`, + req: { + index: { + type: "string", + required: true, + }, + }, + }, + needBody: false, + method: "POST", + }); + + ism.add = ca({ + url: { + fmt: `${API.ADD_POLICY_BASE}/<%=index%>`, + req: { + index: { + type: "string", + required: true, + }, + }, + }, + needBody: true, + method: "POST", + }); + + ism.remove = ca({ + url: { + fmt: `${API.REMOVE_POLICY_BASE}/<%=index%>`, + req: { + index: { + type: "string", + required: true, + }, + }, + }, + needBody: false, + method: "POST", + }); + + ism.change = ca({ + url: { + fmt: `${API.CHANGE_POLICY_BASE}/<%=index%>`, + req: { + index: { + type: "string", + required: true, + }, + }, + }, + needBody: true, + method: "POST", + }); + + // TODO add new APIs as they are being implemented: status, stop, start + + ism.getRollup = ca({ + url: { + fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>`, + req: { + rollupId: { + type: "string", + required: true, + }, + }, + }, + method: "GET", + }); + + ism.getRollups = ca({ + url: { + fmt: `${API.ROLLUP_JOBS_BASE}`, + }, + method: "GET", + }); + + ism.createRollup = ca({ + url: { + fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>?refresh=wait_for`, + req: { + rollupId: { + type: "string", + required: true, + }, + }, + }, + needBody: true, + method: "PUT", + }); + + ism.deleteRollup = ca({ + url: { + fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>?refresh=wait_for`, + req: { + rollupId: { + type: "string", + required: true, + }, + }, + }, + method: "DELETE", + }); + + ism.putRollup = ca({ + url: { + fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>`, + req: { + rollupId: { + type: "string", + required: true, + }, + }, + }, + method: "PUT", + }); + + ism.startRollup = ca({ + url: { + fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>/_start`, + req: { + rollupId: { + type: "string", + required: true, + }, + }, + }, + method: "POST", + }); + + ism.stopRollup = ca({ + url: { + fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>/_stop`, + req: { + rollupId: { + type: "string", + required: true, + }, + }, + }, + method: "POST", + }); + + ism.explainRollup = ca({ + url: { + fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>/_explain`, + req: { + rollupId: { + type: "string", + required: true, + }, + }, + }, + method: "GET", + }); + + ism.getTransform = ca({ + url: { + fmt: `${API.TRANSFORM_BASE}/<%=transformId%>`, + req: { + transformId: { + type: "string", + required: true, + }, + }, + }, + method: "GET", + }); + + ism.getTransforms = ca({ + url: { + fmt: `${API.TRANSFORM_BASE}/`, + }, + method: "GET", + }); + + ism.explainTransform = ca({ + url: { + fmt: `${API.TRANSFORM_BASE}/<%=transformId%>/_explain`, + req: { + transformId: { + type: "string", + required: true, + }, + }, + }, + method: "GET", + }); + + ism.startTransform = ca({ + url: { + fmt: `${API.TRANSFORM_BASE}/<%=transformId%>/_start`, + req: { + transformId: { + type: "string", + required: true, + }, + }, + }, + method: "POST", + }); + + ism.stopTransform = ca({ + url: { + fmt: `${API.TRANSFORM_BASE}/<%=transformId%>/_stop`, + req: { + transformId: { + type: "string", + required: true, + }, + }, + }, + method: "POST", + }); + + ism.deleteTransform = ca({ + url: { + fmt: `${API.TRANSFORM_BASE}/<%=transformId%>`, + req: { + transformId: { + type: "string", + required: true, + }, + }, + }, + method: "DELETE", + }); + + ism.createTransform = ca({ + url: { + fmt: `${API.TRANSFORM_BASE}/<%=transformId%>?refresh=wait_for`, + req: { + transformId: { + type: "string", + required: true, + }, + }, + }, + needBody: true, + method: "PUT", + }); + + ism.putTransform = ca({ + url: { + fmt: `${API.TRANSFORM_BASE}/<%=transformId%>`, + req: { + transformId: { + type: "string", + required: true, + }, + }, + }, + method: "PUT", + }); + + ism.previewTransform = ca({ + url: { + fmt: `${API.TRANSFORM_BASE}/_preview`, + }, + needBody: true, + method: "POST", + }); + + ism.getChannels = ca({ + url: { + fmt: `${API.CHANNELS_BASE}`, + }, + method: "GET", + }); + + ism.getChannel = ca({ + url: { + fmt: `${API.NOTIFICATION_CONFIGS_BASE}/<%=id%>`, + req: { + id: { + type: "string", + required: true, + }, + }, + }, + method: "GET", + }); + + ism.getSMPolicy = ca({ + url: { + fmt: `${API.SM_POLICY_BASE}/<%=id%>`, + req: { + id: { + type: "string", + required: true, + }, + }, + }, + method: "GET", + }); + + ism.getSMPolicies = ca({ + url: { + fmt: `${API.SM_POLICY_BASE}`, + }, + method: "GET", + }); + + ism.createSMPolicy = ca({ + url: { + fmt: `${API.SM_POLICY_BASE}/<%=policyId%>?refresh=wait_for`, + req: { + policyId: { + type: "string", + required: true, + }, + }, + }, + needBody: true, + method: "POST", + }); + + ism.updateSMPolicy = ca({ + url: { + fmt: `${API.SM_POLICY_BASE}/<%=policyId%>?if_seq_no=<%=ifSeqNo%>&if_primary_term=<%=ifPrimaryTerm%>&refresh=wait_for`, + req: { + policyId: { + type: "string", + required: true, + }, + ifSeqNo: { + type: "string", + required: true, + }, + ifPrimaryTerm: { + type: "string", + required: true, + }, + }, + }, + needBody: true, + method: "PUT", + }); + + ism.deleteSMPolicy = ca({ + url: { + fmt: `${API.SM_POLICY_BASE}/<%=policyId%>?refresh=wait_for`, + req: { + policyId: { + type: "string", + required: true, + }, + }, + }, + method: "DELETE", + }); + + ism.explainSnapshotPolicy = ca({ + url: { + fmt: `${API.SM_POLICY_BASE}/<%=id%>/_explain`, + req: { + id: { + type: "string", + required: true, + }, + }, + }, + method: "GET", + }); + + ism.startSnapshotPolicy = ca({ + url: { + fmt: `${API.SM_POLICY_BASE}/<%=id%>/_start`, + req: { + id: { + type: "string", + required: true, + }, + }, + }, + method: "POST", + }); + + ism.stopSnapshotPolicy = ca({ + url: { + fmt: `${API.SM_POLICY_BASE}/<%=id%>/_stop`, + req: { + id: { + type: "string", + required: true, + }, + }, + }, + method: "POST", + }); +} diff --git a/server/clusters/ism/ismPlugin.ts b/server/clusters/ism/ismPlugin.ts index 231c7b5bd..ab05ac5dd 100644 --- a/server/clusters/ism/ismPlugin.ts +++ b/server/clusters/ism/ismPlugin.ts @@ -2,8 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ - -import { API } from "../../utils/constants"; +import { extendClient } from "../extend_client"; /* TODO: migrate to types @@ -15,485 +14,8 @@ export default function ismPlugin(Client: any, config: any, components: any) { Client.prototype.ism = components.clientAction.namespaceFactory(); const ism = Client.prototype.ism.prototype; - - ism.getPolicy = ca({ - url: { - fmt: `${API.POLICY_BASE}/<%=policyId%>`, - req: { - policyId: { - type: "string", - required: true, - }, - }, - }, - method: "GET", - }); - - ism.getPolicies = ca({ - url: { - fmt: `${API.POLICY_BASE}`, - }, - method: "GET", - }); - - ism.createPolicy = ca({ - url: { - fmt: `${API.POLICY_BASE}/<%=policyId%>?refresh=wait_for`, - req: { - policyId: { - type: "string", - required: true, - }, - }, - }, - needBody: true, - method: "PUT", - }); - - ism.deletePolicy = ca({ - url: { - fmt: `${API.POLICY_BASE}/<%=policyId%>?refresh=wait_for`, - req: { - policyId: { - type: "string", - required: true, - }, - }, - }, - method: "DELETE", - }); - - ism.putPolicy = ca({ - url: { - fmt: `${API.POLICY_BASE}/<%=policyId%>?if_seq_no=<%=ifSeqNo%>&if_primary_term=<%=ifPrimaryTerm%>&refresh=wait_for`, - req: { - policyId: { - type: "string", - required: true, - }, - ifSeqNo: { - type: "string", - required: true, - }, - ifPrimaryTerm: { - type: "string", - required: true, - }, - }, - }, - needBody: true, - method: "PUT", - }); - - ism.explain = ca({ - url: { - fmt: `${API.EXPLAIN_BASE}/<%=index%>`, - req: { - index: { - type: "string", - required: true, - }, - }, - }, - method: "GET", - }); - - ism.explainAll = ca({ - url: { - fmt: `${API.EXPLAIN_BASE}`, - }, - method: "GET", - }); - - ism.retry = ca({ - url: { - fmt: `${API.RETRY_BASE}/<%=index%>`, - req: { - index: { - type: "string", - required: true, - }, - }, - }, - needBody: false, - method: "POST", - }); - - ism.add = ca({ - url: { - fmt: `${API.ADD_POLICY_BASE}/<%=index%>`, - req: { - index: { - type: "string", - required: true, - }, - }, - }, - needBody: true, - method: "POST", - }); - - ism.remove = ca({ - url: { - fmt: `${API.REMOVE_POLICY_BASE}/<%=index%>`, - req: { - index: { - type: "string", - required: true, - }, - }, - }, - needBody: false, - method: "POST", - }); - - ism.change = ca({ - url: { - fmt: `${API.CHANGE_POLICY_BASE}/<%=index%>`, - req: { - index: { - type: "string", - required: true, - }, - }, - }, - needBody: true, - method: "POST", - }); - - // TODO add new APIs as they are being implemented: status, stop, start - - ism.getRollup = ca({ - url: { - fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>`, - req: { - rollupId: { - type: "string", - required: true, - }, - }, - }, - method: "GET", - }); - - ism.getRollups = ca({ - url: { - fmt: `${API.ROLLUP_JOBS_BASE}`, - }, - method: "GET", - }); - - ism.createRollup = ca({ - url: { - fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>?refresh=wait_for`, - req: { - rollupId: { - type: "string", - required: true, - }, - }, - }, - needBody: true, - method: "PUT", - }); - - ism.deleteRollup = ca({ - url: { - fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>?refresh=wait_for`, - req: { - rollupId: { - type: "string", - required: true, - }, - }, - }, - method: "DELETE", - }); - - ism.putRollup = ca({ - url: { - fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>`, - req: { - rollupId: { - type: "string", - required: true, - }, - }, - }, - method: "PUT", - }); - - ism.startRollup = ca({ - url: { - fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>/_start`, - req: { - rollupId: { - type: "string", - required: true, - }, - }, - }, - method: "POST", - }); - - ism.stopRollup = ca({ - url: { - fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>/_stop`, - req: { - rollupId: { - type: "string", - required: true, - }, - }, - }, - method: "POST", - }); - - ism.explainRollup = ca({ - url: { - fmt: `${API.ROLLUP_JOBS_BASE}/<%=rollupId%>/_explain`, - req: { - rollupId: { - type: "string", - required: true, - }, - }, - }, - method: "GET", - }); - - ism.getTransform = ca({ - url: { - fmt: `${API.TRANSFORM_BASE}/<%=transformId%>`, - req: { - transformId: { - type: "string", - required: true, - }, - }, - }, - method: "GET", - }); - - ism.getTransforms = ca({ - url: { - fmt: `${API.TRANSFORM_BASE}/`, - }, - method: "GET", - }); - - ism.explainTransform = ca({ - url: { - fmt: `${API.TRANSFORM_BASE}/<%=transformId%>/_explain`, - req: { - transformId: { - type: "string", - required: true, - }, - }, - }, - method: "GET", - }); - - ism.startTransform = ca({ - url: { - fmt: `${API.TRANSFORM_BASE}/<%=transformId%>/_start`, - req: { - transformId: { - type: "string", - required: true, - }, - }, - }, - method: "POST", - }); - - ism.stopTransform = ca({ - url: { - fmt: `${API.TRANSFORM_BASE}/<%=transformId%>/_stop`, - req: { - transformId: { - type: "string", - required: true, - }, - }, - }, - method: "POST", - }); - - ism.deleteTransform = ca({ - url: { - fmt: `${API.TRANSFORM_BASE}/<%=transformId%>`, - req: { - transformId: { - type: "string", - required: true, - }, - }, - }, - method: "DELETE", - }); - - ism.createTransform = ca({ - url: { - fmt: `${API.TRANSFORM_BASE}/<%=transformId%>?refresh=wait_for`, - req: { - transformId: { - type: "string", - required: true, - }, - }, - }, - needBody: true, - method: "PUT", - }); - - ism.putTransform = ca({ - url: { - fmt: `${API.TRANSFORM_BASE}/<%=transformId%>`, - req: { - transformId: { - type: "string", - required: true, - }, - }, - }, - method: "PUT", - }); - - ism.previewTransform = ca({ - url: { - fmt: `${API.TRANSFORM_BASE}/_preview`, - }, - needBody: true, - method: "POST", - }); - - ism.getChannels = ca({ - url: { - fmt: `${API.CHANNELS_BASE}`, - }, - method: "GET", - }); - - ism.getChannel = ca({ - url: { - fmt: `${API.NOTIFICATION_CONFIGS_BASE}/<%=id%>`, - req: { - id: { - type: "string", - required: true, - }, - }, - }, - method: "GET", - }); - - ism.getSMPolicy = ca({ - url: { - fmt: `${API.SM_POLICY_BASE}/<%=id%>`, - req: { - id: { - type: "string", - required: true, - }, - }, - }, - method: "GET", - }); - - ism.getSMPolicies = ca({ - url: { - fmt: `${API.SM_POLICY_BASE}`, - }, - method: "GET", - }); - - ism.createSMPolicy = ca({ - url: { - fmt: `${API.SM_POLICY_BASE}/<%=policyId%>?refresh=wait_for`, - req: { - policyId: { - type: "string", - required: true, - }, - }, - }, - needBody: true, - method: "POST", - }); - - ism.updateSMPolicy = ca({ - url: { - fmt: `${API.SM_POLICY_BASE}/<%=policyId%>?if_seq_no=<%=ifSeqNo%>&if_primary_term=<%=ifPrimaryTerm%>&refresh=wait_for`, - req: { - policyId: { - type: "string", - required: true, - }, - ifSeqNo: { - type: "string", - required: true, - }, - ifPrimaryTerm: { - type: "string", - required: true, - }, - }, - }, - needBody: true, - method: "PUT", - }); - - ism.deleteSMPolicy = ca({ - url: { - fmt: `${API.SM_POLICY_BASE}/<%=policyId%>?refresh=wait_for`, - req: { - policyId: { - type: "string", - required: true, - }, - }, - }, - method: "DELETE", - }); - - ism.explainSnapshotPolicy = ca({ - url: { - fmt: `${API.SM_POLICY_BASE}/<%=id%>/_explain`, - req: { - id: { - type: "string", - required: true, - }, - }, - }, - method: "GET", - }); - - ism.startSnapshotPolicy = ca({ - url: { - fmt: `${API.SM_POLICY_BASE}/<%=id%>/_start`, - req: { - id: { - type: "string", - required: true, - }, - }, - }, - method: "POST", - }); - - ism.stopSnapshotPolicy = ca({ - url: { - fmt: `${API.SM_POLICY_BASE}/<%=id%>/_stop`, - req: { - id: { - type: "string", - required: true, - }, - }, - }, - method: "POST", + extendClient({ + ism, + ca, }); } diff --git a/server/plugin.ts b/server/plugin.ts index e3cfc77b7..bc0177920 100644 --- a/server/plugin.ts +++ b/server/plugin.ts @@ -33,7 +33,9 @@ import dataStreams from "./routes/dataStreams"; import { NodeServices } from "./models/interfaces"; import { getClientSupportMDS } from "./client"; import { OpenSearchDashboardsClient } from "@opensearch-project/opensearch/api/opensearch_dashboards"; -import { Client } from "elasticsearch"; +import { extendClient } from "./clusters/extend_client"; +// @ts-ignore +import { factory } from "elasticsearch/src/lib/client_action"; export class IndexPatternManagementPlugin implements Plugin { private readonly logger: Logger; @@ -50,13 +52,24 @@ export class IndexPatternManagementPlugin implements Plugin factory(...args).bind(finalClient), + }); + return { - ism: ((legacyClient as unknown) as { client: Client & { ism: any } }).client.ism, + ism, }; }, pluginId: "opensearch_index_management_dashboards",