Introduction

In the first part of the article we have introduced and analyzed the problem regarding the transmission and reception of a large message via a WCF Receive Location. In particular, we discussed the difference between the Buffered and Streamed transfer modes provided by WCF and in particular how using the Streamed transfer mode for exchanging large messages between WCF-enabled applications. In the second part of the article, we’ll see how to take advantage of the Transaction File System and of the SQL File Stream to transmit messages to a WCF Receive Location in a reliable, transactional manner. Then we’ll see in detail the code of the pipeline component used on the WCF Receive location to store the incoming message to a staging repository (shared folder or SQL database) and replace it with a placeholder document. Likewise, we’ll analyze the code of the pipeline component used on the Send Port to retrieve the original message from the staging repository using the information contained in the placeholder document. Finally, we’ll look at the implementation of the orchestration and helper class used to delete the original message from the staging repository when this latter has been successfully transmitted to the target system. 

Transactions Support

In this section of the article I am going to introduce how to exploit the capabilities provided by the Transaction File System and SQL File Stream to transmit messages to a WCF Receive Location in a reliable, transactional manner.

Transactional NTFS

Transactional NTFS also known as TxF is a new feature introduced in Windows Server 2008 that provides the ability to perform transactional file operations that are fully Atomic, Consistent, Isolated, and Durable (ACID).
You’re probably asking yourself if all these transactional capabilities have been built into NTFS, what is the performance overhead of using Transactional NTFS? If you aren’t using transactional file operations, there is no overhead, so nothing changed with respect to the past. Instead, if you are using transacted file operations, the overhead is pretty minimal (in the 1 to 2 percent range). For an application that is not I/O-bound, this performance impact is so negligible that it probably would not even be noticeable. Transactional NTFS enables better application stability by reducing or eliminating the amount of error-handling code that needs to be written to guarantee consistency within an application.
TxF uses the new Kernel Transaction Manager (KTM) features, and since this latter can work directly with the Microsoft® Distributed Transaction Coordinator (DTC), any technology that can work with DTC as a transaction coordinator can use transacted file operations within a single transaction. The KTM is used to implement Transactional NTFS (TxF) and Transactional Registry (TxR). TxF allows transacted file system operations within the NTFS file system, whereas TxR allows transacted registry operations. Therefore, KTM enables client and server applications to coordinate file system and registry operations with a distributed transaction that can potentially enlist other transacted operations such as SQL operations, transactional Web service calls via WS-AtomicTransaction, WCF services via OleTransactions Protocol, or even transacted MQSeries\MSMQ operations. The usage of the Kernel Transaction Manager (KTM) features confers a high-level of reliability and robustness for applications and allows to achieve data consistency across multiple, heterogeneous information repositories with a single two-phase commit transaction. 

As shown in the following code snippet,  in order to use TxF in managed code, the first thing you need to do is retrieve the IDtcTransaction COM interface from the current Transaction (this can be done using the TransactionInterop helper class). Once you have the IDtcTransaction, you can cast it to an IKernelTransaction COM interface. Then you call the GetHandle method on it to get a handle that you can pass to the new Transacted Win32 APIs (for instance, the CreateFileTransacted method).

[DllImport("kernel32.dll", SetLastError = true)]
static extern SafeFileHandle CreateFileTransacted(…);
...
using (TransactionScope scope = new TransactionScope())
{
    // Retrieve Kernel level transaction handle
    IDtcTransaction dtcTransaction =  TransactionInterop.GetDtcTransaction(Transaction.Current);
    IKernelTransaction ktmInterface = (IKernelTransaction)dtcTransaction;
    IntPtr ktmTxHandle;
    ktmInterface.GetHandle(out ktmTxHandle);

    // Get transacted file handle
    SafeFileHandle fileHandle = CreateFileTransacted(path, 
                                                     internalAccess, 
                                                     internalShare, 
                                                     IntPtr.Zero,
                                                     internalMode, 
                                                     0, 
                                                     IntPtr.Zero, 
                                                     ktmTxHandle
                                                     IntPtr.Zero, 
                                                     IntPtr.Zero);
    if (!fileHandle.IsInvalid)
    {
        using (FileStream fileStream = new FileStream(fileHandle, FileAccess.ReadWrite))
        {
            //Use file stream
            ...
        }
    }
}

 

For more information on the Transactional NTFS and KTM, you can read the following topics:

  • Enhance Your Apps With File System Transactions” article on MSDN Magazine.
  • Transactional NTFS (TxF)” topic on MSDN.
  • Develop More Reliable Applications” topic on MSDN.
  • Kernel Transaction Manager” topic on MSDN.
  • Transactional NTFS Managed Wrapper” samples and components on MSDN.

 

SQL File Stream

There has always been lots of controversy about whether large documents such as multimedia files, should be stored in the database as SQL Server or in the file system. In fact, the database is a specialized data repository that provides built-in capabilities such as integrated backup and restore, point-in-time recovery, transactions, indexing, and so on, that could come in handy to secure and manage sensitive data contained in large files. However, at least in the past, storing large blobs to a database was rather inefficient from a performance perspective and could result in a  significant slowdown of the entire solution. That’s why many developers may decide to store large files in the file system and their location in a database row, but this sacrifices transactional consistency, integrated query, and the other database features.

In SQL Server 2008, a new feature known as the FILESTREAM storage attribute allows you to define columns in SQL Server tables with a specification that the data is actually stored in the file system. You can write and read the data contained in a varbinary(max) FILESTREAM column with Transact SQL operations or streaming APIs from a C++ or  .NET-compliant languages, and retain all of the other database management features.The FILESTREAM is not a new data type; it’s just a new storage mechanism. It’s available only with the varbinary(max) data type that was introduced in SQL Server 2005. To use FILESTREAM storage requires that the system administrator enable it at an operating system level using SQL Server Configuration Manager and the database administrator enable it on a SQL Server instance level using sp_configure. One of the benefits of using the FILESTREAM storage instead of simply storing file names in SQL Server and files on the file system is that the data is transactionally consistent. In fact, when you insert a row in a table using a Transact SQL command and then insert a document in a varbinary(max) FILESTREAM column using the streaming APIs, the entire operation either succeeds or fails.The SqlFileStream class ensures this by requiring you to have a valid, open transaction at all times when using the streaming APIs, as shown in the following code snippet:

...
using (SqlConnection sqlConnection = new SqlConnection(connectionString))
{
    sqlConnection.Open();
    SqlTransaction sqlTransaction = sqlConnection.BeginTransaction(IsolationLevel.ReadCommitted);
    using (SqlCommand sqlCommand = new SqlCommand("usp_InsertDocument", sqlConnection))
    {
        sqlCommand.Transaction = sqlTransaction;
        ...
        sqlCommand.CommandType = CommandType.StoredProcedure;
        sqlCommand.ExecuteNonQuery();
    }
    using (SqlCommand sqlCommand = new SqlCommand(“usp_GetFileStreamPath”, sqlConnection))
    {
        sqlCommand.Transaction = sqlTransaction;
        ...
        sqlCommand.CommandType = CommandType.StoredProcedure;
        sqlCommand.ExecuteNonQuery();
        if (sqlParameter.Value != null)
        {
            path = sqlParameter.Value as string;
        }
    }
    using (SqlCommand sqlCommand = new SqlCommand(“SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()”, sqlConnection))
    {
        sqlCommand.Transaction = sqlTransaction;
        sqlCommand.CommandType = CommandType.Text;
        Object result = sqlCommand.ExecuteScalar();
        byte[] transactionContext = (byte[])result;
        byte[] buffer = new byte[bufferSize];
        int bytesRead = 0;
        using (SqlFileStream sqlFileStream = new SqlFileStream(path, transactionContext, FileAccess.ReadWrite))
        {
            while ((bytesRead = stream.Read(buffer, 0, bufferSize)) > 0)
            {
                sqlFileStream.Write(buffer, 0, bytesRead);
            }
        }
        sqlTransaction.Commit();
    }
}
...

 

This article is not meant to provide a full coverage of the FILESTREAM storage attribute and the techniques that allow to write and read data to and from a varbinary(max) FILESTREAM column. By the way, for sake of completeness, I have included below the T-SQL code used to create the staging database and the code of the stored procedures utilized, respectively by the StageFileReceivePipeline and RestoreFileSendPipeline, to write and read documents from the Documents table.

USE [master]
GO
-- StagingDb
CREATE DATABASE [StagingDb] ON  PRIMARY 
( NAME = N'StagingDb', 
  FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL10.MSSQLSERVER\MSSQL\DATA\StagingDb.mdf' , 
  SIZE = 3072KB , MAXSIZE = UNLIMITED, FILEGROWTH = 1024KB ), 
 FILEGROUP [FILESTREAM] CONTAINS FILESTREAM  DEFAULT 
( NAME = N'FileStream', 
  FILENAME = N'C:\Projects\LargeMessageTransmission\FileStream' )
 LOG ON 
( NAME = N'StagingDb_log', 
  FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL10.MSSQLSERVER\MSSQL\DATA\StagingDb_log.ldf' , 
  SIZE = 1024KB , MAXSIZE = 2048GB , FILEGROWTH = 10%)
GO

USE [StagingDb]
GO
-- Documents Table
CREATE TABLE [dbo].[Documents](
    [Id] [uniqueidentifier] ROWGUIDCOL  NOT NULL,
    [Name] [varchar](256) NOT NULL,
    [Sender] [varchar](256) NOT NULL,
    [Size] [bigint] NOT NULL,
    [Document] [varbinary](max) FILESTREAM  NULL,
    [DateCreated] [datetime] NULL DEFAULT (getdate()),
 CONSTRAINT [PK_Documents] PRIMARY KEY CLUSTERED 
(
    [Id] ASC
)WITH (PAD_INDEX  = OFF, 
       STATISTICS_NORECOMPUTE  = OFF, 
       IGNORE_DUP_KEY = OFF, 
       ALLOW_ROW_LOCKS  = ON, 
       ALLOW_PAGE_LOCKS  = ON) 
 ON [PRIMARY] FILESTREAM_ON [FILESTREAM],
UNIQUE NONCLUSTERED 
(
    [Id] ASC
)WITH (PAD_INDEX  = OFF, 
       STATISTICS_NORECOMPUTE  = OFF, 
       IGNORE_DUP_KEY = OFF, 
       ALLOW_ROW_LOCKS  = ON, 
       ALLOW_PAGE_LOCKS  = ON) 
 ON [PRIMARY]
) ON [PRIMARY] FILESTREAM_ON [FILESTREAM]
GO
-- usp_InsertDocument Stored Procedure
CREATE PROCEDURE usp_InsertDocument
    @id [uniqueidentifier],
    @name [varchar](256),
    @sender [varchar](256),
    @size [bigint]
AS
BEGIN
    INSERT INTO Documents 
    VALUES (@id, @name, @sender, @size, CAST ('' as varbinary(max)), GETDATE())
END
GO    
-- usp_DeleteDocument Stored Procedure
CREATE PROCEDURE usp_DeleteDocument
    @id [uniqueidentifier]
AS
BEGIN
    DELETE FROM Documents WHERE Id = @Id
END
GO
-- usp_GetFileStreamPath Stored Procedure
CREATE PROCEDURE usp_GetFileStreamPath
    @id [uniqueidentifier],
    @path [varchar](256) OUTPUT
AS
BEGIN
    SELECT @path = Document.PathName(0)
    FROM Documents
    WHERE Id = @id
END
GO    

 

For more information on this argument, you can read the following articles:

  • Programming with FileStreams in SQL Server 2008” article on MSDN Magazine.
  • FILESTREAM Data in SQL Server 2008 (ADO.NET)” topic on MSDN.
  • How to: Enable FILESTREAM” topic on MSDN.
  • “How to: Create a FILESTREAM-Enabled Database” topic on MSDN.
  • How to: Create a Table for Storing FILESTREAM Data” topic on MSDN.
  • Managing FILESTREAM Data by Using Transact-SQL” topic on MSDN.
  • Managing FILESTREAM Data by Using Win32” topic on MSDN.

 

Leveraging Distributed Transactions

In the first part of the article, we have introduced the solution that allows to a client application to transmit a large message to a BizTalk application via WCF. In particular, the message is received by a WCF-Custom Receive Location and processed by custom Receive Pipeline called StageFileReceivePipeline which reads the message metadata from the custom headers using the InboundHeaders context property and then writes the original file to a staging repository that can be a folder or a table in custom SQL database which contains a varbinary(max) FILESTREAM column. Then, the pipeline in question promotes the AckRequired context property to enable delivery notifications on the Send Port and substitutes the original file with a small placeholder document which contains the metadata and actual location of the original message.

The flaw of this approach is that the incoming file may be successfully stored to the staging repository by the custom receive pipeline, whereas the Message Agent may fail to post the placeholder document to the BizTalkMsgBoxDb. In this case, the client application would receive an exception, so it could handle the error and eventually attempt to retransmit the message to BizTalk Server. Nevertheless, the document sent at the previous attempt is still present in the staging folder, so it should be overwritten by the new attempt. Alternatively, unsuccessful transmissions may be detected and then resubmitted or eliminated, based on specific rules, by a dedicated service agent.

By the way, this a perfect case where the Transaction File System and the SqlFileStream can come in handy to store the incoming file, respectively, to a staging folder or database and post the placeholder document to the BizTalkMsgBoxDb as a single unit of work using a DTC distributed transaction. This way, should the Message Agent fail to publish the placeholder document to the BizTalkMsgBoxDb, the two-phase commit transaction would fail and the original file wouldn’t be persisted to the staging repository. In addition, if the transaction is initiated by the client application and flowed to the WCF-Custom receive location using the WS-AtomicTransaction or OleTransactions protocol, the Message Agent and the receive pipeline may enlist the same distributed transaction. In this case, the client can eventually control the outcome of the transaction itself and if it decides to abort it, the transmitted message wouldn’t be written to staging folder and even the placeholder document wouldn’t be published to the BizTalkMsgBoxDb.

In the first article of this series we explained how the client application initiates and flow a distributed transaction to the underlying WCF-Custom Receive Location using the OleTransactions protocol. In particular, we noted that the UploadMessageTxn method used by the client application to transmit the message to BizTalk has been decorated with the TransactionFlowAttribute. In particular, the TransactionFlowOption.Mandatory value indicates that the service endpoint requires the client application to start and flow a distributed transaction. 

For more information on WCF Transaction Propagation, you can read the following article:

 

Transactional Case: TxF

The following picture shows the transactional case where the client application initiates and transmits a distributed transaction to the underlying WCF-Custom Receive Location and the receive pipeline processing the message is configured to stage the incoming file to a Folder using a transacted write operation.

  1. The user selects one file on the UI of the client application, and then selects a transactional service endpoint from the corresponding drop-down list that corresponds to a transactional WCF-Custom Receive Location and finally click the Submit button. At this point, the client application initiates a new transaction and invokes the underlying WCF-Custom Receive Location using the TransferMode = Streamed.
  2. The custom StageFileReceivePipeline reads the message metadata from the custom headers using the InboundHeaders context property. Then the StageFilePipelineComponent enlists in the transaction flowed in by the client application and writes the inbound file to the staging folder which path is defined in the pipeline configuration using the following code:
private string WriteMessageToFolder(Stream stream, Headers headers, IPipelineContext context)
{
    string path = string.Empty;
    try
    {
        if (!string.IsNullOrEmpty(headers.Filename))
        {
            string extension = !string.IsNullOrEmpty(headers.Filename) ? 
Path.GetExtension(headers.Filename) : string.Empty; switch (fileName) { case FileNameType.Original: path = Path.Combine(folderPath, headers.Filename); break; case FileNameType.MessageId: extension = Path.GetExtension(headers.Filename); if (!string.IsNullOrEmpty(headers.Id)) { path = Path.Combine(folderPath, headers.Id + extension); } else { path = Path.Combine(folderPath, Guid.NewGuid().ToString() + extension); } break; } ITransaction transaction = null; if (useAmbientTransaction) { IPipelineContextEx extendedContext = (IPipelineContextEx)context; transaction = extendedContext.GetTransaction() as ITransaction; if (transaction != null) { try { XACTTRANSINFO info; transaction.GetTransactionInfo(out info); Guid transactionId = new Guid(info.uow.rgb); TraceHelper.WriteLineIf(traceEnabled, string.Format(Resources.MessageFormat, Resources.StageFilePipelineComponentName, string.Format(Resources.TransactionIdFormat, transactionId.ToString())), false); } catch (Exception ex) { TraceHelper.WriteLineIf(traceEnabled, context, ex.Message, EventLogEntryType.Error); } IDtcTransaction dtcTransaction = (IDtcTransaction)transaction; using (TransactionScope scope = new TransactionScope(TransactionInterop.GetTransactionFromDtcTransaction(dtcTransaction), TimeSpan.FromSeconds(transactionTimeout))) { WriteFile(stream, path, transaction, context); scope.Complete(); } } } else { WriteFile(stream, path, transaction, context); } } } catch (Exception ex) { ExceptionHelper.HandleException(Resources.StageFilePipelineComponentName, ex); TraceHelper.WriteLineIf(traceEnabled, context, ex.Message, EventLogEntryType.Error); throw;; } return path; } private void WriteFile(Stream stream, string path, ITransaction transaction, IPipelineContext context) { byte[] buffer = new byte[bufferSize]; int bytesRead = 0; using (FileStream fileStream = FileHelper.CreateFile(path, transaction, traceEnabled)) { if (mapEnabled) { if (traceEnabled) { TraceHelper.WriteLine(context, string.Format(Resources.MessageFormat, Resources.StageFilePipelineComponentName, string.Format(Resources.StartMapFormat, mapType)), false); } XslCompiledTransformHelper.Transform(stream, fileStream, mapType, traceEnabled, true, mapBufferSize, mapThresholdSize); if (traceEnabled) { TraceHelper.WriteLine(context, string.Format(Resources.MessageFormat, Resources.StageFilePipelineComponentName, Resources.EndMapFormat), false); } } else { while ((bytesRead = stream.Read(buffer, 0, bufferSize)) > 0) { fileStream.Write(buffer, 0, bytesRead); } } } TraceHelper.WriteLineIf(traceEnabled, string.Format(Resources.OriginalMessageSuccessfullyCopiedTo, Resources.StageFilePipelineComponentName, path), false); }

 

Besides, the pipeline promotes the AckRequired context property to enable delivery notifications and substitutes the original file with a small placeholder document which contains the metadata and actual location of the original message within the staging repository. The content of the placeholder document is also written in a special context property called BatchInformation that will be used later on by the NotificationHandler orchestration to retrieve the location of the original document and delete it once this latter has been successfully processed or transmitted to the target system.
Note:  as highlighted in the picture above, the client application, pipeline component and Message Agent enlist in the same transaction.
  1. The placeholder document is published to the BizTalkMsgBoxDb by the Message Agent.
  2. The inbound request is consumed by a FILE Send Port.
  3. The RestoreFileSendPipeline reads the content of the placeholder document and replaces this latter with the original message. In particular, the RestoreFilePipelineComponent opens a FileStream object to allow the FILE Adapter to read the content of the original document from the staging folder and assign this latter as value to the Data property of the main body part of the outbound message:

 

public IBaseMessage Execute(IPipelineContext context, IBaseMessage message)
{
    try
    {
        ...
            Stream stream = ReadMessage(document, context);
            bodyPart.Data = stream;
        ...
    }
    catch (Exception ex)
    {
        ...
    }
    finally
    {
        ...
    }
    return message;
}

private Stream ReadMessage(Batch document, IPipelineContext context)
{
    if (document != null)
    {
        ...
                                false);
        if (document.Target == TargetType.Folder)
        {
            return ReadMessageFromFolder(document, context);
        }
        else
        {
            return ReadMessageFromSQLServer(document, context);
        }
    }
    return null;
}


private Stream ReadMessageFromFolder(Batch document, IPipelineContext context)
{
    try
    {
        if (!string.IsNullOrEmpty(document.Path))
        {
            return new FileStream(document.Path, FileMode.Open, FileAccess.Read, FileShare.None, bufferSize);
        }
    }
    catch (Exception ex)
    {
       ...
    }
    return null;
}
  1. The FILE Adapter uses the FileStream object opened by the RestoreFilePipelineComponent to read the content of the original document from the staging folder and to write this latter to the target folder (Out).
  2. The BizTalk Messaging Runtime generates an ACK/NACK message depending on whether the transmission was successful or not.
  3. The ACK/NACK message is consumed by a special orchestration called NotificationHandler.
  4. The orchestration executes a different action depending on the type of the inbound control message:
    • ACK: the orchestration deletes the original file from the staging repository.
    • NACK: the orchestration invokes a BRE policy to decide how to proceed. Depending on the information contained in the policy, the orchestration can decide to terminate  or resume the suspended service instance.

 

Transactional Case: SQL Server

The following picture shows the transactional case where the client application initiates and transmits a distributed transaction to the underlying WCF-Custom Receive Location and the receive pipeline processing the message is configured to stage the incoming file in the Documents table on the StagingDb using a SqlFileStream object.

  1. The user selects one file on the UI of the client application, and then selects a transactional service endpoint from the corresponding drop-down list that corresponds to a transactional WCF-Custom Receive Location and finally click the Submit button. At this point, the client application initiates a new transaction and invokes the underlying WCF-Custom Receive Location using the TransferMode = Streamed.
  2. The custom StageFileReceivePipeline reads the message metadata from the custom headers using the InboundHeaders context property. Then the StageFilePipelineComponent enlists in the transaction flowed in by the client application and writes the inbound file to the staging folder which path is defined in the pipeline configuration using the following code:

 

private string WriteMessageToSQLServer(Stream stream, Headers headers, IPipelineContext context)
{
    try
    {
        
        using (SqlConnection sqlConnection = new SqlConnection(connectionString))
        {
            sqlConnection.Open();
            SqlTransaction sqlTransaction = null;
            if (useAmbientTransaction)
            {
                IPipelineContextEx extendedContext = (IPipelineContextEx)context;
                ITransaction transaction = extendedContext.GetTransaction() as ITransaction;
                if (transaction != null)
                {
                    sqlConnection.EnlistDistributedTransaction(transaction);
                    try
                    {
                        XACTTRANSINFO info;
                        transaction.GetTransactionInfo(out info);
                        Guid transactionId = new Guid(info.uow.rgb);
                        TraceHelper.WriteLineIf(traceEnabled,
                                                string.Format(Resources.MessageFormat,
                                                              Resources.StageFilePipelineComponentName,
                                                              string.Format(Resources.TransactionIdFormat, 
                                                                            transactionId.ToString())),
                                                false);
                    }
                    catch (Exception ex)
                    {
                        TraceHelper.WriteLineIf(traceEnabled,
                                                context,
                                                ex.Message,
                                                EventLogEntryType.Error);
                    }
                }
            }
            else
            {
                sqlTransaction = sqlConnection.BeginTransaction();
            }

            SqlParameter sqlParameter = null;
            Guid guid = new Guid(headers.Id);
            string path = null;

            using (SqlCommand sqlCommand = new SqlCommand("usp_InsertDocument", sqlConnection))
            {
                if (!useAmbientTransaction)
                {
                    sqlCommand.Transaction = sqlTransaction;
                }
                sqlParameter = new SqlParameter(Resources.IdParameter, SqlDbType.UniqueIdentifier);
                sqlParameter.Direction = ParameterDirection.Input;
                sqlParameter.Value = guid;
                sqlCommand.Parameters.Add(sqlParameter);

                sqlParameter = new SqlParameter(Resources.NameParameter, SqlDbType.VarChar, 256);
                sqlParameter.Direction = ParameterDirection.Input;
                sqlParameter.Value = headers.Filename;
                sqlCommand.Parameters.Add(sqlParameter);

                sqlParameter = new SqlParameter(Resources.SenderParameter, SqlDbType.VarChar, 256);
                sqlParameter.Direction = ParameterDirection.Input;
                sqlParameter.Value = headers.Sender;
                sqlCommand.Parameters.Add(sqlParameter);

                sqlParameter = new SqlParameter(Resources.SizeParameter, SqlDbType.BigInt);
                sqlParameter.Direction = ParameterDirection.Input;
                sqlParameter.Value = headers.Size;
                sqlCommand.Parameters.Add(sqlParameter);

                sqlCommand.CommandType = CommandType.StoredProcedure;
                sqlCommand.CommandTimeout = transactionTimeout;
                sqlCommand.ExecuteNonQuery();
            }

            using (SqlCommand sqlCommand = new SqlCommand(“usp_GetFileStreamPath”, sqlConnection))
            {
                if (!useAmbientTransaction)
                {
                    sqlCommand.Transaction = sqlTransaction;
                }
                sqlParameter = new SqlParameter(Resources.IdParameter, SqlDbType.UniqueIdentifier);
                sqlParameter.Direction = ParameterDirection.Input;
                sqlParameter.Value = guid;
                sqlCommand.Parameters.Add(sqlParameter);

                sqlParameter = new SqlParameter(Resources.PathParameter, SqlDbType.VarChar, 256);
                sqlParameter.Direction = ParameterDirection.Output;
                sqlCommand.Parameters.Add(sqlParameter);

                sqlCommand.CommandType = CommandType.StoredProcedure;
                sqlCommand.CommandTimeout = transactionTimeout;
                sqlCommand.ExecuteNonQuery();
                if (sqlParameter.Value != null)
                {
                    path = sqlParameter.Value as string;
                }
            }
            if (!string.IsNullOrEmpty(path))
            {
                using (SqlCommand sqlCommand = new SqlCommand(“SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()”, 
sqlConnection)) { if (!useAmbientTransaction) { sqlCommand.Transaction = sqlTransaction; } sqlCommand.CommandType = CommandType.Text; sqlCommand.CommandTimeout = transactionTimeout; Object result = sqlCommand.ExecuteScalar(); byte[] transactionContext = (byte[])result; //The next step is to obtain a handle that //can be passed to the Win32 FILE APIs. byte[] buffer = new byte[bufferSize]; int bytesRead = 0; using (SqlFileStream sqlFileStream = new SqlFileStream(path,
transactionContext,
FileAccess.ReadWrite)) { if (mapEnabled) { if (traceEnabled) { TraceHelper.WriteLine(context, string.Format(Resources.MessageFormat, Resources.StageFilePipelineComponentName, string.Format(Resources.StartMapFormat, mapType)), false); } XslCompiledTransformHelper.Transform(stream, sqlFileStream, mapType, traceEnabled, true, mapBufferSize, mapThresholdSize); if (traceEnabled) { TraceHelper.WriteLine(context, string.Format(Resources.MessageFormat, Resources.StageFilePipelineComponentName, Resources.EndMapFormat), false); } } else { while ((bytesRead = stream.Read(buffer, 0, bufferSize)) > 0) { sqlFileStream.Write(buffer, 0, bytesRead); } } } if (!useAmbientTransaction) { sqlTransaction.Commit(); } TraceHelper.WriteLineIf(traceEnabled, string.Format(Resources.OriginalMessageSuccessfullyCopiedTo, Resources.StageFilePipelineComponentName, path), false); } } } } catch (Exception ex) { ExceptionHelper.HandleException(Resources.StageFilePipelineComponentName, ex); TraceHelper.WriteLineIf(traceEnabled, context, ex.Message, EventLogEntryType.Error); throw;; } return connectionString; }
Besides, the pipeline promotes the AckRequired context property to enable delivery notifications and substitutes the original file with a small placeholder document which contains the metadata and actual location of the original message within the staging repository. The content of the placeholder document is also written in a special context property called BatchInformation that will be used later on by the NotificationHandler orchestration to retrieve the location of the original document and delete it once this latter has been successfully processed or transmitted to the target system.
Note:  as highlighted in the picture above, the client application, pipeline component and Message Agent enlist in the same transaction.
  1. The placeholder document is published to the BizTalkMsgBoxDb by the Message Agent.
  2. The inbound request is consumed by a FILE Send Port.
  3. The RestoreFileSendPipeline reads the content of the placeholder document and replaces this latter with the original message. In particular, the component opens a SqlFileStream object to read the content of the original file from the StagingDb and wraps this object within an instance of a custom stream class called WrapperStream. The pipeline component opens a connection to the StagingDb and creates a SQL transaction to initialize the SqlFileStream object. The content of this latter is not meant to be read by the pipeline component, but  by the Adapter being used by the Send Port, therefore the connection and transaction need to remain open. The scope of the WrapperStream object is committing the transaction and closing the connection when the Adapter has finished transmitting the message. When this happens, the BizTalk Messaging Runtime invokes the Close method of the WrapperStream object which contains the code for committing the transaction and closing the connection.

 

public IBaseMessage Execute(IPipelineContext context, IBaseMessage message)
{
    try
    {
        ...
            Stream stream = ReadMessage(document, context);
            bodyPart.Data = stream;
        ...
    }
    catch (Exception ex)
    {
        ...
    }
    finally
    {
        ...
    }
    return message;
}

private Stream ReadMessage(Batch document, IPipelineContext context)
{
    if (document != null)
    {
        ...
                                false);
        if (document.Target == TargetType.Folder)
        {
            return ReadMessageFromFolder(document, context);
        }
        else
        {
            return ReadMessageFromSQLServer(document, context);
        }
    }
    return null;
}


private Stream ReadMessageFromSQLServer(Batch document, IPipelineContext context)
{
    try
    {
        SqlConnection sqlConnection = new SqlConnection(document.Path);
        sqlConnection.Open();
        
        SqlTransaction sqlTransaction = sqlConnection.BeginTransaction();

        SqlParameter sqlParameter = null;
        Guid guid = new Guid(document.Id);
        string path = null;

        using (SqlCommand sqlCommand = new SqlCommand("usp_GetFileStreamPath", sqlConnection, sqlTransaction))
        {
            sqlParameter = new SqlParameter(Resources.IdParameter, SqlDbType.UniqueIdentifier);
            sqlParameter.Direction = ParameterDirection.Input;
            sqlParameter.Value = guid;
            sqlCommand.Parameters.Add(sqlParameter);

            sqlParameter = new SqlParameter(Resources.PathParameter, SqlDbType.VarChar, 256);
            sqlParameter.Direction = ParameterDirection.Output;
            sqlCommand.Parameters.Add(sqlParameter);

            sqlCommand.CommandType = CommandType.StoredProcedure;
            sqlCommand.CommandTimeout = transactionTimeout;
            sqlCommand.ExecuteNonQuery();
            if (sqlParameter.Value != null)
            {
                path = sqlParameter.Value as string;
            }
        }
        if (!string.IsNullOrEmpty(path))
        {
            using (SqlCommand sqlCommand = new SqlCommand("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", 
                                                          sqlConnection, 
                                                          sqlTransaction))
            {
                sqlCommand.CommandType = CommandType.Text;
                sqlCommand.CommandTimeout = transactionTimeout;
                Object result = sqlCommand.ExecuteScalar();
                byte[] transactionContext = (byte[])result;
                return new WrapperStream(new SqlFileStream(path, transactionContext, FileAccess.Read),
                                         document,
                                         sqlConnection,
                                         sqlTransaction,
                                         traceEnabled);
            }
        }
    }
    catch (Exception ex)
    {
        ...
    }
    return null;
}
  1. The FILE Adapter uses the SqlFileStream object opened by the RestoreFilePipelineComponent to read the content of the original document from the staging database and to write this latter to the target folder (Out).
  2. The BizTalk Messaging Runtime generates an ACK/NACK message depending on whether the transmission was successful or not and invokes the Close method on the WrapperStream object which commits the transactions and closes the connection used by the SqlFileStream object.

 

namespace Microsoft.BizTalk.CAT.Samples.LargeMessageTransmission.PipelineComponents
{
    public class WrapperStream : Stream
    {
        ...
        private Stream stream;
        private Batch batch;
        private SqlConnection sqlConnection;
        private SqlTransaction sqlTransaction;
        private bool traceEnabled;
        private bool committed = false;
        ...
        public override void Close()
        {
            try
            {
                stream.Close();
                if (!committed)
                {
                    CommitTransaction();
                }
            }
            finally
            {
                base.Close();
            }
        }
        ...
        private void CommitTransaction()
        {
            try
            {
                TraceHelper.WriteLineIf(...);
                using (sqlConnection)
                {
                    using (sqlTransaction)
                    {
                        sqlTransaction.Commit();
                        committed = true;
                        TraceHelper.WriteLineIf(...);
                    }
                }
            }
            catch (Exception ex)
            {
                ExceptionHelper.HandleException(...);
                TraceHelper.WriteLineIf(...);
            }
            finally
            {
                TraceHelper.WriteLineIf(...);
            }
        }
        ...
    }
}
  1. The ACK/NACK message is consumed by a special orchestration called NotificationHandler.
  2. The orchestration executes a different action depending on the type of the inbound control message:
    • ACK: the orchestration deletes the original file from the staging repository.
    • NACK: the orchestration invokes a BRE policy to decide how to proceed. Depending on the information contained in the policy, the orchestration can decide to terminate or resume the suspended service instance.

 

Receive Location and Send Port Configuration

The first step to accomplish to enable transactional support is to adopt a transaction-aware WCF binding, both on the client and server side. In our case, both the client application and the WCF-Custom Receive Location are configured to use the CustomBinding. Therefore, in order to enable transactional support on both sides, it’s necessary to add the TransactionFlowBindingElement to the custom binding and specify a value for TransactionProtocol property exposed by the component. In our case, we select OleTransactions because this latter protocol is noticeably faster than the WS-AtomicTransaction that conversely is fully supported by other technology vendors and therefore can be used to exchange messages in a transactional manner with applications and services running on a non-Microsoft platform. The following figure shows the composition of the CustomBinding used by one of the WCF-Custom Receive Locations I created to receive messages in a transactional manner:

The WCF-Custom Receive Location in question uses a custom Receive Pipeline called StageFileReceivePipeline to store the incoming file to a staging folder or SQL database and to replace it with a placeholder document. As shown in the following picture, the StageFileReceivePipeline is composed of the following 2 elements, respectively, the StageFilePipelineComponent and Xml Disassembler components.

The Xml Disassembler  Pipeline Component is used just to promote the the MessageType property in the context of the placeholder document during the parsing phase. As the placeholder document has a very small size, the overhead in terms of performance introduced by the Xml Disassembler component is negligible. The StageFilePipelineComponent is the component responsible for reading and storing the incoming file to the target staging repository and also replacing the original message with the placeholder document. If the incoming file has an XML format, the StageFilePipelineComponent can also be exploited to apply a map defined by its FQDN using the capabilities provided by the XslCompiledTransform class to accelerate the execution of message transformation.

For more information on how using the XslCompiledTransform class to boost message transformation, you can read the articles I wrote about this subject.

  • How To Boost Message Transformations Using the XslCompiledTransform class” article on my blog.
  • How To Boost Message Transformations Using the XslCompiledTransform class Extended” article on my blog.
  • How to Boost Performance of the ESB Routing and Transform Services – Part 1” article on my blog.

The StageFilePipelineComponent  component exposes a wide range of properties that allow to control its runtime behavior:

 

  • AckRequired: specifies whether the pipeline component promotes the BTS.AckRequired and set its value to true.
  • BufferSize: indicates the size in bytes of the buffer used to write the original message to the staging folder or database.
  • ConnectionString: specifies the connection string of the target repository database, when the value of the TargetType property is equal to SQLServer.
  • FileName: specifies the name of the file when the value of the TargetType property is equal to Folder. This property can assume one of the following pre-defined values:
    • Original: in this case the pipeline component uses the original file name contained in the corresponding custom SOAP header.
    • MessageId: in this case the pipeline component uses the MessageId contained in the corresponding custom SOAP header, or a Guid generated on the fly if this latter is missing or empty.
  • FolderPath: specifies the path of the target staging folder, when the value of the TargetType property is equal to Folder.
  • MapBufferSize: indicates the size in bytes of the buffer used by the VirtualStream object that will contain the transformed message.
  • MapEnabled: specifies whether custom message transformation is enabled.
  • MapThresholdSize: indicates the threshold size in bytes beyond which the transformed message is persisted to a temporary file to avoid excessive memory consumption.
  • MapType: indicated the fully qualified name (FQDN) of the transformation map to apply to the incoming file.
  • PostPlaceholderMessage: specifies whether an XML placeholder document, containing information on the staged file, is posted to the BizTalkMsgBoxDb.
  • TargetType: specifies the type of target repository. This property can assume one of the following pre-defined values:
    • Folder: in this case the path of the target staging folder is indicated by the FolderPath property.
    • SQLServer: in this case the connection string of the target staging database is indicated by the ConnectionString property.
  • TraceEnabled: specifies whether component tracing is enabled. You can use a tool like DebugView to monitor debug output messages generated by the component.
  • UseAmbientTransaction: specifies whether the pipeline component performs the staging operations using the transactional context used by the Message Agent to post the placeholder document to the BizTalkMsgBoxDb. As widely noted, the transaction in question can be initiated and transmitted to BizTalk by the client application.

On the other hand, the RestoreFileSendPipeline is used on the Send Port to open a stream (FileStream or SqlFileStream) with the goal of reading the original file from the staging repository and returning the open stream to the Adapter. This pipeline is composed only by the RestoreFilePipelineComponent as shown in the picture below:

The RestoreFilePipelineComponent class exposes only 2properties:

  • BufferSize: indicates the size in bytes of the buffer used to read the original message from the staging folder. This information is passed as argument to the constructor of the FileStream object opened by the component. As previously noted, the pipeline component does not read through the content of the original message, it just returns the open stream to the Adapter used by the Send Port.
  • TraceEnabled: specifies whether component tracing is enabled. You can use a tool like DebugView to monitor debug output messages generated by the component.

 

ACK and NACK Messages

As we previously noted, ACK notifications are published by the Messaging Engine when the Send Port successfully transmits a message over the wire and the system property AckRequired is written in the context of the message that was sent out and its value is set to true. Conversely, when the Send Port fails to transmit the message and runs out of retries, the message in question is suspended and the Messaging Engine publishes a NACK control message to the BizTalkMsgBoxDb.  As highlighted in the following picture, both ACK and NACK messages have the following system context properties promoted which can therefore be used in filter expressions for routing:

The table below is an excerpt from the BizTalk Server documentation on MSDN and provides a description of the system properties contained in the context of an ACK or NACK message published by the Messaging Engine into the MessageBox database.

Property

When and where it is promoted

Type

Description

BTS.AckFailureCategory

Promoted by the Messaging Engine before publishing an acknowledgement message into the MessageBox database.

xs:int

Identifies the ErrorCategory, which gives the place and reason for the suspension.

BTS.AckFailureCode

Written by the Messaging Engine before publishing an acknowledgement message into the MessageBox database.

xs:string

Identifies the ErrorCode, which gives the place and reason for the suspension.

BTS.AckID

Promoted by the Messaging Engine before publishing an acknowledgement message into the MessageBox database.

xs:string

Identifies the MessageID of the original message.

BTS.AckInboundTransportLocation

Promoted by the Messaging Engine before publishing an acknowledgement message into the MessageBox database.

xs:string

Identifies the InboundTransportLocation from the original message.

BTS.AckOutboundTransportLocation

Promoted by the Messaging Engine before publishing an acknowledgement message into the MessageBox database.

xs:string

Identifies the OutboundTransportLocation from the original message.

BTS.AckOwnerID

Promoted by the Messaging Engine before publishing an acknowledgement message into the MessageBox database.

xs:string

Identifies the instance ID from original message.

BTS.AckReceivePortID

Promoted by the Messaging Engine before publishing an acknowledgement message into the MessageBox database.

xs:string

Identifies the ReceivePortID from the original message.

BTS.AckReceivePortName

Promoted by the Messaging Engine for the acknowledgement message.

xs:string

Identifies the ReceivePortName from the original message.

BTS.AckSendPortID

Promoted by the Messaging Engine before publishing an acknowledgement message into the MessageBox database.

xs:string

Identifies the SendPortID from the original message.

BTS.AckSendPortName

Promoted by the Messaging Engine before publishing an acknowledgement message into the MessageBox database.

xs:string

Identifies the SendPortName from the original message.

BTS.AckType

Promoted by the Messaging Engine before publishing an acknowledgement message into the MessageBox database.

xs:string

Allows monitoring of acknowledgements and non-acknowledgements by an orchestration. The value will be ACK for an acknowledgment and NACK for a negative acknowledgment.

 

In addition, all of the message context properties of the original message are demoted in the context of the ACK or NACK message. However, if they were previously promoted they will not be promoted on the ACK or NACK message; therefore they cannot be used for message routing. ACK messages do not have any message parts, whereas NACK messages have a message body part that contains a SOAP Fault like the one shown in the XML snippet below.

 

<SOAP:Envelope xmlns:SOAP="http://schemas.xmlsoap.org/soap/envelope/" 
               SOAP:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
  <SOAP:Body>
    <SOAP:Fault>
      <faultcode>Microsoft BizTalk Server Negative Acknowledgment </faultcode>
      <faultstring>
An error occurred while processing the message,
refer to the details section for more information
</faultstring> <faultactor>C:\Projects\LargeMessageTransmission\Test\Out\%SourceFileName%</faultactor> <detail> <ns0:NACK Type="NACK" xmlns:ns0="http://schema.microsoft.com/BizTalk/2003/NACKMessage.xsd"> <NAckID>{C9051EC7-D22F-4AEB-B7F7-801CA902F1A5}</NAckID> <ErrorCode>0xc0c01c10</ErrorCode> <ErrorCategory>0</ErrorCategory> <ErrorDescription> The FILE send adapter cannot open file
C:\Projects\LargeMessageTransmission\Test\Out\CalculatorRequest.xml for writing. Details: The system cannot find the path specified. </ErrorDescription> </ns0:NACK> </detail> </SOAP:Fault> </SOAP:Body> </SOAP:Envelope>

 

In particular, you can note that the reason of the error raised by the Send Adapter is contained in the SOAP Detail section within the ErrorDescription element. The publication of ACK and NACK messages is a little bit special in that if there are no active subscriptions for them, they will be discarded. The ACK/NACK is published atomically with the appropriate message, for example, the suspension of a message and the publication of its NACK are in the same transaction within the engine, similarly the publication of an ACK control message is performed in the same transaction as the deletion of the message from the application queue.

For more information on ACK and NACK messages, you can read the following topics:

 

NotificationHandler Orchestration

In our solution, ACK and NACK notifications are handled by a special orchestration called NotificationHandler. This latter receives control messages through a Direct Bound Logical One-Way Receive Port whose Filter Expression is shown in the following picture.

Note that I could have used any combination of promoted properties on the ACK\NACK messages to narrow down the range of control messages processed by the NotificationHandler orchestration.

The following figure depicts the structure of the orchestration that executes a different action depending on the type of the inbound control message:

The GetPlaceholder Expression Shape invokes the GetBatch method exposed by the MessageHelper class that retrieves the placeholder document from the BatchInformation context property. This document is deserialized into an instance of the Batch class and this latter is used by the orchestration to retrieve the exact location of the original message within the staging repository. The following table shows the code of the GetBatch method:

public Batch GetBatch(XLANGMessage message)
{
    try
    {
        logHelper.WriteLine(...)));
        if (message != null &&
            message.Count > 0)
        {
            string xml = message.GetPropertyValue(typeof(BatchInformation)) as string;
            if (!string.IsNullOrEmpty(xml))
            {
                return new Batch(xml, debug);
            }
        }
    }
    catch (Exception ex)
    {
        ExceptionHelper.HandleException(...);
        logHelper.WriteLine(...);
    }
    finally
    {
        logHelper.WriteLine(...);
        message.Dispose();
    }
    return null;
}

 

At this point, if the current notification is as an ACK message, the orchestration deletes the original file from the staging repository. The following table presents the code used inside the Delete Message Expression Shape to invoke the DeleteMessage method exposed by the MessageHelper class.

logHelper.WriteLine("[NotificationHandler] ACK message received.");
if (batch != null)
{
    messageHelper.DeleteMessage(batch);
}
else
{
    logHelper.WriteLine("[NotificationHandler] The batch message is null.");
}

 

Likewise, the table below shows the code of the DeleteMessage method.

namespace Microsoft.BizTalk.CAT.Samples.LargeMessageTransmission.Helpers
{
    [Serializable]
    public class MessageHelper
    {
        ...
        public void DeleteMessage(Batch document)
        {
            try
            {
                logHelper.WriteLine(...);
                if (document != null)
                {
                    if (document.Target == TargetType.Folder)
                    {
                        if (File.Exists(document.Path))
                        {
                            File.Delete(document.Path);
                            logHelper.WriteLine(...);
                        }
                    }
                    else
                    {
                        using (SqlConnection sqlConnection = new SqlConnection(document.Path))
                        {
                            sqlConnection.Open();

                            using (SqlCommand sqlCommand = new SqlCommand("usp_DeleteDocument", 
sqlConnection)) { SqlParameter sqlParameter = new SqlParameter("@id",
SqlDbType.UniqueIdentifier); sqlParameter.Direction = ParameterDirection.Input; sqlParameter.Value = new Guid(document.Id); sqlCommand.Parameters.Add(sqlParameter); sqlCommand.CommandType = CommandType.StoredProcedure; int affected = sqlCommand.ExecuteNonQuery(); if (affected > 0) { logHelper.WriteLine(...); } } } } } } catch (Exception ex) { ExceptionHelper.HandleException(...); logHelper.WriteLine(...); } finally { logHelper.WriteLine(...); } } ... } }

 

Conversely, if the current notification is as an NACK message, the orchestration invokes a BRE policy to decide how to proceed. Depending on the information contained in the policy, the orchestration invokes the TerminateInstance or ResumeInstance method respectively to terminate  or resume the suspended service instance.

using Microsoft.BizTalk.Operations;
...
namespace Microsoft.BizTalk.CAT.Samples.LargeMessageTransmission.Helpers
{
    [Serializable]
    public class MessageHelper
    {
        ...
        private static BizTalkOperations operations = new BizTalkOperations();
        ...

        public void TerminateInstance(string instanceID, TimeSpan timeout)
        {
            try
            {
                logHelper.WriteLine(...);
                if (!string.IsNullOrEmpty(instanceID))
                {
                    logHelper.WriteLine(...);
                    Guid id = new Guid(instanceID);
                    MessageBoxServiceInstance instance = operations.GetServiceInstance(id);
                    if (instance != null)
                    {
                        operations.TerminateInstance(id);
                        logHelper.WriteLine(...);
                    }
                }
            }
            catch (Exception ex)
            {
                ExceptionHelper.HandleException(...);
                logHelper.WriteLine(...);
            }
            finally
            {
                logHelper.WriteLine(...);
            }
        }

        public void ResumeInstance(string instanceID, TimeSpan timeout)
        {
            try
            {
                logHelper.WriteLine(...);
                if (!string.IsNullOrEmpty(instanceID))
                {
                    logHelper.WriteLine(...);
                    Guid id = new Guid(instanceID);
                    MessageBoxServiceInstance instance = operations.GetServiceInstance(id);
                    if (instance != null &&
                        instance.InstanceStatus == InstanceStatus.Suspended)
                    {
                        operations.ResumeInstance(id);
                        logHelper.WriteLine(...);
                    }
                    // The following code accomplishes the same task 
                    // using the BizTalk WMI Provider instead of a
                    // BizTalkOperations object. 
                    /*
                    logHelper.WriteLine(...);
                    string wql = 
                   string.Format("SELECT * FROM MSBTS_ServiceInstance WHERE ServiceStatus = 4 AND InstanceID = \"{0}\"", 
                                 instanceID);
                    WqlObjectQuery query = new WqlObjectQuery(wql);
                    EnumerationOptions options = new EnumerationOptions();
                    options.Timeout = timeout;
                    using (ManagementObjectSearcher searcher = 
                      new ManagementObjectSearcher(new ManagementScope(Resources.BizTalkManagementPath), 
                                                   query, 
                                                   options))
                    {
                        using (ManagementObjectCollection collection = searcher.Get())
                        {
                            if (collection != null &&
                                collection.Count > 0)
                            {
                                foreach (ManagementObject instance in collection)
                                {
                                    instance.InvokeMethod(Resources.ResumeMethod, null);
                                    logHelper.WriteLine(...);
                                    instance.Dispose();
                                }
                            }
                        }
                    }
                    */
                }
            }
            catch (Exception ex)
            {
                ExceptionHelper.HandleException(...);
                logHelper.WriteLine(...);
            }
            finally
            {
                logHelper.WriteLine(...);
            }
        }
    }
}

 

Note: the MessageHelper class can be eventually turned into a static class, as it’s stateless component. However, the NotificationHandler orchestration has no persistence points so the MessageHelper object will never be serialized and persisted to the MessageBox along with the orchestration state.

Conclusions

This article along with the first part has demonstrated how to accomplish the following tasks:

  • Use the InboundHeaders WCF context property to access the SOAP headers of incoming WCF messages.
  • Use the TransferMode = Streamed.
  • Process a message in a streaming fashion within a pipeline component.
  • Use a client-side initiated transaction to publish a message to the MessageBox.
  • Stage the incoming document to a Folder or SQL Server.
  • Use the Transactional NTFS to store a large message to the file system in a reliable manner.
  • Use the varbinary(max) FILESTREAM datatype to store a large message to table on a SQL database.
  • Hook the distributed transaction used by the Message Agent within a pipeline component.
  • Use a custom stream on the send port to wrap the original stream and execute closing tasks after message transmission.
  • Exploit ACK\NACK messages in a messaging-only, orchestration-less application.

You can grab the source code here. Should you decide to test, use or customize, please drop me an email and let me know your feedbacks!