TransactionComplete EventHandler

Sep 17, 2012 at 2:44 PM

We have a unique problem in that we are using NHibernate as the data persistence layer using the Shuttle ESB to deliver messages.

We implement the Shuttle.ESB.Core.IMessageHandler. In the ProcessMessage method we do some gets and inserts into the database between an

ISession.BeginTransaction 

....

transaction.Commit()

However, because all the messages are getting handled by one Thread, the NH Session never gets closed, so the transactions take longer and longer over time. I have profiled this and indeed, the first message to be handled takes approx 15ms, and linearly new messages take longer over time eventually going over a minute a message.

I tried to call ISession.Close() after the transaction.Commit() but I suspect that it messes with the transaction management internal to the Shuttle ESB, as the message then never leaves the journal queue.

It would be helpful to extend the IMessageHandler with another method that gets called after the transaction has been completed with possibly a parameter for Transaction Success/Failure.

Eg: public void TransactionComplete(bool success);

I could then implement the required session.Close() outside the scope of the transaction.

Thanks

Stefan

 

Coordinator
Sep 17, 2012 at 3:24 PM
Edited Sep 17, 2012 at 3:28 PM

Hi Stefan,

 

I am no expert on NHibernate but I have run into it using Shuttle previously.  If memory serves it has something to do with when the session is created and destroyed.  I wrote a Shuttle module to handle it (may be missing some namespaces but hopefully this gets you going):

 

using System;
using NHibernate;
using Shuttle.ESB.Core;
using Shuttle.Core.Infrastructure; // NHibernate Session Scope public class SessionScope : IDisposable { private bool complete; private readonly Guid id = Guid.NewGuid(); private readonly ISession session; private readonly ITransaction transaction; public SessionScope() { session = NHSessionManager.Instance.GetSession(); transaction = session.BeginTransaction(); } public void Complete() { complete = true; transaction.Commit(); } public void Dispose() { if (!session.IsOpen) { return; } if (!complete && transaction.IsActive && !transaction.WasRolledBack && !transaction.WasCommitted) { transaction.Rollback(); } transaction.Dispose(); NHSessionManager.Instance.CloseSession(); session.Dispose(); } } // Factory public interface ISessionScopeFactory { SessionScope Create(); } public class SessionScopeFactory : ISessionScopeFactory { public SessionScope Create() { return new SessionScope(); } } // Pipeline Events public class OnStartSessionScope : PipelineEvent { } public class OnCompleteSessionScope : PipelineEvent { } public class OnDisposeSessionScope : PipelineEvent { } // Pipeline Event Extensions public static class PipelineEventExtensions { public static SessionScope GetSessionScope(this PipelineEvent pipelineEvent) { return pipelineEvent.Pipeline.State.Get<SessionScope>("SessionScope"); } public static void SetSessionScope(this PipelineEvent pipelineEvent, SessionScope scope) { pipelineEvent.Pipeline.State.Replace("SessionScope", scope); } } // Shuttle Module public class SessionScopeModule : IModule { private readonly string inboxMessagePipelineTypeName = typeof(InboxMessagePipeline).FullName; public void Initialize(IServiceBus bus) { bus.Events.PipelineCreated += PipelineCreated; } private void PipelineCreated(object sender, PipelineEventArgs e) { var pipelineTypeName = e.Pipeline.GetType().FullName; if (!pipelineTypeName.Equals(inboxMessagePipelineTypeName, StringComparison.InvariantCultureIgnoreCase)) { return; } var stage = e.Pipeline.GetStage("Handle"); stage.AfterEvent<OnStartTransactionScope>().Register<OnStartSessionScope>(); stage.BeforeEvent<OnCompleteTransactionScope>().Register<OnCompleteSessionScope>(); stage.AfterEvent<OnDisposeTransactionScope>().Register<OnDisposeSessionScope>(); e.Pipeline.RegisterObserver(new SessionScopeObserver(new SessionScopeFactory())); } } // Observer public class SessionScopeObserver : IPipelineObserver<OnStartSessionScope>, IPipelineObserver<OnCompleteSessionScope>, IPipelineObserver<OnDisposeSessionScope> { private readonly ISessionScopeFactory sessionScopeFactory; public SessionScopeObserver(ISessionScopeFactory sessionScopeFactory) { this.sessionScopeFactory = sessionScopeFactory; } public void Execute(OnCompleteSessionScope pipelineEvent) { var scope = pipelineEvent.GetSessionScope(); if (scope == null) { return; } if (pipelineEvent.Pipeline.Exception == null || pipelineEvent.GetTransactionComplete()) { scope.Complete(); } } public void Execute(OnDisposeSessionScope pipelineEvent) { var scope = pipelineEvent.GetSessionScope(); if (scope == null) { return; } scope.Dispose(); pipelineEvent.SetSessionScope(null); } public void Execute(OnStartSessionScope pipelineEvent) { var scope = pipelineEvent.GetSessionScope(); if (scope != null) { throw new ApplicationException( (string.Format("Call to '{0}.{1}' already contains an active TransactionScope.", GetType().FullName, MethodBase.GetCurrentMethod().Name))); } pipelineEvent.SetSessionScope(sessionScopeFactory.Create()); } } // And to use the session scope in an endpoint just add the module to the service bus, e.g. bus = ServiceBus .With() .DefaultMessageSerializer() .MessageHandlerFactory(new CastleMessageHandlerFactory(container)) .DefaultMessageRouteProvider() .DefaultForwardingRouteProvider() .DefaultPipelineFactory() .DefaultTransactionScopeFactory() .SubscriptionManager(SubscriptionManager.Default()) .AddModule(new ActiveTimeRangeModule()) .AddModule(new SessionScopeModule()) // <-- HERE IS OUR NEW MODULE .Start();

 

I hope it makes sense.  Using the module architecture you can add quite a bit of fancy footwork but it takes some getting used to.

Read more about the pipeline processing.

Sep 17, 2012 at 5:56 PM

Thanks Eben,

Not too sure how to implement yet, but will look at it when I’m a little less tired J

Thanks again

Stefan

From: jabberwocky [email removed]
Sent: 17 September 2012 04:25 PM
To: Stefan Jaworski
Subject: Re: TransactionComplete EventHandler [shuttle:395804]

From: jabberwocky

Hi Stefan,

I am no expert on NHibernate but I have run into it using Shuttle previously. If memory serves it has something to do with when the session is created and destroyed. I wrote a Shuttle module to handle it:

using System;
using NHibernate;
using Shuttle.Core.Infrastructure;
 
// NHibernate Session Scope
public class SessionScope : IDisposable
{
       private bool complete;
       private readonly Guid id = Guid.NewGuid();
       private readonly ISession session;
       private readonly ITransaction transaction;
 
       public SessionScope()
       {
              session = NHSessionManager.Instance.GetSession();
              transaction = session.BeginTransaction();
       }
 
       public void Complete()
       {
              complete = true;
 
              transaction.Commit();
       }
 
       public void Dispose()
       {
              if (!session.IsOpen)
              {
                    return;
              }
 
              if (!complete && transaction.IsActive && !transaction.WasRolledBack && !transaction.WasCommitted)
              {
                    transaction.Rollback();
              }
 
              transaction.Dispose();
 
              NHSessionManager.Instance.CloseSession();
              session.Dispose();
       }
}
 
// Factory
public interface ISessionScopeFactory
{
       SessionScope Create();
}
 
public class SessionScopeFactory : ISessionScopeFactory
{
       public SessionScope Create()
       {
              return new SessionScope();
       }
}
 
// Pipeline Events
public class OnStartSessionScope : PipelineEvent
{
}
 
public class OnCompleteSessionScope : PipelineEvent
{
}
 
public class OnDisposeSessionScope : PipelineEvent
{
}
 
// Pipeline Event Extensions
public static class PipelineEventExtensions
{
       public static SessionScope GetSessionScope(this PipelineEvent pipelineEvent)
       {
              return pipelineEvent.Pipeline.State.Get<SessionScope>("SessionScope");
       }
 
       public static void SetSessionScope(this PipelineEvent pipelineEvent, SessionScope scope)
       {
              pipelineEvent.Pipeline.State.Replace("SessionScope", scope);
       }
}
 
// Shuttle Module
public class SessionScopeModule : IModule
{
       private readonly string inboxMessagePipelineTypeName = typeof(InboxMessagePipeline).FullName;
 
       public void Initialize(IServiceBus bus)
       {
              bus.Events.PipelineCreated += PipelineCreated;
       }
 
       private void PipelineCreated(object sender, PipelineEventArgs e)
       {
              var pipelineTypeName = e.Pipeline.GetType().FullName;
 
              if (!pipelineTypeName.Equals(inboxMessagePipelineTypeName, StringComparison.InvariantCultureIgnoreCase))
              {
                    return;
              }
 
              var stage = e.Pipeline.GetStage("Handle");
 
              stage.AfterEvent<OnStartTransactionScope>().Register<OnStartSessionScope>();
              stage.BeforeEvent<OnCompleteTransactionScope>().Register<OnCompleteSessionScope>();
              stage.AfterEvent<OnDisposeTransactionScope>().Register<OnDisposeSessionScope>(); 
 
              e.Pipeline.RegisterObserver(new SessionScopeObserver(new SessionScopeFactory()));
       }
}
 
// Observer
public class SessionScopeObserver :
       IPipelineObserver<OnStartSessionScope>,
       IPipelineObserver<OnCompleteSessionScope>,
       IPipelineObserver<OnDisposeSessionScope>
{
       private readonly ISessionScopeFactory sessionScopeFactory;
 
       public SessionScopeObserver(ISessionScopeFactory sessionScopeFactory)
       {
              this.sessionScopeFactory = sessionScopeFactory;
       }
 
       public void Execute(OnCompleteSessionScope pipelineEvent)
       {
              var scope = pipelineEvent.GetSessionScope();
 
              if (scope == null)
              {
                    return;
              }
 
              if (pipelineEvent.Pipeline.Exception == null || pipelineEvent.GetTransactionComplete())
              {
                    scope.Complete();
              }
       }
 
       public void Execute(OnDisposeSessionScope pipelineEvent)
       {
              var scope = pipelineEvent.GetSessionScope();
 
              if (scope == null)
              {
                    return;
              }
 
              scope.Dispose();
 
              pipelineEvent.SetSessionScope(null);
       }
 
       public void Execute(OnStartSessionScope pipelineEvent)
       {
              var scope = pipelineEvent.GetSessionScope();
 
              if (scope != null)
              {
                    throw new ApplicationException(
                           (string.Format("Call to '{0}.{1}' already contains an active TransactionScope.", GetType().FullName, MethodBase.GetCurrentMethod().Name)));
              }
 
              pipelineEvent.SetSessionScope(sessionScopeFactory.Create());
       }
}
 
// And to use the session scope in an endpoint just add the module to the service bus, e.g.
                    bus = ServiceBus
                           .With()
                           .DefaultMessageSerializer()
                           .MessageHandlerFactory(new CastleMessageHandlerFactory(container))
                           .DefaultMessageRouteProvider()
                           .DefaultForwardingRouteProvider()
                           .DefaultPipelineFactory()
                           .DefaultTransactionScopeFactory()
                           .SubscriptionManager(SubscriptionManager.Default())
                           .AddModule(new ActiveTimeRangeModule())
                           .AddModule(new SessionScopeModule())     // <-- HERE IS OUR NEW MODULE
                           .Start();
 

I hope it makes sense. Using the module architecture you can add quite a bit of fancy footwork but it takes some getting used to.

Read more about the pipeline processing.



Stefan Jaworski
Principal Consultant


Mobile: +27 84 273 2221
Tel: +27 11 658 8500
Fax: +27 11 658 1415

www.ovationsgroup.com

Facebook | Twitter | Blog | LinkedIn

Confidentiality note and disclaimer
Time to evaluate your email security provider? Watch the video and take advantage of Mimecast's first ever limited promotion.