BPEL engine maintains all async call back messages into database table called dlv_message. You can see such all messages in BPEL console call-back manual recovery area.The query being used by bpel console is joined on dlv_message and work_item tables.This query simply picks up all call back messages which are undelivered and have not been modified with in certain threshold time.
Call-back messages are processed in following steps
- BPEL engine assigns the call-back message to delivery service
- Delivery service saves the message into dlv_message table with state 'UNDELIVERED-0'
- Delivery service schedules a dispatcher thread to process message asynchronously
- Dispatcher thread enqueues message into JMS queue
- Message is picked up by MDB
- MDB delivers the message to actual BPEL process waiting for call-back and changes state to 'HANDLED=2'
So given above steps, there is always possibility that message is available in dlv_message table but MDB is failed in delivering it to BPEL process which keeps message always in state= 0.
Following program can be tailored to suite one's own requirements to recover from such state-0 messages.
Note:- This program contains logic to recover from invocation and call-back messages. Please comment out appropriately.
package bpelrecovery;
import com.oracle.bpel.client.*;
import com.oracle.bpel.client.util.SQLDefs;
import com.oracle.bpel.client.util.WhereCondition;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import javax.naming.Context;
public class bpelrecovery {
public bpelrecovery() {
}
public static void main(String[] args) {
bpelrecovery recover = new bpelrecovery();
String rtc = "";
try{
rtc = recover.doRecover();
}
catch (Exception e){
e.printStackTrace();
}
}
rivate void recoverCallbackMessages(List messages)
throws Exception
{
String messageGuids[] = new String[messages.size()];
for(int i = 0; i < messages.size(); i++)
{
ICallbackMetaData callbackMetadata = (ICallbackMetaData)messages.get(i);
String messageGuid = callbackMetadata.getMessageGUID();
messageGuids[i] = messageGuid;
System.err.println((new StringBuilder()).append("recovering callback message =
").append(messageGuids[i]).append(" process
[").append(callbackMetadata.getProcessId()).append("(").append(callbackMetadata.getRevisionTag()).ap
pend(")] domain [").append(callbackMetadata.getDomainId()).append("]").toString());
}
Locator locator = getLocator();
IBPELDomainHandle domainHandle = locator.lookupDomain();
domainHandle.recoverCallbackMessages(messageGuids);
}
public String doRecover() throws Exception{
// Connect to domain "default"
try{
System.out.println("doRecover() instantiating locator...");
Locator locator = getLocator();
System.out.println("doRecover() instantiated locator for domain " +
locator.lookupDomain().getDomainId());
// look for Invoke messages in need of recovery
StringBuffer buf1 = new StringBuffer();
WhereCondition where = new WhereCondition(buf1.append(SQLDefs.IM_state).append( " = "
).append(IDeliveryConstants.STATE_UNRESOLVED ).toString() );
System.out.println("doRecover() instantiating IInvokeMetaData... with where = "+ where.getClause());
IInvokeMetaData imd1[] = locator.listInvokeMessages(where);
System.out.println("doRecover() instantiated IInvokeMetaData");
// iterate thru the list
List l1 = new ArrayList();
for (Object o:imd1){
l1.add(o);
}
// See how many INVOKES are in the recovery zone
System.out.println("doRecover() instantiated IInvokeMetaData size = " +l1.size());
// look for Callback messages in need of recovery
StringBuffer buf = new StringBuffer();
where = new WhereCondition(buf.append(SQLDefs.DM_state).append( " = "
).append(IDeliveryConstants.TYPE_callback_soap ).toString() );
System.out.println("doRecover() instantiating ICallbackMetaData... with where = "+
where.getClause());
ICallbackMetaData imd[] = locator.listCallbackMessages(where);
System.out.println("doRecover() instantiated ICallbackMetaData");
//
// recover
//
List l = new ArrayList();
for (Object o:imd){
l.add(o);
}
recoverCallbackMessages(l);
}
catch (Exception e){
e.printStackTrace();
}
return "done";
}
public Locator getLocator(){
System.out.println("getLocator() start");
Locator locator = null;
// set JNDI properties for BPEL lookup
String jndiProviderUrl = "opmn:ormi://localhost:6003:oc4j_soa/orabpel";
String jndiFactory = "com.evermind.server.rmi.RMIInitialContextFactory";
String jndiUsername = "oc4jadmin";
String jndiPassword = "welcome1";
Hashtable jndi = new Hashtable();
jndi.put(Context.PROVIDER_URL, jndiProviderUrl);
jndi.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
jndi.put(Context.SECURITY_PRINCIPAL, jndiUsername);
jndi.put(Context.SECURITY_CREDENTIALS, jndiPassword);
jndi.put("dedicated.connection", "true");
try{
System.out.println("getLocator() instantiating locator...");
locator = new Locator("default", "welcome1", jndi);
System.out.println("getLocator() instantiated locator");
}
catch (Exception e){
System.out.println("getLocator() error");
e.printStackTrace();
}
return locator;
}
}