From e2f529a9655553540edc45b230f0b142c6ccaf87 Mon Sep 17 00:00:00 2001 From: Maciej Swiderski Date: Fri, 5 Aug 2016 11:10:44 +0200 Subject: [PATCH] JBPM-5228 - Allow 'fire and forget' feature in runtime Kie Server Remote APIs (#554) --- .../org/kie/server/api/jms/JMSConstants.java | 12 + .../kie/server/api/model/ServiceResponse.java | 2 +- .../server/client/DocumentServicesClient.java | 3 + .../kie/server/client/JobServicesClient.java | 3 + .../kie/server/client/KieServicesClient.java | 3 + .../client/KieServicesConfiguration.java | 9 + .../server/client/ProcessServicesClient.java | 3 + .../server/client/QueryServicesClient.java | 3 + .../kie/server/client/RuleServicesClient.java | 3 + .../server/client/SolverServicesClient.java | 4 +- .../kie/server/client/UIServicesClient.java | 9 + .../server/client/UserTaskServicesClient.java | 3 + .../impl/AbstractKieServicesClientImpl.java | 70 ++--- .../impl/DocumentServicesClientImpl.java | 9 + .../client/impl/JobServicesClientImpl.java | 15 ++ .../impl/KieServicesConfigurationImpl.java | 30 ++- .../impl/ProcessServicesClientImpl.java | 51 ++++ .../client/impl/QueryServicesClientImpl.java | 112 ++++++-- .../client/impl/RuleServicesClientImpl.java | 8 +- .../client/impl/SolverServicesClientImpl.java | 19 +- .../client/impl/UIServicesClientImpl.java | 18 ++ .../impl/UserTaskServicesClientImpl.java | 101 +++++-- .../client/jms/AsyncResponseHandler.java | 162 +++++++++++ .../client/jms/BlockingResponseCallback.java | 110 ++++++++ .../jms/FireAndForgetResponseHandler.java | 68 +++++ .../jms/RequestReplyResponseHandler.java | 97 +++++++ .../server/client/jms/ResponseCallback.java | 51 ++++ .../server/client/jms/ResponseHandler.java | 61 +++++ .../java/org/kie/server/jms/KieServerMDB.java | 112 ++++---- .../JmsResponseHandlerIntegrationTest.java | 253 ++++++++++++++++++ 30 files changed, 1248 insertions(+), 156 deletions(-) create mode 100644 kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/AsyncResponseHandler.java create mode 100644 kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/BlockingResponseCallback.java create mode 100644 kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/FireAndForgetResponseHandler.java create mode 100644 kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/RequestReplyResponseHandler.java create mode 100644 kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/ResponseCallback.java create mode 100644 kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/ResponseHandler.java create mode 100644 kie-server-parent/kie-server-tests/kie-server-integ-tests-jbpm/src/test/java/org/kie/server/integrationtests/jbpm/jms/JmsResponseHandlerIntegrationTest.java diff --git a/kie-server-parent/kie-server-api/src/main/java/org/kie/server/api/jms/JMSConstants.java b/kie-server-parent/kie-server-api/src/main/java/org/kie/server/api/jms/JMSConstants.java index a3ce82c32e..73fae2731e 100644 --- a/kie-server-parent/kie-server-api/src/main/java/org/kie/server/api/jms/JMSConstants.java +++ b/kie-server-parent/kie-server-api/src/main/java/org/kie/server/api/jms/JMSConstants.java @@ -26,4 +26,16 @@ public class JMSConstants { public static final String TARGET_CAPABILITY_PROPERTY_NAME = "kie_target_capability"; public static final String USER_PROPERTY_NAME = "kie_user"; public static final String PASSWRD_PROPERTY_NAME = "kie_password"; + + public static final String INTERACTION_PATTERN_PROPERTY_NAME = "kie_interaction_pattern"; + + // from 1 to 99 means response should be sent from the server + public static final int UPPER_LIMIT_REPLY_INTERACTION_PATTERNS = 100; + + public static final int REQUEST_REPLY_PATTERN = 1; + public static final int ASYNC_REPLY_PATTERN = 2; + + // from 100 up means server should not send any response + public static final int FIRE_AND_FORGET_PATTERN = 101; + } diff --git a/kie-server-parent/kie-server-api/src/main/java/org/kie/server/api/model/ServiceResponse.java b/kie-server-parent/kie-server-api/src/main/java/org/kie/server/api/model/ServiceResponse.java index f6dc1784fc..2c97e36199 100644 --- a/kie-server-parent/kie-server-api/src/main/java/org/kie/server/api/model/ServiceResponse.java +++ b/kie-server-parent/kie-server-api/src/main/java/org/kie/server/api/model/ServiceResponse.java @@ -56,7 +56,7 @@ @XmlAccessorType(XmlAccessType.NONE) public class ServiceResponse { public static enum ResponseType { - SUCCESS, FAILURE; + SUCCESS, FAILURE, NO_RESPONSE; } @XmlAttribute diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/DocumentServicesClient.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/DocumentServicesClient.java index 5d1d54aa57..53a3869803 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/DocumentServicesClient.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/DocumentServicesClient.java @@ -18,6 +18,7 @@ import java.util.List; import org.kie.server.api.model.instance.DocumentInstance; +import org.kie.server.client.jms.ResponseHandler; public interface DocumentServicesClient { @@ -32,4 +33,6 @@ public interface DocumentServicesClient { void deleteDocument(String identifier); List listDocuments(Integer page, Integer pageSize); + + void setResponseHandler(ResponseHandler responseHandler); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/JobServicesClient.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/JobServicesClient.java index c68fc2035c..e24c314d77 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/JobServicesClient.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/JobServicesClient.java @@ -20,6 +20,7 @@ import org.kie.server.api.model.instance.JobRequestInstance; import org.kie.server.api.model.instance.RequestInfoInstance; import org.kie.server.api.model.instance.RequestInfoInstanceList; +import org.kie.server.client.jms.ResponseHandler; public interface JobServicesClient { @@ -38,4 +39,6 @@ public interface JobServicesClient { List getRequestsByCommand(String command, Integer page, Integer pageSize); RequestInfoInstance getRequestById(Long requestId, boolean withErrors, boolean withData); + + void setResponseHandler(ResponseHandler responseHandler); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/KieServicesClient.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/KieServicesClient.java index c9fff0a999..ee94ef9ecc 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/KieServicesClient.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/KieServicesClient.java @@ -26,6 +26,7 @@ import org.kie.server.api.model.ReleaseId; import org.kie.server.api.model.ServiceResponse; import org.kie.server.api.model.ServiceResponsesList; +import org.kie.server.client.jms.ResponseHandler; public interface KieServicesClient { @@ -85,4 +86,6 @@ public interface KieServicesClient { String getConversationId(); void completeConversation(); + + void setResponseHandler(ResponseHandler responseHandler); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/KieServicesConfiguration.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/KieServicesConfiguration.java index 55d19ba763..33ce010216 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/KieServicesConfiguration.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/KieServicesConfiguration.java @@ -17,6 +17,7 @@ import org.kie.server.api.marshalling.MarshallingFormat; import org.kie.server.client.balancer.LoadBalancer; +import org.kie.server.client.jms.ResponseHandler; import javax.jms.ConnectionFactory; import javax.jms.Queue; @@ -98,4 +99,12 @@ public static enum Transport { LoadBalancer getLoadBalancer(); + void setResponseHandler(ResponseHandler responseHandler); + + ResponseHandler getResponseHandler(); + + void setJmsTransactional(boolean transacted); + + boolean isJmsTransactional(); + } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/ProcessServicesClient.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/ProcessServicesClient.java index b4ed8862bd..cd93eb56af 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/ProcessServicesClient.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/ProcessServicesClient.java @@ -29,6 +29,7 @@ import org.kie.server.api.model.definition.VariablesDefinition; import org.kie.server.api.model.instance.ProcessInstance; import org.kie.server.api.model.instance.WorkItemInstance; +import org.kie.server.client.jms.ResponseHandler; public interface ProcessServicesClient { @@ -91,4 +92,6 @@ public interface ProcessServicesClient { WorkItemInstance getWorkItem(String containerId, Long processInstanceId, Long id); List getWorkItemByProcessInstance(String containerId, Long processInstanceId); + + void setResponseHandler(ResponseHandler responseHandler); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/QueryServicesClient.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/QueryServicesClient.java index 9268012c10..a5bfde7b30 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/QueryServicesClient.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/QueryServicesClient.java @@ -25,6 +25,7 @@ import org.kie.server.api.model.instance.NodeInstance; import org.kie.server.api.model.instance.ProcessInstance; import org.kie.server.api.model.instance.VariableInstance; +import org.kie.server.client.jms.ResponseHandler; public interface QueryServicesClient { @@ -130,4 +131,6 @@ public interface QueryServicesClient { List query(String queryName, String mapper, QueryFilterSpec filterSpec, Integer page, Integer pageSize, Class resultType); List query(String queryName, String mapper, String builder, Map parameters, Integer page, Integer pageSize, Class resultType); + + void setResponseHandler(ResponseHandler responseHandler); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/RuleServicesClient.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/RuleServicesClient.java index 79c2c485f0..a27b43dde4 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/RuleServicesClient.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/RuleServicesClient.java @@ -18,6 +18,7 @@ import org.kie.api.command.Command; import org.kie.api.runtime.ExecutionResults; import org.kie.server.api.model.ServiceResponse; +import org.kie.server.client.jms.ResponseHandler; public interface RuleServicesClient { @@ -36,5 +37,7 @@ public interface RuleServicesClient { ServiceResponse executeCommandsWithResults(String id, String payload); ServiceResponse executeCommandsWithResults(String id, Command cmd); + + void setResponseHandler(ResponseHandler responseHandler); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/SolverServicesClient.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/SolverServicesClient.java index 4677e3824e..e08e95a9b9 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/SolverServicesClient.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/SolverServicesClient.java @@ -18,7 +18,7 @@ import org.kie.server.api.model.ServiceResponse; import org.kie.server.api.model.instance.SolverInstance; import org.kie.server.api.model.instance.SolverInstanceList; -import org.optaplanner.core.api.domain.solution.Solution; +import org.kie.server.client.jms.ResponseHandler; public interface SolverServicesClient { @@ -34,5 +34,7 @@ public interface SolverServicesClient { ServiceResponse disposeSolver( String containerId, String solverId ); + void setResponseHandler(ResponseHandler responseHandler); + } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/UIServicesClient.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/UIServicesClient.java index 5f60580304..734a55c9ed 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/UIServicesClient.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/UIServicesClient.java @@ -15,6 +15,8 @@ package org.kie.server.client; +import org.kie.server.client.jms.ResponseHandler; + public interface UIServicesClient { /** @@ -67,4 +69,11 @@ public interface UIServicesClient { * @return svg (xml) representing process image annotated with active (in red) and completed (in grey) nodes */ String getProcessInstanceImage(String containerId, Long processInstanceId); + + /** + * Override default response handler to change interaction pattern. Applies only to JMS + * based integration. + * @param responseHandler + */ + void setResponseHandler(ResponseHandler responseHandler); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/UserTaskServicesClient.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/UserTaskServicesClient.java index 87af848136..e38502859e 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/UserTaskServicesClient.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/UserTaskServicesClient.java @@ -25,6 +25,7 @@ import org.kie.server.api.model.instance.TaskEventInstance; import org.kie.server.api.model.instance.TaskInstance; import org.kie.server.api.model.instance.TaskSummary; +import org.kie.server.client.jms.ResponseHandler; public interface UserTaskServicesClient { @@ -151,4 +152,6 @@ public interface UserTaskServicesClient { List findTasksByVariable(String userId, String variableName, List status, Integer page, Integer pageSize, String sort, boolean sortOrder); List findTasksByVariableAndValue(String userId, String variableName, String variableValue, List status, Integer page, Integer pageSize, String sort, boolean sortOrder); + + void setResponseHandler(ResponseHandler responseHandler); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/AbstractKieServicesClientImpl.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/AbstractKieServicesClientImpl.java index c435f77f99..272c816224 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/AbstractKieServicesClientImpl.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/AbstractKieServicesClientImpl.java @@ -54,6 +54,7 @@ import org.kie.server.client.KieServicesException; import org.kie.server.client.impl.KieServicesClientImpl; import org.kie.server.client.balancer.LoadBalancer; +import org.kie.server.client.jms.ResponseHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,15 +73,15 @@ public abstract class AbstractKieServicesClientImpl { protected KieServicesClientImpl owner; - public LoadBalancer getLoadBalancer() { - return loadBalancer; - } + // used by JMS to handle response via different interaction patterns + private ResponseHandler responseHandler; public AbstractKieServicesClientImpl(KieServicesConfiguration config) { this.config = config.clone(); this.loadBalancer = config.getLoadBalancer() == null ? LoadBalancer.getDefault(config.getServerUrl()) : config.getLoadBalancer(); this.classLoader = Thread.currentThread().getContextClassLoader() != null ? Thread.currentThread().getContextClassLoader() : CommandScript.class.getClassLoader(); this.marshaller = MarshallerFactory.getMarshaller(config.getExtraJaxbClasses(), config.getMarshallingFormat(), classLoader); + this.responseHandler = config.getResponseHandler(); } public AbstractKieServicesClientImpl(KieServicesConfiguration config, ClassLoader classLoader) { @@ -88,6 +89,7 @@ public AbstractKieServicesClientImpl(KieServicesConfiguration config, ClassLoade this.loadBalancer = config.getLoadBalancer() == null ? LoadBalancer.getDefault(config.getServerUrl()) : config.getLoadBalancer(); this.classLoader = classLoader; this.marshaller = MarshallerFactory.getMarshaller( config.getExtraJaxbClasses(), config.getMarshallingFormat(), classLoader ); + this.responseHandler = config.getResponseHandler(); } /** @@ -128,6 +130,13 @@ public void setOwner(KieServicesClientImpl owner) { this.owner = owner; } + public LoadBalancer getLoadBalancer() { + return loadBalancer; + } + + public void setResponseHandler(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } protected void throwExceptionOnFailure(ServiceResponse serviceResponse) { if (serviceResponse != null && ServiceResponse.ResponseType.FAILURE.equals(serviceResponse.getType())){ @@ -135,6 +144,15 @@ protected void throwExceptionOnFailure(ServiceResponse serviceResponse) { } } + protected boolean shouldReturnWithNullResponse(ServiceResponse serviceResponse) { + if (serviceResponse != null && ServiceResponse.ResponseType.NO_RESPONSE.equals(serviceResponse.getType())){ + logger.debug("Returning null as the response type is NO_RESPONSE"); + return true; + } + + return false; + } + protected void sendTaskOperation(String containerId, Long taskId, String operation, String queryString) { Map valuesMap = new HashMap(); valuesMap.put(CONTAINER_ID, containerId); @@ -458,16 +476,15 @@ protected ServiceResponsesList executeJmsCommand( CommandScript command, String try { // setup MessageProducer producer; - MessageConsumer consumer; + try { if( config.getPassword() != null ) { connection = factory.createConnection(config.getUserName(), config.getPassword()); } else { connection = factory.createConnection(); } - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(config.isJmsTransactional(), Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(sendQueue); - consumer = session.createConsumer(responseQueue, selector); connection.start(); } catch( JMSException jmse ) { @@ -490,6 +507,7 @@ protected ServiceResponsesList executeJmsCommand( CommandScript command, String textMsg.setJMSCorrelationID(corrId); // 2. serialization info textMsg.setIntProperty( JMSConstants.SERIALIZATION_FORMAT_PROPERTY_NAME, config.getMarshallingFormat().getId() ); + textMsg.setIntProperty( JMSConstants.INTERACTION_PATTERN_PROPERTY_NAME, responseHandler.getInteractionPattern() ); if (classType != null) { textMsg.setStringProperty(JMSConstants.CLASS_TYPE_PROPERTY_NAME, classType); } @@ -515,45 +533,11 @@ protected ServiceResponsesList executeJmsCommand( CommandScript command, String } // receive - Message response; - try { - response = consumer.receive( config.getTimeout() ); - } catch( JMSException jmse ) { - logger.warn("JMS exception while waiting for response - {}", jmse.getMessage()); - throw new KieServicesException("Unable to receive or retrieve the JMS response.", jmse); - } + cmdResponse = responseHandler.handleResponse(selector, connection, session, responseQueue, config, marshaller, owner); - if( response == null ) { - logger.warn("Response is empty"); - // return actual instance to avoid null points on client side - List> responses = new ArrayList>(); - responses.add(new ServiceResponse(ServiceResponse.ResponseType.FAILURE, "Response is empty")); - return new ServiceResponsesList(responses); - } - // extract response - assert response != null: "Response is empty."; - try { - owner.setConversationId(response.getStringProperty(JMSConstants.CONVERSATION_ID_PROPERTY_NAME)); - - String responseStr = ((TextMessage) response).getText(); - logger.debug("Received response from server '{}'", responseStr); - cmdResponse = marshaller.unmarshall(responseStr, ServiceResponsesList.class); - return cmdResponse; - } catch( JMSException jmse ) { - throw new KieServicesException("Unable to extract " + ServiceResponsesList.class.getSimpleName() - + " instance from JMS response.", jmse); - } + return cmdResponse; } finally { - if( connection != null ) { - try { - connection.close(); - if( session != null ) { - session.close(); - } - } catch( JMSException jmse ) { - logger.warn("Unable to close connection or session!", jmse); - } - } + responseHandler.dispose(connection, session); } } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/DocumentServicesClientImpl.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/DocumentServicesClientImpl.java index 9e91dc5bb8..7fc1718472 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/DocumentServicesClientImpl.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/DocumentServicesClientImpl.java @@ -64,6 +64,9 @@ public DocumentInstance getDocument(String identifier) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -86,6 +89,9 @@ public String createDocument(DocumentInstance documentInstance) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } if (result instanceof Wrapped) { @@ -150,6 +156,9 @@ public List listDocuments(Integer page, Integer pageSize) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/JobServicesClientImpl.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/JobServicesClientImpl.java index 7ed3d5c0c5..36230f3335 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/JobServicesClientImpl.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/JobServicesClientImpl.java @@ -67,6 +67,9 @@ public Long scheduleRequest(String containerId, JobRequestInstance jobRequest) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } result = deserialize(response.getResult(), Object.class); } @@ -130,6 +133,9 @@ public List getRequestsByStatus(List statuses, Inte ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } list = response.getResult(); } @@ -158,6 +164,9 @@ public List getRequestsByBusinessKey(String businessKey, In ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } list = response.getResult(); } @@ -187,6 +196,9 @@ public List getRequestsByCommand(String command, Integer pa ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } list = response.getResult(); } @@ -213,6 +225,9 @@ public RequestInfoInstance getRequestById(Long requestId, boolean withErrors, bo ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return deserialize(response.getResult(), RequestInfoInstance.class); } } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/KieServicesConfigurationImpl.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/KieServicesConfigurationImpl.java index dcd507e49f..c5462a86f1 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/KieServicesConfigurationImpl.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/KieServicesConfigurationImpl.java @@ -21,14 +21,13 @@ import org.kie.server.client.KieServicesException; import org.kie.server.client.balancer.LoadBalancer; import org.kie.server.client.credentials.EnteredCredentialsProvider; +import org.kie.server.client.jms.RequestReplyResponseHandler; +import org.kie.server.client.jms.ResponseHandler; import javax.jms.ConnectionFactory; import javax.jms.Queue; import javax.naming.InitialContext; import javax.naming.NamingException; -import java.net.MalformedURLException; -import java.net.URISyntaxException; -import java.net.URL; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -61,6 +60,8 @@ public final class KieServicesConfigurationImpl private ConnectionFactory connectionFactory; private Queue requestQueue; private Queue responseQueue; + private ResponseHandler responseHandler = new RequestReplyResponseHandler(); + private boolean jmsTransactional = false; private MarshallingFormat format = MarshallingFormat.JAXB; private Set> extraJaxbClasses = new HashSet>(); @@ -394,6 +395,27 @@ public LoadBalancer getLoadBalancer() { return this.loadBalancer; } + @Override + public void setResponseHandler(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } + + @Override + public ResponseHandler getResponseHandler() { + return this.responseHandler; + } + + @Override + public boolean isJmsTransactional() { + return jmsTransactional; + } + + @Override + public void setJmsTransactional(boolean jmsTransactional) { + this.jmsTransactional = jmsTransactional; + } + + // Clone --- private KieServicesConfigurationImpl(KieServicesConfigurationImpl config) { this.connectionFactory = config.connectionFactory; @@ -411,6 +433,8 @@ private KieServicesConfigurationImpl(KieServicesConfigurationImpl config) { this.capabilities = config.capabilities; this.credentialsProvider = config.credentialsProvider; this.loadBalancer = config.loadBalancer; + this.responseHandler = config.responseHandler; + this.jmsTransactional = config.jmsTransactional; } @Override diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/ProcessServicesClientImpl.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/ProcessServicesClientImpl.java index a90d5c42bd..2390dcfb82 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/ProcessServicesClientImpl.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/ProcessServicesClientImpl.java @@ -68,6 +68,9 @@ public ProcessDefinition getProcessDefinition(String containerId, String process ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -88,6 +91,9 @@ public SubProcessesDefinition getReusableSubProcessDefinitions(String containerI ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -108,6 +114,9 @@ public VariablesDefinition getProcessVariableDefinitions(String containerId, Str ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -128,6 +137,9 @@ public ServiceTasksDefinition getServiceTaskDefinitions(String containerId, Stri ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -148,6 +160,9 @@ public AssociatedEntitiesDefinition getAssociatedEntityDefinitions(String contai ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -168,6 +183,9 @@ public UserTaskDefinitionList getUserTaskDefinitions(String containerId, String ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -189,6 +207,9 @@ public TaskInputsDefinition getUserTaskInputDefinitions(String containerId, Stri ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -210,6 +231,9 @@ public TaskOutputsDefinition getUserTaskOutputDefinitions(String containerId, St ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -239,6 +263,9 @@ public Long startProcess(String containerId, String processId, Map response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } result = deserialize(response.getResult(), Object.class); } @@ -274,6 +301,9 @@ public Long startProcess(String containerId, String processId, CorrelationKey co ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } result = deserialize(response.getResult(), Object.class); } @@ -346,6 +376,9 @@ public T getProcessInstanceVariable(String containerId, Long processInstance ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } result = deserialize(response.getResult(), type); } @@ -375,6 +408,9 @@ public Map getProcessInstanceVariables(String containerId, Long ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } variables = deserialize(response.getResult(), Object.class); } @@ -466,6 +502,9 @@ public List getAvailableSignals(String containerId, Long processInstance ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } signals = deserialize(response.getResult(), Object.class); } @@ -528,6 +567,9 @@ public ProcessInstance getProcessInstance(String containerId, Long processInstan ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return deserialize(response.getResult(), ProcessInstance.class); } } @@ -548,6 +590,9 @@ public ProcessInstance getProcessInstance(String containerId, Long processInstan ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return deserialize(response.getResult(), ProcessInstance.class); } } @@ -608,6 +653,9 @@ public WorkItemInstance getWorkItem(String containerId, Long processInstanceId, ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return deserialize(response.getResult(), WorkItemInstance.class); } } @@ -631,6 +679,9 @@ public List getWorkItemByProcessInstance(String containerId, L ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } list = deserialize(response.getResult(), WorkItemInstanceList.class); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/QueryServicesClientImpl.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/QueryServicesClientImpl.java index a70464c1fe..ed1c267dd1 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/QueryServicesClientImpl.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/QueryServicesClientImpl.java @@ -75,7 +75,9 @@ public List findProcessesById(String processId) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -120,7 +122,9 @@ public ProcessDefinition findProcessByContainerIdProcessId(String containerId, S ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -144,7 +148,9 @@ public List findProcesses(Integer page, Integer pageSize, Str ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -172,7 +178,9 @@ public List findProcesses(String filter, Integer page, Intege ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -202,7 +210,9 @@ public List findProcessesByContainerId(String containerId, In ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -275,7 +285,9 @@ public List findProcessInstances(Integer page, Integer pageSize ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -305,7 +317,9 @@ public List findProcessInstancesByCorrelationKey(CorrelationKey ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -335,7 +349,9 @@ public List findProcessInstancesByProcessId(String processId, L ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -365,7 +381,9 @@ public List findProcessInstancesByProcessName(String processNam ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -396,7 +414,9 @@ public List findProcessInstancesByContainerId(String containerI ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -425,7 +445,9 @@ public List findProcessInstancesByStatus(List status, ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -456,7 +478,9 @@ public List findProcessInstancesByInitiator(String initiator, L ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -488,7 +512,9 @@ public List findProcessInstancesByVariable(String variableName, ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -520,7 +546,9 @@ public List findProcessInstancesByVariableAndValue(String varia ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -547,7 +575,9 @@ public ProcessInstance findProcessInstanceById(Long processInstanceId) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -570,7 +600,9 @@ public ProcessInstance findProcessInstanceById(Long processInstanceId, boolean w ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -593,7 +625,9 @@ public ProcessInstance findProcessInstanceByCorrelationKey(CorrelationKey correl ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } @@ -617,7 +651,9 @@ public NodeInstance findNodeInstanceByWorkItemId(Long processInstanceId, Long wo ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -644,7 +680,9 @@ public List findActiveNodeInstances(Long processInstanceId, Intege ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -674,7 +712,9 @@ public List findCompletedNodeInstances(Long processInstanceId, Int ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -705,7 +745,9 @@ public List findNodeInstances(Long processInstanceId, Integer page ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -735,7 +777,9 @@ public List findVariablesCurrentState(Long processInstanceId) ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -767,7 +811,9 @@ public List findVariableHistory(Long processInstanceId, String ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -855,7 +901,9 @@ public QueryDefinition getQuery(String queryName) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -880,7 +928,9 @@ public List getQueries(Integer page, Integer pageSize) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -912,7 +962,9 @@ public List query(String queryName, String mapper, String orderBy, Intege ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -957,7 +1009,9 @@ public List query(String queryName, String mapper, QueryFilterSpec filter ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -997,7 +1051,9 @@ public List query(String queryName, String mapper, String builder, Map response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/RuleServicesClientImpl.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/RuleServicesClientImpl.java index 0e21d2adf1..511efb6c07 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/RuleServicesClientImpl.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/RuleServicesClientImpl.java @@ -55,7 +55,9 @@ public ServiceResponse executeCommandsWithResults(String id, S } else { CommandScript script = new CommandScript( Collections.singletonList((KieServerCommand) new CallContainerCommand(id, payload)) ); ServiceResponse response = executeJmsCommand( script, null, null, id ).getResponses().get( 0 ); - + if (shouldReturnWithNullResponse(response)) { + return null; + } if (response.getResult() instanceof String) { response.setResult(deserialize((String) response.getResult(), (Class) ExecutionResultImpl.class)); } @@ -71,7 +73,9 @@ public ServiceResponse executeCommandsWithResults(String id, C } else { CommandScript script = new CommandScript( Collections.singletonList( (KieServerCommand) new CallContainerCommand( id, serialize(cmd) ) ) ); ServiceResponse response = executeJmsCommand( script, cmd.getClass().getName(), null, id ).getResponses().get( 0 ); - + if (shouldReturnWithNullResponse(response)) { + return null; + } if (response.getResult() instanceof String) { response.setResult(deserialize((String) response.getResult(), (Class) ExecutionResultImpl.class)); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/SolverServicesClientImpl.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/SolverServicesClientImpl.java index b4dd4ead57..d5cd2b91b2 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/SolverServicesClientImpl.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/SolverServicesClientImpl.java @@ -58,6 +58,9 @@ public ServiceResponse getSolvers(String containerId) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, GetSolversCommand.class.getName(), KieServerConstants.CAPABILITY_BRP, containerId ).getResponses().get( 0 ); throwExceptionOnFailure( response ); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response; } } @@ -77,7 +80,9 @@ public ServiceResponse createSolver(String containerId, String s } else { CommandScript script = new CommandScript( Collections.singletonList( (KieServerCommand) new CreateSolverCommand( containerId, solverId, configFile ) ) ); ServiceResponse response = (ServiceResponse) executeJmsCommand( script, CreateSolverCommand.class.getName(), KieServerConstants.CAPABILITY_BRP, containerId ).getResponses().get( 0 ); - + if (shouldReturnWithNullResponse(response)) { + return null; + } return response; } } @@ -94,6 +99,9 @@ public ServiceResponse getSolverState(String containerId, String ServiceResponse response = (ServiceResponse) executeJmsCommand( script, GetSolverStateCommand.class.getName(), KieServerConstants.CAPABILITY_BRP, containerId ).getResponses().get( 0 ); throwExceptionOnFailure( response ); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response; } } @@ -110,6 +118,9 @@ public ServiceResponse getSolverBestSolution(String containerId, ServiceResponse response = (ServiceResponse) executeJmsCommand( script, GetBestSolutionCommand.class.getName(), KieServerConstants.CAPABILITY_BRP, containerId ).getResponses().get( 0 ); throwExceptionOnFailure( response ); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response; } } @@ -129,6 +140,9 @@ public ServiceResponse updateSolverState(String containerId, Str ServiceResponse response = (ServiceResponse) executeJmsCommand( script, UpdateSolverStateCommand.class.getName(), KieServerConstants.CAPABILITY_BRP, containerId ).getResponses().get( 0 ); throwExceptionOnFailure( response ); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response; } } @@ -145,6 +159,9 @@ public ServiceResponse disposeSolver(String containerId, String solverId) ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DisposeSolverCommand.class.getName(), KieServerConstants.CAPABILITY_BRP, containerId ).getResponses().get( 0 ); throwExceptionOnFailure( response ); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response; } } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/UIServicesClientImpl.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/UIServicesClientImpl.java index 84ef6a8aa8..4c03254290 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/UIServicesClientImpl.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/UIServicesClientImpl.java @@ -58,6 +58,9 @@ public String getProcessForm(String containerId, String processId, String langua ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM-UI", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -78,6 +81,9 @@ public String getProcessForm(String containerId, String processId) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM-UI", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -98,6 +104,9 @@ public String getTaskForm(String containerId, Long taskId, String language) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM-UI", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -118,6 +127,9 @@ public String getTaskForm(String containerId, Long taskId) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM-UI", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -141,6 +153,9 @@ public String getProcessImage(String containerId, String processId) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM-UI", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -164,6 +179,9 @@ public String getProcessInstanceImage(String containerId, Long processInstanceId ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM-UI", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/UserTaskServicesClientImpl.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/UserTaskServicesClientImpl.java index 77163005bf..f3efbd6509 100644 --- a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/UserTaskServicesClientImpl.java +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/impl/UserTaskServicesClientImpl.java @@ -398,7 +398,9 @@ public Long saveTaskContent(String containerId, Long taskId, Map ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } contentId = deserialize(response.getResult(), Object.class); } @@ -426,7 +428,9 @@ public Map getTaskOutputContentByTaskId(String containerId, Long ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } variables = deserialize(response.getResult(), Object.class); } if (variables instanceof Wrapped) { @@ -453,6 +457,9 @@ public Map getTaskInputContentByTaskId(String containerId, Long ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } variables = deserialize(response.getResult(), Object.class); } if (variables instanceof Wrapped) { @@ -508,7 +515,9 @@ public Long addTaskComment(String containerId, Long taskId, String text, String ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } commentId = deserialize(response.getResult(), Object.class); } @@ -556,7 +565,9 @@ public List getTaskCommentsByTaskId(String containerId, Long taskId ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } commentList = deserialize(response.getResult(), TaskCommentList.class); } @@ -585,7 +596,9 @@ public TaskComment getTaskCommentById(String containerId, Long taskId, Long comm ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } taskComment = deserialize(response.getResult(), TaskComment.class); } @@ -610,7 +623,9 @@ public Long addTaskAttachment(String containerId, Long taskId, String userId, St ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } attachmentId = deserialize(response.getResult(), Object.class); } if (attachmentId instanceof Wrapped) { @@ -659,7 +674,9 @@ public TaskAttachment getTaskAttachmentById(String containerId, Long taskId, Lon ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } attachment = deserialize(response.getResult(), TaskAttachment.class); } @@ -684,7 +701,9 @@ public Object getTaskAttachmentContentById(String containerId, Long taskId, Long ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = deserialize(response.getResult(), Object.class); } if (result instanceof Wrapped) { @@ -714,7 +733,9 @@ public List getTaskAttachmentsByTaskId(String containerId, Long ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } attachmentList = deserialize(response.getResult(), TaskAttachmentList.class);; } @@ -742,6 +763,9 @@ public TaskInstance getTaskInstance(String containerId, Long taskId) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } result = deserialize(response.getResult(), TaskInstance.class); } @@ -770,6 +794,9 @@ public TaskInstance getTaskInstance(String containerId, Long taskId, boolean wit ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM", containerId ).getResponses().get(0); throwExceptionOnFailure(response); + if (shouldReturnWithNullResponse(response)) { + return null; + } result = deserialize(response.getResult(), TaskInstance.class); } @@ -793,7 +820,9 @@ public TaskInstance findTaskByWorkItemId(Long workItemId) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -813,7 +842,9 @@ public TaskInstance findTaskById(Long taskId) { ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } return response.getResult(); } } @@ -896,7 +927,9 @@ public List findTasksAssignedAsBusinessAdministrator(String userId, ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } taskSummaryList = response.getResult(); } @@ -927,7 +960,9 @@ public List findTasksAssignedAsBusinessAdministrator(String userId, ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } taskSummaryList = response.getResult(); } if (taskSummaryList != null && taskSummaryList.getTasks() != null) { @@ -955,7 +990,9 @@ public List findTasksAssignedAsPotentialOwner(String userId, Intege ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } taskSummaryList = response.getResult(); } if (taskSummaryList != null && taskSummaryList.getTasks() != null) { @@ -985,7 +1022,9 @@ public List findTasksAssignedAsPotentialOwner(String userId, List response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } taskSummaryList = response.getResult(); } @@ -1017,7 +1056,9 @@ public List findTasksAssignedAsPotentialOwner(String userId, List response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } taskSummaryList = response.getResult(); } @@ -1045,7 +1086,9 @@ public List findTasksOwned(String userId, Integer page, Integer pag ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } taskSummaryList = response.getResult(); } if (taskSummaryList != null && taskSummaryList.getTasks() != null) { @@ -1074,7 +1117,9 @@ public List findTasksOwned(String userId, List status, Inte ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } taskSummaryList = response.getResult(); } if (taskSummaryList != null && taskSummaryList.getTasks() != null) { @@ -1103,7 +1148,9 @@ public List findTasksByStatusByProcessInstanceId(Long processInstan ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } taskSummaryList = response.getResult(); } if (taskSummaryList != null && taskSummaryList.getTasks() != null) { @@ -1131,7 +1178,9 @@ public List findTasks(String userId, Integer page, Integer pageSize ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } taskSummaryList = response.getResult(); } @@ -1161,7 +1210,9 @@ public List findTaskEvents(Long taskId, Integer page, Integer ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } taskSummaryList = response.getResult(); } @@ -1194,7 +1245,9 @@ public List findTasksByVariable(String userId, String variableName, ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } @@ -1227,7 +1280,9 @@ public List findTasksByVariableAndValue(String userId, String varia ServiceResponse response = (ServiceResponse) executeJmsCommand( script, DescriptorCommand.class.getName(), "BPM" ).getResponses().get(0); throwExceptionOnFailure(response); - + if (shouldReturnWithNullResponse(response)) { + return null; + } result = response.getResult(); } diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/AsyncResponseHandler.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/AsyncResponseHandler.java new file mode 100644 index 0000000000..55ca43ac7b --- /dev/null +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/AsyncResponseHandler.java @@ -0,0 +1,162 @@ +/* + * Copyright 2016 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.kie.server.client.jms; + +import java.util.Arrays; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.kie.server.api.jms.JMSConstants; +import org.kie.server.api.marshalling.Marshaller; +import org.kie.server.api.model.ServiceResponse; +import org.kie.server.api.model.ServiceResponsesList; +import org.kie.server.client.KieServicesClient; +import org.kie.server.client.KieServicesConfiguration; +import org.kie.server.client.KieServicesException; +import org.kie.server.client.impl.KieServicesClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Async response handler that receives message from response queue using message listener. + * It requires callback to be invoked upon message delivery otherwise will throw IllegalStateException on runtime. + *
+ * Due to nature of message listener (cannot clean up its connection and session) another thread is used to perform the cleanup + * in finally block of message listener. + *
+ * Response is only delivered via callback thus return value of handleResponse is always single ServiceResponse of type NO_RESPONSE + */ +public class AsyncResponseHandler implements ResponseHandler { + + private static final Logger logger = LoggerFactory.getLogger(AsyncResponseHandler.class); + + private ResponseCallback callback; + private ExecutorService executorService = Executors.newSingleThreadExecutor(); + + public AsyncResponseHandler(ResponseCallback callback) { + this.callback = callback; + } + + @Override + public int getInteractionPattern() { + return JMSConstants.ASYNC_REPLY_PATTERN; + } + + @Override + public ServiceResponsesList handleResponse(String selector, Connection connection, Session session, Queue responseQueue, KieServicesConfiguration config, Marshaller marshaller, KieServicesClient owner) { + + if (callback == null) { + throw new IllegalStateException("There is no callback defined, can't continue..."); + } + + MessageConsumer consumer = null; + try { + consumer = session.createConsumer(responseQueue, selector); + consumer.setMessageListener(new AsyncMessageListener(connection, session, selector, consumer, marshaller, owner)); + logger.debug("Message listener for async message retrieval successfully registered on consumer {}", consumer); + + } catch( JMSException jmse ) { + throw new KieServicesException("Unable to retrieve JMS response from queue " + responseQueue + " with selector " + selector, jmse); + } + + ServiceResponse messageSentResponse = new ServiceResponse(ServiceResponse.ResponseType.NO_RESPONSE, "Message sent"); + return new ServiceResponsesList(Arrays.asList(messageSentResponse)); + } + + @Override + public void dispose(Connection connection, Session session) { + // no op as the resources are closed from within message listener (via separate thread) + } + + private class AsyncMessageListener implements MessageListener { + private String selector; + private MessageConsumer consumer; + private Marshaller marshaller; + private KieServicesClient owner; + + private Connection connection; + private Session session; + + public AsyncMessageListener(Connection connection, Session session, String selector, MessageConsumer consumer, Marshaller marshaller, KieServicesClient owner) { + this.selector = selector; + this.consumer = consumer; + this.marshaller = marshaller; + this.owner = owner; + this.connection = connection; + this.session = session; + } + + @Override + public void onMessage(Message message) { + try { + ((KieServicesClientImpl) owner).setConversationId(message.getStringProperty(JMSConstants.CONVERSATION_ID_PROPERTY_NAME)); + + String responseStr = ((TextMessage) message).getText(); + logger.debug("Received response from server '{}'", responseStr); + + ServiceResponsesList cmdResponse = marshaller.unmarshall(responseStr, ServiceResponsesList.class); + logger.debug("Unmarshalled response from async delivery {} calling callback {}", cmdResponse, callback); + + callback.onResponse(selector, cmdResponse); + logger.debug("Callback {} successfully invoked with response {}", callback, cmdResponse); + } catch (Exception e) { + logger.error("Error while receiving message due to {}, this means response from the server won't be delivered to client", e.getMessage(), e); + } finally { + if (consumer != null) { + try { + consumer.close(); + } catch (JMSException e) { + logger.warn("Error when closing JMS consumer due to {}", e.getMessage()); + } + } + // submit work to executor service to close resources + // as they cannot be closed from message listener + // due to AMQ129006: It is illegal to call this method (session.close()) from within a Message Listener + executorService.submit(new Runnable() { + @Override + public void run() { + try { + if (session != null) { + session.close(); + logger.debug("Session closed via separate thread."); + } + if (connection != null) { + connection.close(); + logger.debug("Connection closed via separate thread."); + } + } catch (JMSException jmse) { + logger.warn("Unable to close connection or session!", jmse); + } + + } + }); + logger.debug("Cleanup of JMS resources requested via separate thread."); + } + } + } +} diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/BlockingResponseCallback.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/BlockingResponseCallback.java new file mode 100644 index 0000000000..4aad80347a --- /dev/null +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/BlockingResponseCallback.java @@ -0,0 +1,110 @@ +/* + * Copyright 2016 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.kie.server.client.jms; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import org.kie.server.api.marshalling.Marshaller; +import org.kie.server.api.model.ServiceResponse; +import org.kie.server.api.model.ServiceResponsesList; +import org.kie.server.api.model.Wrapped; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple blocking response callback backed by blocking queue that will allow sequential access to + * responses and block + *
    + *
  • client if there is no message yet
  • + *
  • server if the queue is full
  • + *
. + */ +public class BlockingResponseCallback implements ResponseCallback { + + private static final Logger logger = LoggerFactory.getLogger(BlockingResponseCallback.class); + private BlockingQueue responses; + + private Marshaller marshaller; + + public BlockingResponseCallback(Marshaller marshaller) { + this.marshaller = marshaller; + this.responses = new ArrayBlockingQueue(10); + } + + public BlockingResponseCallback(Marshaller marshaller, int queueSize) { + this.marshaller = marshaller; + this.responses = new ArrayBlockingQueue(queueSize); + } + + @Override + public void onResponse(String selector, ServiceResponsesList response) { + logger.debug("Message response {} for selector {} delivered to callback", response, selector); + try { + responses.put(response); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.debug("Interrupted exception while putting message to response queue in callback"); + } + } + + @Override + public ServiceResponsesList get() { + try { + return responses.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.debug("Interrupted exception while taking message from responses queue in callback"); + } + + return null; + } + + @Override + public T get(Class type) { + if (marshaller == null) { + throw new IllegalStateException("No marshaller given, can't use get(Class) to return response"); + } + ServiceResponsesList responsesList = get(); + + if (responsesList.getResponses() == null || responsesList.getResponses().isEmpty()) { + logger.debug("No data found in the response, returning null"); + return null; + } + + ServiceResponse response = responsesList.getResponses().get(0); + if (response.getType().equals(ServiceResponse.ResponseType.SUCCESS)) { + + Object result = response.getResult(); + + if (result instanceof String) { + logger.debug("Response '{}' of type string, unmarshalling it...", result); + + result = marshaller.unmarshall((String) result, type); + logger.debug("Result after unmarshall operation {}", result); + } + // handle wrapped objects + if (result instanceof Wrapped) { + result = ((Wrapped)result).unwrap(); + } + + return (T)result; + } else { + logger.debug("Non successful response '{}', returning null", response.getMsg()); + return null; + } + } +} diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/FireAndForgetResponseHandler.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/FireAndForgetResponseHandler.java new file mode 100644 index 0000000000..c277b71bfa --- /dev/null +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/FireAndForgetResponseHandler.java @@ -0,0 +1,68 @@ +/* + * Copyright 2016 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.kie.server.client.jms; + +import java.util.Arrays; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Session; + +import org.kie.server.api.jms.JMSConstants; +import org.kie.server.api.marshalling.Marshaller; +import org.kie.server.api.model.ServiceResponse; +import org.kie.server.api.model.ServiceResponsesList; +import org.kie.server.client.KieServicesClient; +import org.kie.server.client.KieServicesConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Fire and forget response handler meaning it does not wait for any response as it actually + * instructs the server to not even send any response via interaction pattern constant. + * + * It always returns single ServiceResponse of type NO_RESPONSE. Client cannot expect any response from integration + * when using this handler, as the name suggest it sends the message and forgets about it directly. + */ +public class FireAndForgetResponseHandler implements ResponseHandler { + + private static final Logger logger = LoggerFactory.getLogger(FireAndForgetResponseHandler.class); + + @Override + public int getInteractionPattern() { + return JMSConstants.FIRE_AND_FORGET_PATTERN; + } + + @Override + public ServiceResponsesList handleResponse(String selector, Connection connection, Session session, Queue responseQueue, KieServicesConfiguration config, Marshaller marshaller, KieServicesClient owner) { + ServiceResponse messageSentResponse = new ServiceResponse(ServiceResponse.ResponseType.NO_RESPONSE, "Message sent"); + return new ServiceResponsesList(Arrays.asList(messageSentResponse)); + } + + @Override + public void dispose(Connection connection, Session session) { + try { + if ( session != null ) { + session.close(); + } + if ( connection != null ) { + connection.close(); + } + } catch( JMSException jmse ) { + logger.warn("Unable to close connection or session!", jmse); + } + } +} diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/RequestReplyResponseHandler.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/RequestReplyResponseHandler.java new file mode 100644 index 0000000000..53757f3935 --- /dev/null +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/RequestReplyResponseHandler.java @@ -0,0 +1,97 @@ +/* + * Copyright 2016 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.kie.server.client.jms; + +import java.util.ArrayList; +import java.util.List; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.kie.server.api.jms.JMSConstants; +import org.kie.server.api.marshalling.Marshaller; +import org.kie.server.api.model.ServiceResponse; +import org.kie.server.api.model.ServiceResponsesList; +import org.kie.server.client.KieServicesClient; +import org.kie.server.client.KieServicesConfiguration; +import org.kie.server.client.KieServicesException; +import org.kie.server.client.impl.KieServicesClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RequestReplyResponseHandler implements ResponseHandler { + + private static final Logger logger = LoggerFactory.getLogger(RequestReplyResponseHandler.class); + @Override + public int getInteractionPattern() { + return JMSConstants.REQUEST_REPLY_PATTERN; + } + + @Override + public ServiceResponsesList handleResponse(String selector, Connection connection, Session session, Queue responseQueue, KieServicesConfiguration config, Marshaller marshaller, KieServicesClient owner) { + + MessageConsumer consumer = null; + + try { + consumer = session.createConsumer(responseQueue, selector); + + Message response = consumer.receive( config.getTimeout() ); + + if( response == null ) { + logger.warn("Response is empty"); + // return actual instance to avoid null points on client side + List> responses = new ArrayList>(); + responses.add(new ServiceResponse(ServiceResponse.ResponseType.FAILURE, "Response is empty")); + return new ServiceResponsesList(responses); + } + + ((KieServicesClientImpl)owner).setConversationId(response.getStringProperty(JMSConstants.CONVERSATION_ID_PROPERTY_NAME)); + + String responseStr = ((TextMessage) response).getText(); + logger.debug("Received response from server '{}'", responseStr); + ServiceResponsesList cmdResponse = marshaller.unmarshall(responseStr, ServiceResponsesList.class); + return cmdResponse; + } catch( JMSException jmse ) { + throw new KieServicesException("Unable to retrieve JMS response from queue " + responseQueue + " with selector " + selector, jmse); + } finally { + if (consumer != null) { + try { + consumer.close(); + } catch (JMSException e) { + logger.warn("Error when closing JMS consumer due to {}", e.getMessage()); + } + } + } + } + + @Override + public void dispose(Connection connection, Session session) { + try { + if ( session != null ) { + session.close(); + } + if ( connection != null ) { + connection.close(); + } + } catch( JMSException jmse ) { + logger.warn("Unable to close connection or session!", jmse); + } + } +} diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/ResponseCallback.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/ResponseCallback.java new file mode 100644 index 0000000000..402e2052e8 --- /dev/null +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/ResponseCallback.java @@ -0,0 +1,51 @@ +/* + * Copyright 2016 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.kie.server.client.jms; + +import org.kie.server.api.model.ServiceResponsesList; + +/** + * Receives callbacks upon received response from the server. + * Used with ASYNC_REPLY_PATTERN to allow processing of further work before receiving response. + */ +public interface ResponseCallback { + + /** + * Invoked by async message listener when response was received that matches given selector + * @param selector message selector used to filter messages + * @param response actual (unmarshalled) response received from the server. + */ + void onResponse(String selector, ServiceResponsesList response); + + /** + * Returns received value if any. It's up to implementation to either block + * while waiting for the response or return directly with null in case there is no + * response available + * @return returns message received from the server, if exists + */ + ServiceResponsesList get(); + + /** + * Returns deserialized version of the response - it's taken from + * ServiceResponseList.getResponses().get(0).getResult(). + * It attempts to provide as much as possible smooth usage as it would be + * directly via *ServiceClient + * @param type class type of expected result + * @param actual type expected + * @return + */ + T get(Class type); +} diff --git a/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/ResponseHandler.java b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/ResponseHandler.java new file mode 100644 index 0000000000..0c6673613e --- /dev/null +++ b/kie-server-parent/kie-server-remote/kie-server-client/src/main/java/org/kie/server/client/jms/ResponseHandler.java @@ -0,0 +1,61 @@ +/* + * Copyright 2016 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.kie.server.client.jms; + +import javax.jms.Connection; +import javax.jms.Queue; +import javax.jms.Session; + +import org.kie.server.api.jms.JMSConstants; +import org.kie.server.api.marshalling.Marshaller; +import org.kie.server.api.model.ServiceResponsesList; +import org.kie.server.client.KieServicesClient; +import org.kie.server.client.KieServicesConfiguration; + +/** + * Used to define how JMS response should be handled + */ +public interface ResponseHandler { + + /** + * Returns int identifying supported interaction pattern for JMS + * @see JMSConstants for interaction pattern constants + * @return + */ + int getInteractionPattern(); + + /** + * Deals with response if needed according to given interaction pattern it supports. + * @param selector message selector to pick only response for given message + * @param connection JMS connection to be used + * @param session JMS session to be used + * @param responseQueue queue that should have response delivered to + * @param config kie server client configuration + * @param marshaller marshaller to be used after message is received + * @param owner top level kie server client that owns the service client + * @return ServiceResponseList produced from response message + */ + ServiceResponsesList handleResponse(String selector, Connection connection, Session session, Queue responseQueue, + KieServicesConfiguration config, Marshaller marshaller, KieServicesClient owner); + + /** + * Responsible for close of resources. Up to implementation if they can be closed directly + * or after async processing, etc + * @param connection jms connection used + * @param session jms session used + */ + void dispose(Connection connection, Session session); +} diff --git a/kie-server-parent/kie-server-remote/kie-server-jms/src/main/java/org/kie/server/jms/KieServerMDB.java b/kie-server-parent/kie-server-remote/kie-server-jms/src/main/java/org/kie/server/jms/KieServerMDB.java index 0241aa1828..8993b4105f 100644 --- a/kie-server-parent/kie-server-remote/kie-server-jms/src/main/java/org/kie/server/jms/KieServerMDB.java +++ b/kie-server-parent/kie-server-remote/kie-server-jms/src/main/java/org/kie/server/jms/KieServerMDB.java @@ -23,8 +23,6 @@ import javax.annotation.Resource; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; -import javax.ejb.TransactionAttribute; -import javax.ejb.TransactionAttributeType; import javax.ejb.TransactionManagement; import javax.ejb.TransactionManagementType; import javax.jms.Connection; @@ -45,7 +43,6 @@ import org.kie.server.api.marshalling.Marshaller; import org.kie.server.api.marshalling.MarshallerFactory; import org.kie.server.api.marshalling.MarshallingFormat; -import org.kie.server.api.model.KieContainerStatus; import org.kie.server.api.model.ReleaseId; import org.kie.server.api.model.ServiceResponsesList; import org.kie.server.services.api.KieContainerCommandService; @@ -159,35 +156,11 @@ public void onMessage(Message message) { throw new JMSRuntimeException(errMsg, jmse); } - String targetCapability = "KieServer"; // for backward compatibility default to KieServer - try { - if (message.propertyExists(TARGET_CAPABILITY_PROPERTY_NAME)) { - targetCapability = message.getStringProperty(TARGET_CAPABILITY_PROPERTY_NAME); - } - } catch (JMSException jmse) { - String errMsg = "Unable to retrieve property '" + TARGET_CAPABILITY_PROPERTY_NAME + "' from message " + msgCorrId + "."; - throw new JMSRuntimeException(errMsg, jmse); - } - - String containerId = null; - try { - if (message.propertyExists(CONTAINER_ID_PROPERTY_NAME)) { - containerId = message.getStringProperty(CONTAINER_ID_PROPERTY_NAME); - } - } catch (JMSException jmse) { - String logMsg = "Unable to retrieve property '" + CONTAINER_ID_PROPERTY_NAME + "' from message " + msgCorrId + "."; - logger.debug(logMsg); - } + String targetCapability = getStringProperty(message, TARGET_CAPABILITY_PROPERTY_NAME, "KieServer"); // for backward compatibility default to KieServer + String containerId = getStringProperty(message, CONTAINER_ID_PROPERTY_NAME, null); + String conversationId = getStringProperty(message, CONVERSATION_ID_PROPERTY_NAME, null); - String conversationId = null; - try { - if (message.propertyExists(CONVERSATION_ID_PROPERTY_NAME)) { - conversationId = message.getStringProperty(CONVERSATION_ID_PROPERTY_NAME); - } - } catch (JMSException jmse) { - String logMsg = "Unable to retrieve property '" + CONVERSATION_ID_PROPERTY_NAME + "' from message " + msgCorrId + "."; - logger.debug(logMsg); - } + int interactionPattern = getIntProperty(message, INTERACTION_PATTERN_PROPERTY_NAME, REQUEST_REPLY_PATTERN); // 1. get marshalling info MarshallingFormat format = null; @@ -237,34 +210,39 @@ public void onMessage(Message message) { // 4. process request ServiceResponsesList response = executor.executeScript(script, format, classType); - // 5. serialize response - Message msg = marshallResponse(session, msgCorrId, format, marshaller, response); - // set conversation id for routing - if (containerId != null && (conversationId == null || conversationId.trim().isEmpty())) { - try { - KieContainerInstance containerInstance = kieServer.getServerRegistry().getContainer(containerId); - if (containerInstance != null) { - ReleaseId releaseId = containerInstance.getResource().getResolvedReleaseId(); - if (releaseId == null) { - releaseId = containerInstance.getResource().getReleaseId(); + if (interactionPattern < UPPER_LIMIT_REPLY_INTERACTION_PATTERNS) { + logger.debug("Response message is about to be sent according to selected interaction pattern {}", interactionPattern); + // 5. serialize response + Message msg = marshallResponse(session, msgCorrId, format, marshaller, response); + // set conversation id for routing + if (containerId != null && (conversationId == null || conversationId.trim().isEmpty())) { + try { + KieContainerInstance containerInstance = kieServer.getServerRegistry().getContainer(containerId); + if (containerInstance != null) { + ReleaseId releaseId = containerInstance.getResource().getResolvedReleaseId(); + if (releaseId == null) { + releaseId = containerInstance.getResource().getReleaseId(); + } + + conversationId = ConversationId.from(KieServerEnvironment.getServerId(), containerId, releaseId).toString(); } - - conversationId = ConversationId.from(KieServerEnvironment.getServerId(), containerId, releaseId).toString(); + } catch (Exception e) { + logger.warn("Unable to build conversation id due to {}", e.getMessage(), e); } - } catch (Exception e) { - logger.warn("Unable to build conversation id due to {}", e.getMessage(), e); } - } - try { - if (conversationId != null) { - msg.setStringProperty(CONVERSATION_ID_PROPERTY_NAME, conversationId); + try { + if (conversationId != null) { + msg.setStringProperty(CONVERSATION_ID_PROPERTY_NAME, conversationId); + } + } catch (JMSException e) { + logger.debug("Unable to set conversation id on response message due to {}", e.getMessage()); } - } catch (JMSException e) { - logger.debug("Unable to set conversation id on response message due to {}", e.getMessage()); - } - // 6. send response - sendResponse(msgCorrId, format, msg); + // 6. send response + sendResponse(msgCorrId, format, msg); + } else { + logger.debug("Response message is skipped according to selected interaction pattern {}", FIRE_AND_FORGET_PATTERN); + } } finally { @@ -353,4 +331,30 @@ protected Marshaller getMarshaller(String containerId, MarshallingFormat format) return marshallers.get(format); } + protected String getStringProperty(Message message, String name, String defaultValue) { + try { + if (message.propertyExists(name)) { + return message.getStringProperty(name); + } + } catch (JMSException jmse) { + String errMsg = "Unable to retrieve property '" + name + "' from message " + message + "."; + logger.debug(errMsg, jmse); + } + + return defaultValue; + } + + protected int getIntProperty(Message message, String name, int defaultValue) { + try { + if (message.propertyExists(name)) { + return message.getIntProperty(name); + } + } catch (JMSException jmse) { + String errMsg = "Unable to retrieve property '" + name + "' from message " + message + "."; + logger.debug(errMsg, jmse); + } + + return defaultValue; + } + } diff --git a/kie-server-parent/kie-server-tests/kie-server-integ-tests-jbpm/src/test/java/org/kie/server/integrationtests/jbpm/jms/JmsResponseHandlerIntegrationTest.java b/kie-server-parent/kie-server-tests/kie-server-integ-tests-jbpm/src/test/java/org/kie/server/integrationtests/jbpm/jms/JmsResponseHandlerIntegrationTest.java new file mode 100644 index 0000000000..7197eadde1 --- /dev/null +++ b/kie-server-parent/kie-server-tests/kie-server-integ-tests-jbpm/src/test/java/org/kie/server/integrationtests/jbpm/jms/JmsResponseHandlerIntegrationTest.java @@ -0,0 +1,253 @@ +/* + * Copyright 2015 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.kie.server.integrationtests.jbpm.jms; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runners.Parameterized; +import org.kie.server.api.marshalling.Marshaller; +import org.kie.server.api.marshalling.MarshallerFactory; +import org.kie.server.api.marshalling.MarshallingFormat; +import org.kie.server.api.model.ReleaseId; +import org.kie.server.api.model.ServiceResponse; +import org.kie.server.api.model.ServiceResponsesList; +import org.kie.server.api.model.instance.ProcessInstance; +import org.kie.server.api.model.instance.ProcessInstanceList; +import org.kie.server.api.model.instance.TaskSummary; +import org.kie.server.api.model.instance.TaskSummaryList; +import org.kie.server.client.KieServicesConfiguration; +import org.kie.server.client.jms.AsyncResponseHandler; +import org.kie.server.client.jms.BlockingResponseCallback; +import org.kie.server.client.jms.FireAndForgetResponseHandler; +import org.kie.server.client.jms.RequestReplyResponseHandler; +import org.kie.server.client.jms.ResponseCallback; +import org.kie.server.client.jms.ResponseHandler; +import org.kie.server.integrationtests.category.JMSOnly; +import org.kie.server.integrationtests.jbpm.JbpmKieServerBaseIntegrationTest; +import org.kie.server.integrationtests.shared.KieServerAssert; +import org.kie.server.integrationtests.shared.KieServerDeployer; + +import static org.junit.Assert.*; + +@Category({JMSOnly.class}) +public class JmsResponseHandlerIntegrationTest extends JbpmKieServerBaseIntegrationTest { + + private static final ReleaseId RELEASE_ID = new ReleaseId("org.kie.server.testing", "definition-project", "1.0.0.Final"); + + @Parameterized.Parameters(name = "{index}: {0} {1}") + public static Collection data() { + KieServicesConfiguration jmsConfiguration = createKieServicesJmsConfiguration(); + Collection parameterData = new ArrayList(Arrays.asList(new Object[][] + { + {MarshallingFormat.JAXB, jmsConfiguration} , + {MarshallingFormat.JSON, jmsConfiguration}, + {MarshallingFormat.XSTREAM, jmsConfiguration} + } + )); + + return parameterData; + } + + @BeforeClass + public static void buildAndDeployArtifacts() { + KieServerDeployer.buildAndDeployCommonMavenParent(); + KieServerDeployer.buildAndDeployMavenProject(ClassLoader.class.getResource("/kjars-sources/definition-project").getFile()); + + createContainer(CONTAINER_ID, RELEASE_ID); + } + + @Test + public void testStartProcessUseOfFireAndForgetResponseHandler() throws Exception { + testStartProcessResponseHandler(new FireAndForgetResponseHandler()); + } + + @Test + public void testStartProcessUseOfAsyncResponseHandler() throws Exception { + ResponseCallback callback = new BlockingResponseCallback(null); + + testStartProcessResponseHandler(new AsyncResponseHandler(callback)); + // now let's check if response has arrived + ServiceResponsesList response = callback.get(); + assertNotNull(response); + assertNotNull(response.getResponses()); + assertEquals(1, response.getResponses().size()); + KieServerAssert.assertSuccess(response.getResponses().get(0)); + + ServiceResponse serviceResponse = response.getResponses().get(0); + Object result = serviceResponse.getResult(); + assertNotNull(result); + + } + + @Test + public void testStartProcessUseOfAsyncResponseHandlerWithMarshaller() throws Exception { + Marshaller marshaller = MarshallerFactory.getMarshaller(new HashSet>(extraClasses.values()), configuration.getMarshallingFormat(), client.getClassLoader()); + ResponseCallback callback = new BlockingResponseCallback(marshaller); + + testStartProcessResponseHandler(new AsyncResponseHandler(callback)); + // now let's check if response has arrived + Long processInstanceId = callback.get(Long.class); + assertNotNull(processInstanceId); + assertTrue(processInstanceId.longValue() > 0); + } + + @Test + public void testGetProcessInstancesUseOfAsyncResponseHandlerWithMarshaller() throws Exception { + Marshaller marshaller = MarshallerFactory.getMarshaller(new HashSet>(extraClasses.values()), configuration.getMarshallingFormat(), client.getClassLoader()); + ResponseCallback callback = new BlockingResponseCallback(marshaller); + + testGetProcessInstancesResponseHandler(new AsyncResponseHandler(callback)); + // now let's check if response has arrived + Long processInstanceId = callback.get(Long.class); + assertNotNull(processInstanceId); + assertTrue(processInstanceId.longValue() > 0); + + ProcessInstanceList processInstanceList = callback.get(ProcessInstanceList.class); + assertNotNull(processInstanceList); + + List instances = processInstanceList.getItems(); + assertNotNull(instances); + assertEquals(1, instances.size()); + } + + @Test + public void testGetTasksUseOfAsyncResponseHandlerWithMarshaller() throws Exception { + Marshaller marshaller = MarshallerFactory.getMarshaller(new HashSet>(extraClasses.values()), configuration.getMarshallingFormat(), client.getClassLoader()); + ResponseCallback callback = new BlockingResponseCallback(marshaller); + + testGetTaskResponseHandler(new AsyncResponseHandler(callback)); + // now let's check if response has arrived + Long processInstanceId = callback.get(Long.class); + assertNotNull(processInstanceId); + assertTrue(processInstanceId.longValue() > 0); + + TaskSummaryList taskSummaryList = callback.get(TaskSummaryList.class); + assertNotNull(taskSummaryList); + + List tasks = taskSummaryList.getItems(); + assertNotNull(tasks); + assertEquals(1, tasks.size()); + } + + /* + * helper methods that comes with tests that can be invoked with various response handlers + */ + + private void testStartProcessResponseHandler(ResponseHandler responseHandler) throws Exception { + List processInstances = queryClient.findProcessInstances(0, 100); + assertEquals(0, processInstances.size()); + + // change response handler for processClient others are not affected + processClient.setResponseHandler(responseHandler); + Long processInstanceId = processClient.startProcess(CONTAINER_ID, PROCESS_ID_USERTASK); + // since we use fire and forget there will always be null response + assertNull(processInstanceId); + + delay(); + // Process should be started completely async - fire and forget. + processInstances = queryClient.findProcessInstances(0, 100); + assertEquals(1, processInstances.size()); + + ProcessInstance pi = processInstances.get(0); + assertEquals(org.kie.api.runtime.process.ProcessInstance.STATE_ACTIVE, pi.getState().intValue()); + + // set to reqreply so it finishes the test properly + processClient.setResponseHandler(new RequestReplyResponseHandler()); + processClient.abortProcessInstance(CONTAINER_ID, pi.getId()); + } + + private void testGetProcessInstancesResponseHandler(ResponseHandler responseHandler) throws Exception { + List processInstances = queryClient.findProcessInstances(0, 100); + assertEquals(0, processInstances.size()); + + // change response handler for processClient others are not affected + processClient.setResponseHandler(responseHandler); + Long processInstanceId = processClient.startProcess(CONTAINER_ID, PROCESS_ID_USERTASK); + // since we use fire and forget there will always be null response + assertNull(processInstanceId); + + delay(); + // change response handler for queryClient others are not affected + queryClient.setResponseHandler(responseHandler); + // Process should be started completely async - fire and forget. + processInstances = queryClient.findProcessInstances(0, 100); + assertNull(processInstances); + + // set it back for the sake of verification + queryClient.setResponseHandler(new RequestReplyResponseHandler()); + // Process should be started completely async - fire and forget. + processInstances = queryClient.findProcessInstances(0, 100); + assertNotNull(processInstances); + assertEquals(1, processInstances.size()); + + ProcessInstance pi = processInstances.get(0); + assertEquals(org.kie.api.runtime.process.ProcessInstance.STATE_ACTIVE, pi.getState().intValue()); + + // set to reqreply so it finishes the test properly + processClient.setResponseHandler(new RequestReplyResponseHandler()); + processClient.abortProcessInstance(CONTAINER_ID, pi.getId()); + } + + private void testGetTaskResponseHandler(ResponseHandler responseHandler) throws Exception { + List processInstances = queryClient.findProcessInstances(0, 100); + assertEquals(0, processInstances.size()); + + // change response handler for processClient others are not affected + processClient.setResponseHandler(responseHandler); + Long processInstanceId = processClient.startProcess(CONTAINER_ID, PROCESS_ID_USERTASK); + // since we use fire and forget there will always be null response + assertNull(processInstanceId); + + delay(); + // set it back for the sake of verification + queryClient.setResponseHandler(new RequestReplyResponseHandler()); + // Process should be started completely async - fire and forget. + processInstances = queryClient.findProcessInstances(0, 100); + assertNotNull(processInstances); + assertEquals(1, processInstances.size()); + + ProcessInstance pi = processInstances.get(0); + assertEquals(org.kie.api.runtime.process.ProcessInstance.STATE_ACTIVE, pi.getState().intValue()); + + // change response handler for taskClient others are not affected + taskClient.setResponseHandler(responseHandler); + List tasks = taskClient.findTasksAssignedAsPotentialOwner(USER_YODA, 0, 10); + assertNull(tasks); + + // set to reqreply so it finishes the test properly + processClient.setResponseHandler(new RequestReplyResponseHandler()); + processClient.abortProcessInstance(CONTAINER_ID, pi.getId()); + } + + /* + * since these tests are about async processing on the server we need to introduce delay, + * even though it might not be reliable... + */ + private void delay() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + logger.debug("InterruptedException caught while delaying execution..."); + } + } +}