Skip to content

Commit

Permalink
Python: Adding Azure CosmosDB Mongo vCore as a datastore. (#2990)
Browse files Browse the repository at this point in the history
### Motivation and Context

I have added Azure CosmosDB MongoDB vCore as a data store. MongoDB vCore
now supports vector search on embeddings, and it could be used to
seamlessly integrate your AI-based applications with your data stored in
the Azure CosmosDB. More details about Mongo vCore can be found here:
https://learn.microsoft.com/en-us/azure/cosmos-db/mongodb/vcore/vector-search.

Issue #2375

<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, by providing the following
information:
-->
1. Why is this change required? **MongoDB vCore now supports vector
search on embeddings, and it could be used to seamlessly integrate your
AI-based applications with your data stored in the Azure CosmosDB.**
2. What problem does it solve? This adds a new memory store MongoDB
vCore(Azure CosmosDB)


### Description

<!-- Describe your changes, the overall approach, and the underlying
design.
These notes will help you understand how your code works. Thanks! -->

### Contribution Checklist

<!-- Before submitting this PR, please make sure: -->

- [ ] The code builds clean without any errors or warnings
- [ ] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [ ] All unit tests pass, and I have added new tests where possible
- [ ] I didn't break anyone 😄

---------

Co-authored-by: Abby Harrison <[email protected]>
Co-authored-by: Abby Harrison <[email protected]>
Co-authored-by: Mark Wallace <[email protected]>
  • Loading branch information
4 people authored Nov 14, 2023
1 parent 6f7cd6a commit dcad9f7
Show file tree
Hide file tree
Showing 10 changed files with 822 additions and 4 deletions.
4 changes: 4 additions & 0 deletions python/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ WEAVIATE_API_KEY=""
GOOGLE_PALM_API_KEY=""
GOOGLE_SEARCH_ENGINE_ID=""
REDIS_CONNECTION_STRING=""
AZCOSMOS_API = "" // should be mongo-vcore for now, as CosmosDB only supports vector search in mongo-vcore for now.
AZCOSMOS_CONNSTR = ""
AZCOSMOS_DATABASE_NAME = ""
AZCOSMOS_CONTAINER_NAME = ""
2 changes: 2 additions & 0 deletions python/semantic_kernel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from semantic_kernel.utils.null_logger import NullLogger
from semantic_kernel.utils.settings import (
azure_cosmos_db_settings_from_dot_env,
azure_openai_settings_from_dot_env,
bing_search_settings_from_dot_env,
google_palm_settings_from_dot_env,
Expand All @@ -35,6 +36,7 @@
"bing_search_settings_from_dot_env",
"mongodb_atlas_settings_from_dot_env",
"google_palm_settings_from_dot_env",
"azure_cosmos_db_settings_from_dot_env",
"redis_settings_from_dot_env",
"PromptTemplateConfig",
"PromptTemplate",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.

from semantic_kernel.connectors.memory.azure_cosmosdb.azure_cosmos_db_memory_store import (
AzureCosmosDBMemoryStore,
)

__all__ = ["AzureCosmosDBMemoryStore"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
# Copyright (c) Microsoft. All rights reserved.

from typing import List, Tuple

from numpy import ndarray

from semantic_kernel.connectors.memory.azure_cosmosdb.azure_cosmos_db_store_api import (
AzureCosmosDBStoreApi,
)
from semantic_kernel.connectors.memory.azure_cosmosdb.cosmosdb_utils import (
get_mongodb_resources,
)
from semantic_kernel.connectors.memory.azure_cosmosdb.mongo_vcore_store_api import (
MongoStoreApi,
)
from semantic_kernel.memory.memory_record import MemoryRecord
from semantic_kernel.memory.memory_store_base import MemoryStoreBase
from semantic_kernel.utils.settings import azure_cosmos_db_settings_from_dot_env

# Load environment variables
(cosmos_api, cosmos_connstr) = azure_cosmos_db_settings_from_dot_env()


class AzureCosmosDBMemoryStore(MemoryStoreBase):
"""A memory store that uses AzureCosmosDB for MongoDB vCore, to perform vector similarity search on a fully
managed MongoDB compatible database service.
https://learn.microsoft.com/en-us/azure/cosmos-db/mongodb/vcore/vector-search"""

# Right now this only supports Mongo, but set up to support more later.
apiStore: AzureCosmosDBStoreApi = None
mongodb_client = None
database = None
index_name = None
vector_dimensions = None
num_lists = None
similarity = None
collection_name = None

def __init__(
self,
cosmosStore: AzureCosmosDBStoreApi,
database_name: str,
index_name: str,
vector_dimensions: int,
num_lists: int,
similarity: str,
):
if vector_dimensions <= 0:
raise ValueError("Vector dimensions must be a positive number.")
# if connection_string is None:
# raise ValueError("Connection String cannot be empty.")
if database_name is None:
raise ValueError("Database Name cannot be empty.")
if index_name is None:
raise ValueError("Index Name cannot be empty.")

self.cosmosStore = cosmosStore
self.index_name = index_name
self.num_lists = num_lists
self.similarity = similarity

@staticmethod
async def create(
database_name,
collection_name,
index_name,
vector_dimensions,
num_lists,
similarity,
) -> MemoryStoreBase:
"""Creates the underlying data store based on the API definition"""
# Right now this only supports Mongo, but set up to support more later.
apiStore: AzureCosmosDBStoreApi = None
if cosmos_api == "mongo-vcore":
mongodb_client, database = get_mongodb_resources(
cosmos_connstr, database_name
)
apiStore = MongoStoreApi(
collection_name,
index_name,
vector_dimensions,
num_lists,
similarity,
database,
)
else:
raise NotImplementedError

store = AzureCosmosDBMemoryStore(
apiStore,
database_name,
index_name,
vector_dimensions,
num_lists,
similarity,
)
await store.create_collection_async(collection_name)
return store

async def create_collection_async(self, collection_name: str) -> None:
"""Creates a new collection in the data store.
Arguments:
collection_name {str} -- The name associated with a collection of embeddings.
Returns:
None
"""
return await self.cosmosStore.create_collection(collection_name)

async def get_collections_async(self) -> List[str]:
"""Gets the list of collections.
Returns:
List[str] -- The list of collections.
"""
return await self.cosmosStore.get_collections_async()

async def delete_collection_async(self, collection_name: str) -> None:
"""Deletes a collection.
Arguments:
collection_name {str} -- The name of the collection to delete.
Returns:
None
"""
return await self.cosmosStore.delete_collection(str())

async def does_collection_exist_async(self, collection_name: str) -> bool:
"""Checks if a collection exists.
Arguments:
collection_name {str} -- The name of the collection to check.
Returns:
bool -- True if the collection exists; otherwise, False.
"""
return await self.cosmosStore.does_collection_exist(str())

async def upsert_async(self, collection_name: str, record: MemoryRecord) -> str:
"""Upsert a record.
Arguments:
collection_name {str} -- The name of the collection to upsert the record into.
record {MemoryRecord} -- The record to upsert.
Returns:
str -- The unique record id of the record.
"""
return await self.cosmosStore.upsert(str(), record)

async def upsert_batch_async(
self, collection_name: str, records: List[MemoryRecord]
) -> List[str]:
"""Upsert a batch of records.
Arguments:
collection_name {str} -- The name of the collection to upsert the records into.
records {List[MemoryRecord]} -- The records to upsert.
Returns:
List[str] -- The unique database keys of the records.
"""
return await self.cosmosStore.upsert_batch(str(), records)

async def get_async(
self, collection_name: str, key: str, with_embedding: bool
) -> MemoryRecord:
"""Gets a record.
Arguments:
collection_name {str} -- The name of the collection to get the record from.
key {str} -- The unique database key of the record.
with_embedding {bool} -- Whether to include the embedding in the result. (default: {False})
Returns:
MemoryRecord -- The record.
"""
return await self.cosmosStore.get(str(), key, with_embedding)

async def get_batch_async(
self, collection_name: str, keys: List[str], with_embeddings: bool
) -> List[MemoryRecord]:
"""Gets a batch of records.
Arguments:
collection_name {str} -- The name of the collection to get the records from.
keys {List[str]} -- The unique database keys of the records.
with_embeddings {bool} -- Whether to include the embeddings in the results. (default: {False})
Returns:
List[MemoryRecord] -- The records.
"""
return await self.cosmosStore.get_batch(str(), keys, with_embeddings)

async def remove_async(self, collection_name: str, key: str) -> None:
"""Removes a record.
Arguments:
collection_name {str} -- The name of the collection to remove the record from.
key {str} -- The unique database key of the record to remove.
Returns:
None
"""
return await self.cosmosStore.remove(str(), key)

async def remove_batch_async(self, collection_name: str, keys: List[str]) -> None:
"""Removes a batch of records.
Arguments:
collection_name {str} -- The name of the collection to remove the records from.
keys {List[str]} -- The unique database keys of the records to remove.
Returns:
None
"""
return await self.cosmosStore.remove_batch(str(), keys)

async def get_nearest_matches_async(
self,
collection_name: str,
embedding: ndarray,
limit: int,
min_relevance_score: float,
with_embeddings: bool,
) -> List[Tuple[MemoryRecord, float]]:
"""Gets the nearest matches to an embedding using vector configuration.
Parameters:
collection_name (str) -- The name of the collection to get the nearest matches from.
embedding (ndarray) -- The embedding to find the nearest matches to.
limit {int} -- The maximum number of matches to return.
min_relevance_score {float} -- The minimum relevance score of the matches. (default: {0.0})
with_embeddings {bool} -- Whether to include the embeddings in the results. (default: {False})
Returns:
List[Tuple[MemoryRecord, float]] -- The records and their relevance scores.
"""
return await self.cosmosStore.get_nearest_matches(
str(), embedding, limit, min_relevance_score, with_embeddings
)

async def get_nearest_match_async(
self,
collection_name: str,
embedding: ndarray,
min_relevance_score: float,
with_embedding: bool,
) -> Tuple[MemoryRecord, float]:
"""Gets the nearest match to an embedding using vector configuration parameters.
Arguments:
collection_name {str} -- The name of the collection to get the nearest match from.
embedding {ndarray} -- The embedding to find the nearest match to.
min_relevance_score {float} -- The minimum relevance score of the match. (default: {0.0})
with_embedding {bool} -- Whether to include the embedding in the result. (default: {False})
Returns:
Tuple[MemoryRecord, float] -- The record and the relevance score.
"""
return await self.cosmosStore.get_nearest_match(
str(), embedding, min_relevance_score, with_embedding
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright (c) Microsoft. All rights reserved.


from abc import ABC, abstractmethod
from typing import List, Tuple

from numpy import ndarray

from semantic_kernel.memory.memory_record import MemoryRecord


# Abstract class similar to the original data store that allows API level abstraction
class AzureCosmosDBStoreApi(ABC):
@abstractmethod
async def create_collection(self, collection_name: str) -> None:
raise NotImplementedError

@abstractmethod
async def get_collections(self) -> List[str]:
raise NotImplementedError

@abstractmethod
async def delete_collection(self, collection_name: str) -> None:
raise NotImplementedError

@abstractmethod
async def does_collection_exist(self, collection_name: str) -> bool:
raise NotImplementedError

@abstractmethod
async def upsert(self, collection_name: str, record: MemoryRecord) -> str:
raise NotImplementedError

@abstractmethod
async def upsert_batch(
self, collection_name: str, records: List[MemoryRecord]
) -> List[str]:
raise NotImplementedError

@abstractmethod
async def get(
self, collection_name: str, key: str, with_embedding: bool
) -> MemoryRecord:
raise NotImplementedError

@abstractmethod
async def get_batch(
self, collection_name: str, keys: List[str], with_embeddings: bool
) -> List[MemoryRecord]:
raise NotImplementedError

@abstractmethod
async def remove(self, collection_name: str, key: str) -> None:
raise NotImplementedError

@abstractmethod
async def remove_batch(self, collection_name: str, keys: List[str]) -> None:
raise NotImplementedError

@abstractmethod
async def get_nearest_matches(
self,
collection_name: str,
embedding: ndarray,
limit: int,
min_relevance_score: float,
with_embeddings: bool,
) -> List[Tuple[MemoryRecord, float]]:
raise NotImplementedError

@abstractmethod
async def get_nearest_match(
self,
collection_name: str,
embedding: ndarray,
min_relevance_score: float,
with_embedding: bool,
) -> Tuple[MemoryRecord, float]:
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) Microsoft. All rights reserved.

from pymongo import MongoClient


def get_mongodb_resources(connection_string: str, database_name: str):
try:
client = MongoClient(connection_string)
database = client[database_name]
except Exception as ex:
raise Exception(
f"Error while connecting to Azure Cosmos MongoDb vCore: {ex}"
) from ex
return client, database
Loading

0 comments on commit dcad9f7

Please sign in to comment.