Event Streams at Groupon w/ Storm, Mesos & Griddle

AJ & E.Weathers ft. B.McCallister

Recorded at GOTO 2015

Get notified about AJ & E.Weathers ft. B.McCallister

Sign up to a email when AJ & E.Weathers ft. B.McCallister publishes a new video

so hello good afternoon I'm Brian
McAllister of the special guest and
today we want to talk about processing
very large event streams at groupon is
the mic coming through clearly if I hold
it about this distance good so there is
myself to co-workers AJ and Eric
weathers who are actually have all the
fun meaty parts of the presentation I
get to do the setup and sort of talk
about the problem because i'm going to
start coughing in about ten minutes so
at groupon we have a whole lot of events
that we process be they web logs mobile
logs application level events sort of
operational metric type events you know
and if you just talk about ones feeding
into the bulk of our storm topologies
which is going to center around a lot
you know it's on the order of you know
100 to 200 thousand events per second
sort of nominally throughout the
interesting part of the day which isn't
gargantuan but it's big enough that you
have to start doing some interesting
things particularly when you couple it
with dozens of different ways of
processing this that you want to do in
an sometimes creating new streams out of
those streams to feed back in and so
this talk is going to focus on what we
have learned in doing this and what we
developmentally and operationally and so
the starting point for almost all of our
stream processing is storm which came
out of back type and Twitter few years
ago I don't want to go too deeply into
hail storm work so I'm going to do a
quick poll who here is comfortable with
the basic abstractions of storm that's
not as big a cause okay so I'm going to
go into them a little bit more so storm
works on what it calls a topology and
it's apology is made up of spouts which
send events into the system you know so
in our case the starting source of
is almost always Kafka so there's
something called a Kafka spout that
reads out of actually hold on quick show
of hands who does not know Kafka does
not know kafka kafka is a
high-throughput a pub sub system that
works pretty well and you can just think
about it that way it's a high-throughput
pub/sub that works pretty well as
compared to you high-throughput pub/sub
that doesn't work well which is the bulk
of them including the ones I've written
so spouts feed things into storm you
then have processors that an event feeds
in as what storm calls a tuple this is
probably because it started its life
enclosure and closure hates types so
everything is a tuple although it gets
complicated now that most people write
their stuff in Java or even Ruby um
within storm and conceptually a bolt
does it a little bit of processing and
then emits more tuples that go to
another bowl that emits more tuples that
go to another bolt so you can start
gluing together how you want to process
the data a lot like piping things
together in the shell you know a way of
thinking about dealing with streams of
data that most developers are pretty
familiar with and you can omit multiple
ones this sort of a little tiny topology
i have here doesn't really do that but
we will see much more complicated
topologies later in the talk and so the
thing that storm does is it provides a
programming model for doing this that's
pleasant to work in as a developer and
very importantly it also assumes that
you can't do all of this home one
machine because you just have too much
data or too much processing and it
provides very good error recovery error
handling mechanisms to replay streams of
tuples from points in time to handle
failures that's really the main benefit
that storm brings over something
hand-rolled is that it has some clever
things for handling error in recovery in
a distributed system which we're not
going to actually go into that except to
say that we you turn them off when you
want high throughput so from a developer
perspective what are you writing a bolt
it really is you know a tuple comes in
you do some stuff like here you know we
get a span this is actually
a subset of a topology for processing
Zipkin Street Zipkin spans we do some
stuff with it you do some stuff with it
and then you emit in a different tuple
these things typically and you know this
is you know going in so when you act a
lot of the colors really don't show up
on the projector this is all colorized
that's not going to help you little too
subtle I apologize so because we don't
want to go into how storm works to a
large degree from a programmer
perspective we want to talk about how to
do it when you've got lots and lots of
teams using it lots and lots of events
and how do you do that reasonably
efficiently it's important to understand
the core abstractions that storm uses
because we're going to talk about them
and then we're going to twist them
around sideways and put them through a
wringer and do well Eric's going to do
horrible things to them so when you look
at storm there's a few other things like
a zookeeper cluster because you can't
have a distributed system without
zookeeper in 2015 with Nimbus the Nimbus
is conceptually the controller for the
whole thing it's sort of assigns you
submit your topology which is your graph
of processing you know to the Nimbus and
it puts it out to a bunch of worker
nodes you know so this is like three
different servers now the worker nodes
you know have a supervisor running and
this is similar to an or lying
supervisor but just different enough to
be confusing where a supervisor runs a
worker a worker contains executives and
the workers are separate processes their
child processes of the supervisor they
started yet it's it's own JVM the
executor is not a Java executor despite
being in Java it is a single execution
thread in a scheduling thread and you
usually if you put tasks for a topology
into an executor um you can technically
put more than one task on an executor
running in a worker but you don't ever
want to do that because it there's no
it's bad and if anybody actually knows
why you'd ever put more than one task
into an executor find me afterwards and
explain it because I do not understand
it's possible so it's apology then that
we saw is split up into a whole bunch of
little tasks that work so if you think
about the spout the spout is set up you
run three instances of the spout here
you know say this is Kafka we have three
partitions that you can actually get
three simultaneous readers that spew
things in then you have a stream of
stuff going from the spout pushing into
boltay in bolt X and it you know you can
do various things to say try and put it
on the same machine but if you can't put
it somewhere else and it's always one to
one so this will emit a tuple into here
this will admit at uppal into here or
into here and storm takes care of
plumbing the tuples all the way through
the system the worker you know takes
care of running all the tasks for a
topology and here we have what's storm
out of the box approximately looks like
trying to run just that one topology now
we have dozens of topologies so it gets
even more complicated which is what the
rest of the talk is about all half of
the rest of the talk is about um I do
want to stop here though because I went
through the explanation quickly and i
want to say if you do not understand the
different components here a physical
server or virtual machine runs a
supervisor a supervisor runs workers
possibly more than one a worker runs
executives which run tasks associated
with a topology there's a lot of layers
of sort of nesting of control here and
it's going to get murky so this is your
opportunity to ask the questions before
it gets murky I'm Way better at
explaining that I thought I was if
there's no questions going once going
twice okay then so that's the basics of
storm out of the box now to talk about
how we actually make this work for a
large number of teams a large number of
topologies on a slightly smaller set of
we have Eric weathers who leads the sort
of storm on mezzos work at Groupon so
just working yeah cool okay so as Brian
said we have a ton of different
topologies that are running inside of
Groupon and it used to be that
individual teams would have to manage
their own storm clusters gain all the
expertise on running storm learn about
all the different operational aspects
and then we also have inefficiencies
where we have underutilized clusters
because of general issues with needing
to acquire hardware in advance to
provision your machines for clusters and
also that storm itself doesn't lend
itself to efficient utilization of
underlying hardware because of the
resource scheduling model of storm which
I'll get into here in a second so first
I want to introduce the concept of May
soaps and how it works in general and
then I'll apply it specifically to our
use case of storm so if no one's heard
of May sews or I'll do the survey thing
as anyone is how many people are
familiar with mesas so that's actually
pretty cool so I don't have to go into
super detail but basically may sews does
resource scheduling on worker nodes so
given an application framework like
Hadoop or storm or MPI or anything else
you can imagine it might want to run on
many different hosts spread across a
cluster and may sews allows you to
schedule your tasks on to these various
worker nodes it has its own mechanisms
for maintaining state for the master so
that it can be a che and it two of the
core concepts of may so that need to be
understood for you to really grasp the
rest of the talk my section at least are
the scheduler and the executor so each
framework storm Hadoop whatever has a
scheduler which is a thing that makes
those talks to and it says like hey do
you want to run anything on mesa sand
the schedules like yes please run
these tasks then may sews will realize
those tasks on to the actual worker
slave notes and there's different ways
of doing it but the standard way that we
also using in storm is to run another
executor and this is where we get into a
little bit of terminology confusion
because I've talked about tasks and
executives here and Brian also talked
about tasks and executor and these are
different things so again going back to
the storm model you have executives and
tasks these are the most low-level
things in this whole system they operate
on the individual tuples that are
flowing through the system they might do
things like aggregating together two
different log lines whatever you can
imagine and they just run in this
threaded model of the executor the tasks
do now form a sauce going back the
executor was the actual controller of
everything that's running on a
particular slave for your framework and
it's going to run individual processes
for your framework as makes those tasks
so this is kind of the matryoshka doll I
forgot how you said it but babushka
dolls I don't know it of the different
layers so the outside you have a worker
host within that you have a storm
supervisor which is actually the mace
executor within that we have the storm
worker which is actually the mezzos task
process then you have the storm executor
and then the storm task so whenever
you're talking about tasks and
executives with respect to these two
different systems you need to be very
clear about which level you're talking
so to bridge together this two different
systems we leveraged an outside or
rather existing open source project for
creating a framework on top of mesos for
storm there's actually written by the
original author of storm nathan martes
and so we've just run it for a while
unadulterated and we've recently started
making changes to it so the way it works
is that it interfaces between storm and
maces by implementing various Java
interfaces and I have well get to it a
little bit about exactly how the
work but the most important thing to
understand really for thinking about
storm vs. mesos is that they're very
different in terms that are resource
scheduling storm has no knowledge about
CPUs and memory and things like that
that Mesa AZ knows all about because how
mesas works is it looks at a given host
and it's like okay we have 20 cores we
have 20 gigabytes of RAM available and
it offers that out to the schedulers
saying hey you guys want to use this but
storm doesn't know anything about that
all it knows is this concept of a slot
which I'll explain so imagine this top
box is a single host it has four slots
we're really all they are is a place
where it can store that store but run a
worker process because storms Nimbus has
these work or processes that are going
to do all the processing it needs to sit
them on two nodes so what it does is it
asks the cluster hey how many slots do I
have available and it's like okay I have
four from this host and it's like okay
I'll just shove some topologies into it
so that's what I represent below where
topology red is in the top left and then
the peel one is on the right with two
different worker processes in the single
host and there's a third topology in the
bottom left but one of the bad things
about this whole model is it's very
static you have as you set up your storm
cluster you have to decide how many
worker processes you're going to create
you can't make it dynamic to the actual
demands of the various topologies are
going to be running within your system
so it's pretty inflexible and if you're
allocating different types of machines
you have to think about well this is a
much be free machine how many slots does
this really represent so we'd prefer to
have a model that instead takes into
account the actual real resources within
the slave nodes and allows us to
schedule what the actual topologies need
the other thing all reference is that
when you're running inside of a standard
storm cluster you have no real incentive
to think about how many CPUs or how much
memory do I need per worker process so
it can lead to some inefficiencies in
terms of how you're tuning your topology
you might just throw a bunch of hard
metal at it and not really think about
like oh this process really doesn't need
ton of memory it's all it's only doing
CPU work but when we put this stuff on
top of May sews it forces us to start
thinking of this and gains more
efficiencies in terms of usage of the
hardware so I already kind of covered
this but basically mesas thinks of CPU
and memory storm doesn't make those
deals with allocating the CPU and memory
for tasks on toast storm again only
thinks about slots so this is how storm
this is like another picture of the
earlier representation Brian showed up
storm where you have a worker host you
have a supervisor and you have different
topologies that run under supervision of
that single supervisor in the storm of
mesas model it's actually different so
we allocate different supervisors for
every topology that run on a particular
host the advantage of this is that you
can isolate the topologies from one
another you can decide that this
topology needs more ram this topology
needs more CPU and you can allocate them
appropriately and the other thing that's
really important to note about the
something that's kind of hidden this
diagram is that these purple boxes also
represent Mesa executor 'he's which
actually if you're using c groups are
containers so they have hard bounds on
the amount of memory or CPU or other
stuff as May sews improves it some
resource handling so you can avoid
different topologies impacting each
other whereas in this former model
topology see could potentially take over
all the RAM on the box and squeeze out
topology a and B so that's one advantage
so this this diagram is a little bit
busy present is that we need to somehow
map the resources that may so snow is
about into slots and storm has this
model where it knows beforehand I have X
slots but in the maze world there's no
such thing there's not a pre-existing
number of slots we have to somehow take
the existing amount of resources that
been offered up by May sews and munge
them in
do a set of slots for the worker
processes to get scheduled on to you by
storm so this is representing how the
two systems interface they go through
Java interfaces and resources come in
from asos get calculated into slots and
then storm schedules worker processes
onto those slots sending it back through
the interface again into Mesa swear
supervisors are created that then start
worker processes so let me talk a little
bit I'm going like too fast I guess but
let me talk about how Groupon storm as a
service actually looks on top of this
existing technology that is out there in
the real world so we have a dedicated
mason's cluster where we run storm on we
created a submitter application where we
can gate the topologies that are coming
into our system so that we can control
and kind of know who's running
topologies eventually we could use this
to block topologies from even launching
if they're going beyond certain like
resource request limits we've overcome
something that I haven't touched on yet
but you might have noticed in an earlier
slide here that this is a spa single
point of failure the nimbus if it goes
down then storm is unable to keep
launching new workers to notice that
workers are down and needs to create new
ones to take a new topologies and launch
them so you have to work hard to get the
numbers back up as soon as possible if
it goes down storm as Brian reference
does store a bunch of state in zookeeper
but it does have state that isn't in
zookeeper such as the storm jars
themselves for your topology so that's
why it's Singapore failure right now and
doesn't have a che so that leads us to
the are sinking thing so we are synced
from our current master Nimbus on to a
backup one so that we can bring up that
one pretty quickly because all the state
is either on that house or in zookeeper
and we've gone through that exercise a
couple of times and it works well we've
also had to created had to create a
couple of libraries to give us
visibility into the system to allow our
users to be able to use the system
nicely so we allow you to log out into
Kafka or splunk using our libraries we
also provided a metric system so that
people can get a view as to how their
topology is performing like how many
tuples per second are going through
different bolts how many acts are coming
back you can also put application level
metrics out into the system so that you
could graph various application level
interesting things for for your
particular topology and we implemented
this just on top of a built-in storm
thing that's not very well-documented
called the I metrics consumer yeah so
the pros for using this system instead
of using native storm are that provides
true isolation storm does provide
something called the isolation scheduler
however it statically partitions the
cluster so the way it works is that you
would dedicate particular worker nodes
to a topology so this topology would
only ever run in those nodes as those
would only overrun that topologies
processes so we didn't really like that
that's an advantage of using the Mason
space system provides us flexibility for
increasing or decreasing the number and
size of the worker processes that you
have for your storm cluster or topology
rather in cluster it avoids us having a
bunch of separate underutilized clusters
that everybody's having to manage
themselves look into the future and try
to predict how stuff is growing and have
extra hosts it just it it reduces a lot
of the burden it also gives us some
consistent visibility into the
operations of our storm clusters so that
we can compare across topologies and be
like oh well this topology is behaving
well this one isn't so it really is
something about your topology and not
the underlying system that we've built
well I guess I will yield the floor to
aj at this place all right turn myself
off all right so this is cat we like
cats a groupon but this is a bullet cat
and called a cat tuple so the griddle
part of the talk so the griddle part of
the talk is going to be focused on not
just throughput but latency so storm is
really focused on getting the most
number of tuples as you can flowing
through the topology if we go back to
that storm picture with all of those
boxes both Brian and Eric talked a lot
about how a tuple will leave the Machine
and go somewhere else okay so how about
cases where latency matters and you
don't want to bear the cost of
serializing deserialising network I oh
okay so this is called griddle griddle
basically gives two teams that want to
use storm a DSL which expresses with a
DSP like workflow what I mean by that is
just P traditionally as tiny little
things that you want to combine together
to make the sort of Gestalt behavior
they should be easier it should be easy
to mix and match the syntax on the dsl
is like adjacency graph to Claire
verdict says you declare your Jason sees
the topologies that I work with which
are specifically place data business
data we do lots of normalization so
we're really focused on how do we get
storm to be useful for data scientists
or machine learning folks and also we
talk a lot about or I can talk about
about mechanical sympathy which is how
those things to be in CPU cache you want
to run as fast as possible maintence e
matters ok all right so now we'll talk
about some pictures and they're just
going to be DAGs and this is a very
small portion of the place data topology
having very complex topologies in store
storm is a bit rough it's hard to
declare them
so our place topology is i think twenty
to thirty nodes which is a lot for storm
it also means that you have 20 to 30
cues okay all right so let's just talk
about this so we want to normalize the
country code we want to do some things
with the website postcode this stands
for out of business not out of band and
we geocode so this special little blue
border is indicating that this is the
only note here that needs to do any I oh
ok so it goes out to our happy clock
because that cloud is getting data so
it's happy ok and then we want to do
things with the phone numbers alright so
this is very abstract it doesn't really
capture some really important business
use cases which are described here whoa
okay so I recognize that this picture is
ugly it's intentionally ugly because
it's trying to express that in the real
world things get complicated so what
this box up here means is that we have a
whole sub graph which until we have qaid
the effect of these vertices on a
country's data that we don't actually
want to run them we would like to leave
the data as it is because we're not so
like let's say we have a nun bored at a
particular country yet so place data we
get from all over the world and we've
optimized the architecture so that we
can add country by country by country
okay so let's say we want to add a new
country let's say we're doing Spain for
example and let's say these don't QA
well okay but we don't want to delay the
business from being able to do to call
businesses in Spain so we're just going
to do this part okay or it may be be the
case that we're starting to accept
countries that we haven't even started
training on okay so that's fine so
they'll go through here too all right so
go back to hear this is a really simple
graph every vertex has an indegree of
one every edge is going to get walked
all right so I'll sudden we're here we
now have our texts that have in degree
of two and not all vertexes are going to
be walked
in fact let's say we do want to process
our country well then we're going to go
here or go go here post condition exists
just to be like yep we did or we didn't
this okay if we don't want to process
the quality of these normalizations well
then we're going to come here so that's
interesting in the sense that those two
edge sets are not disjoint okay alright
so let's go ahead and maybe obviate
these things to make this graph look at
least a little more manageable first
thing we're going to do is we're going
to say that we have this thing called an
active edge chooser we also want to
state that that means that this country
code normalizer it has no idea what
vertices is actually going to so UD
couple the behavior this can change
independently from this so that's nice
we also know have a couple different
types of vertexes okay so all three of
these have an in degree of two okay but
they get data on each of their edges
so like I said that's very different
from this there's no waiting necessary
here there's no joining there's there's
none of this business all right okay so
this is these kinds of joints are also
very difficult in a storm topologies so
now what I'd like to do is jump to what
is this looks like in a griddle
declaration well just the whole thing
briefly and then we're going to start to
dive in so this is a dsl and there is a
compiler all right so let's get started
this is a subset of the vertices that we
declare okay we're going to see you
start okay very simple syntax so this is
what I was really trying to go to with
you okay a why a dsl and B if you're
going to do a DSL make sure you don't
force people to learn a bunch of weird
random stuff should make a lot of sense
this vertex we name it and we say with
the type is okay
alright so now we want to talk about
adjacencies okay so we're saying that we
omit to post code and website from begin
country dependent so if we go back so
you could consider the conditional to be
the country dependent here's postcode
website okay all right so these are all
pretty basic and by basic what I mean is
that we omit on every out edge okay and
here's a really simple little wing all
right let's go to a complicated one
alright so here's our active edge
chooser so this is basically saying you
omit to any subset of this adjacency
list from start and you use the chooser
of this type okay and by any said I
literally mean any set it can be empty
as well which could alternate so this is
just making sure this is obvious so
let's go back to the picture again
alright so we're saying we omit to
conditional post conditional or out of
business from country code okay but what
we're doing is this active edge choose
your nose okay we've normalized the
country code what is the country oh we
support that one so I'm going to go to
these two okay or no we don't split that
one so I'm gonna go to these two okay
country code has no idea about that edge
choice all right 02 here's a picture
I could show the cat again too okay so
as any time there's a DSL you know
people very reasonably say like why do
you do that what does it do so i'll give
okay so there's a compiler and it
creates a binary graph okay and that
binary graph is then processed by a
griddle runtime framework okay and the
runtime framework is simply a framework
it's not a process so that you can put
it in any sort of server you want to and
that that process provides optimal
concurrency for the graph that you've
expressed okay so basically what we have
now in a groupon is that for our big
placed apology where we have areas of
bolts that really don't need any i/o if
there were on the same CPU we would get
a huge latency increase we've been
porting them to what we call the griddle
operator or griddle bolt which is a bolt
running over a compiled griddle graph
and getting this really good concurrency
every time we sort of pull a bolt into a
griddle graph we get back at least 10
milliseconds because of the queue um so
i'm not saying that is either a good or
a bad thing it's a bad thing for latency
it's something you don't really care
about for throughput right because if
you want to say like we can do 10,000 a
second that means that every second you
have 10,000 be processed okay that
doesn't say anything about how long it
takes all right so now I'm going to
cross my fingers dive into the deep end
and we'll i'll show you this picture
that talks a little bit about the
threading model if what I say is
confusing just interrupt me because I
really do want to motivate like why
would we create another DSL alright so
good the colors showed up kind of okay
um so this is a pretty simple graph it's
got six vertices
those are blue and you've got all your
edges and basic and then these little
red circles are basically indicating
that what a griddle graph processes is a
domain value okay basically these are
unary functions they have domain they
have range okay as your domain value is
moving through the graph until it gets
spit out obviously it's going to change
state okay so if we think of that place
like the website is going to change
geocoding is going to change postcodes
going to get normalized things like that
okay alright so here we are the yellow
boxes our vertex label and a thread
indicator for what thread will be
running that node okay one thing I
should indicate here is that I haven't
even I don't dive into the conditional
pass most I just want to talk about the
concurrency okay so the the green
rounded rectangle is saying that there's
a scheduler thread and so what that
means is that scheduling does not get in
the way of any work that's already being
done okay all right so we have d0 it's a
pretty trivial one it's just trying to
get this graph started ok so we trigger
these three to get started okay
obviously this is handled by to not but
now we have G naught 2 1 T 2 we have
three threads ok these these emit to
range values and we're able to transform
d naught into D 1 ok this is one of
these joint vertexes ok in fact I didn't
syntax we have this idea of aggregates
inputs ok which means you need to wait
for all of your inputs any vertex that
doesn't have that one input will trigger
it to continue and that becomes
interesting with this one for example
you'll notice since it doesn't have an
iron box if it gets in here
there if it gets in here it goes to
t not so now we're back down to only
having two threads and then we come to
here and we just have one final thread
Brian and I were talking this morning
and based on the real implementation one
could very recently criticized this the
fact that the T naught is running
through this path is I probably
shouldn't have done okay because we are
not scheduling paths we are scheduling
vertices that are ready to do work ok so
I could have just well said T naught T
naught 2 1 T 2 ok and this is very
important because what it means is that
as a vertex is ready so let's take this
example so let's say r1 takes a
millisecond but our two takes 10
milliseconds I don't know actually I
help doesn't because that doesn't sound
very good latency so as soon as this is
then ready it's going to come up and be
scheduled because it says I'm ready to
do some work so that ability of being
ready isn't related to the particular
path thats related to the state of the