Real-time analytics with Azure Cosmos DB and Spark

In this screen cast, you will learn how to use Azure Cosmos DB and HDinsight Spark together to make sense of your data in batch and in real-time.

Advertisements

Cosmos DB and Key Vault

You can access Cosmos DB with a Key and the URI of the account. However, you should never keep the keys and URL in application code. Be it a desktop application or a web application.

Azure Key Vault brings the perfect solution for your application. The solution can be very simple, summarize in following steps:

  • Create a key Vault
  • Store the CosmosDB Access Keys in Key Vault.
  • Create an application
  • Register the Application with the Active Directory
  • Give permission to application to read the Key Vault.

Your application code will look as follows:


AzureServiceTokenProvider azureServiceTokenProvider = new AzureServiceTokenProvider();
try
{
  var keyVaultClient = new KeyVaultClient(
  new KeyVaultClient.AuthenticationCallback(azureServiceTokenProvider.KeyVaultTokenCallback));
  var secret = await keyVaultClient.GetSecretAsync("https://rafatkeyvalut.vault.azure.net/secrets/testsecret/2c7b64d0c34546a5902095efcc44af10")
                    .ConfigureAwait(false);
  ViewBag.Secret = $"Secret: {secret.Value}";
}
catch (Exception exp)
{
      ViewBag.Error = $"Something went wrong: {exp.Message}";
}

ViewBag.Principal = azureServiceTokenProvider.PrincipalUsed != null ? $"Principal Used: {azureServiceTokenProvider.PrincipalUsed}" : string.Empty;
return View();

Now, lets go thru all the steps.

Create a secret in Key Vault
KV1

Get the secret identifier, my secret is “xxxxx” string. But,  you can keep the Cosmos DB connection  key here.

KV2

Here is the identifier of the secret:

https://rafatkeyvalut.vault.azure.net/secrets/testsecret/2c7b64d0c34546a5902095efcc44af10

You need this identifier in your application. But, you don’t have to worry about this identifier. Even if someone gets it, they cannot access your secret.

Create a web application or you can download the code from here. It is a very simple MVC application, write the code in home controller as shown above. Only interesting file is HomeController.cs. Everything else is a boiler plate code.

Once the application is created, deploy it on Azure. (right click on project and choose publish)

Once the application is deployed. Go to Azure portal, choose the application service and turn on the Managed Service Identity of this application.

KV3

If you will run the application now, you will see the following error, as you have not given any permission to this application in Key Vault.

KV4

Now go to Key Vault and add the application, using the access policy

KV5

After adding the application, you can choose the permission you want to give it to this application as follows:

KV6

Now, if you will run the application you will see that you can read the secret from Key Vault.

KV7

Similarly you can add a user to access the key Vault.

You need to add yourself to the Key Vault by clicking on “Access Policies” and then give all the permission you need to run the application from Visual studio. When this application is running from your desktop it takes your identity.

Learn more https://docs.microsoft.com/en-us/azure/active-directory/msi-overview

 

Reading Cosmos DB Change Feed

To track the changes on the Cosmos DB, you can reads it’s change feed. The change feed support in Azure Cosmos DB enables you to build efficient and scalable solutions.

Azure Cosmos DB change feed provides a sorted list of documents within an Azure Cosmos DB collection in the order in which they were modified. This feed can be used to listen for modifications to data within the collection and perform any action. The change feed is available for each partition key range within the document collection, and thus can be distributed across one or more consumers for parallel processing. Once you get the document which is changed, sky is the limit. You can send that document to Azure Notification hub or trigger any other process.

You can read the change feed in three different ways:

  1. Using the CosmosDB Client Library.
  2. Using the Change Feed Processor SDK
  3. Using the serverless Azure function.

In this article we will discuss first two option and subsequent blog post will address the last serverless Azure function options.

I am keeping this article very short and to the point. I am quickly showing you the code snippet which you need to write to get started on reading the change feed. At the end of article, you will find a link to the full working code.

Azure Cosmos DB SDK
Download CosmosDB SDK 

This SDK gives you all the power to read the change feed, but with power comes lots of responsibilities too. If you want to manage checkpoint, and deal with sequence number of documents and have granule control over partition keys then this may be the right approach.

So let’s get started, Read the database, collection name etc from appconfig. You will get this information from Azure portal.

DocumentClient client;
string DatabaseName = ConfigurationManager.AppSettings["database"];
string CollectionName = ConfigurationManager.AppSettings["collection"];
string endpointUrl = ConfigurationManager.AppSettings["endpoint"];
string authorizationKey = ConfigurationManager.AppSettings["authKey"];

Make the client as follows:

using (client = new DocumentClient(new Uri(endpointUrl), authorizationKey,
new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp }))
{
}

and then get the partition key ranges

FeedResponse pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(
                                      collectionUri,
                                      new FeedOptions
                      {RequestContinuation = pkRangesResponseContinuation });

partitionKeyRanges.AddRange(pkRangesResponse);
pkRangesResponseContinuation = pkRangesResponse.ResponseContinuation;

and then just call ExecuteNextAsync for every partition key ranges


 foreach (PartitionKeyRange pkRange in partitionKeyRanges){
                string continuation = null;
                checkpoints.TryGetValue(pkRange.Id, out continuation);
                IDocumentQuery<Document> query = client.CreateDocumentChangeFeedQuery(
                    collectionUri,
                    new ChangeFeedOptions
                    {
                        PartitionKeyRangeId = pkRange.Id,
                        StartFromBeginning = true,
                        RequestContinuation = continuation,
                        MaxItemCount = -1,
                        // Set reading time: only show change feed results modified since StartTime
                        StartTime = DateTime.Now - TimeSpan.FromSeconds(30)
                    });
                while (query.HasMoreResults)
                {
                    FeedResponse<dynamic> readChangesResponse = query.ExecuteNextAsync<dynamic>().Result;

                    foreach (dynamic changedDocument in readChangesResponse)
                    {
                        Console.WriteLine("document: {0}", changedDocument);
                    }
                    checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation;
                }
}

If you have multiple readers, you can use ChangeFeedOptions to distribute read load to different threads or different clients. This is it, with these few lines of code you will start reading the change feed. Get the code from here.

Here the last line ResponseContinuation has the last logical sequence number (LSN) of the document, which will be used next time to read new documents after this sequence numbers. Using StartTime of ChangeFeedOption you can widen your net to get the documents. So, If your ResponseContinuation is null, but your StartTime goes back in time then you will get all the documents change since StartTime. But, if your ResponseContinuation has a value then system will get you all the documents since that LSN.

Side Note: One more thing to note, ETag on FeedResponse is different than the _etag you see on the document. _etag is an internal identifier and used to concurrency, it tells about the version of the document and ETag is used for sequencing the feed.

So, you see your checkpoint array is just keeping LSN for each partition. But if you don’t want to deal with the partitions, checkpoints, LSN, Startime etc the simpler option is to use the Change Feed Processor Library.

 Using Change Feed Processor Library 

Azure Cosmos DB Change Feed Processor library, can help you easily distribute event processing across multiple consumers. This library simplifies reading changes across partitions and multiple threads working in parallel.

The main benefit of Feed Processor Library is that you don’t have to manage the each partition, continuation token etc and you don’t have to poll each collection manually.

The FP Library simplifies reading changes across partitions and multiple threads working in parallel.  Change Feed Processor automatically manages reading changes across partitions using a lease mechanism. As you can see in the below image, If I start two clients who are using Processor Library they divide the work among themselves. You can keep increasing the clients and they can keep dividing the work among themselves.

ChangeFeed2.PNG

I started the left client first and it started monitoring all the partitions, then I started the second client and then first let go some of the leases to second one. As you can see this is the nice way to distribute the work between different machines and clients.

To implement Feed library you have to do following:

  1. Implement a DocumentFeedObserver object which implements IChangeFeedObserver.
  2. Implement a DocumentFeedObserverFactory, which implements IChangeFeedObserverFactory.
  3. In the CreateObserver method of DocumentFeedObserverFacory, instantiate ChangeFeedObserver which you made in step 1 and return it.
  4. Instantiate DocumentObserverFactory.
  5. Instantiate a ChangeFeedEventHost
    ChangeFeedEventHost host = new ChangeFeedEventHost(
                     hostName,
                     documentCollectionLocation,
                     leaseCollectionLocation,
                     feedOptions,
                     feedHostOptions);

    Register the DocumentFeedObserverFactory with host.

That’s it. After these few steps you will start seeing the document come in DocumentFeedObserver ProcessChangesAsync method.

Here is the code for step 3.


public IChangeFeedObserver CreateObserver()
{
          DocumentFeedObserver newObserver = new DocumentFeedObserver(this.client,                                                                                         this.collectionInfo);
          return newObserver;
}

 

Here is the code for step 4 & 5


ChangeFeedOptions feedOptions = new ChangeFeedOptions();
feedOptions.StartFromBeginning = true;

ChangeFeedHostOptions feedHostOptions = new ChangeFeedHostOptions();

//Customizing lease renewal interval to 15 seconds. So the if
feedHostOptions.LeaseRenewInterval = TimeSpan.FromSeconds(15);

using (DocumentClient destClient = new DocumentClient(destCollInfo.Uri, destCollInfo.MasterKey))
{
        DocumentFeedObserverFactory docObserverFactory = new DocumentFeedObserverFactory(destClient, destCollInfo);
        ChangeFeedEventHost host = new ChangeFeedEventHost(hostName, documentCollectionLocation, leaseCollectionLocation, feedOptions, feedHostOptions);
        await host.RegisterObserverFactoryAsync(docObserverFactory);
        await host.UnregisterObserversAsync();
}

Complete code you will find here which shows step 1 & 2 and all other steps.

The best option to read the change feed of your collection is to use server less function of Azure. Now Azure functions and Cosmos have native integration.

How focused are you?

Your focus is the most precious thing you have. To be able to focus on one task at a time is the most important skill you need to have.

However, today we have more distractions than ever before. I wrote an application to measure how much time I spend focused on doing one task.  Here is a screen shot of my activity, it tells me how much my mind is thrashing,  and I am jumping from one application to another.

My aim is to see big block of colors on this application which tells me If I was focused on one thing or not. Too many different color lines means I am doing too much time-slicing and I am distracted.

flower

Every 10 sec, this application is making a log of my activity on my machine, and it shows an activity graph with different colors of lines per activity. This is a very simple application, It just log my activity on my PC. Code is on GitHub and you can download this application here.

Here are list of things, you can do to be less distracted:

  • Turn off all the notification on the PC and phone
  • Absolutely, you must turn off email. It is a productivity killer. I check my mail every few hours.
  • Don’t check twitter, FB and other social sites too often.
  • One most annoying distraction is when you open a web page to do something, and it by default start showing you todays news … and you end up reading the news and realize after 15 min that you open the web page to do something important. Turn off the news on default new page.
  • I know Slack, Team, Skype and communicator all are important part of your work, but I say you should turn them off too to avoid being distracted.

Webpack tutorial for beginners

Learn Webpack basics. In this screen cast we start with plain vanilla JavaScript, and then convert the code to different modules AMD and then ES6 module. And on every step of the way we will see how webpack makes our life easy and how seamlessly it works with different module system.

We will also learn how webpack is not only good for module bundling but how it can be extended for CSS, SASS and Lint etc.

webpack