Bulk insert entities into Cosmos DB using Python

Published on
Reading time
Authors

💣💣Note: the v4 Python SDK for Cosmos DB has a bunch of breaking changes to the API surface which means this article is out-of-date. Stored Procedure execution has moved from the Cosmos Client to the Scripts module in the Python SDK. Please consider the below documentation of archive value only. 💣💣


I've been working a lot with Python over the last few months and one piece of work I've spent quite a lot of time on is interfacing Python solutions with Cosmos DB.

Microsoft has shipped a great Cosmos DB Python SDK that provides almost all a developer could need to work with Cosmos DB. In one of my use cases I needed to batch insert multiple entities at the same time which is something the Cosmos DB SDK doesn't support in any language. This meant I immediately looked at the Cosmos DB Bulk Executor library, only to find it only supports Java and .NET right now 🤨.

Luckily, having done a fair bit of Cosmos DB over the last few years I was aware this bulk insert pattern  had been solved previously using Cosmos DB Stored Procedures so I weant and dug up the sample bulk import stored procedure from GitHub:

https://github.com/Azure/azure-cosmosdb-js-server/blob/master/samples/stored-procedures/BulkImport.js

Job done.

Well... not quite!

Thankfully I have a fairly good head of hair, because I tore a fair bit of it out getting it all to work.

The first challenge I hit was a general lack of complex "Calling Cosmos DB Stored Procedures from Python" samples. Yes, there are simple examples floating around, but *all* of them consist of calling a stored procedure with no parameters, or calling a stored procedure with a single parameter. While I was calling and passing only one parameter I was passing an array and not a single value.

No matter what I did all I could generate from Cosmos DB was this response (pasted here in case others search for a fix online):

Exception: HTTPFailure: Status code: 400 Sub-status: 400
{
  "code":"BadRequest",
  "message":"Message:
  {
    "Errors": [
      "Encountered exception while executing Javascript. Exception = Error: The document body must be an object or a string representing a JSON-serialized object.
      Stack trace: Error: The document body must be an object or a string representing a JSON-serialized object. at createDocument (bulkImport.js:636:21)
      at tryCreate (bulkImport.js:32:9) at bulkImport (bulkImport.js:24:5) at __docDbMain (bulkImport.js:61:5) at Global code (bulkImport.js:1:2)"
      ]
  }
  ActivityId: 95af001e-XXXX-XXXX-XXXX-6aca4d356c10,
  Request URI: /apps/0ee1f647-XXXX-XXXX-XXXX-04d92606fd10/services/53e603e8-XXXX-XXXX-XXXX-399602af32ec/partitions/09ebc044-XXXX-XXXX-XXXX-729efb2fd8ae/replicas/131869045473958276p/,
  RequestStats: \r\nRequestStartTime: 2018-11-18T22:26:58.2676364Z,
  Number of regions attempted: 1\r\n,
  SDK: Microsoft.Azure.Documents.Common/2.1.0.0"
}

Clear as mud ... quelle horreur! I even found myself here:

Stack Overflow screenshot 1

Thankfully before I could post a question that would invariably be closed by some 30 million reputation-holding moderator and then down-voted into oblivion, I did a bit more digging and came across an answer to this question:

https://stackoverflow.com/questions/47748684/documentdb-bulkimport-stored-proc-getting-400-error-on-array-json-issue

This answer in particular:

Stack Overflow screenshot 2

https://stackoverflow.com/a/49450200/2899711

Thanks Aram...upvoted with a comment linking here 😊.

Which lead me to the solution shown below. The key element here is the line:

client.ExecuteStoredProcedure(sproc_link, [new_docs])

The 'new_docs' array is itself enclosed in an array!

pass-array-sproc.py
import uuid
import os

# v3 Python SDK - note that v4 changed the API and stored procedures now live in the azure.cosmos.scripts module.
import azure.cosmos.documents as documents
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors

COSMOS_HOST = os.environ['COSMOS_HOST']
MASTER_KEY = os.environ['MASTER_KEY']
DATABASE_ID = os.environ['DATABASE_ID']
COLLECTION_ID = os.environ['COLLECTION_ID']
database_link = 'dbs/' + DATABASE_ID
collection_link = database_link + '/colls/' + COLLECTION_ID
# Use sample bulk SP from here: https://github.com/Azure/azure-cosmosdb-js-server/blob/master/samples/stored-procedures/BulkImport.js
sproc_link = collection_link + '/sprocs/bulkImport'

def create_cosmos_entity(jobid, test):

    return {
        'JobID': jobid,
        'Test': test
    }

def main():

  new_docs = []
  counter = 0
  while counter < 30:
    new_docs.append(create_cosmos_entity(str(uuid.uuid4()), counter))
    counter += 1

  if(len(new_docs) > 0 and counter < 100):
    client = cosmos_client.CosmosClient(COSMOS_HOST, {'masterKey': MASTER_KEY})
    # The key here is to include [] around 'new_docs' otherwise call fails!
    client.ExecuteStoredProcedure(sproc_link, [new_docs])

Happy days! 😎

P.S. - You can learn about working with Cosmos DB completely for free on Microsoft Learn. (The added bonus is no Azure subscription or entry of credit card details is required to get going!)