Highlights

  • Loading a large DynamoDB dataset all at once can break a batch job. Processing records in small chunks is a far safer approach.
  • With Scanamo, Cats Effect, and FS2 in Scala, data can be read page by page and handled as it arrives, keeping memory use stable.
  • This page-by-page processing approach is not just memory-safe, but more reliable for long-running jobs. It uses backpressure to control downstream work and ensures resources like the DynamoDB client are cleaned up properly.

It's 11:00 PM on a Tuesday. Your marketing team sent a Slack message at 5 PM:

"Can we send a promo email to all US users on the Globex tenant tonight?"

You said sure — it's just a query, a filter, and a loop, right? You wire it up in an hour, kick off the job, and head to bed feeling productive.

At 3:00 AM, your phone rings.

The service is down. OutOfMemoryError. The Globex tenant had two million users in that DynamoDB table, and your code tried to load every single one before sending a single email.

You fix the immediate crisis, stare at the ceiling, and think: there has to be a better way.

There is. In this post, we'll explore how to stream large DynamoDB datasets safely and efficiently in Scala, using:

  • Cats Effect for effect management
  • FS2 for functional streaming
  • Scanamo for clean DynamoDB access

By the end, you'll know how to process millions of items lazily — without ever holding them all in memory.


The real problem: It's not DynamoDB

DynamoDB did its job. It returned the data you asked for. The problem was how you asked. A query with no pagination drops every matching record into memory before you can touch a single one. With two million users in one tenant, that's not a data problem — that's a strategy problem.

Here's roughly what ran that night:

// The code that caused the 3 AM call

val client = DynamoDbAsyncClient.builder().region(Region.US_EAST_1).build()
// ❌ No Resource — if anything throws, this client is never closed

val scanamoClient = ScanamoCats[IO](client)
val table = Table[User]("users")

val allUsers: IO[List[User]] = scanamoClient.exec(table.scan())
// ❌ .scan() with no pagination — blocks until ALL 2M records are in memory

allUsers.flatMap { users =>          // nothing runs until the full list is loaded
  users.filter(_.country == "US").traverse(user => sendMail(user))
  // ❌ filtering in-memory after loading everyone — US or not
}.unsafeRunSync()
// ❌ OutOfMemoryError thrown here — client.close() is never reached

Two problems, one crash:

  1. Everything in memory. table.scan() without pagination asks DynamoDB to return every matching record before returning control. Two million users × their fields = heap exhausted before a single email is sent.
  2. Client never closed. The DynamoDbAsyncClient is constructed raw, outside any resource management. When the OutOfMemoryError is thrown, execution jumps straight to the JVM crash — client.close() is never reached, leaving threads and connections dangling until the process is killed.

What you needed was to process users as they arrive. Twenty-five at a time. Never hold more than a page in memory. That's streaming — and in Scala, Cats Effect, FS2, and Scanamo make it clean and composable.

Streaming gives you:

  • Incremental processing — one small batch at a time
  • Backpressure — downstream consumers are never overwhelmed
  • Lazy evaluation — only fetch what you actually use
  • Early termination.take(100) stops fetching after 100 items

Dimension Batch Read Streaming (FS2 + Scanamo)
Memory usage Loads all records into heap One page at a time
Latency High (waits for full scan) Low (starts processing immediately)
Failure recovery Lose all progress Fail fast, resume from last page
Scalability Degrades with table size Constant memory footprint
Read cost Same Same — pagination adds zero extra RCUs

On read costs: Streaming does not increase DynamoDB read costs. DynamoDB charges per item read regardless of how you load them. You get the safety of streaming at exactly zero extra cost.

Before we fix the pipeline, let's establish what we're working with. Our users table is simple:

case class User(tenantId: String, id: String, name: String, country: String, email: String)

DynamoDB Table: "users"

  • Partition Key: tenantId (String) — groups all users belonging to a tenant
  • Sort Key: id (String) — unique identifier per user
  • Attributes: name (String), country (String), email (String)
  • Scale: Millions of user records

Nothing exotic — just the kind of table every production app has. Except ours has millions of them. That's the table that brought the service down at 3 AM, and that's the table we're going to stream through safely.

The goal is the same as Tuesday night: reach every Globex user, send them an email. The difference this time is that we'll never ask DynamoDB to hand us all two million at once.

Setting the stage

Add the following dependencies to your build.sbt:

libraryDependencies ++= Seq(
  "org.scanamo"   %% "scanamo"             % "1.1.1",
  "org.scanamo"   %% "scanamo-cats-effect" % "1.1.1",
  "co.fs2"        %% "fs2-core"            % "3.10.0",
  "org.typelevel" %% "cats-effect"         % "3.5.4"
)

Scanamo wraps DynamoDB cleanly. Cats Effect manages effects safely. FS2 gives us back-pressured streams. Together, they let you process ten million users without ever holding them all in memory.

A client that cleans up after itself

The job that crashed had a second problem: when it blew up, the DynamoDB client dangled — never closed. Resource fixes that:

import cats.effect.{IO, Resource}
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.regions.Region
import org.scanamo.{ScanamoCats, Table}
import org.scanamo.generic.auto._

val clientResource: Resource[IO, DynamoDbAsyncClient] =
  Resource.fromAutoCloseable(                                          
  // auto-closes on exit, error, or cancellation
    IO(DynamoDbAsyncClient.builder().region(Region.US_EAST_1).build())
  )

def createScanamoClient(client: DynamoDbAsyncClient): ScanamoCats[IO] =
  ScanamoCats[IO](client)                                             
  // wraps raw client with effect-safe Scanamo

Resource pairs acquisition with guaranteed release — success, failure, or cancellation. No leaks, no dangling threads, no matter what happens downstream. This pattern is especially useful for long-running jobs that process large datasets.

Reading from DynamoDB — Lazily this time

Before we delve into the solution code, it's worth knowing that there are two ways to consume the stream using Scanamo — queryPaginatedM and scanPaginatedM — depending on what you're doing with each user.

Scanamo's queryPaginatedM and scanPaginatedM return data as lazy pages rather than a single dump — exactly what we needed that night. These pagination methods let us retrieve records incrementally rather than all at once.

⚠️ Prefer query-pagination over scan-pagination where possible. Scan reads every item in the table and consumes read capacity proportional to its total size, regardless of how many results you need. Use it only when you genuinely need to process all records.

3.1 Streaming a single tenant's users

Back to the Globex campaign. We know the partition key — tenantId — so we use DynamoDB's Query operation to retrieve only Globex's users efficiently. For our email campaign, we want to process one user at a time — stream them out, filter, send, and move on. That's the approach we'll build first. If your use case needs batch operations instead (like bulk DB writes or BatchWriteItem), there's an alternative at the end of this section.

Approach 1: One user (record) at a time (our approach)

Best for per-record work — emails, webhooks, notifications:

import fs2.Stream
import org.scanamo.syntax._

def streamByTenantIndividual(
  scanamoClient: ScanamoCats[IO],
  tenantId: String
): Stream[IO, User] = {
  type SIO[A] = Stream[IO, A]
  given Monad[SIO] = summon[MonadError[SIO, Throwable]]

  val table = Table[User]("users")

  scanamoClient.execT(ScanamoCats.ToStream)(   // one DynamoDB call per page; next call only when downstream pulls
    table.queryPaginatedM[SIO]("tenantId" === tenantId, 25)  // query by partition key, 25 items per page
  ).flatMap { page =>
    Stream.emits(page)                         // burst page of 25 into individual stream elements
      .map(_.leftMap(e => new Exception(e.show)))
      .rethrow                                 // fail fast on any decoding error
  }
}
  • type SIO[A] = Stream[IO, A] + given MonadError — tells Scanamo to use FS2 as its pagination monad, so pages flow lazily instead of being collected into a list.

  • execT(ScanamoCats.ToStream) — fetches one page per DynamoDB call, only making the next call when downstream pulls for more.

  • Stream.emits(page) — bursts each page of 25 into individual stream elements.

  • .rethrow — fails fast on any decoding error rather than silently swallowing it.

DynamoDB sends 25. You process them. DynamoDB sends the next 25. Your heap never knows there are two million total.

Approach 2: One page at a time (for batch operations)

If your use case involves bulk operations — batch DB writes or BatchWriteItem — you'll want to preserve the page structure instead of flattening it into individual records:

def streamByTenantPages(
  scanamoClient: ScanamoCats[IO],
  tenantId: String
): Stream[IO, List[User]] = {
  type SIO[A] = Stream[IO, A]
  given Monad[SIO] = summon[MonadError[SIO, Throwable]]

  val table = Table[User]("users")

  scanamoClient.execT(ScanamoCats.ToStream)(
    table.queryPaginatedM[SIO]("id" === tenantId, 25)
  ).evalMap { page =>
    page.traverse(_.leftMap(e => new Exception(e.show)).liftTo[IO])  // keeps page as List[User] — no flattening
  }
}

Instead of flattening with Stream.emits, .evalMap keeps the List intact. Each stream element is a full page — ready for a bulk operation like BatchWriteItem or a bulk DB insert.

When to use which:

Use Case Approach
Send one email per user Individual records
Bulk insert into another DB Pages
Filter + transform per record Individual records
DynamoDB BatchWriteItem Pages
Early termination (.take(n)) Individual records

3.2 Scanning the whole table — When you really need everything

Sometimes you genuinely need every record. The marketing campaign was for all US users — there's no partition key to filter on. Streaming keeps the operation safe and predictable:

def streamAllUsers(scanamoClient: ScanamoCats[IO]): Stream[IO, User] = {
  type SIO[A] = Stream[IO, A]
  given Monad[SIO] = summon[MonadError[SIO, Throwable]]

  val table = Table[User]("users")

  scanamoClient.execT(ScanamoCats.ToStream)(table.scanPaginatedM(25))  // walks every partition, 25 items at a time
    .flatMap { page =>
      Stream.emits(page)                       // burst page into individual elements
        .map(_.leftMap(e => new Exception(e.show)))
        .rethrow                               // fail fast on decoding errors
    }
}

scanPaginatedM walks every partition, 25 items at a time — same lazy pull model as Query, just without a key filter. If you only consume 100 records (say, a staging test), only enough pages to cover those 100 are ever fetched. That's real laziness — your test runs stay cheap even against production-scale tables.

The full pipeline — The job that doesn't wake you up

Back to Tuesday night. Here's the complete pipeline — the email campaign that should have been written in the first place:

Stream.resource(clientResource).flatMap { client =>
  val scanamoClient = createScanamoClient(client)

  streamAllUsers(scanamoClient)
    .filter(_.country == "US")               // discard non-US records inline — no extra DynamoDB call
    .evalMap { user =>
      IO(println(s"Sending email to ${user.name}"))
        *> sendMail(user)                    // sequential with backpressure — slows if email service slows
    }
}.compile.drain                             // collapses stream into IO[Unit] and runs it

This program:

  • Creates a DynamoDB client — and closes it when done, no matter what
  • Streams all users lazily — 25 at a time, never more
  • Filters for US users inline as records flow through — no extra DynamoDB calls
  • Sends each email as the user arrives from DynamoDB
  • Never holds more than one page in memory at any moment
  • Stream.resource — acquires the client once, releases it at the end, always.
  • .filter — discards non-US records inline, no second DynamoDB call needed.
  • .evalMap — runs sendMail sequentially with full backpressure. If the email service slows down, the stream slows down — DynamoDB is never asked for the next page until the current one is fully processed.
  • .compile.drain — collapses the stream into a single IO[Unit] that drives the whole thing.

No OutOfMemoryError. No 3 AM phone calls. Just clean, functional streaming. The same job that crashed Tuesday night can now process millions of users safely, with predictable memory usage.

Back to sleep

The 3 AM call happened because a well-intentioned job tried to do too much at once. The fix is a shift in how you think about reading data — not as a collection to retrieve, but as a sequence to process. Every piece of this pipeline enforces that shift: no more OutOfMemoryErrors, composable functional streams, lazy pagination with no extra read cost, and automatic resource management that cleans up no matter what happens. Pull a page, process it, move on — constant memory, predictable behaviour. Whether you're syncing tables, migrating data, or running batch email campaigns, the model stays the same. And the next time marketing asks for a midnight blast to ten million users, you can say yes — and actually sleep through the night.

Further reading


FAQs

1. How do you stream data from DynamoDB in Scala?
You can stream DynamoDB data in Scala by using Scanamo, Cats Effect, and FS2 together. This allows records to be read page by page and processed as they arrive, instead of loading the full dataset into memory at once.

2. Why is streaming DynamoDB data safer than loading everything at once?
Streaming is safer because it processes data in small batches. That keeps memory usage stable and reduces the risk of crashes like OutOfMemoryError when working with large tables.

3. What does FS2 do when reading from DynamoDB?
FS2 provides lazy, backpressured streams. It fetches more data only when the downstream part of the pipeline is ready, which helps large jobs stay controlled and memory-safe.

4. What is the role of Cats Effect in DynamoDB streaming?
Cats Effect helps manage effects and resources safely. In this setup, it ensures the DynamoDB client is properly cleaned up even if the job fails, is cancelled, or ends early.

5. Does pagination increase DynamoDB read cost?
No. Pagination does not increase DynamoDB read cost by itself. DynamoDB charges for the item read, whether you fetch it all at once or page through it incrementally.

6. When should you use Query instead of Scan in DynamoDB?
Use Query when you know the partition key and want a specific subset of records. Use Scan only when you genuinely need to process all records — keep in mind that Scan reads every item in the table and consumes read capacity proportional to its total size, regardless of how many results you actually need.