Insights in 'Designing Data-Intensive Applications' - Part 3/3

A post to share key insights after reading Martin Kleppmann's essential book. Part 3/3

Welcome to my blog on the book "Designing Data Intensive Applications" by Martin Kleppmann. In this post, I will share with you key principles, trade-offs, and technologies that I found as the most interesting for building robust and scalable data systems.

Most of these notes are as they appear in the original book, or tailored by me, but all ideas are a creation of Martin or authors Martin quotes in his book.

This is Part 3. Check Part 1, Foundations of Data Systems, here, and Part 2, Distributed Data, here.

Derived Data

Intro

The first two parts went from the layout of the data on disk to the limits of distributed consistency. However, a single database was assumed at all times. In real life, access patterns are more complex, and there is no one database that can satisfy simultaneously different use cases. Thus, applications use a combination of stores, caches and mechanisms.

In this final part, we'll have a look at the complexities around integrating multiple different systems. If a vendor offers you a product that can "satisfy all your needs", think again.

Two Categories for store-and-process systems

  • Systems of record Also known as "source of truth", it's the place where new data comes in and is represented exactly once, normally normalized.

  • Derived data systems Existing data is taking from one system and processed in some way i.e. a cache, denormalized values, indexes, materialized views. Good read performance.

10. Batch-oriented dataflows

Types of Systems:

  1. Services (online) A service waits for a request or instruction from a client to arrive. When one is received, the service tries to handle it as quickly as possible and sends a response back. Response time is usually the primary measure of performance of a service, and availability is often very important (if the client can’t reach the service, the user will probably get an error message).
  2. Batch processing (offline) Such system takes a large amount of input data, runs a job to process it, Since normally there isn’t a user waiting for the job to finish, batch jobs are often scheduled to run periodically. The primary performance measure of a batch job is usually throughput (the time it takes to crunch through an input dataset of a certain size). MapReduce is for instance a batch processing algorithm.
  3. Stream processing (near-real-time) This type is somewhere between the previous two. Like a batch processing system, a stream processor consumes inputs and produces outputs. However, a stream job operates on events shortly after they happen, whereas a batch job operates on a fixed set of input data. This difference allows stream processing systems to have lower latency than the equivalent batch systems.

MapReduce workflows

A batch job’s output is only considered valid when the job has completed successfully (MapReduce discards the partial output of a failed job). To handle these dependencies between job executions, various workflow schedulers for Hadoop have been developed, including Oozie, Azkaban, Airflow and Luigi.

While MPP (massively parallel processing) databases focus on parallel execution of analytic SQL queries on a cluster of machines, the combination of MapReduce and a distributed filesystem provides something much more like a general-purpose operating system that can run arbitrary programs.

Joins

When we talk about joins in the context of batch processing, we mean resolving all occurrences of some association within a dataset. For example, we assume that a job is processing the data for all users simultaneously, not merely looking

Batch join example: log of user activity events reduce-side sort-merge joined with a database of user profiles. Figure 10-3
Batch join example.

Making random-access requests over the network for every record you want to process is too slow. Moreover, querying a remote database would mean that the batch job becomes nondeterministic, because the data in the remote database might change while the job is running.

Thus, a better approach would be to take a copy of the user database and to put it in the same distributed filesystem as the log of user activity events. You would then have the user database in one set of files in HDFS and the user activity records in another set of files, and you could use MapReduce to bring together all of the relevant records in the same place and process them efficiently. If the input datasets are partitioned into multiple files, each could be processed with multiple mappers in parallel.

Sessionization

A common use for grouping is collating all the activity events for a particular user session, in order to find out the sequence of actions that the user took—a process called sessionization. For example, such analysis could be used to work out whether users who were shown a new version of your website are more likely to make a purchase than those who were shown the old version (A/B testing), or to calculate whether some marketing activity is worthwhile.

Handling skew

Collecting all activity related to a celebrity (e.g., replies to something they posted) in a single reducer can lead to significant skew (also known as hot spots)—that is, one reducer that must process significantly more records than the others. Since a MapReduce job is only complete when all of its mappers and reducers have completed, any subsequent jobs must wait for the slowest reducer to complete before they can start.

If a join input has hot keys, there are a few algorithms you can use to compensate. For example, the skewed join method in Pig first runs a sampling job to determine which keys are hot.

Output of Batch Workflows

Where does batch processing fit in? It is not transaction processing, nor is it analytics. It is closer to analytics, in that a batch process typically scans over large portions of an input dataset. However, a workflow of MapReduce jobs is not the same as a SQL query used for analytic purposes. The output of a batch process is often not a report, but some other kind of structure.

Philosophy of outputs

By treating inputs as immutable and avoiding side effects (such as writing to external databases), batch jobs not only achieve good performance but also become much easier to maintain:

  • ease of rolling back: being able to recover from buggy code ('human fault tolerance')
  • feature development can proceed more quickly than in an environment where mistakes could mean irreversible damage. This principle of minimizing irreversibility is beneficial for Agile software development
  • automatic retry is safe because inputs are immutable and outputs from failed tasks are discarded by the MapReduce framework

Designing for frequent faults

At Google, a MapReduce task that runs for an hour has an approximately 5% risk of being terminated to make space for a higher-priority process. This rate is more than an order of magnitude higher than the rate of failures due to hardware issues, machine reboot, or other reasons. At this rate of preemptions, if a job has 100 tasks that each run for 10 minutes, there is a risk greater than 50% that at least one task will be terminated before it is finished.

And this is why MapReduce is designed to tolerate frequent unexpected task termination: it’s not because the hardware is particularly unreliable, it’s because the freedom to arbitrarily > terminate processes enables better resource utilization in a computing cluster.

Downsides

MapReduce’s approach of fully materializing (writing to disk) intermediate state has downsides compared to other batch mechanisms like Unix pipes:

  • A MapReduce job can only start when all tasks in the preceding jobs (that generate its inputs) have completed
  • Mappers are often redundant: they just read back the same file that was just written by a reducer
  • Storing intermediate state in a distributed filesystem means those files are replicated across several nodes, which is often overkill for such temporary data

Dataflow Engines

In order to fix these problems, several new engines were developed, like Spark. They handle an entire workflow as one job, rather than breaking it up into independent subjobs. Since they explicitly model the flow of data through several processing stages, these systems are known as dataflow engines. Like MapReduce, they work by repeatedly calling a user-defined function to process one record at a time on a single thread. They parallelize work by partitioning inputs, and they copy the output of one function over the network to become the input to another function.

Unlike in MapReduce, these operators need not take the strict roles of alternating map and reduce, but instead can be assembled in more flexible ways. One option is to repartition and sort records by key, like in the shuffle stage of MapReduce. This feature enables sort-merge joins and grouping in the same way as in MapReduce.

Advantages

  • Expensive work such as sorting need only be performed in places where it is actually required, rather than always happening by default between every map and reduce stage.
  • There are no unnecessary map tasks, since the work done by a mapper can often be incorporated into the preceding reduce operator (because a mapper does not change the partitioning of a dataset).
  • Because all joins and data dependencies in a workflow are explicitly declared, the scheduler has an overview of what data is required where, so it can make locality optimizations. For example, it can try to place the task that consumes some data on the same machine as the task that produces it, so that the data can be exchanged through a shared memory buffer rather than having to copy it over the network.
  • It is usually sufficient for intermediate state between operators to be kept in memory or written to local disk, which requires less I/O than writing it to HDFS (where it must be replicated to several machines and written to disk on each replica). MapReduce already uses this optimization for mapper output, but dataflow engines generalize the idea to all intermediate state.
  • Operators can start executing as soon as their input is ready; there is no need to wait for the entire preceding stage to finish before the next one starts.
  • Existing Java Virtual Machine (JVM) processes can be reused to run new operators, reducing startup overheads compared to MapReduce (which launches a new JVM for each task).

Fault Tolerance

Spark, Flink, and Tez avoid writing intermediate state to HDFS, so they take a different approach to tolerating faults: if a machine fails and the intermediate state on that machine is lost, it is recomputed from other data that is still available.

Nondeterminism when recomputing data needs to be removed in order to reliably recover from faults, for example by generating pseudorandom numbers using a fixed seed.

Recomputing data is not always the right answer: if the intermediate data is much smaller than the source data, or if the computation is very CPU-intensive, it is probably cheaper to materialize the intermediate data to files than to recompute it.

Thanks to the framework, your code in a batch processing job does not need to worry about implementing fault-tolerance mechanisms: the framework can guarantee that the final output of a job is the same as if no faults had occurred, even though in reality various tasks perhaps had to be retried. These reliable semantics are much stronger than what you usually have in online services that handle user requests and that write to databases as a side effect of processing a request.

11. Streams Processing: Batch but with lower delays.

A complex system that works is invariably found to have evolved from a simple system that works. The inverse proposition also appears to be true: A complex system designed from scratch never works and cannot be made to work.

  • John Gall, Systemantics (1975)

The problem with daily batch processes is that changes in the input are only reflected in the output a day later, which is too slow for many impatient users. To reduce the delay, we can run the processing more frequently—say, processing a second’s worth of data at the end of every second—or even continuously, abandoning the fixed time slices entirely and simply processing every event as it happens. That is the idea behind stream processing.

Crucially, in batch processing jobs, the input data is bounded: it has a known, fixed size (for example, it consists of a set of log files at some point in time, or a snapshot of a database’s contents). Because it is bounded, a job knows when it has finished reading the entire input, and so a job eventually completes when it is done. In stream processing, the input is unbounded. In this case, a job is never complete, because at any time there may still be more work coming in. Both are similar in some respects, but the assumption of unbounded streams also changes a lot about how we build systems

Some examples that are naturally represented as streams are user activity events, sensors providing periodic readings, and data feeds (e.g., market data in finance).

The writes to a database can also be thought of as a stream: we can capture the changelog—i.e., the history of all changes made to a database—either implicitly through change data capture or explicitly through event sourcing. Log compaction allows the stream to retain a full copy of the contents of a database.

Transmitting Event Streams

In a stream processing context, a record is more commonly known as an event, but it is essentially the same thing: a small, self-contained, immutable object containing the details of something that happened at some point in time.

In streaming terminology, an event is generated once by a producer (also known as a publisher or sender), and then potentially processed by multiple consumers (subscribers or recipients). In a filesystem, a filename identifies a set of related records; in a streaming system, related events are usually grouped together into a topic or stream.

Direct messaging from producers to consumers

UDP multicast is widely used in the financial industry for streams such as stock market feeds, where low latency is important.

Load-balancing vs fan-out. Figure 11-1.
Load-balancing vs fan-out patterns

Partitioned Logs

Sending a packet over a network or making a request to a network service is normally a transient operation that leaves no permanent trace. Message brokers are built around this transient messaging mindset. Databases and filesystems take the opposite approach: everything that is written to a database or file is normally expected to be permanently recorded.

A key feature of batch processes, is that you can run them repeatedly, since the input is read-only. This is not the case with message brokers: receiving a message is destructive if the acknowledgment causes it to be deleted from the broker, so you cannot run the same consumer again and expect to get the same result.

Further, if you add a new consumer to a messaging system, it typically only starts receiving messages sent after the time it was registered. We need a hybrid solution.

Using logs for message storing

Combining the durable storage approach of databases with the low-latency notification facilities of messaging, we get the hybrid behind log-based message brokers. Message brokers and event logs serve as the streaming equivalent of a filesystem.

Apache Kafka, Amazon Kinesis Streams, and Twitter’s DistributedLog are log-based message brokers that work like this. Even though these message brokers write all messages to disk, they are able to achieve throughput of millions of messages per second by partitioning across multiple machines, and fault tolerance by replicating messages.

Topic-partitioning. Figure 11-3.
Topic partitioning
Since partitioned logs typically preserve message ordering only within a single partition, all messages that need to be ordered consistently need to be routed to the same partition. For example, an application may require that the events relating to one particular user appear in a fixed order. This can be achieved by choosing the partition for an event based on the user ID of that event (in other words, making the user ID the partitioning key).

You can keep derived data systems such as search indexes, caches, and analytics systems continually up to date by consuming the log of changes and applying them to the derived system.

When consumers cannot keep up with producers

Even if a consumer does fall too far behind and starts missing messages, only that consumer is affected; it does not disrupt the service for other consumers. This fact is a big operational advantage: you can experimentally consume a production log for development, testing, or debugging purposes, without having to worry much about disrupting production services. When a consumer is shut down or crashes, it stops consuming resources—the only thing that remains is its consumer offset.

Search on streams

Conventional search engines first index the documents and then run queries over the index. By contrast, searching a stream turns the processing on its head: the queries are stored, and the documents run past the queries. In the simplest case, you can test every document against every query, although this can get slow if you have a large number of queries. To optimize the process, it is possible to index the queries as well as the documents, and thus narrow down the set of queries that may match.

Event time versus processing time

Confusing event time and processing time leads to bad data. For example, say you have a stream processor that measures the rate of requests (counting the number of requests per second). If you redeploy the stream processor, it may be shut down for a minute and process the backlog of events when it comes back up. If you measure the rate based on the processing time, it will look as if there was a sudden anomalous spike of requests while processing the backlog, when in fact the real rate of requests was steady.

Windowing by processing time introduces artifacts due to variations in processing rate.. Figure 11-7.
Windowing

Whose clock are you using, anyway?

The timestamp on the events should really be the time at which the user interaction occurred, according to the mobile device’s local clock. However, the clock on a user-controlled device often cannot be trusted, as it may be accidentally or deliberately set to the wrong time. The time at which the event was received by the server (according to the server’s clock) is more likely to be accurate, since the server is under your control, but less meaningful in terms of describing the user interaction.

To adjust for incorrect device clocks, one approach is to log three timestamps:

  • The time at which the event occurred, according to the device clock
  • The time at which the event was sent to the server, according to the device clock
  • The time at which the event was received by the server, according to the server clock

By subtracting the second timestamp from the third, you can estimate the offset between the device clock and the server clock (assuming the network delay is negligible). You can then apply that offset to the event timestamp, and thus estimate the true time at which the event actually occurred.

Session window

Unlike other window types, a session window has no fixed duration. Instead, it is defined by grouping together all events for the same user that occur closely together in time, and the window ends when the user has been inactive for some time (for example, if there have been no events for 30 minutes). Sessionization is a common requirement for website analytics.

Stream enrichment

It is natural to think of the user activity events as a stream, and to perform a join with a user-profile database on a continuous basis within a stream processor: the input is a stream of activity events containing a user ID, and the output is a stream of activity events in which the user ID has been augmented with profile information about the user.

To perform this join, we can the have a local copy of the database in the stream processor be an in-memory hash table if it is small enough, or an index on the local disk. With this approach, it can be queried locally without a network round-trip.

Consider the Twitter timeline example. We don't want to iterate over all the people the user is following, find their recent tweets, and merge them to build it. Instead, we want a timeline cache: a kind of per-user “inbox” to which tweets are written as they are sent, so that reading the timeline is a single lookup.

Sloooowly Changing Dimension

For example, if you sell things, you need to apply the right tax rate to invoices, which depends on the country or state, the type of product, and the date of sale (since tax rates change from time to time). When joining sales to a table of tax rates, you probably want to join with the tax rate at the time of the sale, which may be different from the current tax rate if you are reprocessing historical data.

If the ordering of events across streams is undetermined, the join becomes nondeterministic, which means you cannot rerun the same job on the same input and necessarily get the same result: the events on the input streams may be interleaved in a different way when you run the job again.

In data warehouses, this issue is known as a slowly changing dimension (SCD), and it is often addressed by using a unique identifier for a particular version of the joined record: for example, every time the tax rate changes, it is given a new identifier, and the invoice includes the identifier for the tax rate at the time of sale. This change makes the join deterministic, but has the consequence that log compaction is not possible, since all versions of the records in the table need to be retained.

Fault Tolerance

The batch approach to fault tolerance ensures that, although restarting tasks means that records may in fact be processed multiple times, the visible effect in the output is as if they had only been processed once. This principle is known as exactly-once semantics.

The same issue arises in stream processing, but it is less straightforward to handle: waiting until a task is finished before making its output visible is not an option, because a stream is infinite and so you can never finish processing it.

Microbatching and checkpointing

One solution is to break the stream into small blocks, and treat each block like a miniature batch process. This approach is called microbatching, and it is used in Spark Streaming. The batch size is typically around one second, which is the result of a performance compromise: smaller batches incur greater scheduling and coordination overhead, while larger batches mean a longer delay before results of the stream processor become visible.

However, as soon as output leaves the stream processor (for example, by writing to a database, sending messages to an external message broker, or sending emails), the framework is no longer able to discard the output of a failed batch. In this case, restarting a failed task causes the external side effect to happen twice, and microbatching or checkpointing alone is not sufficient to prevent this problem.

Idempotence

Our goal is to discard the partial output of any failed tasks so that they can be safely retried without taking effect twice.

An idempotent operation is one that you can perform multiple times, and it has the same effect as if you performed it only once. For example, setting a key in a key-value store to some fixed value is idempotent (writing the value again simply overwrites the value with an identical value), whereas incrementing a counter is not idempotent (performing the increment again means the value is incremented twice).

For example, when consuming messages from Kafka, every message has a persistent, monotonically increasing offset. When writing a value to an external database, you can include the offset of the message that triggered the last write with the value. Thus, you can tell whether an update has already been applied, and avoid performing the same update again.

Rebuilding state after a failure

Any stream process that requires state—for example, any windowed aggregations (such as counters, averages, and histograms) and any tables and indexes used for joins—must ensure that this state can be recovered after a failure.

In some systems, network delay may be lower than disk access latency, and network bandwidth may be comparable to disk bandwidth. There is no universally ideal trade-off for all situations, and the merits of local versus remote state may also shift as storage and networking technologies evolve.

12. Ideas for reliable, scalable and maintable applications

Schema Migration in a... railway? 🚂

In the 19th century, there were multiple competing standards for the gauge of railway tracks in England, which created limitations in the train network. To convert tracks to a single standard gauge without disrupting train services for an extended period, a solution was found in the form of dual or mixed gauge tracks, where a third rail was added. This allowed for a gradual conversion as trains of both gauges could operate on the line. Once all trains were converted to the standard gauge, the nonstandard rail could be removed. This approach, although expensive, enabled the gradual change of gauge over several years. As a result, nonstandard gauges still exist today, such as in the BART system in the San Francisco Bay Area.

Just like with an extra gague, derived views offer a method of gradually evolving a dataset without the need for an abrupt migration. Rather than immediately switching to a new schema, the old and new schemas can coexist as separate derived views of the same underlying data. By gradually shifting a small number of users to the new view while most users continue to use the old view, the performance and bugs of the new view can be tested. Over time, more users can be transitioned to the new view until eventually the old view can be dropped. This gradual approach allows for easy reversibility at each stage, reducing the risk of irreversible damage and increasing confidence in system improvements.

Lambda architecture

If batch processing is used to reprocess historical data, and stream processing is used to process recent updates, then how do you combine the two?

With the lambda approach, the stream processor consumes the events and quickly produces an approximate update to the view; the batch processor later consumes the same set of events and produces a corrected version of the derived view. The reasoning behind this design is that batch processing is simpler and thus less prone to bugs, while stream processors are thought to be less reliable and harder to make fault-tolerant. Moreover, the stream process can use fast approximate algorithms while the batch process uses slower exact algorithms.

It does have some disadvantages, like having to maintain the same logic to run both in a batch and in a stream processing framework.

Designing Applications Around Dataflow

Unbundled databases: unifying writes across different systems

Within a single database, creating a consistent index is a built-in feature. When we compose several storage systems, we similarly need to ensure that all data changes end up in all the right places, even in the face of faults. Making it easier to reliably plug together storage systems (e.g., through change data capture and event logs) is like unbundling a database’s index-maintenance features in a way that can synchronize writes across disparate technologies.

Even spreadsheets have dataflow programming capabilities that are miles ahead of most mainstream programming languages. In a spreadsheet, you can put a formula in one cell (for example, the sum of cells in another column), and whenever any input to the formula changes, the result of the formula is automatically recalculated. This is exactly what we want at a data system level: when a record in a database changes, we want any index for that record to be automatically updated, and any cached views or aggregations that depend on the record to be automatically refreshed.

Stream processors and services

Say a customer is purchasing an item that is priced in one currency but paid for in another currency. In order to perform the currency conversion, you need to know the current exchange rate. This operation could be implemented in two ways:

  1. In the microservices approach, the code that processes the purchase queries an exchange-rate service.
  2. In the dataflow approach, the code that processes purchases would subscribe to a stream of exchange rate updates ahead of time, and record the current rate in a local database whenever it changes. When it comes to processing the purchase, it only needs to query the local database. Not only is the dataflow approach faster, but it is also more robust to the failure of another service. Instead of RPC, we now have a stream join between purchase events and exchange rate update events.

The fastest and most reliable network request is no network request at all!

The join is time-dependent: if the purchase events are reprocessed at a later point in time, the exchange rate will have changed. If you want to reconstruct the original output, you will need to obtain the historical exchange rate at the original time of purchase. No matter whether you query a service or subscribe to a stream of exchange rate updates, you will need to handle this time dependence.

Expressing dataflows as transformations from one dataset to another also helps evolve applications: if you want to change one of the processing steps, for example to change the structure of an index or cache, you can just rerun the new transformation code on the whole input dataset in order to rederive the output. Similarly, if something goes wrong, you can fix the code and reprocess the data in order to recover.

Observing Derived State

Pushing state changes to clients

Traditional web pages rely on the user manually reloading the page to see updated data from the server, resulting in stale information. However, newer protocols such as server-sent events and WebSockets allow for real-time communication between the server and the browser, enabling the server to actively push messages and updates to the client. This reduces the staleness of the client-side state and extends the write path all the way to end-user devices. While clients still need to use a read path to initially retrieve their state, they can subsequently rely on a continuous stream of state changes from the server. This concept of actively pushing state changes can be extended beyond datacenters to end-user devices.

The devices will be offline some of the time, and unable to receive any notifications of state changes from the server during that time. But that problem is solved: with consumer offsets, a consumer of a log-based message broker can reconnect after failing or becoming disconnected, and ensure that it doesn’t miss any messages that arrived while it was disconnected. The same technique works for individual users, where each device is a small subscriber to a small stream of events.

In a search index, writes (document updates) meet reads (queries).. Figure 12-1.
Search Index

In order to extend the write path all the way to the end user, we would need to fundamentally rethink the way we build many of these systems: moving away from request/response interaction and toward publish/subscribe dataflow. If you are designing data systems, keep in mind the option of subscribing to changes, not just querying the current state.

Aiming for Correctness

If your application can tolerate occasionally corrupting or losing data in unpredictable ways, life is a lot simpler, and you might be able to get away with simply crossing your fingers and hoping for the best. On the other hand, if you need stronger assurances of correctness, then serializability and atomic commit are established approaches, but they come at a cost: they typically only work in a single datacenter (ruling out geographically distributed architectures), and they limit the scale and fault-tolerance properties you can achieve.

The End-to-End Argument for Databases

Just because an application uses a data system that provides comparatively strong safety properties, such as serializable transactions, that does not mean the application is guaranteed to be free from data loss or corruption. For example, if an application has a bug that causes it to write incorrect data, or delete data from a database, serializable transactions aren’t going to save you.

Exactly-once execution of an operation

Processing twice is a form of data corruption: it is undesirable to charge a customer twice for the same service (billing them too much) or increment a counter twice (overstating some metric). In this context, exactly-once means arranging the computation such that the final effect is the same as if no faults had occurred, even if the operation actually was retried due to some fault.

One of the most effective approaches to this is to make the operation idempotent. However, taking an operation that is not naturally idempotent and making it idempotent requires some effort and care: you may need to maintain some additional metadata (such as the set of operation IDs that have updated a value), and ensure fencing when failing over from one node to another.

Uniquely identifying requests

To make the request idempotent through several hops of network communication, it is not sufficient to rely just on a transaction mechanism provided by a database—you need to consider the end-to-end flow of the request.

For example, you could generate a unique identifier for a request (such as a UUID) and include it as a hidden form field in the client application, or calculate a hash of all the relevant form fields to derive the request ID. If the web browser submits the POST request twice, the two requests will have the same request ID. You can then pass that request ID all the way through to the database and check that you only ever execute one request with a given ID. If a transaction attempts to insert an ID that already exists, an INSERT would fail and the transaction would be aborted, preventing it from taking effect twice.

Transaction Abstraction

Transactions have long been seen as a good abstraction. They take a wide range of possible issues (concurrent writes, constraint violations, crashes, network interruptions, disk failures) and collapse them down to two possible outcomes: commit or abort.

When we refuse to use distributed transactions because they are too expensive, we end up having to reimplement fault-tolerance mechanisms in application code. Reasoning about concurrency and partial failure is difficult and counterintuitive, and so if you attempt this in your applications, it most probably won't work correctly. The consequence is lost or corrupted data.

For these reasons, Martin is right in thinking it is worth exploring fault-tolerance abstractions that make it easy to provide application-specific end-to-end correctness properties, but also maintain good performance and good operational characteristics in a large-scale distributed environment.

Uniqueness constraints require consensus

If there are several concurrent requests with the same value, the system somehow needs to decide which one of the conflicting operations is accepted, and reject the others as violations of the constraint. The most common way of achieving this consensus is to make a single node the leader, and put it in charge of making all the decisions.

Uniqueness checking can be scaled out by partitioning based on the value that needs to be unique. For example, if you need to ensure uniqueness by request ID, you can ensure all requests with the same request ID are routed to the same partition.

However, asynchronous multi-master replication is ruled out, because it could happen that different masters concurrently accept conflicting writes, and thus the values are no longer unique. If you want to be able to immediately reject any writes that would violate the constraint, synchronous coordination is unavoidable

Multi-partition request processing

Ensuring that an operation is executed atomically, while satisfying constraints, becomes more interesting when several partitions are involved. Imagine a payments scenario where there are potentially three partitions: the one containing the request ID, the one containing the payee account, and the one containing the payer account. There is no reason why those three things should be in the same partition, since they are all independent from each other.

It turns out that equivalent correctness can be achieved with partitioned logs, and without an atomic commit:

  1. The request to transfer money from account A to account B is given a unique request ID by the client, and appended to a log partition based on the request ID.
  2. A stream processor reads the log of requests. For each request message it emits two messages to output streams: a debit instruction to the payer account A (partitioned by A), and a credit instruction to the payee account B (partitioned by B). The original request ID is included in those emitted messages.
  3. Further processors consume the streams of credit and debit instructions, deduplicate by request ID, and apply the changes to the account balances.

Steps 1 and 2 are necessary because if the client directly sent the credit and debit instructions, it would require an atomic commit across those two partitions to ensure that either both or neither happen. To avoid the need for a distributed transaction, we first durably log the request as a single message, and then derive the credit and debit instructions from that first message. The request either appears in the log or it doesn’t, without any need for a multi-partition atomic commit.

Timeliness and Integrity

The term consistency conflates two different requirements that are worth considering separately:

  • Timeliness Timeliness means ensuring that users observe the system in an up-to-date state. We saw previously that if a user reads from a stale copy of the data, they may observe it in an inconsistent state. However, that inconsistency is temporary, and will eventually be resolved simply by waiting and trying again.

The CAP theorem uses consistency in the sense of linearizability (a writer waits until a transaction is committed, and thereafter its writes are immediately visible to all readers), which is a strong way of achieving timeliness. Weaker timeliness properties like read-after-write consistency can also be useful.

  • Integrity Integrity means absence of corruption; i.e., no data loss, and no contradictory or false data. In particular, if some derived dataset is maintained as a view onto some underlying data, the derivation must be correct. For example, a database index must correctly reflect the contents of the database—an index in which some records are missing is not very useful.

On your credit card statement, it is not surprising if a transaction that you made within the last 24 hours does not yet appear—it is normal that these systems have a certain lag. We know that banks reconcile and settle transactions asynchronously, and timeliness is not very important here. However, it would be very bad if the statement balance was not equal to the sum of the transactions plus the previous statement balance (an error in the sums), or if a transaction was charged to you but not paid to the merchant (disappearing money). Such problems would be violations of the integrity of the system.

Loosely interpreted constraints

As discussed previously, enforcing a uniqueness constraint requires consensus, typically implemented by funneling all events in a particular partition through a single node. This limitation is unavoidable if we want the traditional form of uniqueness constraint, and stream processing cannot avoid it.

However, another thing to realize is that many real applications can actually get away with much weaker notions of uniqueness:

Many airlines overbook airplanes in the expectation that some passengers will miss their flight, and many hotels overbook rooms, expecting that some guests will cancel. In these cases, the constraint of “one person per seat” is deliberately violated for business reasons, and compensation processes (refunds, upgrades, providing a complimentary room at a neighboring hotel) are put in place to handle situations in which demand exceeds supply. Even if there was no overbooking, apology and compensation processes would be needed in order to deal with flights being cancelled due to bad weather or staff on strike—recovering from such issues is just a normal part of business.

These applications do require integrity: you would not want to lose a reservation, or have money disappear due to mismatched credits and debits. But they don’t require timeliness on the enforcement of the constraint: if you have sold more items than you have in the warehouse, you can patch up the problem after the fact by apologizing.

How many apologies?

Although strict uniqueness constraints require timeliness and coordination, many applications are actually fine with loose constraints that may be temporarily violated and fixed up later, as long as integrity is preserved throughout. In other words, they reduce the number of apologies you have to make due to inconsistencies, but potentially also reduce the performance and availability of your system, and thus potentially increase the number of apologies you have to make due to outages. You cannot reduce the number of apologies to zero, but you can aim to find the best trade-off for your needs—the sweet spot where there are neither too many inconsistencies nor too many availability problems.

This approach is much more scalable and robust than the traditional approach of using distributed transactions, and fits with how many business processes work in practice.

"But hardware never fails!"

It seems unlikely, but if you have enough devices running your software, even very unlikely things do happen. Besides random memory corruption due to hardware faults or radiation, certain pathological memory access patterns can flip bits even in memory that has no faults —an effect that can be used to break security mechanisms in operating systems (this technique is known as rowhammer). Once you look closely, hardware isn’t quite the perfect abstraction that it may seem.

HDFS and Amazon S3 do not fully trust disks: they run background processes that continually read back files, compare them to other replicas, and move files from one disk to another, in order to mitigate the risk of silent corruption.

It is important to try restoring from your backups from time to time—otherwise you may only find out that your backup is broken when it is too late and you have already lost data. Don’t just blindly trust that it is all working.

Designing for Auditability

Today, weaker consistency guarantees became the norm under the banner of NoSQL, and less mature storage technologies became widely used. Yet, because the audit mechanisms had not been developed, we continued building applications on the basis of blind trust, even though this approach had now become more dangerous.

For example, for an event log, we can use hashes to check that the event storage has not been corrupted. For any derived state, we can rerun the batch and stream processors that derived it from the event log in order to check whether we get the same result, or even run a redundant derivation in parallel.

A deterministic and well-defined dataflow also makes it easier to debug and trace the execution of a system in order to determine why it did something. If something unexpected occurred, it is valuable to have the diagnostic capability to reproduce the exact circumstances that led to the unexpected event—a kind of time-travel debugging capability.

Do the Right Thing

Kleppmann is accurate in writing that a technology is not good or bad in itself—what matters is how it is used and how it affects people. I think it is not sufficient for software engineers to focus exclusively on the technology and ignore its consequences: the ethical responsibility is ours to bear also. Think about this as you get closer to users' personal data.

On a personal note, I have always looked contemptuously at recruiters pushing me towards positions for "the next big thing", which mostly ended up masquerading as an online casino app, a dodgy mental health app, or scammy crypto platforms. Chosing the companies one works for (or even applies to) is also an act that cannot escape from this responsability.

Reasoning about ethics is difficult, but it is too important to ignore.

Not-so-data-driven

A blind belief in the supremacy of data for making decisions is not only delusional, it is positively dangerous. As data-driven decision making becomes more widespread, we will need to figure out how to make algorithms accountable and transparent, how to avoid reinforcing existing biases, and how to fix them when they inevitably make mistakes.

Privacy

If the service is funded through advertising, the advertisers are the actual customers, and the users’ interests take second place. Tracking data becomes more detailed, analyses become further-reaching, and data is retained for a long time in order to build up detailed profiles of each person for marketing purposes.

As a thought experiment, try replacing the word data with surveillance, and observe if common phrases still sound so good. How about this: “In our surveillance-driven organization we collect real-time surveillance streams and store them in our surveillance warehouse. Our surveillance scientists use advanced analytics and surveillance processing in order to derive new insights.”

When sharing medical data, there are clear risks to privacy, but there are also potential opportunities: how many deaths could be prevented if data analysis was able to help us achieve better diagnostics or find better treatments? Over-regulation may prevent such breakthroughs. It is difficult to balance such potential opportunities with the risks.

The End

If you have reached this far, thank you! The purpose of these blog posts is to, through Kleppmann's magnificent book and ideas, spark different thoughts that you may have not encountered yet as a software engineer. Let's work together to build better services and products, and ultimately to leave the world a better place than we found it.

Comments

Thank you for your comment! Under review for moderation.