After building tens of data pipelines for companies of different scales (including my own), I’ve come up with an architecture that I’ve thrown at multiple problems, and it works *nearly* every time.
1. Velocity: hundreds to thousands of data per second
2. The need for real time operations: the need to serve real-time machine learning algorithms, update real-time dashboards, run real-time recommendations that are served to users
3. Scalability: The need to scale up or down, dynamically based on demand
4. Cost: Maximizing storage and optimizing access patterns for efficiency
There are literally hundreds of vendors and OS tools that you can use: Airflow, Cassandra, Hbase, Hive, Redshift, Kafka, etc. I am going to talk in terms of abstractions and allow you to fill in the specific details that fit your needs. You will soon see that the structure I lay out here will certainly solve your problem. I want to stress the flexibility of this architecture, as you can use different tooling for each component.
I’ve used this for
- Startups that handle impressions and real-time user feedback
- Real time financial applications
- Video processing
But, I can see it being used elsewhere to great success. I’ve yet to find a problem that I can’t apply this to.
It consists of 6 different tools. You can add ancillary services depending on your needs, but it must have these 6 components. Whether you are managing 100,000 unique data per day, or 1,000,000,000, these components remain the same, with the only difference being scale and devops.
1 . The Front-End
2. The Stream
The Front End
The “front-end” serves as the routing for the rest of your data pipeline. It is scalable and asynchronous, which means it can serve thousands of potential sources which generate data, which could be sensors, user connections, etc. Even though Kafka and other message brokers have APIs for producers, I don’t like the idea of having producers directly access the message bus. There are several complications that arise. This also makes it possible for an individual user web session to be a producer in your data-pipeline. By having a front end, you can enforce specific standards, and reject a producer if it doesn’t meet the standards, For this, I like Tornado, but there are other options.
What it does:
- Producers push data to this front-end via a webservice call. This never “fetches”
- It should “wrap” data in an envelope that can be understood by every agent, worker, etc. in your system. (example provided below)
“created”: “2020–05–05 00:11:11”,
The envelope has some lightweight metadata, but not much else. The only real requirement here is the “data-id”, which is used by every component in the architecture. I like JSON because it’s universal and compresses nicely. You could also explore things like message-pack or bson if you’re approaching the billions of data per day level, as space will be a major consideration.
- It should “route” requests based off the data type. Say for example, you have 10,000 sensors, which vary by type. The webservice routes these to the appropriate topic or your stream.
- The front end can also control security somewhat because it will be the only point in your data pipeline that is accessible to the outside world. You can add additional layers of security here, such as authentication of each client, etc.
- It should place data in the stream. Every item goes in the stream
The stream serves as a source that allows concurrent processing from multiple real-time applications.
For example, you are running a power grid company, with 500 generating stations. You are also taking in demand data from 1000 different aggregation stations. You need to 1) update your real-time dashboard, 2) run machine learning algorithms that take demand and modify the generator output for the next hour and 3) store the data in multiple databases that are used by your operating team, all with a different format.
For this, I like Kafka. Kinesis is also a good fit, but the sharding poses some problems with the routing methods I described below. Also, if your 100% on-premise, AWS doesn’t work.
What it does
- It provides multiple channels for data routed for your front end. For example your demand data are in one Kafka topic: “demand” and your generator data are in another Kafka topic: “generation”.
- The stream can work in input and output as well. For example, your demand data are consumed by a machine learning algorithm and then the predictions are piped into another topic ‘demand-prediction’, which is consumed by another agent which sends messages to your generation station.
Your stream does nothing but serve as the message bus for the rest of your architecture. With Kubernetes, you can add some advanced autoscaling as well. But that discussion is beyond the scope of this.
Here’s where this entire thing gets interesting. Because you have standardized all your data, and any agent can digest any message and do something with it. You can have 1 agent for every topic, or 100. An agent can be a complex application or a simple lambda function.
Each agent has a unique name or id. You decide the naming conventions, but it’s important that you can identify agents uniquely. Bonus points if you incorporate versions into your agent-naming
- They vary in complexity: The agent can be a full-fledged application (like a dockerized python application, or node-js application), or something simple like a AWS lambda function.
- Agents are single purpose: Each agent should perform a single task on a single data type coming from your stream. Don’t multipurpose your agents since they’re cheap to deploy. That way, when an input data type changes, you don’t have to parse and update all your agents. You just go directly to the agents which operate on that data type. This is important when your agents start to number in the 10s or 100s.
- They’re mostly stateless: If an agent fails, you can just spin up another one that does the exact same thing! Your system becomes extremely fault tolerant with the right monitoring. I say mostly stateless, because occasionally I’d have an agent collect data for a bit prior to performing it’s calculation, let’s say a 20 period moving average.
- They only store data if they create data: An agent only stores data if it produces something unique. That is, don’t have the RealtimeDashboardAgent store the raw message. Create a separate S3StorageAgent that stores all records in partitioned s3 buckets.
What they do
- Deserialize the data into something it understands
For example, in one case I had a deep learning model which needed to make a prediction on each new observed datum. The agent kept a lightweight queue of items (the last 20 records). First, the data had to be transformed to a numpy array. Then I had a bit of feature engineering that needed to be performed on the raw item, including min-max scaling and one-hot encoding.
- Make a trace
In order to prevent an agent from operating on a datum twice, it’s important that you use the trace cache to acknowledge that an agent, identified uniquely, has picked up the data, also identified uniquely. That way, the first step in any agent’s processing flow should be a check and set in your trace cache. This also allows for interesting use-cases, such as data lineage (identifying the exact source, and order of operations on any unique piece of data for your company).
- Do the task:
To use the same example above, once the data was deserialized into an array, I could feed it to the trained machine learning model. The model was also versioned, so I could then predict the value (sentiment), then also store the record-id, the prediction and the version of the model that created the prediction in my RDBMS for future analysis.
Because the record-id is used throughout the entire system, I don’t have “re-store” the raw data. A data scientist could just query the raw record and recreate the features for retraining in the future. If a new version of the model is created, or even something more complex like switching from regression to LSTMs, absolutely nothing needs to change with the way the data comes in. Just roll a new agent and add your new model. You don’t even need to turn off the initial agent. You can deploy a new agent with the new model, then once your happy, decommission the original agent.
For another example, Let’s say you have a real time dashboard used by your operations team. The team needs to see average power generation data from all its stations in a heatmap, and it needs to be real time.
The agent reads the “generation” topic from the stream. It deserializes it into a raw record, decodes it from Base64.
It then reads the current value “output_average_generator_10” from the cache that feeds your dashboard, then sets it to the new average after incorporating the new data.
Since the dashboard application is polling your cache every 5 seconds for new values, it doesn’t need to be pinged to know everything’s updated. It just works seamlessly. If the dashboard needs new values, just add another agent or modify the existing one.
One important note is the idea of “StorageAgents” being first class citizens in this framework. More details provided below.
You should start to see how valuable this is
This is highly dependent on the scale and maturity of your organization. Do you have data scientists that need to query data regularly? Do you need to store everything, or just aggregated data?
Let me provide an example of a pattern that worked well for me.
I had an S3StorageAgent that read records from a real time financial data source. Each topic was a different security, and every 5 seconds I’d receive a new update. The S3StorageAgent had two threads — one which would append each new record to a flat-file, organized by security and time_interval. The other, which would periodically poll the number of records in each file, then compress and upload them to s3 once the limit was reached.
Now that I had compressed flat files containing the data, I could easily run a batch job that ran daily to upload all the flat files for the day to redshift. This is something that you could certainly schedule with an Airflow cluster if you preferred.
You could also have a RedshiftStorageAgent, that did something similar if you wanted your cluster to reflect the latest values instead of waiting a day for the new updates.
It’s important to note here that StorageAgents need to be their own “first-class” citizen in your Agent framework. An agent never stores data, unless it’s a Storage Agent or if it’s generating something unique.
Think of the possibilities here. You can mixin whatever you like. Dynamo. Postgres. Cassandra. Whatever you want.
Your trace cache should be a single source of truth for your agent framework. It is organized by the “data-id”, or the unique identifier that you use to identify any data produced in your system. Because you collect potentially thousands of producers into a single front-end, you can ensure that the data and identifiers are standardized. I like Redis for this because it provides O(1) guarantees for the operations I listed, but other options exist.
Items that need to be traced
A key is created by the producer initially
HSET fdc7d2f2–062c-4683–9a41–1d7912723c1b created timestamp
An agent has received an operation on a record — this prevents duplicated messages
HSET fdc7d2f2–062c-4683–9a41–1d7912723c1b agent_1503 timestamp
Aside from that, anything goes. You can store the final source of the data, amongst other things. You can also analyze the end-to-end lifecycle of your data, such as the time between specific operations or whether an agent is successfully reading data that needs to be read.
A consideration that needs to be made here is, how detailed you need to be and whether you need to analyze traces for a long period of time. This will become prohibitive as you start to scale into the millions-billions of records per day. Then you might want to hold traces for a single day.
You need to be able to understand the “flow-rates” and whether each component is functioning properly. Also, if an agent is performing well, or not, or hanging, and the various response times between components of your architecture.
This will inform decisions such as:
1) Whether to add another agent to “scale up” processing on a task
2) Whether to scale up your front end to handle producer traffic
3) Adding new topics or shards to your streams
4) Re-engineering agents that perform poorly or are impacting SLAs
You could try to build all of this yourself, which is it’s own major project, or select a vendor. I’ve used DataDog successfully in the past, and it’s easy to add instrumentation and track everything via a centralized dashboard. I have nothing but high regards for the product.
Just to give an example, you can operate at small to medium size business scale for $450-$750 a month on AWS given the appropriate selections in server size, making sure you are fully utilizing your machines (running multiple dockerized applications on the same hardware). This excludes the upfront development time. I find that this architecture would fully support a data team of 10–15, including business analysis and machine learning
I hope this was useful!