How to Load Billions of JSON Events into Amazon Redshift Every Day Using Go and Kafka

 In Data Engineering, Data Science, Platform Technology

 

Amazon Redshift forms a crucial part of our ETL pipeline. For our larger customers, we need to load 4 billions+ JSON events into hundreds of tables in Amazon Redshift every day. In this blog post, I dive into how we identified a bottleneck in our ETL pipeline, removed it and launched a new Go service with horizontal scalability and zero downtime.

Wait, doesn’t Amazon Redshift already load billions of JSON events?

Yes and no. Amazon Redshift is really good at a lot of things – excellent price-to-performance ratio, scalability, support for PL/SQL, and so on. However, it is sorely lacking in its handling of JSON into schematized tables – consider a mobile app that generates events like so:

{
  "ts": "2015-01-01T00:31:32Z",
  "entype": "user",
  "evname": "pass_mission",
  "body": {
    "level": 7,
    "mission_name": "caverns",
    "is_premium": false
  }
}

Redshift can easily ingest billions of these events into a single table, say, _events. However, every time you want to query the table, you have to write SQL to extract the columns you want using the JSON_EXTRACT_PATH_TEXT function. For example:

SELECT JSON_EXTRACT_PATH_TEXT(body, 'level')::INTEGER, COUNT(*)
FROM _events
WHERE JSON_EXTRACT_PATH_TEXT(body, 'is_premium')::BOOLEAN = true
GROUP BY 1 ORDER BY 2 DESC;

Wouldn’t you much rather write:

SELECT level, COUNT(*)
FROM _e_user__pass_mission
WHERE is_premium = true
GROUP BY 1 ORDER BY 2 DESC;

If events were “exploded” into per-event tables, you could write the nice query above instead of the horrid mess with JSON_EXTRACT_PATH_TEXT calls. To accomplish the above, you need a service that discovers and applies schemas from incoming JSON to Amazon Redshift tables. It also needs to track schema lineage, deal with errors, bad JSON etc. but that’s beside the point.

Exit Stage Left, Java Schematizer

The original service we wrote to do this work was called schematizer. It was a singelton Java service written in the beloved Dropwizard framework. There were several limitations in the design:

  1. Communication with schematizer was over HTTP. When you are dealing with tens of thousands of events per second, the overhead of HTTP can be fairly large.
  2. Schematizer wasn’t horizontally scalable. This was a major limitation, because the only way to make schematizer punch out more events per second was to put it on a bigger box.
  3. The schema registry was a Postgres RDS instance shared with other services. We often ran into connection limit issues.

Enter sqlizer, Optimistic Concurrency, Go and Kafka

As we considered how to solve these issues, we took a closer look at two technologies we have been very happy with – Go and Kafka. Go already forms a large part of our ETL pipeline. We have also had a very positive experience with Kafka. The new service, sqlizer, does schema discovery and versioning and it is also horizontally scalable and is written in Go instead of Java.

Here’s how the architecture works: event-sink service is an HTTP front-end which sits behind an Elastic Load Balancer (ELB). It receives, validates and publishes events to Kafka topics. An array of sqlizer instances running on Amazon Elastic Container Service (ECS) reads from these topics and publishes schematized events back to Kafka, where they are picked up by a service called batcher and uploaded to S3.

We also built a new service, franz, to manage Kafka topics and partitions. As can be expected, larger event streams from customers with more data get more partitions in Kafka, and we use franz to tune per-app topics. We also gave the schema registry its own dedicated Postgres RDS instance.

In the old architecture, schematizer could achieve 15k events / second on the largest instance type in EC2. The new schematizer nets 75k to 110k events / second with horizontal scalability. That’s up to 10B events per day per customer so there’s no real limit other than Kafka and network performance. Here’s a metric snapshot from DataDog showing sqlizer performance:

Lessons Learned

  1. Build for horizontal scalability. This may seem obvious, but there are services that very reasonably start off as singletons and then end up hitting limits on what can be done with one instance. Horizontally scalable services cause fewer operational fires (“get me a bigger box!!”)
  2. Optimistic concurrency is simple to reason about. With multiple sqlizers writing to the same schema registry, we found optimistic concurrency to be a great pattern to use instead of work-splitting. We had an initial design which relied on Zookeeper for state management, but the optimistic concurreny design is a lot simpler to reason about and has fewer moving parts.
  3. It’s not about Go vs. Java. Could we have written the service in Java and gotten the same performance? Probably. There were time we missed Java profilers – the Go ecosystem is not mature when it comes to performance monitoring and management. We chose Go because of the expertise in our team and its smaller memory footprint.
  4. Kafka is wonderful but needs care and feeding. Kafka is just amazing – but running it on EC2 does take some understanding of network performance and disk IOPS. In a future blog post, I’ll write about how we manage and monitor Kafka.
  5. Monitor everything. We use DataDog as our metrics and monitoring tool. We instrumented critical code paths in all new services and watched dashboards before and during deployments. We just couldn’t have launched this service without in-depth monitoring.

Want to see a demo of Appuri or just chat? Drop me a line!