Setup Local AWS SQS Service with Elastic MQ and Spring Cloud
Introduction
Amazon Simple Queue Service (SQS) is a service on Amazon Web Services that provides a scalable message queuing service for loosely-coupled and reliable communication among distributed software components and microservices. In this article, we will setup a local Elastic MQ service, which will simulate a SQS service and use Spring Cloud to push messages to that queue. With a minimal amount of configuration, this service can be deployed to a EC2 instance and integrate with an actual SQS service.We will be focusing on the standard SQS queue. The other option, a FIFO Queue is limited to 300TPS(300 transactions per second) and is not a valid use-case for many high-throughput applications.
Characteristics of a Standard SQS Queue
Fully-managed, high throughput messaging queue. Standard queues have nearly-unlimited transactions per second (TPS).
Designed for concurrency with multiple message producers and consumers. Multiple parts of your system can send or receive messages at the same time.
Add Dependencies to Gradle
First let's add the dependencies. We will be using the standard dependency here. Add the following dependencies to your build.gradle.build.gradle
dependencies {
compile('org.elasticmq:elasticmq-rest-sqs_2.11:0.10.1')
compile group: 'org.springframework.cloud', name: 'spring-cloud-aws-messaging', version: '1.1.3.RELEASE'
compile group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.49'
testCompile('org.elasticmq:elasticmq-rest-sqs_2.11:0.10.1')
}
Set up our Properties
Next let's set up our properties to configure our local ElasticMQ SQS service and Spring Cloud.sqs-services.yaml
awsConfig:
accessKey: x
secretKey: x
sqsQueueName: sqs-queue-name
queueBuffer:
maxBatchOpenMs: 200 #Maximum amount of time, in milliseconds, that an outgoing call waits for other calls of the same type to batch with
maxBatchSize: 10 #The maximum number of messages that will be batched together in a single batch request
maxInflightOutboundBatches: 5 #The maximum number of active receive batches that can be processed at the same time
elasticMqLocalSqsUri:
scheme: http
host: localhost
path:
port: 9324
awsSqsUri:
scheme: https
host: sqs.us-east-1.amazonaws.com
path: /1234567890/
port: 80
echoSqsMessagesLocal: false
Let's also add some flags to our application properties.
sqs-services.yaml
#Local SQS Config
aws.local.sqs.localElasticMq.enable=true
aws.local.sqs.localElasticMq.startServer=false
Create POJOs to capture our Configuration Properties
Next, since we are working with a lot of properties, let's create a way to deal with them in a manageable way. We will create POJOs(Plain old Java Objects) that will contain various properties from our configuration. Here is the first POJO to capture Queue Buffer Mapped Properties:QueueBufferMappedProperties.java
public class QueueBufferMappedProperties {
/*
The maximum amount of time, in milliseconds, that an outgoing call waits for other calls of the same type to batch with.
The higher the setting, the fewer batches are required to perform the same amount of work.
Of course, the higher the setting, the more the first call in a batch has to spend waiting.
If this parameter is set to zero, submitted requests do not wait for other requests, effectively disabling batching.
The default value of this setting is 200 milliseconds.
*/
private Long maxBatchOpenMs;
/*
The maximum number of messages that will be batched together in a single batch request.
The higher the setting, the fewer batches will be required to carry out the same number of requests.
The default value of this setting is 10 requests per batch, which is also the maximum batch size currently allowed by Amazon SQS.
*/
private Integer maxBatchSize;
/*
The maximum number of active outbound batches that can be processed at the same time.
The higher the setting, the faster outbound batches can be sent (subject to other limits, such as CPU or bandwidth).
The higher the setting, the more threads are consumed by the AmazonSQSBufferedAsyncClient. The default value is 5 batches.
*/
private Integer maxInflightOutboundBatches;
public Long getMaxBatchOpenMs() {
return maxBatchOpenMs;
}
public Integer getMaxBatchSize() {
return maxBatchSize;
}
public Integer getMaxInflightOutboundBatches() {
return maxInflightOutboundBatches;
}
public void setMaxBatchOpenMs(Long maxBatchOpenMs) {
this.maxBatchOpenMs = maxBatchOpenMs;
}
public void setMaxBatchSize(Integer maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
public void setMaxInflightOutboundBatches(Integer maxInflightOutboundBatches) {
this.maxInflightOutboundBatches = maxInflightOutboundBatches;
}
}
Next, here is a POJO to capture the URI of our local Elastic MQ Service:
LocalServiceUri.java
public class LocalServiceUri {
private String scheme;
private String host;
private String path;
private String port;
public String getScheme() {
return scheme;
}
public String getHost() {
return host;
}
public String getPath() {
return path;
}
public String getPort() {
return port;
}
public void setScheme(String scheme) {
this.scheme = scheme;
}
public void setHost(String host) {
this.host = host;
}
public void setPath(String path) {
this.path = path;
}
public void setPort(String port) {
this.port = port;
}
}
Next, here is a POJO to capture various AWS config properties:
AwsMappedProperties.java
public class AwsMappedProperties {
private String accessKey;
private String secretKey;
private String sqsQueueName;
public String getAccessKey() {
return accessKey;
}
public String getSecretKey() {
return secretKey;
}
public String getSqsQueueName() {
return sqsQueueName;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public void setSqsQueueName(String sqsQueueName) {
this.sqsQueueName = sqsQueueName;
}
}
Enable Configuration Properties
Next up, let's configure a Spring Configuration class which will automatically read in parameters from our configuration YAML into our POJO objects.AwsMappedProperties.java
@Configuration
@EnableConfigurationProperties
@ConfigurationProperties(locations = "classpath:sqs-services.yaml")
public class SqsConfigMappingProperties {
private LocalServiceUri elasticMqLocalSqsUri;
private LocalServiceUri awsSqsUri;
private AwsMappedProperties awsConfig;
private QueueBufferMappedProperties queueBuffer;
private Boolean echoSqsMessagesLocal;
public LocalServiceUri getElasticMqLocalSqsUri() {
return elasticMqLocalSqsUri;
}
public LocalServiceUri getAwsSqsUri() {
return awsSqsUri;
}
public AwsMappedProperties getAwsConfig() {
return awsConfig;
}
public QueueBufferMappedProperties getQueueBuffer() {
return queueBuffer;
}
public Boolean getEchoSqsMessagesLocal() {
return echoSqsMessagesLocal;
}
public void setElasticMqLocalSqsUri(LocalServiceUri elasticMqLocalSqsUri) {
this.elasticMqLocalSqsUri = elasticMqLocalSqsUri;
}
public void setAwsSqsUri(LocalServiceUri awsSqsUri) {
this.awsSqsUri = awsSqsUri;
}
public void setAwsConfig(AwsMappedProperties awsConfig) {
this.awsConfig = awsConfig;
}
public void setQueueBuffer(QueueBufferMappedProperties queueBuffer) {
this.queueBuffer = queueBuffer;
}
public void setEchoSqsMessagesLocal(Boolean echoSqsMessagesLocal) {
this.echoSqsMessagesLocal = echoSqsMessagesLocal;
}
}
Create AWS Spring Config
We will set up our SQS queue such that it will be able to use ElasticMQ locally and AWS SQS when deployed.AwsMappedProperties.java
@Configuration
@EnableConfigurationProperties(SqsConfigMappingProperties.class)
public class AwsConfig {
@Value("${aws.local.sqs.localElasticMq.enable}")
Boolean enableLocalElasticMq;
@Value("${aws.local.sqs.localElasticMq.startServer}")
Boolean startLocalElasticMq;
@Autowired
SqsConfigMappingProperties sqsConfigMappingProperties;
@Bean
public UriComponents elasticMqLocalSqsUri() {
LocalServiceUri elasticMqLocalSqsUri = sqsConfigMappingProperties.getElasticMqLocalSqsUri();
return UriComponentsBuilder.newInstance()
.scheme(elasticMqLocalSqsUri.getScheme())
.host(elasticMqLocalSqsUri.getHost())
.port(elasticMqLocalSqsUri.getPort())
.build()
.encode();
}
@Bean
public SQSRestServer sqsRestServer(UriComponents elasticMqLocalSqsUri) {
SQSRestServer sqsRestServer = SQSRestServerBuilder
.withPort(Integer.valueOf(elasticMqLocalSqsUri.getPort()))
.withInterface(elasticMqLocalSqsUri.getHost())
.start();
return sqsRestServer;
}
@Lazy
@Bean(name = "amazonSqsLocal")
@DependsOn("sqsRestServer")
@ConditionalOnExpression("${aws.local.sqs.localElasticMq.enable}")
public AmazonSQSAsync amazonSqsLocal(AWSCredentials amazonAWSCredentials) {
AmazonSQSAsyncClient awsSQSAsyncClient = new AmazonSQSAsyncClient(amazonAWSCredentials);
awsSQSAsyncClient.setEndpoint(createURI(sqsConfigMappingProperties.getElasticMqLocalSqsUri()));
awsSQSAsyncClient.createQueue(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName());
QueueBufferMappedProperties queueBufferMappedProperties = sqsConfigMappingProperties.getQueueBuffer();
QueueBufferConfig config = new QueueBufferConfig()
.withMaxBatchOpenMs(queueBufferMappedProperties.getMaxBatchOpenMs())
.withMaxBatchSize(queueBufferMappedProperties.getMaxBatchSize())
.withMaxInflightOutboundBatches(queueBufferMappedProperties.getMaxInflightOutboundBatches());
AmazonSQSBufferedAsyncClient amazonSQSBufferedAsyncClient = new AmazonSQSBufferedAsyncClient(awsSQSAsyncClient,config);
return amazonSQSBufferedAsyncClient;
}
@Lazy
@Bean(name = "amazonSqs")
@ConditionalOnExpression("!${aws.local.sqs.localElasticMq.enable}")
public AmazonSQSAsync amazonSqs(AWSCredentials amazonAWSCredentials) {
AmazonSQSAsyncClient awsSQSAsyncClient = new AmazonSQSAsyncClient(amazonAWSCredentials);
awsSQSAsyncClient.setEndpoint(createURI(sqsConfigMappingProperties.getAwsSqsUri()));
awsSQSAsyncClient.createQueue(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName());
QueueBufferMappedProperties queueBufferMappedProperties = sqsConfigMappingProperties.getQueueBuffer();
QueueBufferConfig config = new QueueBufferConfig()
.withMaxBatchOpenMs(queueBufferMappedProperties.getMaxBatchOpenMs())
.withMaxBatchSize(queueBufferMappedProperties.getMaxBatchSize())
.withMaxInflightOutboundBatches(queueBufferMappedProperties.getMaxInflightOutboundBatches());
AmazonSQSBufferedAsyncClient amazonSQSBufferedAsyncClient = new AmazonSQSBufferedAsyncClient(awsSQSAsyncClient,config);
return amazonSQSBufferedAsyncClient;
}
@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSqs, AmazonSQSAsync amazonSqsLocal, SQSRestServer sqsRestServer) {
QueueMessagingTemplate queueMessagingTemplate;
if(enableLocalElasticMq)
queueMessagingTemplate = new QueueMessagingTemplate(amazonSqsLocal);
else
queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs);
queueMessagingTemplate.setDefaultDestinationName(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName());
if(!startLocalElasticMq)
sqsRestServer.stopAndWait();
return queueMessagingTemplate;
}
@Bean
public AWSCredentials amazonAWSCredentials() {
if ("local".equals(ApplicationInfo.getEnvironment())) {
return new BasicAWSCredentials(sqsConfigMappingProperties.getAwsConfig().getAccessKey(),
sqsConfigMappingProperties.getAwsConfig().getSecretKey());
}
return new DefaultAWSCredentialsProviderChain().getCredentials();
}
private static String createURI(LocalServiceUri localServiceUri) {
return UriComponentsBuilder.newInstance()
.scheme(localServiceUri.getScheme())
.host(localServiceUri.getHost())
.port(localServiceUri.getPort())
.path(localServiceUri.getPath())
.build()
.encode().toUriString();
}
}
SQS Service Implementation
SqsService.java
public interface SqsService {
void sendSqsMessage(DomainObject domainObject);
}
SqsService.java
@Service
@EnableConfigurationProperties(SqsConfigMappingProperties.class)
public class SqsServiceImpl implements SqsService {
@Autowired
SqsConfigMappingProperties sqsConfigMappingProperties;
@Autowired
QueueMessagingTemplate queueMessagingTemplate;
@Override
public void sendSqsMessage(DomainObject domainObject) {
queueMessagingTemplate.convertAndSend(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName(), pricingChanges);
}
}
Using the SQS Service in a Controller
Using the SQS Service in a controller to make a REST API call to put a serialized JSON message in the queue.DomainObjectIngestController.java
@RestController
@RequestMapping("/domainObject")
public class DomainObjectIngestController {
@Autowired
SqsService sqsService;
/**
*
* @return version information about the application.
*/
@RequestMapping(value = "", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity postDomainObject(@RequestBody DomainObject input) {
sqsService.sendSqsMessage(input);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
}
}
Testing your Service
SqsServiceImplTest.java
@Profile("local")
public class SqsServiceImplTest {
private static final String QUEUE_NAME = "sqs-queue-name";
private static final int SQS_PORT = 9324;
private static final String SQS_HOSTNAME = "localhost";
private static final String EXPECTED_SELLING_PRICE = "100";
private SQSRestServer sqsRestServer;
private SqsServiceImpl classUnderTest;
private QueueMessagingTemplate queueMessagingTemplate;
private SqsConfigMappingProperties sqsMappingProperties;
@Before
public void setUp() {
//Run local ElasticMQ SQS queue
try {
sqsRestServer = SQSRestServerBuilder
.withPort(SQS_PORT)
.withInterface(SQS_HOSTNAME)
.start();
} catch (BindFailedException e) {
e.printStackTrace();
}
AmazonSQSAsyncClient awsSQSAsyncClient = new AmazonSQSAsyncClient(new BasicAWSCredentials("x", "x"));
awsSQSAsyncClient.setEndpoint("http://" + SQS_HOSTNAME + ":" + SQS_PORT);
awsSQSAsyncClient.createQueue(QUEUE_NAME);
SqsConfigMappingProperties sqsMappingProperties = new SqsConfigMappingProperties();
sqsMappingProperties.setAwsConfig(buildAwsConfig());
queueMessagingTemplate = new QueueMessagingTemplate(awsSQSAsyncClient);
queueMessagingTemplate.setDefaultDestinationName(QUEUE_NAME);
classUnderTest = new SqsServiceImpl();
ReflectionTestUtils.setField(classUnderTest, "sqsConfigMappingProperties", sqsMappingProperties, SqsConfigMappingProperties.class);
ReflectionTestUtils.setField(classUnderTest, "queueMessagingTemplate", queueMessagingTemplate, QueueMessagingTemplate.class);
}
@After
public void tearDown() throws Exception {
if(sqsRestServer != null)
sqsRestServer.stopAndWait();
}
@Test
public void givenValidPriceChange_whenSendSqsMsg_theVerifyReceivedMsg() throws Exception {
classUnderTest.sendSqsMessage(buildPricingChange());
PricingChange actualResponse = queueMessagingTemplate.receiveAndConvert(QUEUE_NAME,PricingChange.class);
assertEquals(EXPECTED_SELLING_PRICE, actualResponse.getSellingPrice().getAmount());
}
public AwsMappedProperties buildAwsConfig() {
AwsMappedProperties awsConfig = new AwsMappedProperties();
awsConfig.setSqsQueueName(QUEUE_NAME);
return awsConfig;
}
}
Thanks for sharing this information with us and it was a nice blog.
ReplyDeleteAWS Cloud Support in Delhi
THANK U FOR SHARING THIS POST
ReplyDeletecustom software systems
custom web application development services custom web development
I wanted to thank you for this great read. Your blog is one of the finest blogs . Thanks for posting this informative article.app development company in bhopal
ReplyDeleteGood knowlagde on your blog.... Software development
ReplyDeleterogramming apps for android,organic seo services,php web development company,
creative websites,best website design
Awesome information and thanks for sharing this article.
ReplyDeleteDigital Marketing Agency | Website design and development
SEO company in Australia | Digital Marketing Agency | Website Development Company In Melbourne
I am following your blog regularly and got great information.
ReplyDeleteSEO Company Islamabad
Useful post, thanks for sharing.
ReplyDeleteAWS Online Training
amazing post and written in a very simple and impressive language. Thanks for sharing
ReplyDeleteMicroservices Online Training
Microservices Training in Hyderabad
This comment has been removed by the author.
ReplyDeleteYou always provide quality based posts, enjoy reading your work. Full Stack Developer course in Chennai from our website.
ReplyDeleteStunning! Such an astonishing and supportive post this is. I incredibly love it. It's so acceptable thus wonderful. I am simply astounded.data science course
ReplyDeleteTwo full endorsement for this magnificent article of yours. I've genuinely refreshing scrutinizing this article today and I figure this might be uncommon contrasted with other articles that I've examined now. On the off chance that it's not all that much difficulty prop this work up on in a comparable quality.
ReplyDelete360DigiTMG data science course
This is a great post I saw thanks to sharing. I really want to hope that you will continue to share great posts in the future.
ReplyDeleteartificial intelligence course in noida
ReplyDelete"I can set up my original thought from this post. It gives all around data. A commitment of gratefulness is all together for this essential data for all,
"
HRDF training
I see the best substance on your blog and I unbelievably love getting them.
ReplyDeletehrdf training course
ReplyDeleteBy and by I think thrilled I found the web journals.
hrdf claimable training
This is an excellent post I seen thanks to share it. It is really what I wanted to see hope in future you will continue for sharing such a excellent post.
ReplyDeletebusiness analytics course
Very Nice
ReplyDeleteI need to communicate my deference of your composing aptitude and capacity to make perusers read from the earliest starting point as far as possible. I might want to peruse more up to date presents and on share my musings with you.
ReplyDeletewhat is the difference between analysis and analytics
Cognex is the AWS Training in Chennai. Cognex offers so many services according to the students needs. Cognex is the best place to learn microsoft azure, prince2 foundation, ITI V4 foundation,etc,
ReplyDeleteHappy to read this blog. Very informative and best UI. For more details please check our website
ReplyDeletehttps://kgrnaudit.com/fraud-investigation-audit-in-dubai/
"Thank you very much for your information.
ReplyDeleteFrom,
"data science course in gurgaon
Excellence blog! Thanks For Sharing, The information provided by you is really a worthy. I read this blog and I got the more information about
ReplyDeletedata scientist courses aurangabad
Get Best Food Turnkey Projects
ReplyDeleteNice and very informative blog, glad to learn something through you.
ReplyDeleteai course aurangabad
Nice blog with good information.
ReplyDeleteFull Stack Developer Course in Chennai
Full Stack Developer Online Course
Thank you for this beautiful information
ReplyDeleteCoaching Centre In Bangalore
Selenium Training In Bangalore
Best Training Institute In Marathahalli
Selenium Training In Marathahalli
Great Content...Thanks for sharing
ReplyDeleteGerman Language Course in Chennai
German Language Course Online
German Language Course In Bangalore
Great Content....Thanks for sharing
ReplyDeleteIELTS Coaching Centre in Chennai
Best IELTS Online Course
Best IELTS Coaching In Bangalore
Great Content..
ReplyDeletePython Course in Chennai
Learn Python Online
Python Course in Bangalore
ReplyDeleteNice post Thank for sharing
UI UX Designer Course In Chennai
UI UX Online Course
UI UX Course in Bangalore
UI UX Designer Course in Coimbatore
Nice post Thank for sharing
ReplyDeleteOnline Graphic Design Course
Graphic Design Courses in Chennai
Graphic Design Courses in Bangalore
Graphic Design Course in Coimbatore
I am really happy to say it’s an interesting post to read. I learn new information from your article, you are doing a great job. Keep it up
ReplyDeleteSoftware Testing Tools Training in Hyderabad
"I'm grateful for the diversity of perspectives you bring to your blog. Your willingness to explore different angles and present various viewpoints fosters a more inclusive and open-minded community. Thank you for promoting a space for diverse voices!"
ReplyDeleteplanet fitness coupons
"Your blog is a valuable resource for staying informed about the latest developments. The way you break down complex topics and provide practical insights is incredibly helpful. Thank you for keeping us in the loop with your expertise."
ReplyDeleteraising canes coupons
Looking for a website development company in Melbourne? We build clean, easy-to-use websites tailored to your needs. Our team helps you create an outstanding online presence with professional, reliable website design.
ReplyDelete