I run the 25-person engineering team at RJMetrics. Since our founding in 2009, we’ve built a full-stack business intelligence product. Full-stack BI involves a lot of technology: we manage the entire ETL process, the data warehouse (built on Redshift), have our own custom caching layer, and an entire analysis front-end. We sell this entire stack of technology as an integrated solution.
Last month, we announced the launch of our new product: RJMetrics Pipeline. Pipeline is a modified version of the data infrastructure layer from our full-stack product, released as a standalone product. It solves a very straightforward, yet always vexing, problem — consolidating all of an organization’s data into a single, high-performance data warehouse on an ongoing basis. It does this in near-real-time (seconds to minutes, depending on data source), and it can scale to support arbitrarily large data volumes.
In this post, I’m going to do a deep dive on the technology that’s at the heart of Pipeline. I’ll lay out our initial requirements, our core challenges, and the design decisions we made as a result.
Aside: why did we build Pipeline?
We think that today’s biggest analytical challenge is data proliferation. Data no longer just lives in a handful of relational databases on the corporate network. Today, data is located throughout the cloud, stored in varying formats by many different providers, and consolidating it into a single warehouse for analysis is challenging.
Many are solving this problem with a combination of home-grown and open-source technology, but the frequency with which data sources and business needs change means these solutions require frequent updates and maintenance. RJMetrics Pipeline is our solution to that problem.
Pipeline design criteria
Pipeline has a straightforward job: Continuously extract data from many source systems and load it into a single destination.
It sounds simple at this level of abstraction, but the realities of doing this quickly and reliably across a large universe of source systems are complex. Bringing any sufficiently complex system to life involves trade-offs, so we created design criteria at the outset of the project to inform how such trade-offs would be made. We arrived at these criteria after an extensive market validation process that we plan on covering in a separate post. Here they are:
Some decisions need to be made in real time, so data freshness is critical. While there will be latency constraints imposed by particular source data integrations, data should flow through the Pipeline with as close to zero latency as possible.
We can’t have the system break down when large volumes of data start pouring in. All components of the Pipeline should scale to support arbitrarily large throughput.
Data cannot be dropped or changed in a way that corrupts its meaning. Every data point should be auditable at every stage in the pipeline.
Since we intend to consolidate data from many sources, we need to accommodate data of different shapes and types.
Latency and scalability were the most important criteria to establish at the outset of this project. Implementing a high latency (batch-based) pipeline is technically very different than implementing a low-latency pipeline. And scalability needed to be built in from the ground up: it isn’t something you can bolt on later.
Solidifying these criteria up front enabled us to parallelize our development effort while knowing that each of our teams was working towards the same goal. We honestly didn’t face any serious gotchas in the development process, which allowed us to move quickly.
Core engineering challenges
With the team aligned on functional requirements, the next step was to distill out our technical requirements. We concluded that a service for delivering the above functionality would need to also meet these technical goals:
RJMetrics has always been a cloud platform, and we want to maintain that. Running a separate instance for each customer was not an option given our go-to-market strategy. Instead, each client maintains their own destination database, while the pipeline uses shared resources to load those databases. Multi-tenancy interacts in challenging ways with scalability. One customer may send us a billion rows of data in an hour and that can’t delay the fact that another customer sent us a thousand rows in that same hour. More on the solution to this later.
This is critical for the data receiving component — unavailability means we are missing data, and we can’t assume that all data producers will resend data when availability is restored. We intend to guarantee high availability once Pipeline hits general availability, which means just a small number of hours of unavailability per year.
We have to distribute our workload among many machines in order to meet our scalability and availability requirements, but making this leap introduces complexity. Once a system is distributed, coordination between individual components is a challenge. In Pipeline, this manifests as an ordering problem; we need to guarantee that two data points submitted in order are realized in the same order in Redshift, even if they are handled by different machines.
If latency is important, then we need to measure it at every step along the way. Any problems with any stage in the pipeline should be pinpointed and appropriate alerts triggered.
A trip down the pipeline
Let’s follow a single data point through the pipeline to see how these requirements have influenced the system design.
Step 1: make data
“namespace”: “MyWebsite.com Activity”,
Many of the design decisions mentioned above are visible right here.
First, the data format, as you can probably tell, is JSON. JSON is great for usability; it’s easy to read and write, and pretty much every language out there has library support for it. It also supports arbitrary schemas, nested objects and collections, which is necessary for our flexibility requirement.
But, sometimes JSON is too simple. For example, JSON doesn’t have a datatype for dates, and doesn’t differentiate between integer and floating point numbers. To make up for these shortcomings, we also accept data in Transit format. Transit is essentially JSON with an extended type system, and, although it’s not as widely supported as JSON, it has libraries in a number of popular languages.
table_id fields serve our multi-tenancy requirement. We separate data based on these values, and they dictate the ultimate destination for the data. (Data must also be submitted with a secret credential to ensure the request is authorized.)
Finally, the “sequence” field in the request is required to meet our consistency goal. That field allows the user to specify how to order data points in the request relative to all the others that have been received. Without it, two updates to the same data point could be received in the wrong order, resulting in inaccurate data. In most cases the sequence can simply be the timestamp when the datapoint was generated.
Step 2: send data
To send the above data to Pipeline, it is attached to the body of an HTTP POST request to pipeline-gate.rjmetrics.com. This fact alone reveals some design choices. First, the Pipeline has a single entry point for data. This makes basic auditing easy — we can just compare the volume of data flowing into the pipe against the volume flowing out and know at a glance that we’re not missing any.
Second, we use HTTP in service of our flexibility goal. It’s not the most efficient protocol to use for data transfer, but it is the most widely used. We have built dozens of replicators that submit data this way, and we can be confident that clients on any platform will be able to send data via HTTP.
Step 3: into the queues
In order to meet our scalability and availability requirements, once data enters the pipeline it must be handled by systems that are resilient to node failure and scale horizontally.
The HTTP request is load balanced to an available server with capacity to take on the workload. When data hits the HTTP server it’s a hot potato — now that it has been received, we can’t lose it! The next stop — dubbed the fat queue — is an Apache Kafka queue that serves as persistent, low-latency storage for the HTTP servers to pass the potato off to. Kafka is an ideal choice at this stage because it can support a large volume of low-latency writes while guaranteeing durability.
Data submitted to the fat queue isn’t acknowledged until it is replicated to three different Kafka servers to mitigate the possibility of data loss, a requirement of our accuracy goal. The HTTP servers get acknowledgement that the data has been persisted in a small number of milliseconds — fast enough that they can wait for this confirmation before themselves confirming that the data was accepted.
Data is picked off of the fat queue by an Apache Spark streaming cluster that performs “demultiplexing” — separating data by client, namespace and table — and micro-batching. We chose Spark because it guarantees at least once delivery of all data points, and has a simple language for defining the operations we required.
Our Spark jobs write to a second set of Kafka queues — the “thin queues” — that are dedicated to each client’s data set. This ensures the data stays separated, and protects against the possibility that a single large data set could crowd out smaller data sets, fulfilling our multi-tenancy and scalability requirements.
Step 4 — to Redshift
The final step is to copy the data from the thin queue into its final destination, Redshift. At this point our flexibility and accuracy requirements crash head-first into each other, presenting some difficult challenges. The incoming data is a JSON blob with a flexible, possibly nested, schema. Redshift, on the other hand, doesn’t support nesting, and requires the schema to be specified up front. We have to use some complex logic to break nested records apart into separate tables, and evolve schemas as new columns and datatypes are identified.
Why Redshift? This was purely a question of market pull. Redshift has become, at least for the companies we find ourselves talking to, the default option for warehousing. And as Pipeline loads a warehouse that you manage, we obviously prefer to support the warehouse that most of our potential customers use.
We’ve been incredibly impressed with Redshift’s performance for analytical and data write workloads. Over the next few months I’m going to share some data we’ve gathered in our testing of the platform in a series of “Benchmarking Redshift” posts on our blog.
What about Transformation?
You’ll notice our design criteria didn’t mention anything about data modeling or transformation. In our market validation, we certainly found many use cases where modeling and transformation were important to customers, but we also found that transformation needs have shifted significantly because of the performance profile of Amazon Redshift. Simply put, Redshift is performant enough to handle most transformations, and performing transformations in a familiar language — SQL — and alongside the original raw data is great for auditability. These facts have given rise to the “ELT” model of data processing.
While we did need to build some basic transformation functionality to convert data types, decode JSON, and normalize nested data structures, we decided to do as little modeling and transformation in Pipeline as possible. We may build functionality like this in the future, but specifically leaving it out of scope allowed us to focus on executing against the problems that we cared most about: latency and scalability.
I’ve been reading articles on how other engineering organizations have built their data pipelines for a year or so at this point, and now I’ve finally written my own. I hope you learned something.
There is, of course, the hook. If you’re looking for a data pipeline, I’m not going to discourage you from building one yourself. But we’ve spent tens of thousands of engineering hours solving this problem, and we’re giving you 5MM events per month for free, forever. Before you dive into a major engineering effort, give RJMetrics Pipeline a try and see what you think.