Setup Local AWS SQS Service with Elastic MQ and Spring Cloud


Introduction

Amazon Simple Queue Service (SQS) is a service on Amazon Web Services that provides a scalable message queuing service for loosely-coupled and reliable communication among distributed software components and microservices. In this article, we will setup a local Elastic MQ service, which will simulate a SQS service and use Spring Cloud to push messages to that queue. With a minimal amount of configuration, this service can be deployed to a EC2 instance and integrate with an actual SQS service.

We will be focusing on the standard SQS queue. The other option, a FIFO Queue is limited to 300TPS(300 transactions per second) and is not a valid use-case for many high-throughput applications.

Characteristics of a Standard SQS Queue


Fully-managed, high throughput messaging queue. Standard queues have nearly-unlimited transactions per second (TPS).

Designed for concurrency with multiple message producers and consumers. Multiple parts of your system can send or receive messages at the same time.

Add Dependencies to Gradle

First let's add the dependencies. We will be using the standard dependency here. Add the following dependencies to your build.gradle.

build.gradle


dependencies {
    compile('org.elasticmq:elasticmq-rest-sqs_2.11:0.10.1')
    compile group: 'org.springframework.cloud', name: 'spring-cloud-aws-messaging', version: '1.1.3.RELEASE'
    compile group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.49'
    
    testCompile('org.elasticmq:elasticmq-rest-sqs_2.11:0.10.1')
}    



Set up our Properties

Next let's set up our properties to configure our local ElasticMQ SQS service and Spring Cloud.

sqs-services.yaml

awsConfig:
  accessKey: x
  secretKey: x
  sqsQueueName: sqs-queue-name

queueBuffer:
  maxBatchOpenMs: 200  #Maximum amount of time, in milliseconds, that an outgoing call waits for other calls of the same type to batch with
  maxBatchSize: 10  #The maximum number of messages that will be batched together in a single batch request
  maxInflightOutboundBatches: 5  #The maximum number of active receive batches that can be processed at the same time

elasticMqLocalSqsUri:
  scheme: http
  host: localhost
  path:
  port: 9324

awsSqsUri:
  scheme: https
  host: sqs.us-east-1.amazonaws.com
  path: /1234567890/
  port: 80

echoSqsMessagesLocal: false

Let's also add some flags to our application properties.

sqs-services.yaml

#Local SQS Config
aws.local.sqs.localElasticMq.enable=true
aws.local.sqs.localElasticMq.startServer=false



Create POJOs to capture our Configuration Properties

Next, since we are working with a lot of properties, let's create a way to deal with them in a manageable way. We will create POJOs(Plain old Java Objects) that will contain various properties from our configuration. Here is the first POJO to capture Queue Buffer Mapped Properties:

QueueBufferMappedProperties.java

public class QueueBufferMappedProperties {

    /*
        The maximum amount of time, in milliseconds, that an outgoing call waits for other calls of the same type to batch with.
        The higher the setting, the fewer batches are required to perform the same amount of work.
        Of course, the higher the setting, the more the first call in a batch has to spend waiting.
        If this parameter is set to zero, submitted requests do not wait for other requests, effectively disabling batching.
        The default value of this setting is 200 milliseconds.
     */
    private Long maxBatchOpenMs;

    /*
        The maximum number of messages that will be batched together in a single batch request.
        The higher the setting, the fewer batches will be required to carry out the same number of requests.
        The default value of this setting is 10 requests per batch, which is also the maximum batch size currently allowed by Amazon SQS.
     */
    private Integer maxBatchSize;

    /*
        The maximum number of active outbound batches that can be processed at the same time.
        The higher the setting, the faster outbound batches can be sent (subject to other limits, such as CPU or bandwidth).
        The higher the setting, the more threads are consumed by the AmazonSQSBufferedAsyncClient. The default value is 5 batches.
     */
    private Integer maxInflightOutboundBatches;

    public Long getMaxBatchOpenMs() {
        return maxBatchOpenMs;
    }

    public Integer getMaxBatchSize() {
        return maxBatchSize;
    }

    public Integer getMaxInflightOutboundBatches() {
        return maxInflightOutboundBatches;
    }

    public void setMaxBatchOpenMs(Long maxBatchOpenMs) {
        this.maxBatchOpenMs = maxBatchOpenMs;
    }

    public void setMaxBatchSize(Integer maxBatchSize) {
        this.maxBatchSize = maxBatchSize;
    }

    public void setMaxInflightOutboundBatches(Integer maxInflightOutboundBatches) {
        this.maxInflightOutboundBatches = maxInflightOutboundBatches;
    }
}

Next, here is a POJO to capture the URI of our local Elastic MQ Service:

LocalServiceUri.java

public class LocalServiceUri {

    private String scheme;
    private String host;
    private String path;
    private String port;

    public String getScheme() {
        return scheme;
    }

    public String getHost() {
        return host;
    }

    public String getPath() {
        return path;
    }

    public String getPort() {
        return port;
    }

    public void setScheme(String scheme) {
        this.scheme = scheme;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public void setPath(String path) {
        this.path = path;
    }

    public void setPort(String port) {
        this.port = port;
    }
}

Next, here is a POJO to capture various AWS config properties:

AwsMappedProperties.java

public class AwsMappedProperties {

    private String accessKey;
    private String secretKey;
    private String sqsQueueName;

    public String getAccessKey() {
        return accessKey;
    }

    public String getSecretKey() {
        return secretKey;
    }

    public String getSqsQueueName() {
        return sqsQueueName;
    }

    public void setAccessKey(String accessKey) {
        this.accessKey = accessKey;
    }

    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }

    public void setSqsQueueName(String sqsQueueName) {
        this.sqsQueueName = sqsQueueName;
    }

}



Enable Configuration Properties

Next up, let's configure a Spring Configuration class which will automatically read in parameters from our configuration YAML into our POJO objects.

AwsMappedProperties.java

@Configuration
@EnableConfigurationProperties
@ConfigurationProperties(locations = "classpath:sqs-services.yaml")
public class SqsConfigMappingProperties {

    private LocalServiceUri elasticMqLocalSqsUri;
    private LocalServiceUri awsSqsUri;
    private AwsMappedProperties awsConfig;
    private QueueBufferMappedProperties queueBuffer;

    private Boolean echoSqsMessagesLocal;

    public LocalServiceUri getElasticMqLocalSqsUri() {
        return elasticMqLocalSqsUri;
    }

    public LocalServiceUri getAwsSqsUri() {
        return awsSqsUri;
    }

    public AwsMappedProperties getAwsConfig() {
        return awsConfig;
    }

    public QueueBufferMappedProperties getQueueBuffer() {
        return queueBuffer;
    }

    public Boolean getEchoSqsMessagesLocal() {
        return echoSqsMessagesLocal;
    }

    public void setElasticMqLocalSqsUri(LocalServiceUri elasticMqLocalSqsUri) {
        this.elasticMqLocalSqsUri = elasticMqLocalSqsUri;
    }

    public void setAwsSqsUri(LocalServiceUri awsSqsUri) {
        this.awsSqsUri = awsSqsUri;
    }

    public void setAwsConfig(AwsMappedProperties awsConfig) {
        this.awsConfig = awsConfig;
    }

    public void setQueueBuffer(QueueBufferMappedProperties queueBuffer) {
        this.queueBuffer = queueBuffer;
    }

    public void setEchoSqsMessagesLocal(Boolean echoSqsMessagesLocal) {
        this.echoSqsMessagesLocal = echoSqsMessagesLocal;
    }
}



Create AWS Spring Config

We will set up our SQS queue such that it will be able to use ElasticMQ locally and AWS SQS when deployed.

AwsMappedProperties.java

@Configuration
@EnableConfigurationProperties(SqsConfigMappingProperties.class)
public class AwsConfig {

    @Value("${aws.local.sqs.localElasticMq.enable}")
    Boolean enableLocalElasticMq;

    @Value("${aws.local.sqs.localElasticMq.startServer}")
    Boolean startLocalElasticMq;

    @Autowired
    SqsConfigMappingProperties sqsConfigMappingProperties;

    @Bean
    public UriComponents elasticMqLocalSqsUri() {

        LocalServiceUri elasticMqLocalSqsUri = sqsConfigMappingProperties.getElasticMqLocalSqsUri();

        return UriComponentsBuilder.newInstance()
                .scheme(elasticMqLocalSqsUri.getScheme())
                .host(elasticMqLocalSqsUri.getHost())
                .port(elasticMqLocalSqsUri.getPort())
                .build()
                .encode();
    }

    @Bean
    public SQSRestServer sqsRestServer(UriComponents elasticMqLocalSqsUri) {
        SQSRestServer sqsRestServer = SQSRestServerBuilder
                .withPort(Integer.valueOf(elasticMqLocalSqsUri.getPort()))
                .withInterface(elasticMqLocalSqsUri.getHost())
                .start();

        return sqsRestServer;
    }

    @Lazy
    @Bean(name = "amazonSqsLocal")
    @DependsOn("sqsRestServer")
    @ConditionalOnExpression("${aws.local.sqs.localElasticMq.enable}")
    public AmazonSQSAsync amazonSqsLocal(AWSCredentials amazonAWSCredentials) {

        AmazonSQSAsyncClient awsSQSAsyncClient = new AmazonSQSAsyncClient(amazonAWSCredentials);
        awsSQSAsyncClient.setEndpoint(createURI(sqsConfigMappingProperties.getElasticMqLocalSqsUri()));
        awsSQSAsyncClient.createQueue(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName());

        QueueBufferMappedProperties queueBufferMappedProperties = sqsConfigMappingProperties.getQueueBuffer();
        QueueBufferConfig config = new QueueBufferConfig()
                .withMaxBatchOpenMs(queueBufferMappedProperties.getMaxBatchOpenMs())
                .withMaxBatchSize(queueBufferMappedProperties.getMaxBatchSize())
                .withMaxInflightOutboundBatches(queueBufferMappedProperties.getMaxInflightOutboundBatches());

        AmazonSQSBufferedAsyncClient amazonSQSBufferedAsyncClient = new AmazonSQSBufferedAsyncClient(awsSQSAsyncClient,config);



        return amazonSQSBufferedAsyncClient;
    }

    @Lazy
    @Bean(name = "amazonSqs")
    @ConditionalOnExpression("!${aws.local.sqs.localElasticMq.enable}")
    public AmazonSQSAsync amazonSqs(AWSCredentials amazonAWSCredentials) {

        AmazonSQSAsyncClient awsSQSAsyncClient = new AmazonSQSAsyncClient(amazonAWSCredentials);
        awsSQSAsyncClient.setEndpoint(createURI(sqsConfigMappingProperties.getAwsSqsUri()));
        awsSQSAsyncClient.createQueue(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName());

        QueueBufferMappedProperties queueBufferMappedProperties = sqsConfigMappingProperties.getQueueBuffer();
        QueueBufferConfig config = new QueueBufferConfig()
                .withMaxBatchOpenMs(queueBufferMappedProperties.getMaxBatchOpenMs())
                .withMaxBatchSize(queueBufferMappedProperties.getMaxBatchSize())
                .withMaxInflightOutboundBatches(queueBufferMappedProperties.getMaxInflightOutboundBatches());

        AmazonSQSBufferedAsyncClient amazonSQSBufferedAsyncClient = new AmazonSQSBufferedAsyncClient(awsSQSAsyncClient,config);

        return amazonSQSBufferedAsyncClient;
    }

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSqs, AmazonSQSAsync amazonSqsLocal, SQSRestServer sqsRestServer) {
        QueueMessagingTemplate queueMessagingTemplate;

        if(enableLocalElasticMq)
            queueMessagingTemplate = new QueueMessagingTemplate(amazonSqsLocal);
        else
            queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs);

        queueMessagingTemplate.setDefaultDestinationName(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName());

        if(!startLocalElasticMq)
            sqsRestServer.stopAndWait();

        return queueMessagingTemplate;
    }

    @Bean
    public AWSCredentials amazonAWSCredentials() {
        if ("local".equals(ApplicationInfo.getEnvironment())) {
            return new BasicAWSCredentials(sqsConfigMappingProperties.getAwsConfig().getAccessKey(),
                    sqsConfigMappingProperties.getAwsConfig().getSecretKey());
        }

        return new DefaultAWSCredentialsProviderChain().getCredentials();
    }

    private static String createURI(LocalServiceUri localServiceUri) {

        return UriComponentsBuilder.newInstance()
                .scheme(localServiceUri.getScheme())
                .host(localServiceUri.getHost())
                .port(localServiceUri.getPort())
                .path(localServiceUri.getPath())
                .build()
                .encode().toUriString();
    }
}



SQS Service Implementation



SqsService.java

public interface SqsService {
    void sendSqsMessage(DomainObject domainObject);
}



SqsService.java

@Service
@EnableConfigurationProperties(SqsConfigMappingProperties.class)
public class SqsServiceImpl implements SqsService {

    @Autowired
    SqsConfigMappingProperties sqsConfigMappingProperties;

    @Autowired
    QueueMessagingTemplate queueMessagingTemplate;

    @Override
    public void sendSqsMessage(DomainObject domainObject) {
        queueMessagingTemplate.convertAndSend(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName(), pricingChanges);
    }
}



Using the SQS Service in a Controller

Using the SQS Service in a controller to make a REST API call to put a serialized JSON message in the queue.

DomainObjectIngestController.java

@RestController
@RequestMapping("/domainObject")
public class DomainObjectIngestController {

    @Autowired
    SqsService sqsService;

    /**
     *
     * @return version information about the application.
     */
    @RequestMapping(value = "", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity postDomainObject(@RequestBody DomainObject input) {

        sqsService.sendSqsMessage(input);
        return new ResponseEntity<>(HttpStatus.NO_CONTENT);
    }

}



Testing your Service



SqsServiceImplTest.java

@Profile("local")
public class SqsServiceImplTest {

    private static final String QUEUE_NAME = "sqs-queue-name";
    private static final int SQS_PORT = 9324;
    private static final String SQS_HOSTNAME = "localhost";

    private static final String EXPECTED_SELLING_PRICE = "100";

    private SQSRestServer sqsRestServer;
    private SqsServiceImpl classUnderTest;
    private QueueMessagingTemplate queueMessagingTemplate;
    private SqsConfigMappingProperties sqsMappingProperties;

    @Before
    public void setUp() {
        //Run local ElasticMQ SQS queue
        try {
            sqsRestServer = SQSRestServerBuilder
                    .withPort(SQS_PORT)
                    .withInterface(SQS_HOSTNAME)
                    .start();
        } catch (BindFailedException e) {
            e.printStackTrace();
        }

        AmazonSQSAsyncClient awsSQSAsyncClient = new AmazonSQSAsyncClient(new BasicAWSCredentials("x", "x"));
        awsSQSAsyncClient.setEndpoint("http://" + SQS_HOSTNAME + ":" + SQS_PORT);
        awsSQSAsyncClient.createQueue(QUEUE_NAME);

        SqsConfigMappingProperties sqsMappingProperties = new SqsConfigMappingProperties();
        sqsMappingProperties.setAwsConfig(buildAwsConfig());

        queueMessagingTemplate = new QueueMessagingTemplate(awsSQSAsyncClient);
        queueMessagingTemplate.setDefaultDestinationName(QUEUE_NAME);
        classUnderTest = new SqsServiceImpl();

        ReflectionTestUtils.setField(classUnderTest, "sqsConfigMappingProperties", sqsMappingProperties, SqsConfigMappingProperties.class);
        ReflectionTestUtils.setField(classUnderTest, "queueMessagingTemplate", queueMessagingTemplate, QueueMessagingTemplate.class);
    }

    @After
    public void tearDown() throws Exception {
        if(sqsRestServer != null)
            sqsRestServer.stopAndWait();
    }

    @Test
    public void givenValidPriceChange_whenSendSqsMsg_theVerifyReceivedMsg() throws Exception {

        classUnderTest.sendSqsMessage(buildPricingChange());
        PricingChange actualResponse = queueMessagingTemplate.receiveAndConvert(QUEUE_NAME,PricingChange.class);
        assertEquals(EXPECTED_SELLING_PRICE, actualResponse.getSellingPrice().getAmount());
    }

    public AwsMappedProperties buildAwsConfig() {

        AwsMappedProperties awsConfig = new AwsMappedProperties();
        awsConfig.setSqsQueueName(QUEUE_NAME);

        return awsConfig;
    }

}

Comments

  1. Thanks for sharing this information with us and it was a nice blog.
    AWS Cloud Support in Delhi

    ReplyDelete
  2. I wanted to thank you for this great read. Your blog is one of the finest blogs . Thanks for posting this informative article.app development company in bhopal

    ReplyDelete
  3. I am following your blog regularly and got great information.
    SEO Company Islamabad

    ReplyDelete
  4. amazing post and written in a very simple and impressive language. Thanks for sharing
    Microservices Online Training
    Microservices Training in Hyderabad

    ReplyDelete
  5. This comment has been removed by the author.

    ReplyDelete
  6. You always provide quality based posts, enjoy reading your work. Full Stack Developer course in Chennai from our website.

    ReplyDelete
  7. Stunning! Such an astonishing and supportive post this is. I incredibly love it. It's so acceptable thus wonderful. I am simply astounded.data science course

    ReplyDelete
  8. Two full endorsement for this magnificent article of yours. I've genuinely refreshing scrutinizing this article today and I figure this might be uncommon contrasted with other articles that I've examined now. On the off chance that it's not all that much difficulty prop this work up on in a comparable quality.
    360DigiTMG data science course

    ReplyDelete
  9. This is a great post I saw thanks to sharing. I really want to hope that you will continue to share great posts in the future.
    artificial intelligence course in noida

    ReplyDelete

  10. "I can set up my original thought from this post. It gives all around data. A commitment of gratefulness is all together for this essential data for all,

    "
    HRDF training

    ReplyDelete
  11. I see the best substance on your blog and I unbelievably love getting them.
    hrdf training course

    ReplyDelete


  12. By and by I think thrilled I found the web journals.
    hrdf claimable training

    ReplyDelete
  13. This is an excellent post I seen thanks to share it. It is really what I wanted to see hope in future you will continue for sharing such a excellent post.
    business analytics course

    ReplyDelete
  14. I need to communicate my deference of your composing aptitude and capacity to make perusers read from the earliest starting point as far as possible. I might want to peruse more up to date presents and on share my musings with you.
    what is the difference between analysis and analytics

    ReplyDelete
  15. Cognex is the AWS Training in Chennai. Cognex offers so many services according to the students needs. Cognex is the best place to learn microsoft azure, prince2 foundation, ITI V4 foundation,etc,

    ReplyDelete
  16. Happy to read this blog. Very informative and best UI. For more details please check our website
    https://kgrnaudit.com/fraud-investigation-audit-in-dubai/

    ReplyDelete
  17. Excellence blog! Thanks For Sharing, The information provided by you is really a worthy. I read this blog and I got the more information about
    data scientist courses aurangabad

    ReplyDelete
  18. Nice and very informative blog, glad to learn something through you.
    ai course aurangabad

    ReplyDelete
  19. I am really happy to say it’s an interesting post to read. I learn new information from your article, you are doing a great job. Keep it up
    Software Testing Tools Training in Hyderabad

    ReplyDelete
  20. "I'm grateful for the diversity of perspectives you bring to your blog. Your willingness to explore different angles and present various viewpoints fosters a more inclusive and open-minded community. Thank you for promoting a space for diverse voices!"
    planet fitness coupons

    ReplyDelete
  21. "Your blog is a valuable resource for staying informed about the latest developments. The way you break down complex topics and provide practical insights is incredibly helpful. Thank you for keeping us in the loop with your expertise."
    raising canes coupons

    ReplyDelete

Post a Comment

Popular posts from this blog

Change Port on a Spring Boot Application when using intelliJ

New Personal Website

How to set up a SQL Server 2008 Local Database