This is not a post about Big Data. This is about datasets that are too big to handle entirely in-memory but too small to justify spending a few hours writing a custom data pipeline.

Here is an easy way to deal with data that should take up to a few hours to process:

Repo.transaction(fn ->
  YourSchema
  |> order_by(asc: :inserted_at)
  |> any_query()
  |> Repo.stream()
  |> Stream.map(fn user -> any_transformation(user) end)
  |> Stream.filter(&any_filter/1)
  |> Stream.each(fn user -> do_something_with_user(user) end)
  |> Stream.run()
end, timeout: :infinity)

Comments

  • Repo.stream/1 has to be used inside a transaction for both PostgreSQL and MySQL. It queries the DB in chunks (the default chunk size is 500 rows.)
  • We’re passing timeout: :infinity to Repo.transaction/2 to allow it to run as long as it needs. The default is 15 seconds, after which the transaction times out.
  • Streaming the results will block one of the connections from Ecto’s connection pool, so watch out for that if your connection pool size is small.
  • Since it’s run in a transaction, rows added after the function starts won’t be read. You can re-run the script adding a where clause with a timestamp to synchronise those entries.
  • Since Repo.stream returns an Elixir stream, you have to use Stream module instead of Enum. You can use Stream instead of Enum for most of what you’d typically need. Just use Stream.run/1 at the end to actually run the processing.
  • Before you run the function on the entire dataset, you can easily add |> limit(10) clause to your query and test if processing is correct.

Example use cases

  • Migrate the data from one table to another.
  • Migrate the data to another database (for example: build an initial index in ElasticSearch).
  • Export the data to a CSV file.
  • Fill missing values for old data (after adding a new column).
  • Any manual, one-time task you might want to do for some data.

Alternative approaches

References: