Powered By Blogger

Tuesday, August 3, 2010

Automatic recovery program for pending BPEL call back messages

BPEL engine maintains all async call back messages into database table called dlv_message. You can see such all messages in BPEL console call-back manual recovery area.The query being used by bpel console is joined on dlv_message and work_item tables.This query simply picks up all call back messages which are undelivered and have not been modified with in certain threshold time.

Call-back messages are processed in following steps
  • BPEL engine assigns the call-back message to delivery service
  • Delivery service saves the message into dlv_message table with state 'UNDELIVERED-0'
  • Delivery service schedules a dispatcher thread to process message asynchronously
  • Dispatcher thread enqueues message into JMS queue
  • Message is picked up by MDB
  • MDB delivers the message to actual BPEL process  waiting for call-back and changes state to 'HANDLED=2'
So given above steps, there is always possibility that message is available in dlv_message table but MDB is failed in delivering it to BPEL process which keeps message always in state= 0.

Following program can be tailored to suite one's own requirements to recover from such state-0 messages.

Note:- This program contains logic to recover from invocation and call-back messages. Please comment out appropriately.

package bpelrecovery;
import com.oracle.bpel.client.*;
import com.oracle.bpel.client.util.SQLDefs;
import com.oracle.bpel.client.util.WhereCondition;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import javax.naming.Context;

public class bpelrecovery {

public bpelrecovery() {

}

public static void main(String[] args) {

bpelrecovery recover = new bpelrecovery();
String rtc = "";
try{
rtc = recover.doRecover();
}
catch (Exception e){
e.printStackTrace();
}
}
rivate void recoverCallbackMessages(List messages)
throws Exception
{

String messageGuids[] = new String[messages.size()];
for(int i = 0; i < messages.size(); i++)
{
ICallbackMetaData callbackMetadata = (ICallbackMetaData)messages.get(i);
String messageGuid = callbackMetadata.getMessageGUID();

messageGuids[i] = messageGuid;
System.err.println((new StringBuilder()).append("recovering callback message =
").append(messageGuids[i]).append(" process
[").append(callbackMetadata.getProcessId()).append("(").append(callbackMetadata.getRevisionTag()).ap
pend(")] domain [").append(callbackMetadata.getDomainId()).append("]").toString());

}

Locator locator = getLocator();
IBPELDomainHandle domainHandle = locator.lookupDomain();
domainHandle.recoverCallbackMessages(messageGuids);

}

public String doRecover() throws Exception{
// Connect to domain "default"

try{

System.out.println("doRecover() instantiating locator...");

Locator locator = getLocator();

System.out.println("doRecover() instantiated locator for domain " +

locator.lookupDomain().getDomainId());

// look for Invoke messages in need of recovery

StringBuffer buf1 = new StringBuffer();

WhereCondition where = new WhereCondition(buf1.append(SQLDefs.IM_state).append( " = "

).append(IDeliveryConstants.STATE_UNRESOLVED ).toString() );

System.out.println("doRecover() instantiating IInvokeMetaData... with where = "+ where.getClause());

IInvokeMetaData imd1[] = locator.listInvokeMessages(where);

System.out.println("doRecover() instantiated IInvokeMetaData");

// iterate thru the list
List l1 = new ArrayList();

for (Object o:imd1){

l1.add(o);

}

// See how many INVOKES are in the recovery zone

System.out.println("doRecover() instantiated IInvokeMetaData size = " +l1.size());

// look for Callback messages in need of recovery

StringBuffer buf = new StringBuffer();

where = new WhereCondition(buf.append(SQLDefs.DM_state).append( " = "

).append(IDeliveryConstants.TYPE_callback_soap ).toString() );

System.out.println("doRecover() instantiating ICallbackMetaData... with where = "+

where.getClause());

ICallbackMetaData imd[] = locator.listCallbackMessages(where);

System.out.println("doRecover() instantiated ICallbackMetaData");
//

// recover

//

List l = new ArrayList();

for (Object o:imd){

l.add(o);

}

 recoverCallbackMessages(l);

}

catch (Exception e){

e.printStackTrace();

}
return "done";

}

public Locator getLocator(){

System.out.println("getLocator() start");

Locator locator = null;
// set JNDI properties for BPEL lookup

String jndiProviderUrl = "opmn:ormi://localhost:6003:oc4j_soa/orabpel";

String jndiFactory = "com.evermind.server.rmi.RMIInitialContextFactory";

String jndiUsername = "oc4jadmin";

String jndiPassword = "welcome1";
Hashtable jndi = new Hashtable();

jndi.put(Context.PROVIDER_URL, jndiProviderUrl);

jndi.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory);

jndi.put(Context.SECURITY_PRINCIPAL, jndiUsername);

jndi.put(Context.SECURITY_CREDENTIALS, jndiPassword);

jndi.put("dedicated.connection", "true");

try{

System.out.println("getLocator() instantiating locator...");

locator = new Locator("default", "welcome1", jndi);

System.out.println("getLocator() instantiated locator");

}

catch (Exception e){

System.out.println("getLocator() error");

e.printStackTrace();

}

return locator;

}
}

7 comments:

  1. Hi How can i see audit for the instances stuck in the recovery?

    ReplyDelete
  2. hi,

    How to callback pending instance.

    Example: Process A is calling Process B, both are Async processes only. While processing Process B instance due some error it got failed status is FAULTED, can not recover, Process A instance is waiting for response of B. A expecting Success/Failure response from B.

    In this case I want to send/correlate process A instance with Failure input. Is it possible ?
    Please explain me how to bypass these kind of problems?

    Thank you


    Regards
    Bhargavi Ch

    ReplyDelete
  3. Hi Bhargavi,

    You can achieve this using interactions tab of process A.

    Click on A instance then click on Interactions tab link, now you will see all partner invocations(completed and waiting for response) from process-A.
    Click on receive link waiting for response from process B and provide the response that A is expecting.
    Click on Post XML Message button

    Note: If it is simple data type value you can provide value directly otherwise you have to provide appropriate XML response wrapped up in SOAP.


    Thanks,
    Praveen

    ReplyDelete
  4. Hi,
    In order to customizing the workflow process using Oracle SOA11g (bpel ) we need an option of manual recovery in fault policy for manual intervention of any fault messages.

    We have created AsynchMasterProcess and AsynchDetailProcess (having human task) both are Asynchronous processes and for fault handling we have configured fault-binding.xml and fault-policies.xml files to catch remotefault and an option of human-intervention job for Retry action in fault policy. While any exceptions thrown by AsynchDetailProcess are caught in AsynchMasterProcess using call back operation and remote fault is not getting propagated which is defined in fault-policy/binding files and process is getting faulted (manual recovery job is not created).

    Generally how we can handle custom Faults in asynchronous calls to BPEL processes ?

    Thank you
    Ravigovindk

    ReplyDelete
  5. Hi Champs,

    Kindly, let me know how we can set a count for auto recovery of say 30 or 40 or 50 instances which are in running state in SOA BPEL


    Say, for example i dont want to put pressure on my server and dont want to recover all the running instances at a shot, and give a definite number.

    Let me know how to achieve it.This is urgent



    Arya. India. Software Engineer.




    ReplyDelete
    Replies
    1. Try setting maxMessageRaiseSize under soa-infra -> SOA Administration -> BPEL Properties -> More BPEL Configuration Properties -> RecoveryConfig

      Delete