Hi everyone, in this post I will explain how to call Pipelines inside
Orchestration
Syntax:
Links -
This post is very important to understand the implementation of
Composed message processing Enterprise Design Pattern, where we use all the
below explained concepts.
There are times, when we need to call custom or inbuilt pipelines inside the
orchestration in scenarios such as
1) We need to call an external assembly that returns a string and you
need to convert it into XML and create a message from it
2) To perform debatching inside the orchestration.
3) To perform batching of messages (assemble multiple
messages).
4) To call a custom pipline like that has the functionality of logging
of the messages created inside Orchestration,
BizTalk engine provides the XLANGPipelineManager for the execution of pipelines inside the
Orchestration.
This class is present inside the assembly Microsoft.XLANGs.Pipeline
This class is present inside the assembly Microsoft.XLANGs.Pipeline
To use this class we need to add a reference to Microsoft.XLANGs.Pipeline.dll in the Project
where we are calling the Send/Receive Pipeline.
ILDASM view of
Microsoft.XLANGs.Pipeline component
|
The XLANGPipelineManager class contains two static
many methods, they are -
1) ExecuteReceivePipeline
2) ExecuteSendPipeline
1) ExecuteReceivePipeline()
method – This method is used to execute the Receive pipeline.
This method takes two parameters and returns an object.
This method takes two parameters and returns an object.
Syntax:
public static ReceivePipelineOutputMessages ExecuteReceivePipeline(Type
receivePipelineType, XLANGMessage
inMsg)
This method consumes a single interchange and returns
a collection of zero or more messages (contained in an instance of the ReceivePipelineOutputMessages
class)
i) 1st Parameter is a System.Type,
which is the type of the pipeline to execute.
ii) 2nd parameter is a XLANGMessage,
which is the message to use as the input for the pipeline.
The Output Parameter, is an instance (object) of type ReceivePipelineOutputMessages class and
contains a collection of zero or more messages
The output is not serializable, and therefore the call to ExecuteReceivePipeline() method needs to be done inside an atomic scope with a variable for the
output declared locally (variable declaration should be present insdie the
Atomic scope).
Note - We get the following error if we declare the variable of type ReceivePipelineOutputMessages
class outside the atomic shape
“A variable of type ReceivePipelineOutputMessages can be declared only within an
atomic scope in an orchestration.”
This is because variables of this type are not
serializable and thus would not survive persistence of the orchestration, and
orchestrations are never persisted while executing within an atomic scope. This
means that a receive pipeline can be executed only within an atomic scope.
ReceivePipelineOutputMessages
class - The output of the ExecuteReceivePipeline method is an intance of this class.
This class inherits from IEnumerator and has a couple of methods
This class inherits from IEnumerator and has a couple of methods
a) MoveNext() - is the method
that will move the IEnumerator internal cursor to the next element in the enumeration.
b) GetCurrent() - is the
method to be used to get the current message from the enumeration.
Note - Call to the above
two methods (MoveNext/GetCurrent) must be done within a Message Assignment
shape bcoz we are constructing a new XLANG Message.
When calling PassThruReceive pipeline or custom pipeline component from
within an orchestration, you must declare the variable type for incoming message as System.Xml.XmlDocument
despite of the incoming message type is XML or not. Therefore, you may
encounter exception if you try to operate on it if the incoming message is a
non-XML message such as a flat file format message. This is because of that
orchestration engine intends to use System.Xml.XmlDocument for any type of
incoming message
Variable of type ReceivePipelineOutputMessages can be assigned as
shown below.
|
Expression shape code
|
pipeOut =
Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteReceivePipeline(
typeof(TestArchivePipeline.Pipelines.ReceivePipeline),
InputMsg);
|
here,
pipeOut – Variable of type ReceivePipelineOutputMessages
class
TestArchivePipeline.Pipelines.ReceivePipeline – Name of the Pipeline (Fully Qualified
Name of the Pipeline)
InputMsg – Message passed as input to the pipeline
|
Message Assignment shape code
(Pipeline Output Message
Processing happens in the Messsage Assignment shape)
|
pipeOut.MoveNext();
msgNew = null;
pipeOut.GetCurrent(msgNew);
|
The above code assumes that only one output is created. If the receive
pipeline debatches msgInput into several message, you can use a Loop shape that
haspipeOut.MoveNext() as the condition for the loop and inside the loop extract
each message and do what you need to do with it.
Exception Handling – We need to handle the exceptions while calling the pipelines, This is
done using the Exception hanlder to the scope which is calling the pipeline.
The exception type should be of
Microsoft.XLANGs.Pipeline.XLANGPipelineManagerException
The class XLANGPipelineManagerException is present in the namespace
Microsoft.XLANGs.Pipeline
Note – Atomic scope do not have Exception Hanlder as they can have only
Compensation Block, Hence set the Transaction Type of the expression shape in
which the Pipeline call is made to None.
2) ExecuteSendPipeline()
method – This method is used to execute the Send pipeline.
This method takes three parameters and does not return anything.
This method takes three parameters and does not return anything.
Syntax:
public static void ExecuteSendPipeline(Type sendPipelineType,
SendPipelineInputMessages inMessages,
XLANGMessage outXLANGMsg )
i) 1 st parameter is a System.Type, which is the type of the
pipeline to execute
ii) 2 nd parameter is a SendPipelineInputMessages, which is an
enumeration of all the messages to send to the Assemble method one at the time.
(Collection of one or more messages)
iii) 3 rd parameter is an
XLANGMessage, which is the message that is returned by the send pipeline
The method does not have any output but It will change the third
parameter to contain the result after the execution of send pipeline
Message Assignment shape code
|
SendPipelineInputMsgs =
new Microsoft.XLANGs.Pipeline.SendPipelineInputMessages();
SendPipelineInputMsgs.Add(msg);
Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteSendPipeline(
typeof(FineFoods.Orders.Processes.SendPipeline),
inputMsgs, msgOrderShipped);
|
here,
SendPipelineInputMsgs – Is an instance of the SendPipelineInputMessages class
Msg -
|
SendPipelineInputMessages
class - Represents the send pipeline input messages.
it has a couple of methods and the most relevant is –
a) Add() - Adds the specified XLANG message. Its an instance
method
Add takes a parameter of type - Microsoft.XLANGs.BaseType.XLANGMessage
which is the base class for any message.
Syntax:
public void Add(
XLANGMessage xmsg
)
Note - A call to a
receive pipeline would typically be done in an Expression shape within the
orchestration.
This is because we are
creating an object of type ReceivePipelineOutputMessages and not of
type XLANGMessage.
Note - A call to a send pipeline must be done in a Message Assignment shape within the orchestration
This is because the output of the ExecuteSendPipeline method is an XLangMessage
Note - A call to a send pipeline must be done in a Message Assignment shape within the orchestration
This is because the output of the ExecuteSendPipeline method is an XLangMessage
Links -
1 comment:
Good and Detailed article post.
Was very useful to understand the concepts.
Post a Comment