4+ ways to get data into CosmosDB

There are many ways to insert data in Cosmos DB and many use cases when you want to do that. In this article we’ll dive in the use case of bulk-updating a Cosmos DB collection based on a feed of data or some kind of recurrent data ingestion process into Cosmos DB. The
Sync jobs, daily jobs that need to populate data into a Cosmos DB collection…

I’ll evaluate the solutions based on a few factors: cost, speed, development time, complexity of solution.

Test bench

For the tests, our data will always come from a blob storage that contains a JSON document representing a collection of documents that need to be inserted in Cosmos DB. The documents are already in the exact shape / schema that Cosmos DB needs (i.e. will have an id field, will have the Partition Key property set, etc…)

I’m using two separate data sources to represent slightly different scenarios.

  • A smaller collection of just 5k items of bigger documents (~1kb). Each item has a unique partition key for maximum write performance.
  • A larger collection of about 100k items of smaller documents (0.5kb). Items are distributed more or less evenly across 5k partition keys.

The Cosmos DB collection is purposely scaled down to just 1000 RUs, manual scale.

Options

1. Logic App

Following with the serverless concepts, a popular option to run workflows is via Logic Apps. It’s a very good, no-code method to build your pipeline and you have endless options to trigger and mange the workload.

However, the cost model for Logic Apps makes it less suitable for massive, recurrent load jobs, as you are charged for every step of the workflow. If you’re inserting 5000 items in a Cosmos DB, you’ll be charged for 5000 connector executions and it can get pretty expensive at scale.

Logic App Flow

Pros:

  • Lots of options to trigger it, built-in connectors
  • No code to maintain
  • Easy deployment / automation (you can export template as ARM)

Cons:

  • Cannot be written generically - need the schema of each type of document in order to parse it from the original blob.
  • Does not scale very well since logic app won’t run inserts in parallel
  • You pay per connector, and each document is an considered a connector action
  • Some connector limitations

Benchmark Results:

  • 5k items from JSON blob - ~90s (~45 items / sec)

Conclusion:

Logic Apps are a good no-code solution, suitable for smaller jobs (hundreds of items at a time), especially if they are part of a larger or pre-existing workflow. The Logic App connector is not optimized for speed so this will likely be the slowest option. However when used for small scale inserts this likely won’t be a deal breaker.

2. Data Factory

Using a Data Factory is another no-code approach to get data into your Cosmos database. Behind the scenes it’s using the bulk executor library which makes it fast and efficient.

The pricing model for the Data Factory a combination of execution time and number of executions. However, the significant portion of cost will likely come from the time it takes to run the job and this makes it significantly cheaper than the Logic Apps, at scale.

Data Factory

Pros:

  • Can handle csv out of the box with no code conversion necessary and no noticeable performance penalty
  • Supports time based or event based triggers
  • Very fast since it’s using the bulk executor library for highly efficient inserts.

Cons:

  • Can be a pain to automate via Arm templates.
  • Cannot be generic, you have to configure the data mapping between source and destination.

Benchmark Results:

Manual 1000 RUs collection

  • 5k items from JSON blob: ~44s
  • 5k items from CSV blob: ~47s
  • 100k items from JSON blob: ~610s

Auto-scale 10000 RUs collection

  • 5k items from JSON blob: ~8s
  • 100k items from JSON blob: ~85s

Conclusion:

Data Factory is the other no-code approach to get data in. It offers excellent performance (best-in-class) however it loses the triggering flexibility that you have with Logic App. This solution is probably best for larger jobs (thousands, hundred of thousands of documents) that run in a more structured environment.

3. Azure Function (With Cosmos DB Binding)

When using the Automatic bindings, the Azure Functions is a low-code solution that provides a good balance between a bit of flexibility with not a lot of code.

1
2
3
4
5
6
7
8
9
10
11
[FunctionName("Load-QueueToCosmos")]
public static async Task LoadQueueMessageToCosomos(
[QueueTrigger("load-to-cosmos")] dynamic str,
[CosmosDB(
databaseName: "%CosmosDatabaseName%",
collectionName: "%CosmosCollectionName%",
ConnectionStringSetting = "CosmosConnectionString")] IAsyncCollector<dynamic> documents,
ILogger log)
{
await documents.AddAsync(str);
}

In the example above you can see we can “move” a queue message straight into Cosmos DB with just one line of code.
This isn’t going to be the most efficient way to “bulk” load data though, since every document will mean a function execution and a connection to Cosmos DB, however, for some low scale inserts, might be good enough.

A quick optimization of the same idea is to use the binding to add multiple documents in the same function execution context. This obviously requires a different input, such as a blob reference containing the documents, or any other way to read an array.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[FunctionName("Load-BlobToCosmos")]
public static async Task<IActionResult> LoadBlobToCosmos(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequest req,
IBinder inputBlob,
[CosmosDB(
databaseName: "%CosmosDatabaseName%",
collectionName: "%CosmosCollectionName%",
ConnectionStringSetting = "CosmosConnectionString")] IAsyncCollector<dynamic> documents,
ILogger log)
{
var items = await GetDocumentsFromBlob(inputBlob, GetInputPathFromRequest(req), log);

foreach (var item in items)
await documents.AddAsync(item);
await documents.FlushAsync();

return new OkObjectResult($"Inserted {items.Count()} items");
}

Pros:

  • Virtually no code
  • Free (assuming fewer than 1M calls a month)

Cons:

  • 5m timeout on Functions limits the number of items you can insert in one function call.
  • Not scalable beyond a few hundred items
  • Not very fast in bulk load scenarios

Benchmark Results:

  • 5k items from JSON blob: ~264s
  • 100k items from JSON blob: Did not run (not feasible with this solution)

4. Code

The most flexible option, is still via old-fashioned code. In this case you are able to fully customize your solution to fully extract the last bit of performance. You can deploy this as an API (but need to think about security considerations) or as a stand-alone application on a VM (if you already have something set up).

With this approach, you should fully take advantage of the “new” bulk-load capabilities of the Cosmos DB SDK since it’s heavily optimized for inserting a lot of documents in the shortest time possible.

However, this bulk load capability comes with a drawback. Without a properly provisioned collection (5000 RUs either manual or auto-scale) you might have to manually handle some “429 try again later” errors when doing a larger data set (like my 100k items), especially when running this in a high capacity source like an App Service in Azure.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public async Task LoadData(string fileName)
{
//Initialize container somewhere else
var cosmosClient= new CosmosClient(cosmosConnectionString, new CosmosClientOptions
{
AllowBulkExecution = true
});
var cosmosContainer = cosmosClient.GetContainer(databaseName, containerName);

//Utility method to read the file and get a collection of tuples with data / partitionKey
IEnumerable<(JObject Item, PartitionKey PartitionKey)> itemsToInsert = await GetJObjectsFromInputFile(fileName);

//Optional, if you don't need the insert to return the items back. Shaves a bit of time.
var requestOptions = new ItemRequestOptions { EnableContentResponseOnWrite = false };

var tasks = itemsToInsert.Select(_ =>
cosmosContainer.UpsertItemAsync(_.Item, _.PartitionKey, requestOptions)
.ContinueWith((res) =>
{
if (!res.IsCompletedSuccessfully)
//likely a 429 error. record the item, to insert after.
;
return res;
})).ToArray();
).ToArray();
await Task.WhenAll(tasks);

Console.WriteLine($"Inserted {itemsToInsert.Count()} items in {sw.ElapsedMilliseconds:0.##} ms");
}

Pros:

  • Most flexible in terms of options and performance
  • Highest performance (easily saturate your Cosmos DB provisioned throughput)

Cons:

  • Hard to deploy and trigger, you’ll likely need an additional piece of infrastructure to run this code (VM, App Service, Function, etc…)

Benchmark Results:

Running locally

  • 5k items from JSON blob: ~48s
  • 100k items from JSON blob: ~790s

Running in Azure App Service at manual scale 1000 RUs

  • 5k items from JSON blob: ~38s
  • 100k items from JSON blob: did not run was not able to consistently run this without errors on Azure, due to low RUs.

Running in Azure App Service (collection scaled to 10K RUs auto-scaled)

  • 5k items from JSON blob: ~5s
  • 100k items from JSON blob: ~80s

Conclusion:

For one-time massive load jobs, this is the best option as you can fine tune it to exact spec. Not the best if you need this to be a daily / recurrent job for smaller / medium scale inserts due to lack of flexibility on triggering the job.

Final Words

We explored 4 different ways (one with multiple deployment options) to get data into Cosmos DB which should cover a vast array of use-cases and situations.
Some final conclusions based on the tests and benchmarks are that the Data Factory is very well built and can handle lots of items, at any scale, even in low RUs scenarios. The SDK Bulk support is by far the fastest option, but it needs a properly scaled Cosmos Collection to avoid retries and more code to handle those exceptions. Logic Apps and Functions are interesting no-code solutions for very low scale operations, especially if you already have those services as part of your current workflow.

Share Comments