Skip to content

Commit

Permalink
JBPM-5228 - Allow 'fire and forget' feature in runtime Kie Server Rem…
Browse files Browse the repository at this point in the history
…ote APIs (kiegroup#554)
  • Loading branch information
mswiderski authored Aug 5, 2016
1 parent 1af2482 commit e2f529a
Show file tree
Hide file tree
Showing 30 changed files with 1,248 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,16 @@ public class JMSConstants {
public static final String TARGET_CAPABILITY_PROPERTY_NAME = "kie_target_capability";
public static final String USER_PROPERTY_NAME = "kie_user";
public static final String PASSWRD_PROPERTY_NAME = "kie_password";

public static final String INTERACTION_PATTERN_PROPERTY_NAME = "kie_interaction_pattern";

// from 1 to 99 means response should be sent from the server
public static final int UPPER_LIMIT_REPLY_INTERACTION_PATTERNS = 100;

public static final int REQUEST_REPLY_PATTERN = 1;
public static final int ASYNC_REPLY_PATTERN = 2;

// from 100 up means server should not send any response
public static final int FIRE_AND_FORGET_PATTERN = 101;

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
@XmlAccessorType(XmlAccessType.NONE)
public class ServiceResponse<T> {
public static enum ResponseType {
SUCCESS, FAILURE;
SUCCESS, FAILURE, NO_RESPONSE;
}

@XmlAttribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;

import org.kie.server.api.model.instance.DocumentInstance;
import org.kie.server.client.jms.ResponseHandler;

public interface DocumentServicesClient {

Expand All @@ -32,4 +33,6 @@ public interface DocumentServicesClient {
void deleteDocument(String identifier);

List<DocumentInstance> listDocuments(Integer page, Integer pageSize);

void setResponseHandler(ResponseHandler responseHandler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.kie.server.api.model.instance.JobRequestInstance;
import org.kie.server.api.model.instance.RequestInfoInstance;
import org.kie.server.api.model.instance.RequestInfoInstanceList;
import org.kie.server.client.jms.ResponseHandler;

public interface JobServicesClient {

Expand All @@ -38,4 +39,6 @@ public interface JobServicesClient {
List<RequestInfoInstance> getRequestsByCommand(String command, Integer page, Integer pageSize);

RequestInfoInstance getRequestById(Long requestId, boolean withErrors, boolean withData);

void setResponseHandler(ResponseHandler responseHandler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.kie.server.api.model.ReleaseId;
import org.kie.server.api.model.ServiceResponse;
import org.kie.server.api.model.ServiceResponsesList;
import org.kie.server.client.jms.ResponseHandler;

public interface KieServicesClient {

Expand Down Expand Up @@ -85,4 +86,6 @@ public interface KieServicesClient {
String getConversationId();

void completeConversation();

void setResponseHandler(ResponseHandler responseHandler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.kie.server.api.marshalling.MarshallingFormat;
import org.kie.server.client.balancer.LoadBalancer;
import org.kie.server.client.jms.ResponseHandler;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
Expand Down Expand Up @@ -98,4 +99,12 @@ public static enum Transport {

LoadBalancer getLoadBalancer();

void setResponseHandler(ResponseHandler responseHandler);

ResponseHandler getResponseHandler();

void setJmsTransactional(boolean transacted);

boolean isJmsTransactional();

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.kie.server.api.model.definition.VariablesDefinition;
import org.kie.server.api.model.instance.ProcessInstance;
import org.kie.server.api.model.instance.WorkItemInstance;
import org.kie.server.client.jms.ResponseHandler;

public interface ProcessServicesClient {

Expand Down Expand Up @@ -91,4 +92,6 @@ public interface ProcessServicesClient {
WorkItemInstance getWorkItem(String containerId, Long processInstanceId, Long id);

List<WorkItemInstance> getWorkItemByProcessInstance(String containerId, Long processInstanceId);

void setResponseHandler(ResponseHandler responseHandler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.kie.server.api.model.instance.NodeInstance;
import org.kie.server.api.model.instance.ProcessInstance;
import org.kie.server.api.model.instance.VariableInstance;
import org.kie.server.client.jms.ResponseHandler;

public interface QueryServicesClient {

Expand Down Expand Up @@ -130,4 +131,6 @@ public interface QueryServicesClient {
<T> List<T> query(String queryName, String mapper, QueryFilterSpec filterSpec, Integer page, Integer pageSize, Class<T> resultType);

<T> List<T> query(String queryName, String mapper, String builder, Map<String, Object> parameters, Integer page, Integer pageSize, Class<T> resultType);

void setResponseHandler(ResponseHandler responseHandler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.kie.api.command.Command;
import org.kie.api.runtime.ExecutionResults;
import org.kie.server.api.model.ServiceResponse;
import org.kie.server.client.jms.ResponseHandler;

public interface RuleServicesClient {

Expand All @@ -36,5 +37,7 @@ public interface RuleServicesClient {
ServiceResponse<ExecutionResults> executeCommandsWithResults(String id, String payload);

ServiceResponse<ExecutionResults> executeCommandsWithResults(String id, Command<?> cmd);

void setResponseHandler(ResponseHandler responseHandler);
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.kie.server.api.model.ServiceResponse;
import org.kie.server.api.model.instance.SolverInstance;
import org.kie.server.api.model.instance.SolverInstanceList;
import org.optaplanner.core.api.domain.solution.Solution;
import org.kie.server.client.jms.ResponseHandler;

public interface SolverServicesClient {

Expand All @@ -34,5 +34,7 @@ public interface SolverServicesClient {

ServiceResponse<Void> disposeSolver( String containerId, String solverId );

void setResponseHandler(ResponseHandler responseHandler);

}

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package org.kie.server.client;

import org.kie.server.client.jms.ResponseHandler;

public interface UIServicesClient {

/**
Expand Down Expand Up @@ -67,4 +69,11 @@ public interface UIServicesClient {
* @return svg (xml) representing process image annotated with active (in red) and completed (in grey) nodes
*/
String getProcessInstanceImage(String containerId, Long processInstanceId);

/**
* Override default response handler to change interaction pattern. Applies only to JMS
* based integration.
* @param responseHandler
*/
void setResponseHandler(ResponseHandler responseHandler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.kie.server.api.model.instance.TaskEventInstance;
import org.kie.server.api.model.instance.TaskInstance;
import org.kie.server.api.model.instance.TaskSummary;
import org.kie.server.client.jms.ResponseHandler;

public interface UserTaskServicesClient {

Expand Down Expand Up @@ -151,4 +152,6 @@ public interface UserTaskServicesClient {
List<TaskSummary> findTasksByVariable(String userId, String variableName, List<String> status, Integer page, Integer pageSize, String sort, boolean sortOrder);

List<TaskSummary> findTasksByVariableAndValue(String userId, String variableName, String variableValue, List<String> status, Integer page, Integer pageSize, String sort, boolean sortOrder);

void setResponseHandler(ResponseHandler responseHandler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.kie.server.client.KieServicesException;
import org.kie.server.client.impl.KieServicesClientImpl;
import org.kie.server.client.balancer.LoadBalancer;
import org.kie.server.client.jms.ResponseHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -72,22 +73,23 @@ public abstract class AbstractKieServicesClientImpl {

protected KieServicesClientImpl owner;

public LoadBalancer getLoadBalancer() {
return loadBalancer;
}
// used by JMS to handle response via different interaction patterns
private ResponseHandler responseHandler;

public AbstractKieServicesClientImpl(KieServicesConfiguration config) {
this.config = config.clone();
this.loadBalancer = config.getLoadBalancer() == null ? LoadBalancer.getDefault(config.getServerUrl()) : config.getLoadBalancer();
this.classLoader = Thread.currentThread().getContextClassLoader() != null ? Thread.currentThread().getContextClassLoader() : CommandScript.class.getClassLoader();
this.marshaller = MarshallerFactory.getMarshaller(config.getExtraJaxbClasses(), config.getMarshallingFormat(), classLoader);
this.responseHandler = config.getResponseHandler();
}

public AbstractKieServicesClientImpl(KieServicesConfiguration config, ClassLoader classLoader) {
this.config = config.clone();
this.loadBalancer = config.getLoadBalancer() == null ? LoadBalancer.getDefault(config.getServerUrl()) : config.getLoadBalancer();
this.classLoader = classLoader;
this.marshaller = MarshallerFactory.getMarshaller( config.getExtraJaxbClasses(), config.getMarshallingFormat(), classLoader );
this.responseHandler = config.getResponseHandler();
}

/**
Expand Down Expand Up @@ -128,13 +130,29 @@ public void setOwner(KieServicesClientImpl owner) {
this.owner = owner;
}

public LoadBalancer getLoadBalancer() {
return loadBalancer;
}

public void setResponseHandler(ResponseHandler responseHandler) {
this.responseHandler = responseHandler;
}

protected void throwExceptionOnFailure(ServiceResponse<?> serviceResponse) {
if (serviceResponse != null && ServiceResponse.ResponseType.FAILURE.equals(serviceResponse.getType())){
throw new KieServicesException(serviceResponse.getMsg());
}
}

protected boolean shouldReturnWithNullResponse(ServiceResponse<?> serviceResponse) {
if (serviceResponse != null && ServiceResponse.ResponseType.NO_RESPONSE.equals(serviceResponse.getType())){
logger.debug("Returning null as the response type is NO_RESPONSE");
return true;
}

return false;
}

protected void sendTaskOperation(String containerId, Long taskId, String operation, String queryString) {
Map<String, Object> valuesMap = new HashMap<String, Object>();
valuesMap.put(CONTAINER_ID, containerId);
Expand Down Expand Up @@ -458,16 +476,15 @@ protected ServiceResponsesList executeJmsCommand( CommandScript command, String
try {
// setup
MessageProducer producer;
MessageConsumer consumer;

try {
if( config.getPassword() != null ) {
connection = factory.createConnection(config.getUserName(), config.getPassword());
} else {
connection = factory.createConnection();
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session = connection.createSession(config.isJmsTransactional(), Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(sendQueue);
consumer = session.createConsumer(responseQueue, selector);

connection.start();
} catch( JMSException jmse ) {
Expand All @@ -490,6 +507,7 @@ protected ServiceResponsesList executeJmsCommand( CommandScript command, String
textMsg.setJMSCorrelationID(corrId);
// 2. serialization info
textMsg.setIntProperty( JMSConstants.SERIALIZATION_FORMAT_PROPERTY_NAME, config.getMarshallingFormat().getId() );
textMsg.setIntProperty( JMSConstants.INTERACTION_PATTERN_PROPERTY_NAME, responseHandler.getInteractionPattern() );
if (classType != null) {
textMsg.setStringProperty(JMSConstants.CLASS_TYPE_PROPERTY_NAME, classType);
}
Expand All @@ -515,45 +533,11 @@ protected ServiceResponsesList executeJmsCommand( CommandScript command, String
}

// receive
Message response;
try {
response = consumer.receive( config.getTimeout() );
} catch( JMSException jmse ) {
logger.warn("JMS exception while waiting for response - {}", jmse.getMessage());
throw new KieServicesException("Unable to receive or retrieve the JMS response.", jmse);
}
cmdResponse = responseHandler.handleResponse(selector, connection, session, responseQueue, config, marshaller, owner);

if( response == null ) {
logger.warn("Response is empty");
// return actual instance to avoid null points on client side
List<ServiceResponse<? extends Object>> responses = new ArrayList<ServiceResponse<? extends Object>>();
responses.add(new ServiceResponse(ServiceResponse.ResponseType.FAILURE, "Response is empty"));
return new ServiceResponsesList(responses);
}
// extract response
assert response != null: "Response is empty.";
try {
owner.setConversationId(response.getStringProperty(JMSConstants.CONVERSATION_ID_PROPERTY_NAME));

String responseStr = ((TextMessage) response).getText();
logger.debug("Received response from server '{}'", responseStr);
cmdResponse = marshaller.unmarshall(responseStr, ServiceResponsesList.class);
return cmdResponse;
} catch( JMSException jmse ) {
throw new KieServicesException("Unable to extract " + ServiceResponsesList.class.getSimpleName()
+ " instance from JMS response.", jmse);
}
return cmdResponse;
} finally {
if( connection != null ) {
try {
connection.close();
if( session != null ) {
session.close();
}
} catch( JMSException jmse ) {
logger.warn("Unable to close connection or session!", jmse);
}
}
responseHandler.dispose(connection, session);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public DocumentInstance getDocument(String identifier) {
ServiceResponse<DocumentInstance> response = (ServiceResponse<DocumentInstance>) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0);

throwExceptionOnFailure(response);
if (shouldReturnWithNullResponse(response)) {
return null;
}
result = response.getResult();
}

Expand All @@ -86,6 +89,9 @@ public String createDocument(DocumentInstance documentInstance) {
ServiceResponse<String> response = (ServiceResponse<String>) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0);

throwExceptionOnFailure(response);
if (shouldReturnWithNullResponse(response)) {
return null;
}
result = response.getResult();
}
if (result instanceof Wrapped) {
Expand Down Expand Up @@ -150,6 +156,9 @@ public List<DocumentInstance> listDocuments(Integer page, Integer pageSize) {
ServiceResponse<DocumentInstanceList> response = (ServiceResponse<DocumentInstanceList>) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0);

throwExceptionOnFailure(response);
if (shouldReturnWithNullResponse(response)) {
return null;
}
result = response.getResult();
}

Expand Down
Loading

0 comments on commit e2f529a

Please sign in to comment.