Publishers / Subscribers

Let's assume that we want to call our method on all instances of registered microservices. We will use service already known from other parts of documentation. The name of microservice we will be calling is sampleMicroservice with service named exampleService. It's interface and implementation:

public interface ExampleService {
    void exampleMethod(Integer i);
}
@Service("exampleService")
public class ExampleServiceImpl implements ExampleService {
    final Logger logger = LoggerFactory.getLogger(ExampleServiceImpl.class);

    @Override
    void void exampleMethod(Integer i) {
        logger.info("example method called with argument: " + i);
    }
}

How to configure communication interfaces is shown in communication chapter. Here we will show how to use publisher and subscriber regardles the client type (JUnit client or servlet microservice - Spring Boot by default).

Queue configuration

Queue named SAMPLE configuration in file queues.yml in microservice queueMicroservice:

DELEGATOR:
    howOftenCheckingServerInMillisByDelegator: 5000
    repeatsAmountByDelegator: 4
    timeToWaitBetweenRepeatProbeInMillisByDelegator: 1000
QUEUES:
  SAMPLE:
    innerQueuesAmount: 4
    waitTimeBetweenCheckingTaskReadyToStartInMillis: 500
    storageClassName: 'com.jlupin.impl.microservice.partofjlupin.asynchronous.storage.queue.impl.memory.JLupinMemoryQueueStorageImpl'
    threadAmount: 128
    maxSendProbeAmount: 2
    maxAcceptExceptionAmount: 1
    exceptionStringToRepeat: 'java.lang.Exception'
    garbageThreadAmount: 4
    howLongTaskStatusWillBeInATransientStateInMillis: 1000
    howLongTaskResultShouldBeOnQueueWithoutDownloadingItInMillis: 30000
    howLongTaskInputShouldBeOnQueueWithoutResultItInMillis: 60000
    waitTimeBetweenCheckingTaskByGarbageManagerInMillis: 5000
    delaySendProcessTaskToExecuteInMillis: 0

Spring Boot client

We will use standard controller to show usage of publishers and subscribers in action. Let's start with Spring Boot configuration (microservice name is example):

package com.example.configuration;

import com.jlupin.impl.client.delegator.balance.JLupinQueueLoadBalancerDelegatorImpl;
import com.jlupin.impl.client.util.queue.JLupinClientPublisherUtil;
import com.jlupin.impl.client.util.JLupinClientUtil;
import com.jlupin.interfaces.client.delegator.JLupinDelegator;
import com.jlupin.interfaces.common.enums.PortType;
import com.jlupin.interfaces.microservice.partofjlupin.asynchronous.service.queue.JLupinQueueManagerService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan({
        "com.example",
        "com.jlupin.servlet.monitor.configuration"
})
public class ExampleSpringConfiguration {
    @Bean
    public JLupinDelegator getQueueJLupinDelegator() {
        final JLupinDelegator jLupinDelegator = JLupinClientUtil.generateInnerMicroserviceLoadBalancerDelegator(PortType.QUEUE);
        ((JLupinQueueLoadBalancerDelegatorImpl) jLupinDelegator).setGetStatusAnalyseAndChooseHighestFromAllEnvironment(true);
        return jLupinDelegator;
    }

    @Bean
    public JLupinQueueManagerService getJLupinQueueManagerService() {
        return JLupinClientUtil.generateRemote(getQueueJLupinDelegator(), "queueMicroservice", "jLupinQueueManagerService", JLupinQueueManagerService.class);
    }

    @Bean(name = "sampleClientPublisherUtil")
    public JLupinClientPublisherUtil getSampleClientPublisherUtil() {
        return new JLupinClientPublisherUtil("SAMPLE", getJLupinQueueManagerService());
    }
}

You can now create controller which can send message to all registered microservices:

package com.example.controller;

import com.jlupin.impl.client.util.publisher.JLupinClientPublisherUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ExampleController {
    @Autowired
    private JLupinClientPublisherUtil sampleClientPublisherUtil;

    @PostMapping("/emit")
    public String emit() throws Throwable {
        final String taskId = sampleClientPublisherUtil.publishTaskToAllSubscribers(
                "sampleMicroservice",
                "exampleService",
                "exampleMethod",
                new Object[]{
                        10
                }
        );

        return taskId;
    }
}

Value of taskId object is just an information for following calls. It will be same inside exampleMethod execution. For now it is not possible to retrieve each method call result. To test example just send POST request to /emit endpoint and see what happened (in all sampleMicroservice instances' log files).

JUnit client

Client implementation (as a JUnit test, that's why external ip addresses are used for JLupinDelegator).

Communication delegator definition

jLupinQueueDelegator = JLupinClientUtil.generateOuterMicroserviceLoadBalancerDelegator(5000, 3, 5000,PortType.QUEUE, 1000, 3000000,
        new JLupinMainServerInZoneConfiguration[]{
                new JLupinMainServerInZoneConfiguration("NODE_1", "127.0.0.1", 9090, 9095, 9096, 9097)
        }
);

((JLupinQueueLoadBalancerDelegatorImpl) jLupinQueueDelegator).setGetStatusAnalyseAndChooseHighestFromAllEnvironment(true);

try {
    jLupinQueueDelegator.start();
} catch (JLupinDelegatorException e) {
    throw new IllegalStateException("an error occurred caused by:", e);
}

Execution - on all instances of microservice

There is an option to execute method on all microservices register to load balancer. This way for example events in system could be emitted. It is very similiar to calling just one microservice. Remember that this way you won't get methods' results! So it is good practice to use only void methods here just to not confuse other developers.

public class IntegrationTest {
    private JLupinDelegator jLupinQueueDelegator;
    private JLupinQueueManagerService jLupinQueueManagerService;
    private JLupinClientPublisherUtil jLupinClientPublisherUtil;

    @Before
    public void before() throws Throwable {
        jLupinQueueDelegator = JLupinClientUtil.generateOuterMicroserviceLoadBalancerDelegator(5000, 3, 5000, PortType.QUEUE, 1000, 3000000,
                new JLupinMainServerInZoneConfiguration[]{
                        new JLupinMainServerInZoneConfiguration("NODE_1", "127.0.0.1", 9090, 9095, 9096, 9097)
                }
        );
        jLupinQueueManagerService = JLupinClientUtil.generateRemote(jLupinQueueDelegator, "queueMicroservice", "jLupinQueueManagerService" , JLupinQueueManagerService.class);
        jLupinClientPublisherUtil = new JLupinClientPublisherUtil("SAMPLE", jLupinQueueManagerService);

        ((JLupinQueueLoadBalancerDelegatorImpl) jLupinQueueDelegator).setGetStatusAnalyseAndChooseHighestFromAllEnvironment(true);

        try {
            jLupinQueueDelegator.start();
        } catch (JLupinDelegatorException e) {
            throw new IllegalStateException("an error occurred caused by:", e);
        }
    }

    @After
    public void after() throws Throwable {
        jLupinQueueDelegator.stop();
    }

    @Test
    public void taskTest_1() throws Throwable {
        final String taskId = jLupinClientPublisherUtil.publishTaskToAllSubscribers(
            "SAMPLE",
            "sampleMicroservice",
            "exampleService",
            "exampleMethod",
            new Object[]{
                new Integer(10)
            }
        );
    }
}

Value of taskId object is just an information for following calls. It will be same inside exampleMethod execution. For now it is not possible to retrieve each method call result.