by community-syndication | Sep 30, 2010 | BizTalk Community Blogs via Syndication
The following post is intended to walk you through the implementation of a loosely coupled communication pattern between processes and role instances running on Windows Azure platform. The scenario discussed and implemented in this paper is based off a real-world customer project and built upon a powerful combination of Windows Azure AppFabric Service Bus, Observer design pattern and Parallel LINQ (PLINQ).
Background
In the world of highly distributed applications, information exchange often occurs between a variety of tiers, layers and processes that need to communicate with each other either across disparate computing environments or within a proximity of given local boundaries. Over the years, the communication paradigms and messaging patterns have been able to support the many different scenarios for inter- and intra-process data exchange, some of which were durable and successful whilst the others have suffered from a limited existence.
The design patterns for high-scale cloud computing may result in breaking up of what could have been monolithic in the past and could communicate through direct method invocation into more granular, isolated entities that need to run in different processes, on different machines, in different geographical locations. Along those lines, the Windows Azure platform brings in its own unique and interesting challenges for creating an efficient implementation of the cross-process communication concepts in the Cloud to be able to connect web and worker roles together in a seamless and secure fashion.
In this paper, we take a look at how the Windows Azure platform AppFabric Service Bus helps address data exchange requirements between loosely coupled cloud application services running on Windows Azure. We explore how multicast datagram distribution and one-way messaging enable one to easily build a fully functional publish/subscribe model for Azure worker roles to be able to exchange notification events with peers regardless of their actual location.
For sake of simplicity, the messaging concept discussed in this paper will be further referenced to as “inter-role communication” although one should treat this as a communication between Azure role instances given the nature of the definition of a “role” in the Windows Azure platform’s vocabulary.
The Challenge
The inter-role communication between Windows Azure role instances is not a brand new challenge. It has been on the agenda of many Azure solution architects for some time until the support for internal endpoints has been released. An internal endpoint in the Windows Azure roles is essentially the internal IP address automatically assigned to a role instance by the Windows Azure fabric. This IP address along with a dynamically allocated port creates an endpoint that is only accessible from within a hosting datacenter with some further visibility restrictions. Once registered in the service configuration, the internal endpoint can be used for spinning off a WCF service host in order to make a communication contract accessible by the other role instances.
Consider the following example where an instance of the Process Controller worker role needs to exchange units of work and other processing instructions with other roles. In this example, the internal endpoints are being utilized. The Process Controller worker role needs to know the internal endpoints of each individual role instance which the process controller wants to communicate with.
Figure 1: Roles exchanging resource-intensive workload processing instructions via internal endpoints.
The internal endpoints provide a simplistic mechanism for getting one role instance to communicate to the others in 1:1 or 1:many fashion. This leads to a fair question: where is the true challenge? Simply put, there aren’t any. However, there are constraints and special considerations that may de-value the internal endpoints and shift focus over to a more flexible alternative. Below are the most significant “limitations” of the internal endpoints:
-
Internal endpoints must be defined ahead of time – these are registered in the service definition and locked down at design time;
-
The discoverability of internal endpoints is limited to a given deployment – the role environment doesn’t have explicit knowledge of all other internal endpoints exposed by other Azure hosted services;
-
Internal endpoints are not reachable across hosted service deployments – this could render itself as a limiting factor when developing a cloud application that needs to exchange data with other cloud services deployed in a separate hosted service environment even if it’s affinitized to the same datacenter;
-
Internal endpoints are only visible within the same datacenter environment – a complex cloud solution that takes advantage of a true geo-distributed deployment model cannot rely on internal endpoints for cross-datacenter communication;
-
The event relay via internal endpoints cannot scale as the number of participants grows – the internal endpoints are only useful when the number of participating role instances is limited and with underlying messaging pattern still being a point-to-point connection, the role instances cannot take advantage of the multicast messaging via internal endpoints.
In summary, once a complexity threshold is reached, one may need to re-think whether or not the internal endpoints may be a durable choice for inter-role communication and what other alternatives may be available. At this point, we organically approached the section that is intended to drill down into an alternative solution that scales, is sufficiently elastic and compensates for the constraints highlighted above.
The Solution
Among other powerful communication patterns supported by the AppFabric Service Bus, the one-way messaging between publishers and subscribers presents a special interest for inter-role communication. It supports multicast datagrams which provide a mechanism for sending a single message to a group of listeners that joined the same multicast group. Simply put, this communication pattern enables communicating a notification to a number of subscribers interested in receiving these notifications.
The support for “publish/subscribe” model in the AppFabric Service Bus promotes the relevant capabilities into a natural choice for loosely coupled communication between Azure roles. The role instances authenticate and register themselves on a Service Bus service namespace and choose the appropriate path from the global hierarchical naming system on which the subscriber would be listening to incoming events. The Service Bus creates all underlying transport plumbing and multicasts published events to the active listener registrations on a given rendezvous service endpoint (also known as topic).
Figure 2: Roles exchanging resource-intensive workload processing instructions via Service Bus using one-way multicast (publish/subscribe).
The one-way multicast eventing is available through the NetEventRelayBinding WCF extension provided by the Azure Service Bus. When compared against internal endpoints, the inter-role communication that uses Service Bus’ one-way multicast benefits from the following:
-
Subscribers can dynamically register themselves at runtime – there is no need to pre-configure the Azure roles to be able to send or receive messages (with an exception of access credentials that are required for connectivity to a Service Bus namespace);
-
Discoverability is no longer an issue as all subscribers that listen on the same rendezvous endpoint (topic) will equally be able to receive messages, irrespectively whether or not these subscribers belong the same hosted service deployment;
-
Inter-role communication across datacenters is no longer a limiting factor – Service Bus makes it possible to connect cloud applications regardless of the logical or physical boundaries surrounding these applications;
-
Multicast is a highly scalable messaging paradigm, although at the time of writing this paper, the number of subscribers for one-way multicast is constrained to fit departmental scenarios with fewer than 20 concurrent listeners (this limitation is subject to change).
Now that all the key benefits are evaluated, let’s jump to the technical implementation and see how a combination of the Service Bus one-way multicast messaging and some value-add capabilities found in the .NET Framework 4.0 help create a powerful communication layer for inter-role data exchange.
Note that the sample implementation below has been intentionally simplified to fit the scope of a demo scenario for this paper.
Defining Application Events
To follow along, download the full sample code from the MSDN Code Gallery.
In the first step, we define a data contract for application-specific events that will be exchanged between Azure roles. The instances of these application events carry the essential informational payload and are purely used as event data transfer objects. A worker role instance that originates these events will be treated as a publisher whereas other role instances participating in the data exchange will act as subscribers.
Each application event must be decorated with a DataContract attribute with a DataMember attribute applied to all fields and properties carrying event data. This is to make the event’s data serializable. In the example below, we defined 2 sample events:
/// Represents an inter-role notification requesting a graceful recycle of all instances of a particular role.
[DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)]
public class RoleGracefulRecycleEvent
{
/// The name of the role the instances of which are being asked to recycle themselves gracefully.
[DataMember]
public string RoleName { get; private set; }
/// Creates a new instance of the RoleGracefulRecycle event for the specified role.
public RoleGracefulRecycleEvent(string roleName)
{
RoleName = roleName;
}
}
/// Represents an inter-role notification requesting a refresh of the specified configuration section.
[DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)]
public class RoleConfigurationSectionRefreshEvent
{
/// The name of the configuration section to be unloaded and re-loaded from the configuration store.
[DataMember]
public string SectionName { get; private set; }
/// Creates a new instance of the ConfigurationSectionRefresh event for the specified configuration section.
public RoleConfigurationSectionRefreshEvent(string sectionName)
{
SectionName = sectionName;
}
}
Recommendations
The one-way multicast mode is more constrained than the other bindings since it requires message duplication and volatile storage on the cloud side. In the current release of the AppFabric Service Bus, messages sent through the one-way multicast binding are constrained to the maximum size of 60KB in wire format. With its default settings, the NetEventRelayBinding supports SOAP 1.2 messaging over TCP using .NET Framing and .NET binary serialization. Consequently, do not attempt to publish events the serialized image of which exceeds the above limitation, otherwise a fault will be encountered.
|
Now that application-specific events are defined, the next step is to define a generic event that would encapsulate both the event payload and originator’s identity into a single transmittable entity.
Defining Inter-Role Communication Event
In this step, we implement a generic event object comprised of the sender’s identity (sender instance ID) and application event payload defined earlier.
/// Represents a generic event for inter-role communication carrying the sender's identity and useful event payload.
[DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)]
[KnownType(typeof(RoleConfigurationSectionRefreshEvent))]
[KnownType(typeof(RoleGracefulRecycleEvent))]
public class InterRoleCommunicationEvent
{
/// The role instance ID of the sender.
[DataMember]
public string SenderInstanceId { get; private set; }
/// The event payload, namely a DataContract-serializable object that carries the information about the event.
[DataMember]
public object Payload { get; private set; }
/// Creates a new instance of the inter-role communication event originated from the specified role instance ID
/// and specified payload.
public InterRoleCommunicationEvent(string senderInstanceId, object payload)
{
SenderInstanceId = senderInstanceId;
Payload = payload;
}
}
Note that the Payload member is not strongly typed as it is being used as a versatile event data storage slot. As the result, the data contract serializer needs to be explicitly made aware of possible payload types when serializing or deserializing instances of the inter-role communication event. This is achieved through annotating the InterRoleCommunicationEvent class with one or more KnownType attributes.
Recommendations
The requirement for explicitly registering all supported application event types using KnownType attribute may be seen as an undesirable limitation. Should a more flexible design be required, consider using a generic event that can represent different types of application events through the payload (e.g. event context or data properties) as opposed to using a dedicated .NET type. Another alternative would be a polymorphic event object model in which all application events are derived from a base class which in turn is Message Contract-serializable to bypass the need for explicit registration of known types.
|
At this point, the inter-role communication event object model is defined. The next step is to design a communication service contract.
Defining Communication Contract
The inter-role communication pattern discussed in this paper implies that Azure role instances communicate with each other via the one-way multicast provided by the Azure Service Bus SDK’s WCF binding called netEventRelayBinding. This requires a WCF service contract to be defined. The service contract that we need to model for inter-role communication will serve both the publisher and subscriber through the following composition:
/// Defines a contract for inter-role communication using push-based notifications.
[ServiceContract(Name = "IInterRoleCommunicationService", Namespace = WellKnownNamespace.ServiceContracts.Infrastructure)]
public interface IInterRoleCommunicationServiceContract : IObservable<InterRoleCommunicationEvent>
{
/// <summary>
/// Multicasts the specified inter-role communication event to one or more subscribers.
/// </summary>
/// <param name="e">The inter-role communication event to be delivered to the subscribers.</param>
[OperationContract(IsOneWay = true)]
void Publish(InterRoleCommunicationEvent e);
}
The above service contract carries 2 fundamentally important design considerations. First, it defines a single one-way WCF service operation that is intended to receive events from the remote publishers (specifically, Azure role instances). Secondly, it enables subscribers to register themselves and be notified when an inter-role communication event arrives. The latter is delivered through the implementation of the Observer design pattern by leveraging the IObservable<T> interface available in the .NET Framework 4.0.
The IObservable<T> interface provides a foundation for push-based notifications enabling subscribers to enroll themselves into receiving strongly-typed events of type T. The end implementations of this interface act as providers, essentially pushing notifications to all active subscribers, also known as observers.
In the context of the inter-role communication service contract, we composed the contract definition by inheriting from the IObservable<T> interface. As a result, the communication service contract is also responsible for implementing the Subscribe clause making it a true publish/subscribe agreement.
public interface IObservable<InterRoleCommunicationEvent>
{
/// <summary>
/// Tells the provider that an observer wants to receive notifications.
/// </summary>
IDisposable Subscribe(IObserver<InterRoleCommunicationEvent> observer);
}
Note that the Subscribe method doesn’t have to be defined as a WCF operation as it will not be invoked by remote clients. Consequently, this method is not annotated with an OperationContract attribute.
Recommendations
The Observer design pattern provides a powerful mechanism for building loosely coupled messaging scenarios based on push-based notifications. When used correctly and wisely, this paradigm would be a natural choice for consolidating multiple patterns such as event-based asynchronous messaging, .NET events and delegates into a common unified model. Consider adopting the Observer pattern wherever appropriate.
|
Once the communication contract is defined, we can proceed with its underlying implementation.
Implementing Communication Contract
The inter-role communication contract requires a fairly simple implementation that maintains an internal list of active subscribers (observers) and is responsible for storing references to all registered observers and ensuring that they are valid before it relays notifications to them. The contract implementation does not make any assumptions about the number of observers or the order in which notifications will be sent out.
/// Implements the contract for inter-role communication using push-based notifications.
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
public sealed class InterRoleCommunicationService : IInterRoleCommunicationServiceContract
{
private readonly IList<IObserver<InterRoleCommunicationEvent>> subscribers = new List<IObserver<InterRoleCommunicationEvent>>();
#region IInterRoleCommunicationServiceContract implementation
/// All methods defined in the IInterRoleCommunicationServiceContract interface are implemented here.
#endregion
#region IObservable implementation
/// All methods defined in the IObservable interface are implemented here.
#endregion
}
Recommendations
Consider using a thread-safe collection whenever multiple threads will be registering and unregistering subscribers concurrently. The System.Collections.Concurrent namespace in the .NET Framework 4.0 provides several thread-safe collection classes that can be used in place of standard collections and lists which are not guaranteed to be thread safe.
|
First, we implement the Publish method prescribed by the IInterRoleCommunicationServiceContract interface:
/// Multicasts the specified inter-role communication event to one or more subscribers.
public void Publish(InterRoleCommunicationEvent e)
{
// Validate input parameters.
Guard.ArgumentNotNull(e, "e");
// Log an entry point.
var callToken = TraceManager.ServiceComponent.TraceIn(e.SenderInstanceId);
try
{
// Query all registered subscribers using query parallelization provided by PLINQ.
var subscribers = from s in this.subscribers.AsParallel().AsUnordered() select s;
// Invokes the specified action for each subscriber.
subscribers.ForAll((subscriber) =>
{
try
{
// Notify the subscriber.
subscriber.OnNext(e);
}
catch (Exception ex)
{
// Do not allow any subscriber to generate a fault and affect other subscribers.
try
{
// Notifies the subscriber that the event provider has experienced an error condition.
subscriber.OnError(ex);
}
catch (Exception fault)
{
// Log an error.
TraceManager.ServiceComponent.TraceError(fault);
}
}
});
}
finally
{
// Log an exit point.
TraceManager.ServiceComponent.TraceOut(callToken);
}
}
When traversing the list of subscribers, the publisher takes advantage of the Parallel Language Integrated Query (PLINQ) and, specifically, its AsParallel operation. The primary purpose of PLINQ is to speed up the execution of LINQ queries by executing the query delegates in parallel on multi-core computers. PLINQ attempts to make full use of all system processors. It achieves this by partitioning the data source into segments, and then executing the query on each segment on separate worker threads in parallel on multiple processors. In many cases, parallel execution means that the queries complete significantly faster, most importantly when those queries are complex and time-consuming.
Therefore, PLINQ presents itself as a perfect choice for sending notifications concurrently to multiple subscribers. The subscribers can then take any actions without blocking or waiting on each other. Through parallel execution, PLINQ-enabled notifications result in significant performance improvements over traditional, serialized approach to pushing data to subscribers.
Recommendations
By using the AsParallel operator, you incur the overhead costs of partitioning the source collection and synchronizing the worker threads. The benefits of parallelization are further limited by the number of processors on the hosting machine. There are virtually no performance improvements to be gained by running multiple compute-bound threads on just one processor. In addition, since the query parallelization is fully driven by PLINQ internally, it is important to keep in mind that individual iterations in a loop may not be executed in parallel.
|
The next section drills down into the implementation of the Observer design pattern as it relates to the inter-role communication.
Implementing Observer Pattern
The implementation of the Observer pattern consists of a class that sends notifications (the provider) and a class that receives them (the observer). The providers should implement the IObservable<T> interface whereas the observers must comply with the IObserver<T> interface specification.
The IObservable<T> interface prescribes the providers to expose a single Subscribe method which gets called in order to register an observer to receive notifications. This method returns an IDisposable object that can be used to unsubscribe the observer from receiving notifications.
/// <summary>
/// Registers the specified subscriber to receive inter-role communication events.
/// </summary>
/// <param name="subscriber">The object that is to receive notifications.</param>
/// <returns>The observer's interface that enables a subscription to be cancelled by the subscriber.</returns>
public IDisposable Subscribe(IObserver<InterRoleCommunicationEvent> subscriber)
{
// Before registering a subscriber, verify whether or not it has already been registered.
// This is to prevent duplicate notifications.
if (!this.subscribers.Contains(subscriber))
{
this.subscribers.Add(subscriber);
}
// Return a special object that provides the ability to cancel this subscription.
return new UnsubscribeCallback(subscribers, subscriber);
}
The IObserver<T> interface defines the following 3 methods that each observer must implement:
- The OnNext method, which gets called by the provider to notify the observer about a new event;
- The OnCompleted method, which is called by the provider to indicate that it has finished sending notifications to observers;
- The OnError method, which is called by the provider to indicate that data is unavailable, inaccessible, or corrupted, or that the provider has experienced some other error condition.
Below is a sample implementation of an observer that receives and handles the two inter-role communication events defined in previous sections.
/// Implements a subscriber for inter-role communication events.
public class InterRoleEventSubscriberExtension : IObserver<InterRoleCommunicationEvent>
{
/// Receives a notification when a new inter-role communication event occurs.
public void OnNext(InterRoleCommunicationEvent e)
{
if (e.Payload != null)
{
if (e.Payload is RoleConfigurationSectionRefreshEvent)
{
HandleRoleConfigurationSectionRefreshEvent(e.Payload as RoleConfigurationSectionRefreshEvent);
return;
}
if (e.Payload is RoleGracefulRecycleEvent)
{
HandleRoleGracefulRecycleEvent(e.Payload as RoleGracefulRecycleEvent);
return;
}
}
}
/// Gets notified that the provider has successfully finished sending notifications to all observers.
public void OnCompleted()
{
// Doesn't have to do anything upon completion.
}
/// Gets notified that the provider has experienced an error condition.
public void OnError(Exception error)
{
// Report on the error through common logging/tracing infrastructure.
TraceManager.WorkerRoleComponent.TraceError(error);
}
private void HandleRoleConfigurationSectionRefreshEvent(RoleConfigurationSectionRefreshEvent e)
{
// TODO: Place the event handler here.
}
private void HandleRoleGracefulRecycleEvent(RoleGracefulRecycleEvent e)
{
// TODO: Place the event handler here.
}
}
Note that we handle all 2 events from within a single observer. This is purely for the purposes of simplifying the demo scenario. In the real implementations, a separate observer (subscriber) for each individual event would render itself as a more flexible design.
Implementing Inter-Role Communication Extension
In this step, we implement a component which provides an extra level of abstraction for the publish/subscribe mechanism in the inter-role communication. This component would act as an extension to the worker and web roles and will enable the Azure roles to publish and subscribe to notification events without having to know the specifics behind the underlying pub/sub infrastructure.
The extension component provides the following key capabilities:
-
Provide and manage the lifecycle of a WCF service host servicing the inter-role communication contract;
-
Reliably publish notification events via the inter-role communication service by handling and recovering from transient conditions (e.g. temporary service unavailability);
-
Provide the entry point for subscribers to register themselves to receive notification events.
When implementing the extension component, we reuse the same service contract defined for the inter-role communication in the earlier section. In addition, we implement the IDisposable interface so that the WCF client (acting publisher) and WCF service host (acting subscriber) can be reliably disposed.
// Defines a contract supported by a role extension which abstracts the pub/sub layer
// from end consumers of the inter-role communication service.
public interface IInterRoleCommunicationExtension : IInterRoleCommunicationServiceContract, IDisposable
{
}
// Implements a role extension which provides access to the inter-role communication service
// and enables registering subscriptions for inter-role communication events.
public class InterRoleCommunicationExtension : IInterRoleCommunicationExtension
{
private ReliableServiceBusClient<IInterRoleCommunicationServiceChannel> publisher;
private ReliableServiceBusHost<IInterRoleCommunicationServiceContract> subscriber;
public InterRoleCommunicationExtension()
{
// Open a WCF service host servicing the IInterRoleCommunicationService contract.
Subscriber.Open();
}
/// Publishes the specified inter-role communication event into publish/subscribe messaging infrastructure.
public void Publish(InterRoleCommunicationEvent e)
{
// Wrap a call to the Publish into a retry policy-aware scope. This is because NetEventRelayBinding may return
// a fault if there are no subscribers listening on the Service Bus URI. This behavior may be changed in the
// future releases of the Windows Azure AppFabric.
Publisher.RetryPolicy.ExecuteAction(() =>
{
Publisher.Client.Publish(e);
});
}
/// Registers the specified subscriber to receive inter-role communication events.
public IDisposable Subscribe(IObserver<InterRoleCommunicationEvent> subscriber)
{
return Subscriber.ServiceInstance.Subscribe(subscriber);
}
}
Note the two fields and their counterpart properties called Publisher and Subscriber of type ReliableServiceBusClient<T> and ReliableServiceBusHost<T> respectively. These are intended to support reliable communication via AppFabric Service Bus. This capability deserve a separate discussion.
Implementing Reliable WCF Client & WCF Service Host for Service Bus Communication
The inter-role communication via the Service Bus implies that data exchange always takes place across the wire. Although Azure role instances may be running on the same physical node and theoretically benefit from being a single-hop away from each other and not touching the network wire at all, when the Service Bus comes into play, things do look differently.
When communicating via the Service Bus, both the sender and receiver will be talking to a NLB-aware relay node that sits somewhere within the hosting datacenter. This means that a communication between Azure roles using Service Bus will always happen through the wire. As in most networking scenarios, the reliability of connections is subject to quality of networks, intermittent faults in the LAN infrastructure and other transient conditions.
In order to mitigate the adverse effects of network-level transient faults, the implementations of a WCF client and WCF service host that depend on the Service Bus to participate in inter-role communication should be built in such a way that would enable detecting and recovering (if at all possible) from a transient error. This requirement has led to introducing 2 new custom components to support the reliable communication.
The ReliableServiceBusClient<T> and ReliableServiceBusHost<T> components are designed to ensure that all major operations against WCF infrastructure such as starting up a service host or creating and opening a client communication channel respect potential transient conditions in a highly multi-tenant hosting environment such as Windows Azure.
The core capabilities provided by these components include:
-
Support for retry policy-aware Open operation against service hosts and client channels helps ensure that a WCF service host will be reliably opened or a WCF client will always be using a valid communication channel unless an unrecoverable persistent fault is encountered;
-
Support for automatic recovery of failed WCF service hosts attempts to recover a service host whenever it enters a faulted state;
-
Support for correct disposal pattern of the channel and host resources adopts the
relevant best practices and ensures that WCF objects will be cleaned up safely and accurately.
Both components share the common design principle that is comprised of the ICommunicationObject interface, the familiar IDisposable pattern and makes use of a retry policy as depicted in the following class diagram:
The inter-role communication discussed in this paper takes advantage of the above components in the following ways:
-
ReliableServiceBusHost<T> is being utilized to ensure that whenever the event subscriber attempts to register itself on the Service Bus, this registration would always be successful unless there is a non-transient condition that stops the Service Bus listener from being accepted by the frontend nodes (for instance, when a wrong access credentials are provided);
-
ReliableServiceBusHost<T> simplifies access to the singleton object instance of type InterRoleCommunicationService in order to enroll subscribers into receiving inter-role communication events;
-
ReliableServiceBusClient<T> is being used to guarantee that every possible attempt would be made when publishing an event to the Service Bus until all retry cycles are exhausted or non-recoverable condition is encountered.
Please refer to the accompanying sample code for more details on how the above components bring together some of the capabilities discussed earlier in this section.
Recommendations The implementation of ReliableServiceBusClient<T> and ReliableServiceBusHost<T> is not strictly and solely limited to the communication with the Service Bus. These components could equally provide the same capabilities when used together with regular WCF services and clients. Consider taking advantage of the patterns and best practices adopted in the above components for general WCF usage scenarios.
|
At this point, the implementation of the inter-role communication is almost complete. We are finishing off by enabling an Azure role for inter-process communication through a few extra steps.
Plugging Inter-Role Communication Extension Into Azure Role
In this last step, we bring together all the key infrastructure components that are intended to provide and support the inter-role communication. These components help abstract the mechanics of the underlying publish/subscribe architecture and greatly simplify their integration with the Azure roles.
The remaining actions can be summarized as follows:
-
Upon role initialization by the Windows Azure service runtime, the inter-role communication extension is instantiated. This results in creating and opening a WCF service host which registers itself on the Service Bus and starts listening to incoming events;
-
After the inter-role communication extension is successfully initialized, a subscription for inter-role notifications is activated through the invocation of the Subscribe method;
-
When the role instance is being recycled by the Windows Azure service runtime, the subscription gets cancelled and resources allocated by the inter-role communication extension get disposed.
public class SampleWorkerRole : RoleEntryPoint
{
private volatile IInterRoleCommunicationExtension interRoleCommunicator;
private volatile IObserver<InterRoleCommunicationEvent> interRoleEventSubscriber;
private volatile IDisposable interRoleEventSubscription;
/// Called by Windows Azure service runtime to initialize the role instance.
/// Return true if initialization succeeds, otherwise returns false.
public override bool OnStart()
{
// ...There is usually some code here...
// Use the Task Parallel Library (TPL) to provide non-blocking initialization of the
// inter-role communication infrastructure.
Task.Factory.StartNew(() =>
{
try
{
this.interRoleCommunicator = new InterRoleCommunicationExtension();
this.interRoleEventSubscriber = new InterRoleEventSubscriberExtension();
// Register the subscriber for receiving inter-role communication events.
this.interRoleEventSubscription = this.interRoleCommunicator.Subscribe(this.interRoleEventSubscriber);
}
catch (Exception ex)
{
// Report on the error through common logging/tracing infrastructure.
TraceManager.WorkerRoleComponent.TraceError(ex);
// Request that the current role instance is to be stopped and restarted.
RoleEnvironment.RequestRecycle();
}
});
// ...There is usually some more code here...
return true;
}
/// Called by Windows Azure when the role instance is to be stopped.
public override sealed void OnStop()
{
// ...There is usually some code here...
if (this.interRoleEventSubscription != null)
{
// Cancel the subscription to stop receiving inter-role communication events.
this.interRoleEventSubscription.Dispose();
}
if (this.interRoleCommunicator != null)
{
// Recycle the resources allocated by inter-role communication infrastructure.
this.interRoleCommunicator.Dispose();
}
// ...There is usually some more code here...
}
/// Called by Windows Azure after the role instance has been initialized.
/// This method serves as the main thread of execution for your role.
public override sealed void Run()
{
// ...There is usually some code here...
}
}
Note that the startup time of the inter-role communication extension is subject to the WCF infrastructure completing all the necessary work before a WCF service host spins itself off. This operation might take some time. In order to speed up the Azure role initialization, the process of instantiating the inter-role communication extension is “offloaded” to a task that runs on a separate thread. This capability is implemented by taking advantage of the Task Parallel Library (TPL) available in the .NET Framework 4.0. The TPL simplifies many aspects of multithreading programming and makes it easier to build applications that need to support parallelism and concurrency.
Recommendations The Task Parallel Library includes many features that enable the .NET application to scale better. This includes work partitioning, support for on-demand cancellation, better state management, support for parent/child task relationships and more. The TPL scales the degree of concurrency dynamically to make the most efficiently use of all available processors. This makes it easier and more efficient to execute very fine-grained parallel workloads on modern hardware. Starting with the .NET Framework 4.0, the TPL is recommended as a preferred way to write multithreaded code.
|
Once the inter-role communication extension is plugged into a role implementation and initialized, the activated subscription will immediately be ready to receive notifications. Any role instances hosting the inter-role communication extension are now able to seamlessly participate in data exchange. It’s time to conclude all the key learnings acquired to this point.
Conclusion
The goal of this paper was to explore how a combination of Windows Azure AppFabric Service Bus and some value-add capabilities found in the .NET Framework 4.0 helps address the requirement for enabling Windows Azure roles to communicate with each other in a bi-directional, scalable, secure and reliable fashion regardless of their location in the Cloud.
We learned that the inter-role communication via internal endpoints may possess some constraints and render certain communication scenarios virtually impossible. We looked at how one-way multicast provided by the Service Bus helps support a more flexible messaging patterns and provide connectivity between cloud-based applications and services across any network topologies, either from within or across datacenter environments.
When drilling down into the implementation of the inter-role communication via Service Bus, we mostly focused on multicast eventing scenario, namely, when Azure roles exchange notifications using one-to-many composition. With a fairly straightforward enhancement, this communication model can be enriched with the support for a true one-to-one interaction leveraging the same underlying publish/subscribe infrastructure.
The accompanying sample code is available for download from the MSDN Code Gallery. All source code files are governed by the Microsoft Public License (Ms-PL) as explained in the corresponding legal notices.
Additional Resources/References
For more information on the related subject or resources providing alternative views on the topic discussed in this post, please visit the following links:
by community-syndication | Sep 30, 2010 | BizTalk Community Blogs via Syndication
[Source: http://geekswithblogs.net/EltonStoneman]
I’ve had a need on a couple of projects for a monitoring system to record progress of a long-running transaction. Enter Heartbeat – a library which does just that. It’s a project of mine on github: Heartbeat progress monitor.
The typical use-case is for monitoring a lengthy batch load, where Heartbeat will record progress at set intervals. The intervals can be time-based, or progress-based or both – so you can have an overnight process which writes a status update every 15 minutes, and for every 10,000th item it processes.
Heartbeat is threadsafe, so it’s ideal for asynchronous processing which is actioned by spawning worker threads or using Task parallelism in .NET 4.0.
Usage
Usage is straightforward:
1. Initialise Heartbeat
Create a Heartbeat instance, passing in the object to be monitored, and the interval values for each type of pulse, and then start the timer:
long countInterval = 7000;
double timerInterval = 300; //0.3 seconds
var heartbeat = new Heartbeat(this, countInterval, timerInterval);
heartbeat.Start();
2. Integrate Heartbeat with the working process
If you only want timed pulses, you don’t need to do anything else – the Heartbeat instance will pulse at the set interval and write a log to the database. If you want additional pulses when your count interval is reached, then you need to increment the count when each item is processed:
//do work…
heartbeat.IncrementCount();
You can also subscribe to the OnPulse event, and on each pulse you can stop the log being written, or add your own custom message to the log:
heartbeat.OnPulse += new Heartbeat.OnPulseEventHanlder(RunTimer_OnPulse);
heartbeat.Start(“RunTimer started, timerInterval: {0}, runTime: {1}”.FormatWith(timerInterval, runTime));
void RunTimer_OnPulse(PulseEventSource source, ref bool writeLog, ref string logText)
{
writeLog = true;
logText = “RunTimer_OnPulse, source: {0}, text: {1}”
.FormatWith(source, RandomValueGenerator.GetRandomString());
}
3. Tell Heartbeat when you’re finished
Call SetComplete or SetFailed to log that the work is finished:
var heartbeat = new hb.Heartbeat(this, countInterval, 0);
heartbeat.Start(“RunCount_NoHandler, countInterval: {0}, countTo: {1}”.FormatWith(countInterval, countTo));
try
{
for (int i = 0; i < countTo; i++)
{
heartbeat.IncrementCount();
}
var zero = 0;
var dbz = 1 / zero;
heartbeat.SetComplete(“RunCount_NoHandler finished”);
}
catch (Exception ex)
{
heartbeat.SetFailed(“RunCount_NoHandler failed, message: {0}”.FormatWith(ex.FullMessage()));
}
If you don’t make either call, then Heartbeat will write an UNKNOWN status log when the Heartbeat object is disposed.
Configuration
Rather than specify the pulse intervals for each Heartbeat instance you use, you can set default values at application level in your config file:
<!– set heartbeat defaults to pulse every 10 minutes & every 3,000 increments –>
<sixeyed.heartbeatenabled=“true“
defaultPulseTimerInterval=“600000“
defaultPulseCountInterval=“3000“ />
The config section is optional, but if you don’t have any config and don’t set at least one of the pulse intervlas, your Heartbeat will never fire.
The Heartbeat Database
Heartbeat pulses are written to a simple database, consisting of a log table and two reference data tables:
To create the database (with C:\ as the default file location), run CREATE-Heartbeat.sql in the Database folder. In a working system I’d expect the Heartbeat tables to be added to an existing system database, so the script is just to get you started.
Each instance of a job has a unique ID, and the log records the full CLR assembly name of the object being monitored, the pulse intervals being used, the log time and status. The log also records the number of counts for the type of pulse, and the pulse statistics (elapsed milliseconds for timed pulses, number of items for count pulses). In raw form, the results of a time- and count- heartbeat session look like this:
HeartbeatLogIdHeartbeatInstanceIdComponentTypeNameStatusCodePulseTimerIntervalPulseCountIntervalLogDateLogTextTimerPulseNumberCountPulseNumberTimerMillisecondsCountNumber
376BF341091-D0EB-498F-803A-B623EAD5BF16Sixeyed.Heartbeat.Tests.HeartbeatTest, Sixeyed.Heartbeat.Tests, Version=1.0.0.0, Culture=neutral, PublicKeyToken=nullSTART 80010002010-09-30 20:03:37.990RunCountAndTimer_NoHandler, countInterval: 1000, countTo: 5692, timerInterval: 800, runTime: 45320000
377BF341091-D0EB-498F-803A-B623EAD5BF16Sixeyed.Heartbeat.Tests.HeartbeatTest, Sixeyed.Heartbeat.Tests, Version=1.0.0.0, Culture=neutral, PublicKeyToken=nullWORKING 800NULL2010-09-30 20:03:38.8031NULL811.2014NULL
378BF341091-D0EB-498F-803A-B623EAD5BF16Sixeyed.Heartbeat.Tests.HeartbeatTest, Sixeyed.Heartbeat.Tests, Version=1.0.0.0, Culture=neutral, PublicKeyToken=nullWORKING 800NULL2010-09-30 20:03:39.6132NULL1622.4029NULL
411BF341091-D0EB-498F-803A-B623EAD5BF16Sixeyed.Heartbeat.Tests.HeartbeatTest, Sixeyed.Heartbeat.Tests, Version=1.0.0.0, Culture=neutral, PublicKeyToken=nullSUCCEED 80010002010-09-30 20:04:01.610RunCountAndTimer_NoHandler finished29523618.44155692
Storing the results in SQL Server provides the opportunity for simple SSRS dashboards, facilitated by the reference tables which give friendly names for statuses and allow you to store friendly names for applications (in HeartbeatApplications – xxx can be aliased to “User Load” for querying).
Implementation Notes
Heartbeat and Task.Factory.Start go very nicely together, but you need to be careful with how state is passed to the tasks. As Heartbeat implements IDisposable, if your worker class is also IDisposable then the Heartbeat instance could go out of scope and be disposed before your tasks are run. The safest pattern is to pass the task method a copy of the Heartbeat instance and any other variables it needs:
Also note the check to see if this is the final task – if so, the Heartbeat is set to complete. The task which is started last may not be the final task to complete, so the end time is not guaranteed to be accurate, but for processes running for several hours, the discrepancy is likely to be minimal.
Still to come
- Fancy SSRS reports showing a breakdown of jobs by completion status (failed, succeeded), average duration, historical run stats etc.
- Options for using Heartbeat in distributed systems like NServiceBus and BizTalk.
by community-syndication | Sep 29, 2010 | BizTalk Community Blogs via Syndication
Recently David K contacted me through my blog and shared with me a solution that he came up with for a particular problem that he was able to solve using IWorkflowInstanceExtension and said it would be great if I could do an endpoint.tv episode about it. David already solved his problem but he wanted to spare you the pain in case you ever have to do this.
Download WF4 Activity Callbacks and Events Example from MSDN Code Gallery
The Problem
Let’s imagine that you have a class library that exposes a type called Gizmo. This Gizmo type exposes an event that is fired from time to time for some reason (maybe it detects some input from a hardware device) and you want to create an activity that will wait for the Gizmo.Fire event and return the level that was fired.
public class Gizmo
{
public event EventHandler<GizmoEventArgs> Fire;
public void FireGizmo(int level)
{
if (Fire != null)
{
Fire(this, new GizmoEventArgs {Level = level});
}
}
}
Activities cannot just sit around waiting for an event and blocking the workflow thread so we need a way to add an event handler that can handle the event and resume the workflow once it is fired. Also we need to be sure that this works correctly even if the workflow host has a persistence enabled. Because there is no way for an activity to re-establish connections with event handlers when being loaded from persistence we have to avoid being persisted while we have a connection to the Gizmo.Fire event.
The Solution
The solution is to pair the Activity with an Extension. It works like this.
- Create an activity called WaitForGizmo and an extension called WaitForGizmoExtension which implements IWorkflowInstanceExtension
- When the Activity is executed it will enter a no-persist zone and let the extension know when it needs to handle the event and set a bookmark
- The WaitForGizmoExtension will handle the event and resume the bookmark
- When the bookmark is resumed the activity will obtain the out argument values and exit the no-persist scope
This is the right way for dealing with CLR events and callbacks.
The WaitForGizmo activity
public sealed class WaitForGizmo : NativeActivity<int>
{
internal const string BookmarkName = "WaitingForGizmoToBeFired";
private readonly Variable<NoPersistHandle> _noPersistHandle = new Variable<NoPersistHandle>();
private BookmarkCallback _gizmoBookmarkCallback;
public InArgument<Gizmo> TheGizmo { get; set; }
public BookmarkCallback GizmoBookmarkCallback
{
get
{
return _gizmoBookmarkCallback ??
(_gizmoBookmarkCallback = new BookmarkCallback(OnGizmoCallback));
}
}
protected override bool CanInduceIdle
{
get { return true; }
}
protected override void CacheMetadata(NativeActivityMetadata metadata)
{
// Tell the runtime that we need this extension
metadata.RequireExtension(typeof (WaitForGizmoExtension));
// Provide a Func<T> to create the extension if it does not already exist
metadata.AddDefaultExtensionProvider(() => new WaitForGizmoExtension());
metadata.AddArgument(new RuntimeArgument("TheGizmo", typeof (Gizmo), ArgumentDirection.In, true));
metadata.AddArgument(new RuntimeArgument("Result", typeof (int), ArgumentDirection.Out, false));
metadata.AddImplementationVariable(_noPersistHandle);
}
protected override void Execute(NativeActivityContext context)
{
// Enter a no persist zone to pin this activity to memory since we are setting up a delegate to receive a callback
var handle = _noPersistHandle.Get(context);
handle.Enter(context);
// Get (which may create) the extension
var gizmoExtension = context.GetExtension<WaitForGizmoExtension>();
// Add the callback
gizmoExtension.AddGizmoCallback(TheGizmo.Get(context));
// Set a bookmark - the extension will resume when the Gizmo is fired
context.CreateBookmark(BookmarkName, GizmoBookmarkCallback);
}
internal void OnGizmoCallback(NativeActivityContext context, Bookmark bookmark, Object value)
{
// Store the result
Result.Set(context, (int) value);
// Exit the no persist zone
var handle = _noPersistHandle.Get(context);
handle.Exit(context);
}
}
The WaitForGizmoExtension
internal class WaitForGizmoExtension : IWorkflowInstanceExtension
{
private static bool _addedCallback;
private WorkflowInstanceProxy _instance;
#region IWorkflowInstanceExtension Members
public IEnumerable<object> GetAdditionalExtensions()
{
return null;
}
public void SetInstance(WorkflowInstanceProxy instance)
{
_instance = instance;
}
#endregion
internal void AddGizmoCallback(Gizmo gizmo)
{
if (!_addedCallback)
{
_addedCallback = true;
gizmo.Fire += OnGizmoFired;
}
}
internal void OnGizmoFired(object sender, GizmoEventArgs args)
{
// Gizmo was fired, resume the bookmark
_instance.BeginResumeBookmark(
new Bookmark(WaitForGizmo.BookmarkName),
args.Level,
(asr) => _instance.EndResumeBookmark(asr),
null);
}
}
Handling Timeouts
What if Gizmo does not fire? You might want to have a solution where if Gizmo doesn’t fire within some timeout period you do something else. The best way to deal with this is at the workflow level by using a Pick activity with a branch that contains a delay. This ensures that either a Gizmo.Fire event is detected or the Timeout occurs