Connecting to Cosmos#

Data Science & AI Workbench enables you to connect to a Microsoft Azure Cosmos distributed database, to access data stored in it.

Before you can do so, however, you’ll need to install the libraries that are required to connect to the Cosmos database:

conda install -c https://conda.anaconda.org/conda-forge/ azure-cosmos -y

Tip

You can also download the Cosmos DB Python SDK and access relevant documentation from Microsoft.

When you use conda install to add a package during a session, the project is impacted temporarily, during the current session only. If you want the change to persist for future project sessions and deployments, be sure to add the package to the project’s anaconda-project.yml file. For more information, see Project configurations.

You’ll also need to obtain the values for the connection keys required from your Azure account.

See Secrets for information about adding credentials to the platform, to make them available in your projects. Any secrets you add will be available across all sessions and deployments associated with your user account.

After you’ve installed the correct driver and gathered the connection information for your Azure account, you can then use code such as this to access the database from within a notebook session:

from azure.cosmos import cosmos_client


import json


"""
Expecting Dictionary for the secret in the following format:
{
    "ENDPOINT": "<ENDPOINT>",
    "PRIMARYKEY": "<PRIMARYKEY>",
    "DATABASE": "<DATABASE>",
    "CONTAINER": "<CONTAINER>"
}

Change out the following values with the appropriate values from your Azure account:
<ENDPOINT> - Get from Azure account
<PRIMARYKEY> - Get from Azure account
<DATABASE> - Database to connect to
<CONTAINER> - Container to use
"""
credentials = None
with open('/var/run/secrets/user_credentials/<credential_key>') as f:
    credentials = json.load(f)

# Initialize the Cosmos client
client = cosmos_client.CosmosClient(
    url_connection=credentials.get('ENDPOINT'),
    auth={'masterKey': credentials.get('PRIMARYKEY')}
)

def find_element(id, element_type, parent_link=None):
    item = None
    temp_items = []
    find_entity_by_id_query = {
        "query": "SELECT * FROM r WHERE r.id=@id",
        "parameters": [
            { "name":"@id", "value": id}
        ]
    }
    if element_type == 'database':
        temp_items = list(client.QueryDatabases(find_entity_by_id_query))
    elif element_type == 'collection':
        temp_items = list(client.QueryContainers(parent_link, find_entity_by_id_query))

    if len(temp_items) == 1:
        item = temp_items[0]

    return item

database = find_element(credentials.get('DATABASE'), 'database')
collection = find_element(credentials.get('CONTAINER'), 'collection', database.get('_self'))

query = {'query': 'SELECT * FROM c'}
options = {'enableCrossPartitionQuery': True}

results = client.QueryItems(collection.get('_self'), query, options)
for result in results:
    print(json.dumps(result, indent=4))