paint-brush
How to Learn AWS With Localstack and Reactive Kotlin - A Stamps and Coins Implementationby@jesperancinha
New Story

How to Learn AWS With Localstack and Reactive Kotlin - A Stamps and Coins Implementation

by João Esperancinha35mJanuary 24th, 2025
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Localstack is a new cloud computing service. It was founded in December 2016 and its first release was on the 11th December 2016. Localstack aims to make cloud computing more accessible and affordable.
featured image - How to Learn AWS With Localstack and Reactive Kotlin - A Stamps and Coins Implementation
João Esperancinha HackerNoon profile picture
0-item

1. Introduction

Amazon Web Services have been around since at least 2002 to provide commercially available cloud computing. Whether in concept still or already physically available, the designation of cloud computing goes way back up until 1996. This is where the first document referring specifically to cloud computing appeared in a Compaq business plan document. If we consider that Cloud computing came from another important concept in computer science called Distributed computation, then we have to go back even further to the 1960s to find the actual early beginning of these concepts.


It was in 1963 that J. C. R. Licklider distributed a memo outlining challenges found in implementing real-time computing in a network system. Licklider was the head of the Information Processing Techniques Office (IPTO) at the Advanced Research Projects Agency (ARPA). From this point onwards, and now moving forward in time, we see that the Advanced Research Projects Agency Network (ARPANET) starts being developed having its first computers connected in 1969 and finally being established in 1970. The Computer Science Network (CSNET) was then established in 1981 and led to the expansion of ARPANET in universities and science centers. The National Science Foundation Network (NSFNET) was founded in 1986 as an addition to ARPANET.


The expansion of the ARPANET and making it open to the public led to it being decommissioned in 1990 to give way to the World Wide Web (WWW) in its place, which is what we now know as the Internet. Having the internet available to everyone led quickly to business growth and change in business models. Hence, why, not very surprisingly, people started exploring the possibility of expanding businesses online. This gave rise among many others to the birth of Amazon in 1994 by Jeff Bezos as an extremely simple online bookstore.


This is two years before the coining of the term Cloud Computing in 1996. Thus, in a time span of 8 years, engineers at Amazon created a Beta version of their first installment of what was back then called "Amazon.com Web Service."


This service allowed clients to connect and get product information via XML envelopes through SOAP web services. In 2000, Roy Fielding presented a Ph.D. dissertation about a new communication protocol called REpresentational State Transfer (REST). It would, later on, begin replacing the usage of the SOAP (Simple Object Access Protocol) protocol, although SOAP is still very much in use up to this day as a deprecated technology.


From 2002 up to present times there have been further extreme changes in the way we work with Webservices, and AWS, just like any other Cloud provider, has evolved quite independently but keeping fundamental concepts insight. For example, when it comes to storing files in the Cloud, AWS now offers S3 which is available since its inception date on the 14th of March 2006. DynamoDB, a NoSQL database has been available since January 2012. AWS, as a cloud system can be very complicated to work with within the Cloud. This mostly has to do with the fact that once the Free Tier is over, we then must pay for AWS usage, and depending on the plan we decide to register, expenses can fluctuate and depend very much on your choices as a Systems Manager.


This is where Localstack comes in and its first release is registered on GitHub to have been on the 11th of December 2016. Fast-forward and we are now enjoying the benefits of Cloud computing and the benefits of a system that we can run locally with the same syntax, message formats, and configuration just as the real Cloud. Currently, AWS Cloud services provide more than 50 different types of services, and they all seem to be the goal Localstack is setting up to be. I wanted to give this introduction first, because it is important also, to realize that although we come across many on-premise systems nowadays, we have to realize that the idea of running locally is quite outdated.


The idea of Cloud Computing existed since 1996, and some services such as Cloud Storage like S3 have already been around since 2006. This means that all systems that need to be concerned with performance, resource usage, cost reduction, high availability, resilience, maintainability, capacity, and reactivity shouldn’t be used locally.


At best, we can have control of everything we do locally and at its worse, we will not be able to afford the expenses related to maintaining such a system on-premise. Unless of course, we want to serve a small private website for private use and for our family, friends, and acquaintances. Then you probably don’t need any cloud computing to support this. In this article, we are going to have a look at an example I’ve created on GitHub.

2. Why use Localstack?

In this article, we are going to explore two projects which use 3 features of LocalStack, also found in AWS. These are S3 (Simple Storage Service), DynamoDB and the Parameter store. I chose the usage of these three technologies and only these because I’m writing this article thinking about people who are using AWS already, or are just beginning to use it and are struggling with understanding the "magic" behind it. Especially on the job, it can be challenging to use AWS the way we intend it or even to just explore it, for a multitude of reasons:


  • You do not have permission to create, write or change anything.


  • Your systems manager may change configurations or properties against your expectations.


  • Adding services, resources and seeing just how they work may mean adding AWS usage costs within your company and maybe those costs aren’t covered for you.


  • You want to check if a dry-run of your Performance Tests works and do not want to risk unnecessary spike usages coming from errors in Performance test implementation.


  • You just want to explore and may not want to associate your email to an AWS account just yet or maybe even never.


  • You don’t want to spend time adjusting things in the Cloud directly and want to make sure you know what you are doing before moving into it in order to make the transition process as smooth as possible.


  • You want to make integration tests with AWS components but not use the real ones. Essentially you just want to use some sort of virtual environment where these components or components alike are available to be used.


  • In your project, everything has been figured out for you and your teammates by someone else who has left the team and is now unreachable. They have designed and created libraries for you to use, and it all seems perfect. Except that now you need to fix a problem, but you just know that some properties work, but have no idea how they are used and where they get applied to.

3. Why use DynamoDB

Through the years we have been used to thinking about concepts about foreign keys, primary keys, and essentially everything that relates to an Entity-relationship model. Hibernate and JPA repositories or even the Reactive CRUD repositories, help us to do that. However, when we have high volumes of data that do not have a lot of relations between tables, it becomes much less relevant to use ER models. They make everything slow, and many times, they are just not indicated for the use case. A database with high volumes of data under an ER model just doesn’t perform well. We see that even if we use fine-tune ORM (Object Relational Mapping) in the back end.


Sometimes, we even dare to just use native queries in our code because it gives us the feeling of a great improvement in performance. If we have to serve a lot of data we probably don’t need to use many if any relations. This is where DynamoDB comes in. Among others DynamoDB is promoted to be, have or cause:

  1. Highly scalable
  2. Suited for OLTP (OnLine Transaction Processing)
  3. Fast read and writes easy to implement
  4. Automated High Availability
  5. Reduction in the workload of the operating system
  6. High-level durability
  7. Managing unpredictable high peak loads


It is also important to mention that DynamoDB comes free in Localstack. However, this has contributed to making my article also about DynamoDB, it is also true that we are retrieving data without any ER relations. We are also simulating managing high volumes of data using the data coming from lots and lots of stamps and coins. Eventually, we get only one table in PostgreSQL and DynamoDB to serve all of this data. Since data keeps coming in, or not, in unregulated intervals, this also matches what we need from DynamoDB.

4. Objectives

Unlinked Diagram

From the above example, we get an idea of the components we are going to check, their related code and configurations, and finally, we’ll check how this works in the network environment. I am assuming at this point that you know the basics of how Coroutines work and how Coroutines reactive repositories work. In the diagram, we see a layout of all the containers used. The one we are focusing on for this article is the "localstack" container. This is the container I have mentioned above which simulates a real AWS Cloud environment. If you recognize the symbols within the container, you can easily recognize the parameter store in green, dynamo DB in blue, and S3 in red.


We have a PostgreSQL database that starts up with a lot of data related to stamps and coins. The final idea of the first part of this project is to load all the data from PostgreSQL and transfer it to DynamoDB with Quartz Jobs. To get this to work, we’ll use S3 as an intermediary. We will read the data from the PostgreSQL database, create a CSV file with all the contents of that data, compress it to a GZip file format, and ship this file to S3. Once this is done, another Job will download the files from S3, process them and when complete, the file will get deleted from S3. Processing, in this case, means decompressing the file and loading the CSV contents to DynamoDB.


Old stamps and Coins will be updated and identified with their PK (Primary Key) In order to be able to access PostgreSQL and to be able to log in, we will use the parameter store to save the credentials we need. We can persist several types of data in the parameter store. In this case, we are using it to save credentials and to access them Finally, we want to understand how the code differs in terms of wanting to visualize paginated content in the front end. For that, we will look at two different services implemented with SpringWebFlux and coroutines.


One uses direct access to the PostgresSQL database using Pagination, and the other one uses direct access to DynamoDB using an async client. Both of the services are implemented in a reactive way.

5. Implementation

The implementation of this project is in other words a look into these libraries:

<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>dynamodb</artifactId>
</dependency>
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>s3</artifactId>
</dependency>
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>ssm</artifactId>
</dependency>

These libraries are part of a collection of libraries named AWS SDK for Java V.2. Although we can use automatic configuration to access AWS, we have to use a manual configuration in order to access Localstack. Thanks to V.2., it is possible to manually configure where AWS would be and which credentials we want to use. Localstack, uses test as the credential to identify our Cloud environment running in our container.


Some configuration needed for the code to run is loaded to Localstack via very small-sized containers, which run a few commands against it. This will create a bucket, the username, and password for our app, the username, and the password for PostgreSQL in the parameter store. The Dynamo DB table will be created by any one of the Spring Boot running processes that use the dependency stamps-and-coins-common-cloud. In the following, we will just check the code. I will explain every code in detail. We’ll then see how to run the Demo.


I have made a video about it, you can have a look at it below. Finally, we’ll have a look at some of the aws commands pointing directly to Localstack, and we will interpret the results we get back.

Sequence Diagram

5.1. Parameter Store

Parameter Store Diagram

In order to make anything with AWS, we need to realize that the likelihood that we’ll make use of the parameter store is very high. This is because the Parameter store is exactly where we can store properties we want to use with the services we decide to run. Making a search on Google, we can find several implementations on how we can manually implement access to the parameter store while making use of it using Value Inject provided by Spring. The one that I prefer is the one that adds an EnvironmentPostProcessor as the last element to process our properties.

internal class ParameterStorePropertySourceEnvironmentPostProcessor : EnvironmentPostProcessor {
    override fun postProcessEnvironment(environment: ConfigurableEnvironment, application: SpringApplication) {
        val host = (getenv("STACO_AWS_LOCALSTACK_IP") ?: "localhost")
            .let { dns ->
                logger.info("Local stack DNS is $dns")
                InetAddress.getAllByName(dns)[0].hostAddress
            }
        val port = (getenv("STACO_AWS_LOCALSTACK_PORT") ?: "4566").apply { logger.info("Port is $this") }
        val protocol = (getenv("STACO_AWS_LOCALSTACK_PROTOCOL") ?: "http").apply { logger.info("Protocol is $this") }
        val accessKey = "test"
        val secretKey = "test"
        environment.propertySources
            .addLast(
                ParameterStorePropertySource(
                    "AWSParameterStorePropertySource",
                    config(
                        StaCoAwsProperties(
                            URI.create("$protocol://$host:$port"),
                            "eu-central-1",
                            accessKey,
                            secretKey
                        ).apply {
                            logger.info("Configured parameter properties: $this")
                        },
                        SsmAsyncClient.builder(),
                        staticCredentialsProvider(accessKey, secretKey)
                    )
                )
            )
    }
}

In this case, I’ve created the code in such a way that we can also, via environment variables, configure the location of our Localstack instance. We can add this to our spring.factories in order to add this EnvironmentPostProcessor. This will make sure that we’ll try to find all of our still unassigned properties in AWS. This is done by the ParameterStorePropertySource. This is the actual implementation of that system. I made config a generic method to create instances in the same way.


Since they share common characteristics for this article, it made sense to do it this way. This way our client instances for SSM, S3, and DynamoDB will be created to point to the same Localstack and we don’t have to worry about extra implementations.

internal class ParameterStorePropertySource(name: String, ssmAsyncClient: SsmAsyncClient) :
    PropertySource<SsmAsyncClient>(name, ssmAsyncClient) {
    override fun getProperty(propertyName: String): Any? {
        logger.debug("Property $propertyName is not yet configured")
        if (propertyName.startsWith("/")) {
            logger.info("Fetching property ${propertyName.toPathPropertyName()}")
            val localstackValue = source.getParameter(
                GetParameterRequest.builder().name(propertyName.toPathPropertyName())
                    .build()
            )?.get()?.parameter()?.value()
            logger.info("Localstack param ${propertyName.toPathPropertyName()} created with value $localstackValue")
            return localstackValue
        }
        return null
    }
}

In our implementation, we are going to consider only properties that start with a backslash. In real life, and in AWS, SSM parameter store properties start normally with /aws/. In our case, we are making something very custom-made and so our properties will all start with /config/StaCoLsService/.


Now that we understand these changes, we can now simply add this common dependency to the repositories where we need to read properties from the Parameter Store.

5.2. S3 — Simple Storage Service — Reader

Time Diagram for S3

Both the reader and the loader Jobs are located in the same module and that is module stamps-and-coins-batch. This module is essentially an independent Spring Boot process that ensures that our data get temporarily stored in S3 and then downloaded and shipped to DynamoDB. As I mentioned above, I use a generic config method for all Localstack services, given that all of them just so happen to share the same configuration:

@Bean
fun dynamoDbClient(staCoAwsProperties: StaCoAwsProperties): DynamoDbAsyncClient =
    config(staCoAwsProperties, DynamoDbAsyncClient.builder(), staticCredentialsProvider)

@Bean
fun s3Client(staCoAwsProperties: StaCoAwsProperties): S3AsyncClient =
    config(staCoAwsProperties, S3AsyncClient.builder(), staticCredentialsProvider)

@Bean
fun ssmClient(staCoAwsProperties: StaCoAwsProperties): SsmAsyncClient =
    config(staCoAwsProperties, SsmAsyncClient.builder(), staticCredentialsProvider)

Our Job starts at the execute point, it is launched in coroutine context IO and then retrieves all the data from the database. At this point, using a coroutine might not be necessary because unfortunately, in order to process all contents to a file, we do need to block the process:

override fun execute(context: JobExecutionContext) {
    logger.info { "Creating S3 file has started..." }
    CoroutineScope(IO).launch {
        awsStacoFileService.createCompressAndUploadToS3(staCoRepository.findAll(Sort.unsorted()).toList())
    }
}

Having the list loaded and in memory, we now create our file. This is where things can become very difficult depending on how many resources you have available. There are many different solutions to process high volumes of data into different files or partitions, but that is off the scope of this article. Let’s just assume that our data will not get past the threshold of a high data volume. Bearing that in mind, we convert all our data into a CSV file and print each record per line to that file. Your file will be saved in the temporary folder defined by your system. This is usually $TMPDIR.

val fileName = "stacos-${Instant.now().toEpochMilli()}"
val path = Files.createTempFile(fileName, ".csv")
val writer = newBufferedWriter(path)

val csvPrinter = CSVPrinter(
    writer, CSV_HEADER
)
stacos.forEach {
    it.apply {
        csvPrinter.printRecord(
            stacoId,
            description,
            year,
            value,
            currency,
            type,
            diameterMM,
            internalDiameterMM,
            heightMM,
            widthMM
        )
    }
}

At this point, you might think that given that we write per record to that file, that this actually could have been implemented without blocking. Unfortunately, we still need to flush and close our writer. At the same time, another solution could be to implement a subscriber which would be triggered after the job is complete. Doing this, however, would imply that our Job is subscribed to a reactive publisher. All of these techniques could have been used here, but this is still a job running without any sort of competition for resources and so the paradigms of reactive and high performance, do not really apply for this case. We then create a stream. In order to ship a file to S3, all we need is a byte-stream with a given format. We are choosing GZIP here, but anything goes in S3.

val output: Path = Files.createTempFile(fileName, ".gz")
compressGZIP(path, output)
val fileIn: InputStream = Files.newInputStream(output)
val size: Long = Files.size(output)

Also, we need a bucket. We call our bucket "stacos-bucket". This specific bucket is created upon the start of the demo. I will explain further how this bucket is created, but for now, it is only important for us to know that a different process will create the bucket. We finally pick up that stream and upload our file to the bucket.

s3AsyncClient.putObject(
    PutObjectRequest
        .builder()
        .bucket(STACOS_BUCKET)
        .key(fileName)
        .metadata(
            mapOf(
                "Content-Type" to "application/x-gzip",
                "Content-Length" to size.toString()
            )
        ).build(),
    AsyncRequestBody.fromBytes(fileIn.readBytes())
).thenApplyAsync {
    logger.info { "File $output is uploaded!" }
}

This concludes the first part of the loading process. We now have our file in S3, and we are ready to download it in another Job, unpack it, and ship the data to Dynamo DB.

5.3. Dynamo DB — Loader

Time Diagram for Dynamo

Loading data into the database is crucial in order to be able to visualize data. When doing so, and according to our algorithm, we are simply going to download the needed files. Which files do we need? For our simple case, we make it simple, and so we will need all of them. This is one way to list them via code:

s3AsyncClient.listObjects(
    ListObjectsRequest
        .builder()
        .bucket(STACOS_BUCKET)
        .build()
)


Having the whole list of files, we can then iterate through all of them.

s3AsyncClient.getObject(
    GetObjectRequest.builder().bucket(STACOS_BUCKET).key(targetFileKey).build(),
    AsyncResponseTransformer.toBytes()
)


For each iteration, we get a byteArray. This only represents the content of our file. We give it a name, read from them, and write to DynamoDB:

.thenApplyAsync {
    try {
        Files.write(path, it.asByteArray())
        logger.info { "Downloaded file $path" }
        val output = Files.createTempFile(targetFileKey, ".csv")
        runCatching {
            decompressGzip(path, output)
            val reader = Files.newBufferedReader(output)
            val csvParser = CSVParser(reader, CSV_HEADER)
            val records = csvParser.records
            for (csvRecord in records.takeLast(records.size - 1)) {
                try {
                    staCoDynamoDBRepository.save(csvRecord.toEvent)
                        .subscribe { logger.info { "Saved $csvRecord to DynamoDB!" } }
                } catch (ex: IllegalArgumentException) {
                    logger.info { "Record $csvRecord was rejected!. Reason: $ex" }
                }
            }
        }.onFailure { exception ->
            logger.error("Something failed!", exception)
        }
        logger.info { "Download and parsing of file $output complete!" }
        removeResource(targetFileKey)
    } catch (ex: Exception) {
        removeResource(targetFileKey)
        logger.info { "File with Key $targetFileKey is invalid and has been removed!" }
        logger.error { ex }
    }
}

You will notice throughout the code that I’m using several extension functions in order to convert to and from different types of objects with the same data information. In this case, we want to convert the data we have received in the CSV file to a map which is accepted as an argument to the DynamoDB client:

val CSVRecord.toEvent: Map<String, AttributeValue>
    get() = mapOf(
        "id" to AttributeValue.builder().s(get("id") ?: UUID.randomUUID().toString()).build(),
        "description" to AttributeValue.builder().s(get("description")).build(),
        "year" to AttributeValue.builder().s(get("year")).build(),
        "value" to AttributeValue.builder().s(get("value")).build(),
        "currency" to AttributeValue.builder().s(get("currency")).build(),
        "type" to AttributeValue.builder().s(get("type")).build(),
        "diameterMM" to AttributeValue.builder().s(get("diameterMM") ?: "").build(),
        "internalDiameterMM" to AttributeValue.builder().s(get("internalDiameterMM") ?: "").build(),
        "heightMM" to AttributeValue.builder().s(get("heightMM") ?: "").build(),
        "widthMM" to AttributeValue.builder().s(get("widthMM") ?: "").build()
    )


The conversion we make is based on how we define our table. In the code for this demo, the table is created programmatically:

private fun createStaCosTable(): CompletableFuture<CreateTableResponse> {
    val keySchemaElement: KeySchemaElement = KeySchemaElement
        .builder()
        .attributeName(ID)
        .keyType(KeyType.HASH)
        .build()
    val dynId: AttributeDefinition = AttributeDefinition
        .builder()
        .attributeName(ID)
        .attributeType(ScalarAttributeType.S)
        .build()
    return dynamoDbAsyncClient.createTable(
        CreateTableRequest.builder()
            .tableName(STACOS_TABLE)
            .keySchema(keySchemaElement)
            .attributeDefinitions(dynId)
            .billingMode(BillingMode.PAY_PER_REQUEST)
            .build()
    )
}

Creating tables in DynamoDB is nothing trivial. With SQL databases and many NoSQL databases, we have gotten used to the idea of creating the tables first and then making changes as we go along. DynamoDB is not really a fit, as mentioned before, for RDBC connections and ER SQL databases. It is at this point that we need to know how are we going to serve our data.


Just as we’ll see further in this article, the way in which DynamoDB works, deserves an article on its own. DynamoDB allows for partition configuration, queries work very differently, we need to define our schema keys beforehand of our potential queries, and further down the line, if we do all of this correctly, we’ll get an extremely performant Database. This, however, is also off-topic for this article.


For now, let’s just keep in mind the idea that we only have one key ID, for our schema and that this key is of type ScalarAttributeType.S. It is on the basis of this ID that we’ll be able to make pagination.

5.4. Towards Reactive Pagination

Reactive Diagram

In the project, I have created a seemingly unrelated module to this article called: stamps-and-coins-blocking-service. In this module, we are accessing the database via traditional JPA repositories:

fun findStaCosByDescriptionLikeOrYearLikeOrValueLikeOrCurrencyEqualsOrDiameterMMLikeOrInternalDiameterMMLikeOrHeightMMLikeOrWidthMMLike(
    description: String,
    year: String,
    value: String,
    currency: CurrencyType,
    diameterMM: String,
    internalDiameterMM: String,
    heightMM: String,
    widthMM: String,
    pageable: Pageable
): Page<StaCo>

As you can see, pagination is achieved very easily by using the Pageable argument and then using Page as a return argument. With these objects, we can keep sending paginated result requests and keep exchanging the value of the current page, the size of the page and what do we want to filter. We are also filtering on all elements of the tables. When trying to translate this to the reactive repositories and still against the same PostgreSQL database, we may start questioning the return argument.


Reactive programming, whether implemented with coroutines or with WebFlux is quite different when working with multiple return rows. We don’t return rows anymore. Instead, we return a Flux or a Flow which get processed later on leaving the service available for more requests and thus more reactive. We can, however, use Pageable still to return the results related to one page. We still lose a precious result which is the total rows found:

interface StaCoSearchRepository : ReactiveSortingRepository<StaCo, Long>, ReactiveCrudRepository<StaCo, Long> {
    fun findStaCoBy(
        pageable: Pageable
    ): Flux<StaCo>

    fun findStaCosByDescriptionLike(
        description: String,
        pageable: Pageable
    ): Flux<StaCo>

    fun countStaCosByDescriptionLike(
        description: String
    ): Mono<Long>
}

So, in this case, the only way I found to let the application know how many pages there are is by performing another count request. This request is just a count based on our initial request. As you can see from above, both request return publishers, which means that both of them work in a reactive way. This means that I am essentially performing yet another query just to check how many elements there are in the database according to my search criteria. There are some improvements, however, in performing pagination in this way. The returned count is a Mono and the filtered results are returned as a Flux.


These contenders for the reactive coroutines implementation also make the application incredibly reactive. However, if we want to make operations like pagination using reactive repositories, they don’t seem to come out of the box. We can still use Pageable, but for a website. We must indeed perform 2 separate requests, but they are not blocking and so it is reasonable to expect that our application becomes thus more performant. My point is that going Reactive also means going less trivial. The more we ramp up the technology to get an application to work as reactive as possible, it seems that we also ramp up complexity.


Before I continue, I just want to say it’s good to be that way. Surely we don’t need to complicate things, but what we understand as complexity does vary a lot from person to person. I personally prefer to interpret this "complexity" as just something new that we are not used to. But continuing: And less trivial is also what happens when moving from a reactive repository against a PostgreSQL to a DynamoDB repository against DynamoDB. The save operation (Create and Update) for example is implemented in this way:

fun save(staCoEvent: Map<String, AttributeValue>): Mono<PutItemResponse> =
PutItemRequest.builder()
    .tableName(STACOS_TABLE)
    .item(staCoEvent)
    .build().let {
        dynamoDbAsyncClient.putItem(it)
        Mono.fromFuture { dynamoDbAsyncClient.putItem(it) }
    }

As we have seen before, the payload to save a request is a Map of key String value AttributeValue. This, in turn, gets executed from the client using putItem and the PutItemRequest. Let’s reflect a bit on this. Every single request to Localstack and thus, also to AWS, seems to be dependent on the same constant builder pattern.


The Async implementations use Futures which we can very easily adapt to the WebFlux framework, and we can make reactive requests with them. This is the reason why we see a Mono as a wrapper to the Future. Also, we should see at this point that the dynamoDBAsyncClient works pretty much the same way, from a programmatic perspective, as the s3AsyncClient we have seen before.


In fact, we have used methods like this in the reader and the loaders already. This was just a short intro to how the Save method works. We still want to compare this to how would we do this in DynamodDB. Since we are essentially just doing a crash course on this and not going into details on how to search, scan, and query data, this is how I could do pagination at this point.

fun findByPageNumberAndPageSize(pageSize: Int): Flux<MutableMap<String, AttributeValue>> {
    return Mono.fromFuture(
        dynamoDbAsyncClient.scan(
            ScanRequest
                .builder()
                .limit(pageSize)
                .tableName(STACOS_TABLE)
                .exclusiveStartKey(null)
                .build()
        )
    ).map {
        it.items()
    }.flatMapIterable { it }
}

We can see that, we are not really making a paginated request in the traditional sense. What we are doing here is using something akin to LIMIT in SQL queries. We are also not using a query. Queries and Scans are different things in DynamoDB. The semantics here are very important because, when we talk about Queries in DynamoDB, we are talking about accessing and searching data in partitions. When we talk about Scans we are talking about accessing and searching data in the whole tablespace. This means that on a programmatic level, we could get the same results but Scans are generally much more expensive to perform than Queries.


We’ve already discussed above, that more workload on the cloud servers means more billing costs. So, although I insist on avoiding using scans, we are still going to use it in this example to understand why also we get so limited just by creating a table with all the default and minimal configurations and using Scans only. Since I decided not to use Queries in this article, for the reasons already mentioned, I came up with a solution. What if we simply make a first scan up to the record just before where we want to start our page? Then we can limit the results from there onwards with the LIMIT operation as before. There we have it. Very inefficient but working pagination in DynamoDB:

fun findByPageNumberAndPageSize(
    pageSize: Int,
    pageNumber: Int
): Flux<MutableMap<String, AttributeValue>> = Mono.fromFuture(
    dynamoDbAsyncClient.scan(
        ScanRequest
            .builder()
            .limit(pageSize * (pageNumber - 1))
            .tableName(STACOS_TABLE)
            .build()
    )
).flatMap {
    Mono.fromFuture(
        dynamoDbAsyncClient.scan(
            ScanRequest
                .builder()
                .limit(pageSize)
                .tableName(STACOS_TABLE)
                .exclusiveStartKey(it.lastEvaluatedKey())
                .build()
        )
    )
}.map {
    it.items()
}.flatMapIterable { it }

However inefficient, we get to a concept I want to discuss here. Remember when we defined our Schema Key above? Well, this Key, aka ID, is what we used to define the exclusiveStartKey. What is the exclusive start key? This is where we start our query from. In this case, we will start from the last resulting element from an initial query. By chaining these methods together and using ID as a key, we are essentially creating two queries for every time the page number is above 1. We don’t need two queries for the first page, but using scans, we don’t really have many other options. You will see in the Demo that this works. We are, however, doing 2 queries. To make this good, we would have to dive into the world of Queries in DynamoDB, which is a subject very much off-topic for this article. Coming from JPA, crossing through reactive CRUD repositories, and finally landing in DynamoDB it is also quite remarkable how beautiful and great technology has evolved. Our DynamoDB solution, though not the perfect one in terms of fetching data, is still reactive. We are still using Future's; we are still using Flux. We are now using 3 queries, but no filter yet.

6. Starting the Demo

I have built an application, a small GUI, just to test both of the front-end facing back-end Spring Boot services. I have created a Makefile that contains many commands important to perform tests, builds, cleanups, and startup containers. I use this to keep a reference to important commands and also to make things a bit easier. Having said this, let’s just start everything at once with the following command: make docker-clean-build-start Assuming that everything went well, you should see this screen on http://localhost:8080:

Login Page

If you log in with admin/admin, and remember that this is configured in the parameter store, you’ll get into the Reactive Protected application. This application accesses Postgres directly via R2DBC in a reactive way.

Reactive pagination

Once you are in, you’ll see this screen.

Reactive pagination

This is where you can test our filter. This is the filter we have spoken about when we discussed pagination:

Reactive filtered pagination

And if you go back to the main screen using the logout button and just click on Go To DynamoDB, you’ll see the following screen:

DynamoDB pagination

As we have discussed before, there is no filter. There is, however, one possible implementation of pagination. This is done with the algorithm we have discussed before. If you’d like to follow a DEMO and check how I’ve started the containers, I have made a video with a walkthrough of the application created for this article on YouTube:

7. Issuing AWS Commands

If you are interested in trying the command line for AWS against Localstack, you first need to set up environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_DEFAULT_REGION. I have made a script about this and placed it in the bash folder at the root of the project. If you run this script with:

. ./bash/docker-setup.sh

You’ll have all the variables you need to be configured for you. For now, we can practice the following list of commands with Localstack running. Remember that Localstack is running on Localhost. Since we don’t need all the services running to test Localstack, then please just run this from the root:

make docker-localstack


Or, if you prefer to start it up manually, you can do this:

docker-compose rm -svf
docker-compose rm localstack
docker-compose up -d --build --remove-orphans localstack


After having Localstack running, we can try and see the results we get:

alias aws="aws --endpoint-url $LOCAL_STACK"
aws s3api list-buckets
aws s3api create-bucket --bucket staco
aws s3api put-object --bucket stacos --key warehouse1 --body docker-psql/init-scripts/stamps_coins.json
aws s3api list-objects --bucket staco
aws s3api list-objects --bucket images
aws s3api get-object --bucket stacos --key warehouse1 test.json
aws s3api delete-object --bucket images --key staco-image-60b32714-08d3-4583-a598-969119849745.png
aws s3api delete-object --bucket images --key staco-image-7852abd6-6be5-49e5-a910-f8a78d95e6c0.png
aws s3api delete-object --bucket images --key staco-image-e80bdc01-f837-4fb4-b356-47164f95640e.png
aws s3api delete-object --bucket images --key warehouse1
aws s3api delete-bucket --bucket staco
aws rds create-db-instance --db-instance-identifier staco-app --db-instance-class c1 --engine postgres
aws ecr create-repository --repository-name staco-app
kubectl --namespace localstack logs --selector app.kubernetes.io/name=localstack --tail 100
aws eks create-cluster --name staco-cluster --role-arn staco-role --resources-vpc-config '{}'
aws eks list-clusters
aws configure
aws eks describe-cluster --name staco-cluster
aws dynamodb list-tables
aws dynamodb scan --table-name stacos
aws ssm put-parameter --name love --value "What is love"
aws ssm get-parameter --name love
aws ssm describe-parameters
aws ssm put-parameter --name /dev/postgres/username --value "postgres"
aws ssm put-parameter --name /dev/postgres/password --value "password"
aws s3api get-object --bucket images --key staco-image-e4b80aa3-5b49-49b4-829a-463501279615.png test.png

8. Sending Images Via Web REST Services

One other thing we can discuss is how to use DynamoDB behind services. One way is to store our stamps and coins via REST calls and using byte streams. This way we can create standard abstractions that use in its turn the abstractions provided by AWS. We have this RestController in the stamps-and-coins-ls-service module:

@RestController
@RequestMapping("images")
internal class StaCoImageController(
    val s3AsyncClient: S3AsyncClient
) {
    @PostMapping("/save/{id}")
    fun saveUser(
        @RequestPart(value = "image", required = false) filePartMono: Mono<FilePart>,
        @PathVariable("id") uuid: UUID
    ): Mono<Void> {
        return filePartMono.flatMapMany {
            it.content()
        }.map {
            val putObjectRequest =
                PutObjectRequest.builder().bucket(IMAGES_BUCKET).key("staco-image-$uuid.png").build()
            s3AsyncClient.putObject(
                putObjectRequest,
                AsyncRequestBody.fromBytes(it.asByteBuffer().array())
            )
        }.then()
    }
}

In this case, I’m implementing this in a reactive way using the putObject request from the s3AsyncClient we’ve talked about. In the stamps-and-coins-demo, we have some example images we can send:

curl -v -F "image=@stamp-sample.png" http://localhost:8082/api/staco/ls/images/save/$(uuidgen)


And then we get this response:

* Trying ::1:8082...
* Connected to localhost (::1) port 8082 (#0)
> POST /api/staco/ls/images/save/88D0A1B2-561D-46CB-BB7A-A8381437E8E2 HTTP/1.1
> Host: localhost:8082
> User-Agent: curl/7.71.1
> Accept: */*
> Content-Length: 31250
> Content-Type: multipart/form-data; boundary=------------------------0ee837c95bbe60e3
>
* We are completely uploaded and fine
* Mark bundle as not supporting multiuse
< HTTP/1.1 200
< Content-Length: 0
< Date: Wed, 01 Dec 2021 19:03:43 GMT
<
* Connection #0 to host localhost left intact


Reading this, it seems like our images have gone to S3. Let’s now list all containers:

{
    "Buckets": [
        {
            "Name": "images",
            "CreationDate": "2021-12-01T18:39:39.000Z"
        },
        {
            "Name": "stacos",
            "CreationDate": "2021-12-01T18:39:39.000Z"
        }
    ],
    "Owner": {
        "DisplayName": "webfile",
        "ID": "bcaf1ffd86f41161ca5fb16fd081034f"
    }
}

We can see that we have an images bucket. Let’s now list the objects in it:

aws s3api list-objects --bucket images


And the contents of this bucket is:

{
    "Contents": [
        {
            "Key": "staco-image-88d0a1b2-561d-46cb-bb7a-a8381437e8e2.png",
            "LastModified": "2021-12-01T19:03:43.000Z",
            "ETag": "\"ed8d3bffef907bd61ed0c29c7696deea\"",
            "Size": 31056,
            "StorageClass": "STANDARD",
            "Owner": {
                "DisplayName": "webfile",
                "ID": "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a"
            }
        }
    ]
}

The name of our key is staco-image-88d0a1b2–561d-46cb-bb7a-a8381437e8e2.png. We created this key. Let’s download the image we just uploaded:

aws s3api get-object --bucket images --key staco-image-88d0a1b2-561d-46cb-bb7a-a8381437e8e2.png download.png

The response is:

{
    "AcceptRanges": "bytes",
    "LastModified": "Wed, 01 Dec 2021 19:03:43 GMT",
    "ContentLength": 31056,
    "ETag": "\"ed8d3bffef907bd61ed0c29c7696deea\"",
    "ContentLanguage": "en-US",
    "ContentType": "application/octet-stream",
    "Metadata": {}
}

And if we open the resulting file we get:


9. Conclusion

In this article, we have seen some of the very simple ways we can use Localstack to simulate an AWS cloud environment. We only touched the surface of S3, DynamoDB, and the Parameter Store. Even reaching the surface can be a complicated task for a typical Engineer who is very skilled in how programming languages work, architecture and design and sees Cloud for the first time. The way I think of this is that the Cloud is here, and it won’t go away. AWS is here and probably won’t go ever away.


Working and learning in the cloud can be a very interesting experience but also a very expensive one. It can also be limiting given the most common reasons about costs and permissions. Permissions are always a problem working in a DevSecOps environment. If your team has members with very defined roles, odds are, you are going to find yourself asking the System Administrator to give you rights to perform some tasks in your cloud environment. In many cases, you just get temporary permissions and frequently permissions aren’t given fast enough to keep the pace of any sort of learning path you are into. As we have seen, this is where Localstack plays a fantastic role.


We do not have the very complete GUI of AWS online, but nearly all the things that we can do against AWS in remote we can also do in Localstack. This gives us leverage into understanding AWS, making mistakes, correcting them, and developing and testing new ideas and concepts. Localstack is great for a POC (Proof of Concept) or an MVP (Minimum Viable Product). It is also a fantastic tool for making Integration tests. There are quite a few integration tests developed in this project. They all use Testcontainers as a supporting framework.

10. Resources





Thank You!

I hope you enjoyed this article as much as I did making it! Please leave a review, comments, or any feedback you want to give on any of the socials in the links below. I’m very grateful if you want to help me make this article better. I have placed all the source code of this application on GitHub. Thank you for reading!