JLupin Next Server provides mechanisms of asynchronous processing. The below diagram presents this mechanism:

To describe queues in JLupin, we will use an example.
Let’s start from interface id and a class implementing this interface.

public interface RandomService {
    int getRandomInt();
}

@Service("randomService")
public class RandomServiceImpl implements RandomService {

    private JLupinLogger jLupinLogger;
    private SecureRandom secureRandom;

    public RandomServiceImpl() {
        jLupinLogger = JLupinRuntimeContainer.getInstance().getJLupinLogger();
        secureRandom = new SecureRandom();
    }

    public int getRandomInt() {
        return secureRandom.nextInt();
    }
}

For this example, service will be registered as "randomService" and microservice will be named "randomMicroservice". JLupin automatically provides queue interfaces for asynchronous triggering. Without additional programming work we can simply trigger our previously written queues in asynchronous mode – without waiting for the result.

Let’s take a look at triggering:

public class AsynchronousTest {

    private JLupinLogger jLupinLogger;
    private JLupinSerializer jLupinSerializer;
    private JLupinSocketDelegatorImpl jLupinDelegator;

    @Before
    public void before() {
        JLupinSimpleSystemOutLoggerImpl.initializeWithLogMode(LogMode.DEBUG);
        jLupinLogger     = JLupinSimpleSystemOutLoggerImpl.getInstance();
        jLupinSerializer = JLupinFSTSerializerImpl.getInstance();
        jLupinDelegator = new JLupinSocketDelegatorImpl(jLupinLogger, jLupinSerializer);
        jLupinDelegator.setSocketAddressToConnectServer(new InetSocketAddress("localhost", 9093));
    }

    @org.junit.Test
    public void test() throws Throwable {
        String taskId = addAsynchronousTask();
        jLupinLogger.info("task id:" + taskId );
    }

    private String addAsynchronousTask() throws Throwable {
         JLupinInputParameter jLupinInputParameter = new JLupinInputParameter();
         jLupinInputParameter.setApplicationName("randomMicroservice");
         jLupinInputParameter.setServiceName("randomService");
         jLupinInputParameter.setMethodName("getRandomInt");
         jLupinInputParameter.setAsynchronousBusName("simpleBus");

         JLupinOutputParameter jLupinOutputParameter = jLupinDelegator.delegate(jLupinInputParameter);
         String taskId = jLupinOutputParameter.getResultObject();
         return taskId;
    }
}

In addAsynchronousTask method in JLupinInputParameter input object we provide all input information for asynchronous triggering, such as: microservice name, service name, method name, and the most important "simpleBus" queue name – we will check below where it is configured. Other parameters are optional (sessionId, requestId). As mentioned before, output object from a given method cannot be a trigger effect. In this case, task id is an effect. Variable that means job acceptance to a queue.

Trigger effect is presented below:

[2015-10-27 09:12:01] CLASS:com.jlupin.test.ServiceAsynchronousTest MESSAGE ID:0 LEVEL:INFO THREAD:1 METHOD:test LINE:37 LOG:task id: 35a65c8d364a6a35e6c5bc972e35fb244d6b1f92726fe254908e9ea6d8d10756)

Let's have a look at the server log. To see the entry below on a server it is necessary to change login mode to debug. To enable this it is required to change the following entry in log4j.xml file in server-resources directory:

<priority value="DEBUG"/>

Logs:
Task acceptance to execute in queue:

2015-10-27 09:12:01,597 DEBUG [com.jlupin.impl.entrypoint.multiprocess.binary.asynchronous.bus.JLupinAsynchronousBus] (pool-29-thread-1)  MESSAGE ID:1 LINE:52 LOG:added task:JLupinAsynchronousTask{id='35a65c8d364a6a35e6c5bc972e35fb244d6b1f92726fe254908e9ea6d8d10756', inputTimeInMillis=1445933521597, probeCounter=0, applicationName='quickStartApplication', serviceName='quickStartService', busName='simpleBus', sessionId='sessionId', requestId='requestId', methodName='getRandomInt'} into bus:simpleBus
2015-10-27 09:12:01,751 INFO  [com.jlupin.impl.entrypoint.multiprocess.binary.asynchronous.multithreading.JLupinMultiThreadingBinaryAsynchronousEntryPointImpl] (pool-29-thread-1)  MESSAGE ID:42 LINE:114 LOG:successfully added task:JLupinAsynchronousTask{id='35a65c8d364a6a35e6c5bc972e35fb244d6b1f92726fe254908e9ea6d8d10756', inputTimeInMillis=1445933521597, probeCounter=0, applicationName='quickStartApplication', serviceName='quickStartService', busName='simpleBus', sessionId='sessionId', requestId='requestId', methodName='getRandomInt'} into task list

Task execution:

2015-10-27 09:12:03,766 DEBUG [com.jlupin.impl.entrypoint.multiprocess.binary.asynchronous.bus.JLupinAsynchronousBus] (pool-9-thread-19)  MESSAGE ID:33 LINE:114 LOG:executing task:JLupinAsynchronousTask{id='35a65c8d364a6a35e6c5bc972e35fb244d6b1f92726fe254908e9ea6d8d10756', inputTimeInMillis=1445933521597, probeCounter=0, applicationName='quickStartApplication', serviceName='quickStartService', busName='simpleBus', sessionId='sessionId', requestId='requestId', methodName='getRandomInt'}

Task execution confirmation:

2015-10-27 09:12:04,259 DEBUG [com.jlupin.impl.manager.multiprocess.JLupinDefaultNewMultiProcessManagerImpl] (pool-9-thread-19)  MESSAGE ID:2891 LINE:2085 LOG:task:35a65c8d364a6a35e6c5bc972e35fb244d6b1f92726fe254908e9ea6d8d10756 executed correctly

Let’s take a look at the part of a server configuration file (section of Server Configuration File).

Method part public JLupinGlobalServer[] getJLupinGlobalServers():

JLupinDefaultGlobalServerImpl mThreadCycleListServer = new JLupinDefaultGlobalServerImpl("asynchronous_3");
mThreadCycleListServer.setThreadPoolSize(64);
mThreadCycleListServer.setPort(9093);
mThreadCycleListServer.setBacklog(1024);
mThreadCycleListServer.setEntryPointIndex(4);

Server on 9093 port to which entry point of 4 index (setEntryPointIndex)is connected is defined – requests for asynchronous processing will be accepted on 9093 port 4 index corresponds to the below entry point of the method public JLupinMultiProcessEntryPoint[] getJLupinMultiProcessEntryPoints(), which means:

//JLUPIN ASYNCHRONOUS CYCLE MULTITHREADING QUEUE
JLupinCycleListBinaryAsynchronousEntryPointImpl mThreadAsynchCycleEP = new JLupinCycleListBinaryAsynchronousEntryPointImpl();

JLupinCycleListBinaryAsynchronousEntryPointImpl is responsible for asynchronous support which means queue support. It allows to remotely initiate any method on any class – provided that it is available as a remote service. It is initiated by JLupin Asynchronous Java Object Remote Invocation protocol and task id is returned as an output. Initiation processing for a given method is made at the background asynchronously.
JLupinCycleListBinaryAsynchronousEntryPointImpl allows defining many queues as shown below:

//CYCLE BUS DEFINITION
mThreadAsynchCycleEP.addJLupinCycleTaskListBus("simpleBus",new JLupinCycleTaskListBusImpl(2000,16,0,2,1000,1,false));

//CYCLE BUS DEFINITION WITH 5000 MILLISECONDS DELAY
mThreadAsynchCycleEP.addJLupinCycleTaskListBus("simpleCycleBusWithDelay",new JLupinCycleTaskListBusImpl(2000,16,5000,2,1000,1,false));

The above example presents configuration of two queues. First queue named "simpleBus" processes data immediately after receiving them. Second queue named "simpleCycleBusWithDelay" starts data processing 5 seconds after receiving them. New queue is added by initiating addJLupinCycleTaskListBus method on an object JLupinCycleListBinaryAsynchronousEntryPointImpl. First parameter of the above mention method is the name of a queue, the second one is an object of queue definition of JLupinCycleTaskListBusImpl.

Meaning of parameters of class constructor JLupinCycleTaskListBusImpl:

  1. First parameter is time presented in milliseconds of switching between internal queues (number of internal queues is the last parameter).
  2. Second parameter is the number of threads regarding one internal queue – therefore number of threads for the whole queue is the last parameter increased by the second parameter – because every internal queue has its own poll of threads, they do not block or saturate.
  3. Third parameter is a delay in milliseconds, which is a minimal time that must pass for a task to be ordered to a microservice. If time does not pass, task is moved at the end of internal queue. Therefore it is important to adjust number of internal queues to capacity and pool of threads for that internal queue. Of course, one task is taken by one thread from the pool, therefore at least "numberofthreadsinapool" of tasks from a queue is collected and only date is checked or tasks is sent to a particular microservice.
  4. Fourth parameter is the number of trails to send task to application – task returns to a queue only in two cases: connection timeout or if microservice is off.
  5. Fifth parameter is the number of internal queues.

In the above example two queues are defined: simpleBus and simpleCycleBusWithDelay. In the triggering example in this section messages were sent to simpleBus queue – this name was placed in JLupinInputParameter

jLupinInputParameter.setAsynchronousBusName("simpleBus");

The above parameter decides to which queue a message will be sent. You can create new queues under new names and new settings adding them to the server configuration file as below:

mThreadAsynchCycleEP.addJLupinCycleTaskListBus("yourBus",new JLupinCycleTaskListBusImpl(1000,8,5000,2,2000,10,false));

At the end, explanation of classes appearing in the above example.

Class:

JLupinCycleListBinaryAsynchronousEntryPointImpl

Method:

addJLupinCycleTaskListBus(String name, JLupinCycleTaskListBus jLupinCycleTaskListBus)

JLupinCycleTaskListBus - class constructors:

public JLupinCycleTaskListBusImpl(int timeToSwitchQueueInMillis,
                                  int threadPoolSizeForTaskList,
                                  int delay,
                                  int probeAmount,
                                  long waitTimeToStopInMillis,
                                  int taskListAmount)

public JLupinCycleTaskListBusImpl(int timeToSwitchQueueInMillis,
                                  int threadPoolSizeForTaskList,
                                  int delay,
                                  int probeAmount,
                                  long waitTimeToStopInMillis,
                                  int taskListAmount, boolean logLoopOnDebug)