Providing Flexible Database Consistency Levels with Manhattan at Twitter

Boaz Avital

Recorded at GOTO 2016

Get notified about Boaz Avital

Sign up to a email when Boaz Avital publishes a new video

so yeah today I'll be talking about some
context and some practical specifics
about how we build stronger consistency
guarantees into our existing key value
my name is Baz I am on the core storage
team at Twitter we are one of the teams
that provides as a managed service
storage to the engineers at the company
who are building their applications I'll
give you a second to follow me on
Twitter one of the ways that we do that
and have been doing that for a while now
is with a database that we call
Manhattan it is our distributed
key-value store it is the primary key
value store Twitter that we built from
the ground up about four years ago
Manhattan didn't exist and today it is
the canonical store for a lot of very
important data at the company things
like tweets DMS advertiser data and a
whole long tail of other applications
I'll give you some context about the
database among other reasons one of the
things that we built Manhattan for was
to be very easy for developers to adopt
storage and get into production quickly
with their services in order to do that
we emphasized a few things we provide
self-service access to the database
through a web interface we try to hide
as many infrastructure complexities as
we can from our customers the other
engineers at Twitter things like making
transparent global replication across
all of the regions of our data centers
we allow customers to share space on
large clusters this reduces the
provisioning time when you ax ask for
storage and it reduces the burden on the
developers from having to set up and
manage new clusters whenever someone has
a new use case and also we provide some
nifty automatic visibility and
observability and alerting so that
people have visibility into how their
using the database now like with any
successful service as it gets more
popular and gets more adoption people
get used to using it and they want to
start using it for use cases for which
it wasn't exactly designed in our case
that could mean that someone who was
using this eventually consistent key
value store wants to start using it for
parts of their data that needs stronger
consistency guarantees when they try to
do this and they can't with our system
it causes a lot of pain it means that
they have to make trade-offs that they
don't want to make or that they're not
happy with and either they make those
trade-offs through using our system or
through using a different system like in
our DBMS that doesn't provide some of
the guarantees that we provide so you as
an engineer whose job it is to solve
these problems for your customers are
presented with a choice a choice that we
see a lot which is do you build a new
service or use an existing new service
alongside in parallel to the to the
service that you already have to solve
these problems or do you build new
features into the existing service today
now this is a scale and I don't need to
tell you that you don't want to be on
the far side of the scale where you
build bloated software that's not very
good at anything but also when you're
building infrastructure you don't always
want to build services that are just too
specific and there's a few reasons for
that one is running multiple systems in
production can be really painful every
system is going to have a different
operational profile every system is
going to have its own little quirks and
it's going to be painful for your
on-call engineers to know every system
in depth another another is that for
complex infrastructure a lot of things
are going to be common between these
services for a database for instance
you're always going to need to manage
your nodes you're always going to need
to move data around and you're going to
have to either rebuild these components
for the
systems or try to continuously abstract
components out of your systems which can
have diminishing returns and probably
the biggest reason that you don't want
lots of separate systems is that I
guarantee you that your customers do not
want to talk to five different databases
with five different api's each one that
provides a slightly different guarantee
or feature so in reality you want to be
somewhere here on the scale where you're
building specialized systems that are
good at what they do but they can evolve
in a logical and thoughtful way to
provide more functionality to your
customers so that's what we wanted to do
to solve this problem for our customers
and the engineers at Twitter and in
order to talk about how we did that I'll
give you guys an overview of the
architecture architecture of our
eventually consistent database I'm gonna
go pretty quickly over a lot of general
concepts if you are familiar with
distributed systems or with distributed
databases a lot of this is going to look
very familiar but if not feel free to
ask questions after the talk or find me
at the conference as a good place to
start as any with the database is with
the data model Manhattan as I said is a
key value store and we split our keys
into two components a partition key or P
key and a local key or L key local keys
are stored together on the same machine
underneath the partition key they have
locality that's why we call them that
and that means that you can do
interesting range side queries for
instance in this data set if the p key
is a user ID you can say something like
get me all of the l keys that start with
the word profile for this particular
user and you would get the user's
profile this should look familiar to you
if you've used Cassandra or BigTable as
a row and column scheme or if you've
used dynamodb as a hash key and range
key so we take these keys and we
distribute them around the cluster the
way we do this is by hashing the
partition and key
it into randomly into a shard and then
replicating those shards and
distributing them to the nodes in the
cluster when you want to make a request
you send a client sends their request to
one of many stateless symmetrical
coordinator nodes those nodes take the
request take the key from it find the
shard that that key belongs to find the
storage nodes that that shard lives on
and then forwards those requests in
parallel to all of those storage nodes
waiting for a response that response
could be a success if it's a right it
now when I say that the coordinator
nodes forward this data in parallel to
all of these storage nodes that should
be a giveaway immediately that I'm
talking about a quorum based eventually
consistent system specifically in this
case we use Dinamo style eventual
consistency as in the Amazon Dinamo
paper and we have last right wins
conflict resolution by doing this we can
forward the requests in parallel to all
of the hosts and those requests can
succeed and fail independently of each
other without necessarily affecting the
overall success of the entire request
and as long as you are making requests
to a quorum of hosts if that's important
out of three and those requests succeed
you're guaranteed to get certain
consistency levels such as read your own
rights in this case so if these are
fails because of a network partition or
because of a rolling restart and
subsequently you do a read from a
database and in all of the read also
succeeds against a quorum of hosts even
if one of those hosts previously failed
you're guaranteed to see at least one
version of the data that is the latest
course we can't call it eventual
consistency if we don't eventually make
the data consistent across hosts so we
have an anti entropy process that runs
in the background called replica
this goes and compares the shards across
every replica making sure that they have
the same data and if they don't it
copies that data over so these are all
some pretty standard straightforward
concepts why would we choose to build a
system this way
the primary benefit that you get from
building a system like this is super
high availability because all of the
storage nodes are equal to each other we
don't have a mastership relationship we
hiccups from things like failover and
master reelection and because we chose
to send the request in parallel to all
of these hosts this is a latency
throughput trade-off you're essentially
waiting for a best-of n responses from
these replicas and you're getting the
best possible latency that the system
can provide at any time and you also get
simplicity and the simplicity is in
exchange for maybe a slightly less than
intuitive consistency model you get last
right wins or really with any kind of
hands-off conflict resolution a way to
transparently move data asynchronously
replicate data from one set of hosts to
another and have that data be seamlessly
integrated into the existing data in the
system so if I copy data over through
replicas reconciliation or if I
replicate data over from a different
region that just combines with the data
that's already on the system in a way
that makes our cross region replication
story very nice very easy for people to
use and you don't have to think about
how my cross region data is working in
the database but as I said for some
people it's not quite enough when is it
not enough one use case is much like in
any other kind of concurrent programming
when you need a uniqueness or check and
set constraint in the face of concurrent
updates to the database so an example of
that could be if two users are trying to
both reserve a username
not only do I need only one of these
users to ultimately succeed in reserving
this username I can't have races that
result in both of them thinking that
they succeeded in reserving the username
because this can break the business
logic of your application so for
instance if both of these users tried to
they both saw that the username was free
if they both got a success on that
assignment but only the second person
who happened to make that request
succeeded this is very awkward behavior
for your application to have to overcome
and it it also applies across regions
globally so two people might be trying
to reserve a username one of them is in
Chicago one of them is in Nairobi even
if they're talking to different data
centers they should still get these same
guarantees and in the same example you
want one other thing which is a de Missa
T of your updates if I send an update to
the database to reserve this username in
everywhere in the case of success I want
it to succeed everywhere I can't get
into a state in a failure Canario where
a minority of hosts think that a
username is reserved and I can't
guarantee a predictable read when I go
back and try to see who has this
username this is a state that you can
get into with eventual consistency and
that state will be resolved either in
one direction or the other depending on
whether this existing host dies or not
so it's unacceptable for some use cases
genuine need for a stronger consistency
model how do we adapt our current
we can add a new component into our
request path which is logs specifically
sequential replicated distributed logs
in the form for us of Twitter
distributed log which is a recently open
sourced log library built on top of a
patchy bookkeeper the purpose of a
sequential log is that all of the
updates that go into this system into
this log are able to be read in the same
order from all consumers on the way out
it gets complicated when you replicate
this log for durability and you
distribute this log for resilience and
scalability and the log has to deal with
concepts like consensus or mastership
with failover and fencing and it has to
get all these complicated concepts right
now this kind of component is becoming
more and more popular in lots of
different kinds of infrastructure if you
were at the last talk here then you saw
Kafka which is one type of distributed
log and so I won't talk about how these
logs are implemented but I will talk
about how we use this log what are some
of the complexities of interaction when
we introduce this new kind of component
into our request path and have this new
kind of consistency model so I said
earlier that all of the keys are
partitioned into shards this shard is
this ultimate granular unit of
partitioning and distribution in the
system so we can move shards around in
between hosts what I can choose to do is
say that all operations that are going
to update or act on a shard have to go
through a log and I can make this log
per shard infrastructure and then I can
have all the storage nodes that owned
this shard consume all of their updates
for that shard from the log and
have this guarantee that all of those
updates will be guaranteed to be
delivered and be delivered in order from
the log and now once I have this I can
make these operations pretty much every
anything that I want them to be there
can be reads or writes as they were
before or I can start sending compare
and set operations I can start sending
increment operations I can send any kind
of operation that is deterministic based
on the data that arrives in the
operation itself and the data that
already exists in the database which is
guaranteed to be the same on every
storage node at the time that that
operation is processed so we can already
see that by introducing this we can
solve some of the problems that we were
worried about before let's take the
reservation example I can model a
set operation where I say if a username
is free assign my user ID to that
username and two users who are
concurrently making this request will
appear in this log in some order which
order doesn't really matter because
they're doing it concurrently and either
order is valid but what's important is
that all of the storage nodes all of the
replicas that process this request see
the same order so what will end up
happening is that the first one that
enters the log will be able to reserve
that username and the second one will
fail its check condition and not reserve
the username and see a happy application
level error this kind of simple change
to the messaging model allows our
guarantees to be strengthened quite a
bit so let's zoom out a little bit and
see how this fits into the larger
architecture now when a client sends a
request to a coordinator instead of
forwarding that request directly to the
storage nodes the coordinator forwards
it to the log that owns the shard for
which that key lives on and then the
storage nodes
consume from that log playback the
operation and respond with again the
success if it's a right the value if
it's a read or the whatever the result
is of your more complex complex
operation back over to the coordinator
now if you notice I have a new guarantee
here that I didn't have before which is
that if the right enters the log I know
it and if a storage node processes an
operation I know that all the other
storage nodes that process that
operation will return the same response
because they have the same data so now I
can start to do more interesting things
with my request model so that I can keep
the same guarantees that I want to have
while lowering the latency on the client
side for any given request so for
example let's say I know for sure that
all of my rights and all of my reads to
an to my data set are going to go
through this sequential log well now I
can say if I'm sending a write as soon
as that write enters the log I'm done I
know that that write will be eventually
be applied on to the storage nodes so I
only need to wait for the log to act
that it has this right durable before I
can return a success back to the client
and I know that any subsequent read will
be cued behind that write in the log
therefore it will see the write as I
expect it to and when I send that read
or if I send out a more complex
operation that needs a result I only
need to wait for one of the storage
nodes to respond because I know that all
the other storage nodes will respond
with the same data so this makes certain
kinds of strong operations happen a
little bit faster we either get to wait
for the log response or the fastest
response from a host but I can still do
something interesting I can say I was
waiting for a quorum of responses before
wouldn't my eventually consistent model
but I wait for a quorum of responses now
so if I send a right or if I send a cast
or an increment or something I'll wait
for a quorum of responses to come back
and if I do that I know that that
request is durable and consistent on the
storage notes themselves and when I send
a subsequent read because I didn't
change my request path I only added a
new option to the request can take in
this database I can send a read through
the normal eventually consistent fan-out
path and when I do that I get all of the
benefits that I was getting before from
an eventually consistent request high
availability low latency without eating
any of the cost of going through the log
but I still get to do updates in this
more strongly consistent fashion so this
is an interesting model and in a
nutshell this is how we can add this log
component into the database to add
stronger consistency so let's see what
are the results that come out of this
well first we've gained what it is that
we were trying to gain because updates
are ordered per shard and keys are
hashed randomly into shards but
consistently into shards we effectively
gained in order updates on a per key
basis linearise ability for our keys
technically we have in order updates for
everything in the shard but because you
as a user don't know for sure if any two
keys will end up in the same shard you
don't really rely on that it's important
to note that we don't have full
distributed transactions across shards
that's a more complex problem and it's
something that would have to be built on
top of this more basic system we also
have or we can say we kept failure
isolation at the shard level because all
of our rights go through logs that are
owned by shards if we have a hiccup or
an issue or a problem with an individual
that problem is constrained to a very
small sliver of the key space that
belongs to that shard and also because
we've only added this new path into our
architecture instead of replacing it
completely we have this ability to do a
couple things not only can we mix strong
and eventually consistent datasets
within the same cluster we can actually
to some extent mix strong and eventually
consistent requests within the same data
set and this is what I mean when I say
that we're providing flexible
consistency levels a user can move back
and forth across this continuum between
super high availability and pretty
strong per object linearize ability by
just modifying the kind of request they
make on a per request basis to the
database and that's really powerful for
our customers and allows you to get
exactly what you wanted to the system at
course we made trade-offs first
trade-off is an obvious one we've added
a new component it's not our
particularly simple component into the
request path and so on average we're
gonna add some amount of latency in the
local data center that could be a few
milliseconds of latency if you're
talking about doing coordination across
regions that is constrained by the
disappointingly slow speed of light and
so you're gonna have an order of
magnitude more latency added to your
requests that have to coordinate far
across the globe we've also as much as
we tried to isolate failures exposed
ourselves to this thing that we were
trying to avoid which is latency hiccups
so when there are some kind of problems
or re-elections on an individual log
that slow down the updates for an
individual shard a customer will see in
their high percentile agencies an
increase and that has to be okay for you
that has a trade-off that you have to be
okay with as an application developer
and finally an interesting one
because we said that all of these
updates have to happen in order and they
have to be applied in order to maintain
the integrity of the database and the
data on the database then we have the
situation where if for some reason
replicas cannot apply an operation that
it reads from a stream from a log the
only option that we have is to
completely halt operations for that log
so why why might we not be able to apply
an operation normally everything should
be applied but operating at scale we
have to take into account that there
might be bugs we have to take into
account that there might be some kind of
exceptional corruption issue and that
individual replicas now cannot continue
to process operations from that log and
we have a formal unavailability this can
be scary certainly sounds scary and it's
where things like excellent
observability excellent alerting
excellent tooling come into play to make
sure that if this happens because you
have to account for all possibilities we
know about it immediately and we resolve
it as quickly as possible so that's what
in and it's important now that you have
it to think about how is this new model
that we've added different from the
existing model and how do we use this
new model and operate our clusters with
this new model in a way that isn't
special or unique that just works with
the system that we already had so that
for us and our customers we're adding
something without losing too much so
when I think about how these event this
is eventually consistent and this
strongly consistent model are different
the first thing that comes to mind
is where the authority of your data
comes from in the dynamo style
eventually consistent model the
authority of your data is delivered
alongside your values in the form of a
version or timestamp this version
information is what's applied to the
backend and decides if the new data can
override the old data so in this example
we have values we have a key with value
X at version 1 already on these notes
and a new key is the update to that key
is coming in with value Y at version 2
and the only reason the reason that that
update is accepted is because the
version is newer this is interesting
because it effectively decouples the
authority of data from the order in
which that data comes in it's what gives
us that conflict resolution it's because
of the conflict resolution it's what
gives us the the nice properties that we
got before it's what gives us the
idempotence of the updates that go into
the database but it's interesting to
reason about even for the benefits that
it provides the new model is completely
different the order of the updates is
all that matters for the data in the
back ends and if I do want to keep
around some version information then I
have to maintain that independently on
every single host and if I want that
version information to be the same on a
across the hosts which is something that
I do want if I still want to be able to
make those eventually consistent style
quorum requests then I have to do that
as an implementation detail of how I
maintain versions because it's not an
intrinsic property of the system that
the data has versioning and when I'm
applying this data I have to store it
locally so that I know that I'm applying
I have to store where I am in the logs
so how does this manifest one
interesting thing that comes out of this
is that because I have this guarantee
that I will apply all of the data that
comes in on the log in this order that
it comes in I have no more need for this
replica reconciliation process in fact
if I was to go and compare the data on
shards and try to proactively correct
the data that exists on the shards I
would be introducing more consistency
issues than I would be solving because
of the way the system works so that's
cool but it also turns out that we still
have a need for a very similar type of
system that goes through and compares
the data between hosts because when you
have a cluster with terabytes upon
terabytes of data in it you have to
constantly be checking for things like
bit rot for potential bugs and
potentially correcting them so with the
savings banana enormous savings another
thing that changes is when you're doing
data migrations so at Twitter like at
most places when we build a new system
we generally want to take advantage of
the benefits that we built it for by
migrating some services from the old
systems that they were on on to the new
system and to do this for a database and
to keep that service live and not have
any downtime what you generally do is
send requests especially write requests
that are coming in to that data bit that
are coming into the service to both
databases and then in the background you
backfill old information from the old
database system to the new one in an
eventually consistent key value store
with conflict resolution this turns out
to be pretty trivial to do you simply
send rights to both of these systems you
copy the data over however you want to
the conflict resolution takes care of
everything for you when you're done
copying you can just switch your reads
over to the new system pretty simple for
the most strongly consistent model this
is no longer the case if I want to do
something like move an application that
was on my sequel or in our DBMS onto
Manhattan we might have operations
running that rely on the data that
exists in the system while also updating
that data things like compare and set so
I have to write extra code on a per
application basis to make sure that the
data is populated correctly between the
two systems and that we maintain
integrity and correctness a change that
we had to make in the same vein is to
the process of how we add and remove
notes from the cluster which is
something that we call a topology
transition so again an eventually
consistent system doing a topology
transition moving a shard from an old
set of hosts to a new set of hosts is
pretty easy you start directing rights
that belong on that shard to the new
replica set as well as the old one you
snapshot the data you copy it over when
it's done being copied over you move all
of the rights and reads to the new
replica set because order doesn't matter
this works out in on its own but with
the more strongly consistent model you
have to think a little bit harder
because you can't just start sending
writes to the new set of hosts you have
to change the topology state machine the
transition state machine that manages
the addition of hosts to accommodate
these new components so the first thing
you do now is say instead of accepting
rights just say I have a new set of
replicas in the cluster and I care about
the updates that are coming in off of
this log for this chart so at the very
delete them from the log before I have a
chance to read them then you snapshot
the data and copy it over as normal and
then you add this new state to the state
machine which for eventually consistent
data is a no op but for strongly
consistent data that reads from a log
says give me a chance to catch up all of
the updates that I've missed from the
log from the point where I got the
snapshot you need to do this catch up
before you start accepting requests
otherwise as soon as you start accepting
requests from these hosts and saying
that they're trying to listen for the
responses that these hosts send back to
the coordinator you're suddenly will be
taking a request that was taking ten
milliseconds and making it take multiple
minutes or however long it takes for
these nodes to catch up to the current
state on the log one last change that we
made was in how will in how we
implemented TTLs
so a TTL or a time to live is a really
useful feature where you can indicate
that a value should no longer be
readable after a certain point in time
you might use a TTL because you're
collecting some kind of user data and
it's not relevant anymore after a few
days so you don't want to be able to
read it you want it to get cleaned up
you might use it because a user is
active for 30 days or something so you
want us when a user logs in you'll say
this user is active set the TTL for 30
days automatically after 30 days the
user won't look too active anymore
if they haven't logged in again so in
the eventually consistent model its
implemented in a pretty straightforward
way if you read a piece of data from a
node and that node thinks that that
piece of data has expired
it simply doesn't return it to you this
is going to be subject to the normal
amount of clock skew that happens across
nodes in a data center but it turns out
that for our eventually consistent model
that's okay it doesn't break anything
everything still works as you expect it
to work but of course for strongly
consistent model it's no longer okay
because the exploration of this data is
in data manipulating event just like any
other and we know that all these such
events have to happen in the same order
so you can imagine relying on the clock
of the node and these nodes having clock
skew and doing something like a check-in
set operation and they found some nodes
you're doing a check-in set operation
against the value that exists and on
some nodes you're doing it against the
value that the node thinks doesn't exist
you've now introduced a divergence into
the into your data you have an
inconsistency between your replicas and
this is exactly the case that we were
trying to avoid
so in order to get around that we
decided to no longer trust an individual
nodes concept of the time instead of
relying on the notes clock we send with
every operation a timestamp indicating
what time the note should think it is we
can do this because our distributed log
architecture allows a single master a
single writer for every log and so we
can ensure that this concept of time
that this clock is always monotonically
increasing so when we send each
operation the node updates its concept
of time and now only across operation
boundaries can data be expired due to
TTL and that way we guarantee that all
these things happen in the same order
it's a pretty interesting solution it's
a very popular use of our database and
it's not generally TTLs is not something
that you've seen more strongly
so one last thing I'll say is that at
the beginning of the talk we discussed
what it is that the users of our
database wanted and in order to support
what they wanted we made all of these
changes to the backend of the database
we introduced to quite differing
consistency models living side by side
and the question is did we succeed in
adding functionality without disrupting
the users experience so in order to use
this new mode of the database all the
user has to do is when provisioning the
data set in the web UI they choose a
stronger consistency type and when
making a request through our client they
set a stronger consistency guarantee or
if they're doing an operation that
didn't exist before like a compare and
set they can use a simple API for that
and not just that if they try to do
something that we don't think is safe to
do against this data for instance if
they send a request that is trying to
update in a fan out eventually
consistent way a data set that's
strongly consistent it's I'm trying to
send a write not through the replicated
log we inspect that request we decide
that it's not safe and then we send back
a very long but clear error message
saying what it is that they're trying to
do why that's not OK and what they might
one of the most important things about
this database that we were trying to
achieve when we built it which is that
it's very usable and easy for people so
that they can adopt quickly without
bringing our team into the loop for most