How to call Receive or Send Pipelines inside Orchestration in Microsoft BizTalk Server, Using ExecuteReceivePipeline and ExecuteSendPipeline methods of XLANGPipelineManager class for implementing Composed Message Processing Enterprise Integration patterns

Hi everyone, in this post I will explain how to call Pipelines inside Orchestration

This post is very important to understand the implementation of Composed message processing Enterprise Design Pattern, where we use all the below explained concepts.

There are times, when we need to call custom or inbuilt pipelines inside the orchestration in scenarios such as
1) We need to call an external assembly that returns a string and you need to convert it into XML and create a message from it
2) To perform debatching inside the orchestration.
3) To perform batching of messages (assemble multiple messages).
4) To call a custom pipline like that has the functionality of logging of the messages created inside Orchestration,

BizTalk engine provides the XLANGPipelineManager for the execution of pipelines inside the Orchestration.
This class is present inside the assembly Microsoft.XLANGs.Pipeline

To use this class we need to add a reference to Microsoft.XLANGs.Pipeline.dll in the Project where we are calling the Send/Receive Pipeline.



ILDASM view of Microsoft.XLANGs.Pipeline component



The XLANGPipelineManager class contains two static many methods, they are -
1) ExecuteReceivePipeline
2) ExecuteSendPipeline

1) ExecuteReceivePipeline() methodThis method is used to execute the Receive pipeline.
 This method takes two parameters and returns an object.

Syntax:
public static ReceivePipelineOutputMessages ExecuteReceivePipeline(Type  receivePipelineType, XLANGMessage  inMsg)

This method consumes a single interchange and returns a collection of zero or more messages (contained in an instance of the ReceivePipelineOutputMessages class)

i) 1st Parameter is a System.Type, which is the type of the pipeline to execute.
ii) 2nd parameter is a XLANGMessage, which is the message to use as the input for the pipeline.

The Output Parameter, is an instance (object) of type ReceivePipelineOutputMessages class and contains a collection of zero or more messages
The output is not serializable, and therefore the call to ExecuteReceivePipeline() method needs to be done inside an atomic scope with a variable for the output declared locally (variable declaration should be present insdie the Atomic scope).

Note - We get the following error if we declare the variable of type ReceivePipelineOutputMessages class outside the atomic shape
“A variable of type ReceivePipelineOutputMessages can be declared only within an atomic scope in an orchestration.”
This is because variables of this type are not serializable and thus would not survive persistence of the orchestration, and orchestrations are never persisted while executing within an atomic scope. This means that a receive pipeline can be executed only within an atomic scope.

ReceivePipelineOutputMessages class - The output of the ExecuteReceivePipeline method is an intance of this class. 
This class inherits from IEnumerator and has a couple of methods

a) MoveNext() - is the method that will move the IEnumerator internal cursor to the next element in the enumeration.
b) GetCurrent() - is the method to be used to get the current message from the enumeration.

Note - Call to the above two methods (MoveNext/GetCurrent) must be done within a Message Assignment shape bcoz we are constructing a new XLANG Message.

When calling PassThruReceive pipeline or custom pipeline component from within an orchestration, you must declare the variable type for incoming message as System.Xml.XmlDocument despite of the incoming message type is XML or not. Therefore, you may encounter exception if you try to operate on it if the incoming message is a non-XML message such as a flat file format message. This is because of that orchestration engine intends to use System.Xml.XmlDocument for any type of incoming message

Variable of type ReceivePipelineOutputMessages can be assigned as shown below.


Expression shape code

pipeOut = Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteReceivePipeline(
typeof(TestArchivePipeline.Pipelines.ReceivePipeline), InputMsg);

here,
pipeOut – Variable of type ReceivePipelineOutputMessages class
TestArchivePipeline.Pipelines.ReceivePipeline – Name of the Pipeline (Fully Qualified Name of the Pipeline)
InputMsg – Message passed as input to the pipeline
Message Assignment shape code
(Pipeline Output Message Processing happens in the Messsage Assignment shape)
pipeOut.MoveNext();

msgNew = null;

pipeOut.GetCurrent(msgNew);

The above code assumes that only one output is created. If the receive pipeline debatches msgInput into several message, you can use a Loop shape that haspipeOut.MoveNext() as the condition for the loop and inside the loop extract each message and do what you need to do with it.

Exception Handling – We need to handle the exceptions while calling the pipelines, This is done using the Exception hanlder to the scope which is calling the pipeline.
The exception type should be of Microsoft.XLANGs.Pipeline.XLANGPipelineManagerException
The class XLANGPipelineManagerException is present in the namespace Microsoft.XLANGs.Pipeline

Note – Atomic scope do not have Exception Hanlder as they can have only Compensation Block, Hence set the Transaction Type of the expression shape in which the Pipeline call is made to None.


2) ExecuteSendPipeline() methodThis method is used to execute the Send pipeline.
This method takes three parameters and does not return anything.

Syntax:
public static void  ExecuteSendPipeline(Type sendPipelineType, SendPipelineInputMessages inMessages,  XLANGMessage outXLANGMsg )

i) 1 st parameter is a System.Type, which is the type of the pipeline to execute
ii) 2 nd parameter is a SendPipelineInputMessages, which is an enumeration of all the messages to send to the Assemble method one at the time. (Collection of one or more messages)
iii) 3 rd parameter is an XLANGMessage, which is the message that is returned by the send pipeline
The method does not have any output but It will change the third parameter to contain the result after the execution of send pipeline

Message Assignment shape code
SendPipelineInputMsgs = new Microsoft.XLANGs.Pipeline.SendPipelineInputMessages();

SendPipelineInputMsgs.Add(msg);

Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteSendPipeline(
typeof(FineFoods.Orders.Processes.SendPipeline), inputMsgs, msgOrderShipped);
here,
SendPipelineInputMsgs – Is an instance of the SendPipelineInputMessages class
Msg -

SendPipelineInputMessages class - Represents the send pipeline input messages.
it has a couple of methods and the most relevant is –

a) Add() - Adds the specified XLANG message. Its an instance method
Add takes a parameter of type - Microsoft.XLANGs.BaseType.XLANGMessage which is the base class for any message.

Syntax:
public void Add(
                XLANGMessage xmsg
)


Note - A call to a receive pipeline would typically be done in an Expression shape within the orchestration.
This is because we are creating an object of type ReceivePipelineOutputMessages and not of type XLANGMessage.

Note - A call to a send pipeline must be done in a Message Assignment shape within the orchestration 
This is because the output of the ExecuteSendPipeline method is an XLangMessage


Links - 
 

1 comment:

Anonymous said...

Good and Detailed article post.
Was very useful to understand the concepts.