Saturday, October 20, 2012

BizTalk 2010 R2 CTP: Azure Service Bus Integration–Part 5 Sending messages to Service Bus Queues using Sessions

 

What are Service Bus Sessions?

Service Bus Sessions are actually a rather broad subject as there are a few different scenarios in which they can be used.  At its simplest description I consider Service Bus Sessions to be a way to relate messages together.  More specifically here are a few ways in which they can be used:

  • To address the Maximum message size constraint.  Service Bus Queues can support messages that have a size of 256 KB or smaller.  Using Sessions allow us to break a larger message down into smaller messages and then send them over the wire.  A consumer, or receiver, can then receive all of these message “chunks” and aggregate them together.
  • To support receiving a related set of messages in First In First Out (FIFO) fashion
  • Allows for affinity between a consumer and a Service Bus Queue in competing consumer scenarios.  Imagine having 3 consuming clients all trying to receive messages from the same Service Bus Queue.  Under normal circumstances you cannot be assured that one receiver will receive all messages within a message set.  One can expect the messages to be distributed amongst the clients as each consumer “competes” for the right to process a particular message.  In this scenario, once a receiver has started to process a message within a session, that consumer will process all messages within that session barring some sort of application crash.
  • In some scenarios, using a Service Bus Session allows for routing of messages.  Within a receiver, you can specify an explicit Session that you are interested in.  So in some ways a Session can be used almost like a filter.  I am not suggesting that this approach be used instead of Service Bus Topics/Subscriptions, but there may be a specific business requirement to do this.

Why are Service Bus Sessions important in BizTalk processing?

BizTalk deals with a variety of different messaging scenarios in many different industry verticals.  Supporting Service Bus Sessions is just another tool in the the BizTalk toolbox for supporting new requirements.  A scenario that I came up with is dispatching messages.  For instance if we wanted to load up a field worker with all of his orders, wouldn’t it be nice to have all of his orders sent to him as a batch?  As opposed to him receiving some of his orders only to receive more orders later on.  For instance he may have driven by one of his customers already because the messages that he receive were out of order and other field workers were also receiving their orders which delayed him in receiving all of his.

image

Putting it together – Modifying Service Bus Queue

A pre-requisite for this type of messaging scenario to work is configuring our Service Bus Queue to support Sessions.  This can be enabled in a couple different ways:

  • When creating a queue from within the Azure Portal, we can check the Enable sessions checkbox.

image

  • When using the QueueDescription class we can set the RequiresSession property to true.

NamespaceManager namespaceClient = new NamespaceManager(serviceUri, credentials);

if (!namespaceClient.QueueExists(Sender.QueueName))
        {

            QueueDescription queueDescription = new QueueDescription(Sender.QueueName)
            {
                RequiresSession = true
            };
            namespaceClient.CreateQueue(queueDescription);
          
        }

BizTalk Configuration

In order to keep the solution very simple, we will create:

  • Two Receive Ports
  • A Receive Location for each Receive Port.  The purpose of these Receive Locations is to simply inject messages into BizTalk so that we can set the SessionID property on the Send Ports.
  • 2 Send Ports
    • One for Mike
    • One for Joe
  • Each Send Port will have a filter for the corresponding Receive Port.  Within each Send Port we will configure a SessionID .

image

The other Send Port will use the same URI, however it will have a different SessionID value which will be Joe.

image

 

Service Bus Queue Client

The code below will make a connection to our session-ful Service Bus Queue and retrieve all messages that have a SessionId of Mike.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System.Runtime.Serialization;
using BrokeredMessageToBizTalk;

namespace RetrieveServiceBusSession
{
    class Receiver
    {

        const string QueueName = "<your_sessionful_queue>";
        static string ServiceNamespace = "<your_namespace>";
        static string IssuerName = "<your_issuerName>";
        static string IssuerKey = "<your_IssuerKey>";
        static void Main(string[] args)
        {
            TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider(Receiver.IssuerName, Receiver.IssuerKey);
            Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", Receiver.ServiceNamespace, string.Empty);

            MessagingFactory factory = null;

            factory = MessagingFactory.Create(serviceUri, credentials);

            QueueClient sessionQueueClient = factory.CreateQueueClient(Receiver.QueueName);

            //Create sessionQueueClient and subscribe to SessionIDs that have a value of "Mike"
            MessageSession sessionReceiver = sessionQueueClient.AcceptMessageSession("Mike", TimeSpan.FromSeconds(60));
            BrokeredMessage receivedMessage;

                while ((receivedMessage = sessionReceiver.Receive(TimeSpan.FromSeconds(60))) != null)
                {
                    var data = receivedMessage.GetBody<PowerOut>(new DataContractSerializer(typeof(PowerOut)));
                    Console.WriteLine(String.Format("Customer Name: {0}", data.CustomerName));
                    Console.WriteLine("SessionID: {0}", receivedMessage.SessionId);
                    //remove message from Topic
                    receivedMessage.Complete();
                }
          
            Console.WriteLine("All received on this session...press enter to exit");
            Console.Read();
        }
    }
}

 

The code itself is very similar to that of some of my previous blog posts on ServiceBus integration.  The main difference is instantiating a MessionSession object.

MessageSession sessionReceiver = sessionQueueClient.AcceptMessageSession("Mike", TimeSpan.FromSeconds(60));

Within this line of we are indicating that we want to receive messages that belong to the Mike Session.  We can also provide a TimeSpan as an argument to specify the duration in which we want to receive from this Session.  Setting this value is more useful when we are looking for any available Session as it allows all messages within a Session to be processed before moving onto the next Session.

Testing

I have two sets of messages here.  Two of the messages will be routed through the Joe Send Port and subsequently the SessionID for these two messages will be set to Joe. The other two messages will be routed through the Mike Send Port and subsequently will have its SessionID property set to  Mike.

image

As mentioned previously, both Send Ports are configured to send to the same Service Bus Queue.  When we do run our client application, the expectation is that messages belonging to the Mike Session will be retrieved.  The Joe Messages will remain in the Queue until another receiver pulls them down or the the Time To Live (TTL) threshold has been exceeded.

When we start our Consumer application we do discover that the “Mike” messages are processed.

image

So what happened to the other messages?

The “Joe” messages are still in our Queue.  If we navigate to our Windows Azure Portal, we will discover that our Queue Length is set to 2.

image

So how do we get these messages out?

We have a couple options, we can create another MessageSession instance and retrieve all Messages belonging to the Mike Session or we can not specify a Session and our client will look for the next available Session which in this case will be the Mike Session.

Let’s go with the second option and retrieve the next available session.  In order to do so we need to change the following line of code from

  MessageSession sessionReceiver = sessionQueueClient.AcceptMessageSession("Mike", TimeSpan.FromSeconds(60));

to

  MessageSession sessionReceiver = sessionQueueClient.AcceptMessageSession(TimeSpan.FromSeconds(60));

We essentially are no longer specifying a specific Session that we are interested in and are now interested in any Session.

I will now process another 4 files; 2 will belong to the Joe Session and 2 will belong to the Mike Session.  What we expect to happen is that all 4 Joe messages will be processed since it is the next available Session.

image

So this proves that we have In Order Delivery occurring at the Session level.  Initially our Mike Session was processed which left our Joe messages outstanding.  We then loaded 4 more messages to the Queue and since the Joe messages were first in, they were processed first.  Our remaining 2 messages that now belong to Mike can be retrieved by starting up our Client Application once again.

image

Note:

Within my BizTalk Send Ports I statically configured my SessionID.  This isn’t very practical in the “Real World” but it was easy to demonstrate for the purpose of this blog post.  Much like other BizTalk context properties the SessionID property is available and can be set dynamically within an Orchestration Message Assignment shape or a Pipeline Component.

image

Conclusion

Overall I found this functionality pretty neat. I do think that it is another capability that we can leverage to support more granular control over message processing.  I do like the opportunity to group messages together and treat them as a batch.  This also works when dealing with message size limitations as we can stich a bunch of smaller messages together that collective make up a large message.

Tuesday, October 2, 2012

BizTalk 2010 R2 CTP: Azure Service Bus Integration–Part 4 Sending Messages to Service Bus Topics

 

Back again with Part 4 in this series.  This time around we are going use BizTalk to send a message to an Azure Service Bus Topic using the new SB-Messaging Adapter.

What is the difference between a Service Bus Queue and Topic?

Conceptually they are very similar in the sense that they both provide a durable message store within Azure where Publishers can publish messages and Consumers can consume messages.  Queues store messages in a First In First Out (FIFO) manner and provide a competing consumer experience.  What this means is that if we have two queue clients that are polling the same queue then only one client will receive a copy of any given message.

Topics provide some additional features that really support a Publish/Subscribe (Pub/Sub) architecture.  Topics allow for multiple clients to subscribe to the same Topic through subscriptions.  Subscriptions also support SQL92 expressions and allow a consumer to filter messages out based upon BrokeredMessage Properties.

Scenario

In Part 3 I  discussed how our Work Order Management system can notify our Major Account System when an Estimated Time of Restore is available.  This allows Major Account Representatives the ability to reach out to customers to share the good/bad news about when their power will be restored.

We are going to build upon this scenario but instead of sending all messages to a Queue we are going to send it to a Azure Service Bus Topic instead.  Due to the , fictional, growth of our company there are now two distinct groups responsible for Major accounts.  One for the city of Edmonton and another for the city of Calgary. (Both of these cities exist within Alberta, Canada)  Each queue Subscription client will now subscribe to events for their city.  BizTalk will however just send messages to one Topic and let the Service Bus work out which message needs to be delivered to each client.

Modifying Client Application(s)

Calgary Application

  • Once again we are going to leverage the work that we have done in previous posts(In this case Part 3).  The first thing we will do is rename our previous C# Console project from BrokeredMessageFromBizTalk to BrokeredMessageFromBizTalkCalgary.

image

  • Next we will modify the namespace for the Program.cs file so that it reflects the name of the project (BrokeredMessageFromBizTalkCalgary).  Note we will leave the namespace of our EstimatedTimeToRestore.cs as is.
  • Below is the entire code listing for our Calgary Client.  Within this code we will use our connectivity parameters to establish a connection.  Once we have a connection we will see if the Calgary subscription currently exists.  If it does not, we will create it.  We will also create it and add a SQLFilter.  This particular filter is interested in messages where the Address Brokered Message Property equals “Calgary”.

using System.IO;
using System.Runtime.Serialization;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using BrokeredMessageFromBizTalk;

 

namespace BrokeredMessageFromBizTalkCalgary
{
    class Receiver
    {
        const string TopicName = "<your_topic>";
        static string ServiceNamespace = "<your_namespace>";
        static string IssuerName = "<your_owner>";
        static string IssuerKey = "<your_key>";
        static string connectionString = String.Format("Endpoint=sb://{0}.servicebus.windows.net/;SharedSecretIssuer={1};SharedSecretValue={2}",
             ServiceNamespace, IssuerName, IssuerKey); 

        static void Main(string[] args)
        {
           
            TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(
                Receiver.IssuerName, Receiver.IssuerKey);
            Uri uri = ServiceBusEnvironment.CreateServiceUri("sb", Receiver.ServiceNamespace, string.Empty);
            MessagingFactory messagingFactory = MessagingFactory.Create(uri, tokenProvider);

 

            // Create a "Calgary" SQLFilter
           SqlFilter calgaryFilter = new SqlFilter("Address = 'Calgary'");

            //Check to see if Calgary Subscription exists, if it does not then create it
            NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(Receiver.connectionString);
            if (!namespaceManager.SubscriptionExists(Receiver.TopicName, "Calgary"))
            {

                //Create Calgary Subscription with our calgaryFilter
                namespaceManager.CreateSubscription(Receiver.TopicName, "Calgary", calgaryFilter);

            }
            //Create Subscription Client using Peek/Lock Mode
            SubscriptionClient sc = messagingFactory.CreateSubscriptionClient(Receiver.TopicName , "Calgary", ReceiveMode.PeekLock);
            BrokeredMessage bm;
            while ((bm = sc.Receive(new TimeSpan(hours: 0, minutes: 0, seconds: 20))) != null)
            {
                var data = bm.GetBody<EstimatedTimeToRestore>(new DataContractSerializer(typeof(EstimatedTimeToRestore)));
                Console.WriteLine(String.Format("An estimated time of restore {0} has been received for {1}", data.RestoreTime, data.CustomerName));
                Console.WriteLine("Brokered Message Property Address has a value of {0}", bm.Properties["Address"]);
                //Remove message from Topic
                bm.Complete();
            }

        }
    }
}

Edmonton Application

  • Within our existing Visual Studio solution we are going to add another C# Console Application called BrokeredMessageFromBizTalkEdmonton.

image

  • Within this application we will create a reference to our “Calgary” project.  We need to do this so that we have access to the EstimatedTimeToRestore class. This is accomplished by including the following statement:
    • using BrokeredMessageFromBizTalk;
  • Otherwise we can simply copy and past the “Calgary” Program.cs code and adapt it for our Edmonton scenario.The main areas that we need to focus on are creating a unique Edmonton Subscription and ensuring that our Edmonton filter matches Edmonton messages and not Calgary messages.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.IO;
using System.Runtime.Serialization;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using BrokeredMessageFromBizTalk;

 

namespace BrokeredMessageFromBizTalkEdmonton
{
    class Receiver
    {
        const string TopicName = "<your_topic>";
        static string ServiceNamespace = "<your_namespace>";
        static string IssuerName = "<your_owner>";
        static string IssuerKey = "<your_key>";
        static string connectionString = String.Format("Endpoint=sb://{0}.servicebus.windows.net/;SharedSecretIssuer={1};SharedSecretValue={2}",
             ServiceNamespace, IssuerName, IssuerKey);

        static void Main(string[] args)
        {

            TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(
                Receiver.IssuerName, Receiver.IssuerKey);
            Uri uri = ServiceBusEnvironment.CreateServiceUri("sb", Receiver.ServiceNamespace, string.Empty);
            MessagingFactory messagingFactory = MessagingFactory.Create(uri, tokenProvider);

 

            // Create a "Edmonton" filtered subscription
            SqlFilter edmontonFilter = new SqlFilter("Address = 'Edmonton'");

            //Create NamespaceManager object and see if Edmonton Subscription exists for our Topic
            NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(Receiver.connectionString);
            if (!namespaceManager.SubscriptionExists(Receiver.TopicName, "Edmonton"))
            {
                //If the Subscription does not exist, create it
                namespaceManager.CreateSubscription(Receiver.TopicName, "Edmonton", edmontonFilter);

            }

            //Create Subscription client and use Peek/Lock mechanism for delivery
            SubscriptionClient sc = messagingFactory.CreateSubscriptionClient(Receiver.TopicName, "Edmonton", ReceiveMode.PeekLock);
            BrokeredMessage bm;
            while ((bm = sc.Receive(new TimeSpan(hours: 0, minutes: 0, seconds: 20))) != null)
            {
                var data = bm.GetBody<EstimatedTimeToRestore>(new DataContractSerializer(typeof(EstimatedTimeToRestore)));
                Console.WriteLine(String.Format("An estimated time of restore {0} has been received for {1}", data.RestoreTime, data.CustomerName));
                Console.WriteLine("Brokered Message Property Address has a value of {0}", bm.Properties["Address"]);
                //remove message from Topic
                bm.Complete();
            }

        }
    }
}

Creating Topic

Much like we did in the previous blog post where we created a Service Bus Queue, we can also create a Service Bus Topic from the http://windowsazure.com portal

  • To create a new Topic simply click on the New Topic button.

image

  • Provide a Topic Name.  For the purpose of this post I am using PowerRestoreTopic.  We can then leave the default settings as is.

image

Note: We can also create Topics via code by using the NamespaceManager class much like we did for Subscriptions.  I decided to use the portal just to demonstrate that we have a few options when creating Service Bus artifacts.

Modifying BizTalk Solution

Once again we are going to keep the BizTalk Solution changes to a minimum and will continue to use a Messaging Only scenario.

  • The first change that we are going to make is to our BrokeredMessagePropertySchema by adding our Address field.  Note if you recall from our C# projects that we created SQLFilters based upon the Address property being equal to either Calgary or Edmonton(depending upon the application).  By adding this property to our PropertySchema we can now promote this value in our XSD.image
  • Next we will modify the ETRfile.xsd and promote the Address field.  We will then map this field to the property we just created in our PropertySchema.

image

  • We can now deploy our BizTalk Application and bounce any related Host Instances.
  • While inside the BizTalk Administration Console, we need to modify our SB-Messaging Send Port called SendEstimatedTimeToRestoreToAzure.  Previously we were using this Send Port to send messages to a Service Bus Queue.  We will now be sending messages to a Topic which uses the same convention within the URI.  Instead of including our Queue Name in the URI we need to specify the Topic Name.  In our scenario I am going to specify PowerRestoreTopic.

image

  • Our Authentication tab remains unchanged. We will continue to use the same Issuer Name and Issuer Key as in our previous example.

image

  • Finally, we need to click on the Properties tab so that we can enable our custom Brokered Message Properties. To enable these properties we need to provide the namespace for our Property Schema that we previously modified.  As you may recall, we added a property called Address that we will use to route messages within our Azure Topic Subscription.

image

Testing our Solution

  • We are going to leverage the Receive Location that we created in Part 3 of this blog series.  Our solution is expecting messages that conform to the ETRfile.xsd schema.  Below, I have included two sample files.  One contains an Address that belongs in Edmonton and the other one has an Address that belongs in Calgary.  We will drop both of these files within the same receive location and they will be sent to the same PowerRestoreTopic.

image

  • Once the messages have been sent we will launch our two client applications (Calgary and Edmonton).  The expectation is that the Calgary application will retrieve the Calgary Message and the Edmonton application will retrieve the Edmonton Message.
  • As expected, our Edmonton client retrieved the Edmonton message and the Calgary client retrieved the Calgary message.

image

Conclusion

In this post we used existing BizTalk skills to send typed messages to an Azure Service Bus Topic.  We promoted a property within BizTalk that was converted to a Brokered Message Property that can be used within Subscription Filters in order to route messages between different consumers.

Once again the experience is very seamless. I really do like the .Net API for Service Bus.  I think it is intuitive and is very logical.  As simple as it is to use this API it is hard to imagine anything being much simpler. But it has been done. The BizTalk team has made interfacing with the Service Bus even easier than using the API.  From a BizTalk perspective the Service Bus is just another endpoint and requires some additional configuration.  I do find that this aligns with some of the early goals of BizTalk.