Cloud Native Data Pipelines

Sid Anand

Recorded at GOTO 2017


Get notified about Sid Anand

Sign up to a email when Sid Anand publishes a new video

[Music]
so welcome to cloud native data
pipelines I'm Sid Anand a little bit
about me over the last 16 or so years I
work on end-to-end data infrastructure
most of my time has been spent at
companies that are typically consumer
facing high scale traffic high traffic
companies like LinkedIn Netflix eBay and
Etsy about two and a half years ago I
joined an enterprise task company that
works in the security space called gari
and they're doing some interesting work
with data in the cloud and I'll talk to
you about that today in my spare time
I'm a co-chair for Q con San Francisco
and London I'm a committer on the Apache
airflow project and I have my father of
two rambunctious four and a half year
old boy and a seven month old girl so a
Gauri what do we do so we make email
safe again for those of you not familiar
with it now the thing is email has
actually never been safe since the 1970s
with the advent of the SMTP protocol
it's only been more recently that
security protocols like SPF D Kim and
Demark have been developed many of which
were developed by members of my team and
it's not in widespread use at this time
so you still encounter these sort of
scenarios right you can email from your
bank or credit card saying there's some
sort of suspicious charge and you need
to be you need to log in immediately and
verify the charges or you know maybe
your account will be closed and
essentially you will get fished in most
cases because your bank tries not to
send you things like that via email
and most people don't realize it so what
do we do our first product which was
launched a few years before I even
joined was to protect consumers in a b2c
company so most of my former companies
LinkedIn eBay PayPal Netflix use the
gari to protect to secure the email
channel between them and their customers
so that it reduces the chance of their
customers being fished by an email that
looked like it was from Netflix when I
joined about two and a half years ago we
try to take this technology and apply it
to the enterprise so that enterprises
could also be safeguarded in this way
from inbound email so you might have
heard many years ago LinkedIn sort of
lost 17 million passwords didn't really
lose them they just that other people
have them and you know there was some
discussion of an email born route for
that attack and that's sort of the case
right
you know more and more loss of data at
the enterprise level so our first
version of our product was a batch
product so enterprise customers deployed
one of our sensors in their data center
and every 15 minutes they would send us
a file which contained email headers and
we would analyze that data in the cloud
models and for the most part we would
score that data so we could you know
assign a trust score on a range from
cluster to untrusted for each email and
we would provide a visibility dashboard
to our customers which would show
statistics on their mail flow are about
a year ago we started developing a near
real-time product in the near real-time
use case mail flow is coming in and our
cloud-based service is applying those
trust models or in some cases building
some on the fly and again showing a
visibility dashboard which now is a
real-time dashboard but in addition
we're also basically executing a control
flow where based on some policy we can
send a signal back to our sensor to
either quarantine the email label it or
allow it to pass through and in most
cases we're running essentially three
nines between the sensor and back to the
sensor with three seconds of latency and
majority of that time is actually spent
over the public Internet that we don't
control so within our cloud deployment
we're actually you know in many cases
sub-second so what's the reason that
this is called cloud native data
pipelines so if you work at you know big
data companies like LinkedIn or Facebook
or Twitter or Google their data
engineering teams are huge you know 500
is not a unknown number right and
they're running their own custom data
pipelines on hardware and for if you
remember Dean stock you know there was a
reference architecture with lots of open
source technologies typically in a large
company each of those teams would be
supported each of those boxes will be
whether you're running Kafka you need a
team on hand to you know patch it if it
needs to be patched to do some
operations on it to configure it all of
that stuff now if you work at a startup
you don't have hundreds of engineers you
may have one engineer working on the
data pipeline our company has about 10
total the total company size of 20
engineers ten of which are working on
the data pipeline and data science side
and in and they're running in the cloud
in most cases so are there things that
these small companies can do so that
they get the same sort of comparable
pipelines that you would expect at a
Facebook or Twitter and the answer is
yes if you adopt some cloud native
techniques and some open source
technology is in a certain way you
should be able to get highly available
highly scalable loli and C data
pipelines to do interesting things like
predictions
so before I well I guess before we get
into an architecture we should define
what are some desirable qualities of a
resilient data pipeline and for us there
are four aspects correctness timeliness
operability and cost so correctness
essentially means that we should have
data integrity
and to end in our data pipeline we
shouldn't lose any data we should incur
upped any data and since we're building
like the type of pipeline we're working
on as a predictive data pipeline is
generating predictions and and ranks and
scores the data distribution out of the
pipeline should be as expected we
shouldn't all of a sudden find that 99%
of the messages were generating are
considered fish or untrusted in terms of
time eunice whether you're running a
daily hourly or near real-time pipeline
what's really important is that it's the
SLA czar met whatever the SLA is are
they need to be met because in our case
we may be holding an email in a in
someone's mailbox until we've analyzed
it and and said it's safe to deliver in
terms of operability if you work if
you've worked in these large data teams
you'll find that a very common problem
is operational fatigue majority of the
time or most of in a data engineer's
things up and fixing data problems and
I'll talk about quick recoverability but
one major problem that happens in data
pipelines is that if there's a bug once
you fix the bug you have to fix all of
the data that's that that flow
downstream and contact all of those
teams and basically fix all of the data
that you've corrupted all the way down
stream and that resolves in a lot of
operational fatigue because every bug
has a large blast radius another thing
that's important for operability is that
we have some sort of fine-grained
monitoring and alerting so that we can
check that we're meeting our correctness
and timeliness like Ethel ease and then
quick recoverability so there are two
aspects of designing a system for
reliability one is called mean time
between failure and one is MTTR mean
time to recovery so I haven't included a
side here but there is interesting thing
in my talk I gave last year this
conference you know there's this
comparison of a jeep to a rolls-royce
rolls-royce
is designed to operate really well
within its operational boundaries like
nice rose and on the Autobahn right but
if we take it off roading it's going to
break down and then you're going to
spend a considerable time fixing it
whereas a jeep was designed to buy by
the people who created it to be taken
outside of its design boundaries they
don't really need to know what you're
going to do with the Jeep but the expect
that you will break it and when you
break it it should be really easy to fix
so on YouTube if you search for a Marine
Corps team reassembles Jeep in four
minutes you'll see what I mean so that's
sort of what we want in our data
pipeline bugs will happen
we just need to quickly recover when
that happens and finally since we're
running in the cloud one of the promises
of the cloud was pay-as-you-go so we can
leverage some aspects of auto scaling
and maybe lambdas to to reduce our cost
of running a data pipeline something you
can't really do in your own data center
so what do predictive analytic analytics
look like at agari so we have two
classes of predictive analytics one is
in a building and training models and
the other one is applying those models
in either a batch or near real-time
pipeline to score them and today's talk
is mostly going to focus on applying the
models so let's look at an example of
our batch pipeline architecture so we
have say three companies Enterprise A B
and C and they're getting mail coming to
them all of the time and they've
deployed a sensor and a gauri sensor in
their data centers and in our our first
version of our product which was batch
every 15 minutes our sensor would upload
a file into s3 at the top of the hour
we'd have and we use air flow for our
workflow engine and I'll talk a little
bit more about that later but we
essentially have a workflow engine
called air flow that will will run a
spark job to score that data and
generate some aggregate statistics once
an hour
and it will output that data into a
different path in s3 we've set up as
three with what's called SNS
notifications so as data lands in f3 in
this output bucket notifications are
sent on simple notification service SNS
which will queue them in simple queue
service so SMS it's a push based topic
queue and essentially what it does is
for every event it guess it'll notify a
bunch of subscribers and SQS is a queue
service that will actually retain all of
those messages but it allows a single
message consumption and if you want to
learn more about this I you can have a
look at last rest talk we're actually
going to details of how do you two work
together but I'm sort of assuming that
just to get a raise of hand how many
people are familiar with SNS and SQS
okay that's very good
now as messages flow into sqs we scale
out what we call our importer layer
so our importers are Ruby jobs that will
read these these messages and import
them into our Postgres database so s3
contains all of our data all the input
data all the intermediate data all the
output data but our Postgres database is
used to feed a subset of the data to our
web app so that it's only the subset of
the data that we need to show in our
visibility product and the importer job
is to efficiently insert all of that
data into the PostScript database and we
have airflow basically managing this
entire process also checking the quality
and and loss characteristics of the data
on each step in this pipeline so let's
look at some of the architectural
components here right the first one is s
3 S 3 is where we are essentially
building our data Lake its scalable
available performance its server less we
don't need to manage it it's used to
store all of our data as I mentioned our
input intermediate and
output data and all of our data science
processing goes against s3 so
essentially it's like HDFS but we don't
really need to worry about scaling it
for data growth for messaging we're
using a combination of SNS plus sqs
together by using them together we get
the benefits of both and essentially we
get a reliable transactional pub/sub so
what that essentially means is we get
the benefits of pub/sub such as multiple
topic listeners but we also get
transactions when you consume a message
you actually have to send an act to let
sqs know that you've consumed it again
it's surveillance and it's infinitely
scalable and performant we have two
cases of processing so for our data
science processing which in this case is
scoring aggregations model building we
use SPARC and we've we've used it in
both flavors both EMR and also running
our own and it spun up in both cases by
airflow airflow will spin up a cluster
whether it's EMR or our own image by
running on easy to do whatever
processing is required and then spin it
down and spark has a very nice
programming model but it's like very
complex to debug and I think one of the
speakers in this track later will sort
of go into the details of spark and you
haven't used it you'll get it you'll get
an appreciation of how complex it is and
I mean a powerful of course and for
general processing sort of like some
business logic we just take whatever
type of service we have and we
encapsulate it in an auto scaling group
and we just let Amazon manage that auto
scaling group so it's always up and
were using is airflow so it coordinates
all the spark jobs and other complex
tasks it's lightweight it treats DAGs as
code so config as code if you've used
other type of workflow engines like Ozzy
or Azkaban you basically wrestle with
you know llamó and zip files of llamó
and and xml files and the approach that
systems like Luigi and an airflow take
our guests treat this as normal code and
use your development processes to manage
them it does have a little bit of a
steep learning curve especially around
operations and that's because airflow is
not very opinionated about how you run
it it just sort of gives you a sense of
how to run it on one box and you sort of
have to make it scale and finally the
database we use is Postgres for no
better reason than our web developers
picked that and so we're using it and
since there are rails people like real
people of Postgres and vice-versa
so in this model write like three of our
services are not managed by us and three
of them are and that informs you a
little bit about how we run we are such
a small team we can afford to run
everything ourselves and when we have a
choice we will always pick something
provided by the Amazon and when we don't
find an adequate solution there we will
choose to manage it ourselves and in
this case there was no real good
workflow engine provided by Amazon and
you know to some extent like as I
mentioned we sometimes we use EMR and
sometimes we don't because their
downsides of using it so let's talk
about cost and timeliness right when we
were running every day right for 23
hours of the day are we only paid for
the ec2 instances that ran airflow our
database and our web app and at the end
of the day we ran a full daily run at
that point we paid for running AMR for
an hour and the importers for an hour
and any messages that we had to pay for
on SNS and SQS
so we were very cost efficient so we
built in you know an infrastructure that
was you know didn't have just a bunch of
standing nodes that we were paying for
even if they were reserved instances
when we went to hourly things changed we
actually were whether we ran for a
minute five minutes or an hour
easy to basically charges us for the
full hour so we were actually not saving
any money with this architecture we were
when we were daily but once we went
hourly we were saving nothing so one
idea was for us who just moved to lamda
lamda charges you up to 100 milliseconds
rather than up to the hour so we you
know we found it in general with some
back of the envelope calculations to
save us about 100x the cost so what
one-hundredth of the cost of running the
equivalent load even continuously on ec2
would be you know running on Lambert
it's like you know a penny over to the
dollar so it's quite a good cost savings
if you're considering lamda
what about timeliness right so in the
batch world
we're running every hour and sometimes
as new customers come online our data
volume is growing and our smart jobs
take longer to run and our importers
challenge we have is completing all of
it within an hour all the time so
airflow since it manages the spin-up of
spark we can just configure to an
airflow variable the number of workers
and the instance type which defines the
memory and CPU characteristics of that
spark cluster right before the next
hourly run and the next hourly run will
automatically spin up a bigger stronger
larger cluster for spark what about the
importers right what how do we scale the
called auto scaling how many of you just
is here you know show of hands use auto
scaling okay it's about 10 people right
so what auto scaling gives us and I'll
tell you why this is important so we
have an hour budget to finish our
workload and sometimes spark will take
50 minutes and what that essentially
means is we have 10 minutes or less to
import all of the data into our database
and we need some way to make a cost
performance trade-off and an auto scalar
and auto scaling group lets us define a
Max and min of this
of the group and if we're finding our
spark jobs are just taking longer and
longer we just raised the max we pay a
little extra and we get the importer
will take a lifetime and I'm going to
talk to you a little bit about the
autoscaler now but essentially what it
does is it listens to an sqs cue and it
will scale out based on some
characteristics of that cue to get a
higher throughput as needed so our first
approach at auto-scaling use what's
called a cpu metric so i'm going to sort
of talk about that right now
so as I mentioned we have these
importers and one way to scale a cluster
of importers in or out is based on the
average CPU of the cluster of importers
and this is a common first step for
anyone using auto scaling as they'll say
ok I want to scale this easy to resource
maybe CPU is a good measure so let's use
the average CPU so what I have here on
messages that are coming into SQS from
you know sparks slash s3 and in the
beginning there are no importers so this
green curve which is the number of
messages process from sqs the counter is
kind of low here and we already we
actually have just one instance all the
time but for this demonstration just
assume that's kind of zero and then this
blue line is the total CPU of the
cluster and you can see some steps here
the autoscaler
keeps adding more and more nodes so
what's happening essentially is the
autoscaler is increasing the pool and as
it does so the processing processing
increases the processing rate increases
until there are no more messages to
process and it completes processing of
all the sqs messages and as I mentioned
so the the metric that is used for auto
scaling to scale out is average CPU so
our scaling policy is that we tell the
autoscaler never let the average CPU go
higher than 40 percent if it does add
more machines and so what it does is
it'll keep adding machines to keep that
CPU at around 40% you can see that it
does a really great job of doing that so
it's always getting ahead of the
there is one problem with this approach
CP is actually not a great metric when
you get down to the last message in sqs
let's say you've scale up to 20 nodes 19
of the nodes are idle and they're CP is
basically around 2% and then one node is
like processing the last batch and the
and our scaling policy is something like
when this when the average CPU falls
below 10% hey autoscaler start killing
nodes and that's what happens the
autoscaler doesn't know which note is
the one processing the last message it
just starts killing notes and sometimes
it kills the the you know the importer
that's processing the last message and
what happens is we get this really long
tail of processing the last message and
it takes a really long time so this is
called premature scaling so the idea was
CP is not really a good measure so we
tried a different one and it's called
the queue based approach so when you're
reading messages from sqs messages are
in two states they're either visible or
they're in flight if they're visible it
means a get request on that queue will
give you a message if it's invisible
it means no get request which it will
give you that message it's invisible for
a certain time out period so what we
have here is this first blue curve right
here is the is a count of visible
messages on the queue so as spark is
working in producing messages the number
of visible messages increase and our
scalar policy is as long as that that
value is greater than zero scale the
cluster out make it larger and at this
point what happens is the cluster starts
processing messages that and they go in
flight that's the orange curve these are
inside messages and spark stops you know
creating new messages here and now we're
just we're just basically consuming them
so this drops to zero and we hit zero we
stop scaling out then the cluster is now
at its maximum size and then we wait
until all the in-flight messages go to 0
which means that they've all been
processed and then we magically just
immediately scale in our entire cluster
and this works very well for us we no
longer has
long tail we essentially scaled out
while there was you know messages to be
processed and we scaled in when there
were no invite messages left so some
people ask like how do we
what's the cloud native approach to
building these auto scaling groups so we
we've adopted three major tooling like
three major tooling languages or tools
so the first one is called terraform how
many of you have used either terraform
and support packer excellent so
terraform is the way to to allocate
resources like SQS Kinesis etq they
basically in a you know declarative
model they're actually all configured
code but they essentially give you you
know a sqs resource and that's
essentially the tool to to allocate
these Amazon or you know GCP resources
once you get an ec2 machine you have to
install probably some of one two
packages and some like Python packages
to make it useful for you for example
our data sign like our spark cluster
that we build we build the ami is using
ansible and parker and essentially those
we have to basically we get a vanilla
instance and then we install like sci-fi
or psychic scikit-learn
ansible is the way that you would
provision a single ECG machine with all
the packages you know OS and and Python
that you need to run your application
and finally Packer is a this tool also
by Hoshi Corp that will generate an ami
for you and an Amazon machine image from
everything that you've got so let's
let's have an example let's look an
example so step one we run packer and
Packer will in like create an ec2
instance with basically it's a blank
canvas just like a very some like blank
basic Ubuntu 1404 image on it and really
nothing that your application needs and
then we will run an answerable playbook
to provision you know whatever your app
like if it's a Python app it's going to
need like a lot of pit install packages
and and maybe a couple of less packages
snapshot of that machine and register
the ami with Amazon and then it will
destroy the machine it really doesn't
need the machine beyond just baking this
ami and finally a terraform along with
this ami will in a like a Bluegreen
fashion will spin up a new auto scaling
group and launch config with this ami
and replace your existing auto scaling
group so it does Bluegreen deployments
and this is essentially how we deploy
all of our auto scaling groups today so
this sort of covers the timeliness and
and costs aspects of our batch pipeline
what about operability and correctness
so when we first started our data
science team was essentially you know
writing cron jobs to run our like spark
jobs and you know it's not ideal in many
ways so we needed a better way to author
configure and manage workflows and we
you know also needed visual insight into
the performance and correctness and
state of our workflows like for example
was a given task failing a lot and
constantly being retried is that why our
SLA is were taking longer and longer to
complete right and also we needed some
really good integration with our
existing alerting so we get played an
on-call engineer to have a look at one
of our failing workflows and the best
solution we found this is probably about
two years ago was Apache airflow which
was just starting at that time it wasn't
even in Apache and so this is what it
looks like so given a you know a given
workflow or a dag it's nothing but
Python code it's just very simple Python
code and when visualized it looks like
this right and and the dag that you're
looking at essentially manages the batch
workflow or the batch architecture shown
on the lower right and say you want to
you know you want an insight into the
running of your
of your pipeline so you get a Gantt
chart for each and every run and what
this shows you is that the lar the
longest step and we sort of talked about
it the longest step is the SPARC
aggregation and scoring that happens and
then after the ablution is done and this
data is sort of being pumped into s3 the
importers will complete it so this takes
about ten minutes this takes about one
minute right all right and then we also
have sort of a wait for s3 thing here
because after we have known like
consistency like eventual consistency
issue so to avoid it we actually do kind
of just a time to wait and essentially
this whole thing takes around 20 minutes
or less to run and what if you know you
want to know that hey this is pretty
fast for an hourly pipeline where we're
completing it within the first twenty
minutes of every hour of course we're
paying for the full hour but is it
always running efficiently and and then
you can get sort of a task duration
chart which will give you a historic
view of the running of your pipeline and
each of these lines represents a
different task so again this is the most
expensive test this is the aggregate
SPARC job and what we find is the first
month of thing last year it was just
getting linearly slower and there was a
bug and the data science team found it
fixed it and then we got basically over
the next four weeks a consistent run
times of our SPARC job I mean at this
point is taking over a half an hour to
run it and they brought it back down to
around ten minutes then they did another
optimization and they brought the time
down even further so one thing that
happens at least in data science is that
it's like a virtuous cycle right they're
using new features and and sometimes
those features are not performing like
they would expect and king of get slower
and then they have to do some like
cleanup to bring the times back into
alignment and essentially this is a new
feature then these are two optimizations
and then you add another feature and
again time went up so things like air
flow give you that ability to to
maintain this virtual cycle of new
feature development you know new
data science like models development as
well as like performance optimization
now what happens when your pipeline
takes longer than an hour or what if
your pipeline generates some sort of
errors in that case we have integration
with slack and we use like add off some
slack ups air flow basically like we'll
dump into here
like this says we're missing our hourly
SLA and this one says you know guess
what we've lost data and then we'll
actually page or on call with intubation
with Victor ops or whatever you know
pager duty and then they'll be notified
that something's wrong with the pipeline
and air flow meets all of those needs
for us so essentially this is how our
batch pipeline runs and with air flow we
get you know the correctness and
operability that we need how about our
near real-time pipeline so the way our
have three customers they are dumping
data as they get it into Kinesis Kinesis
is amazon's answer to Kafka and we have
an auto scaling group of scorers that
are getting the data within you know
microseconds in most most cases of when
they were actually dumped on to you when
a customer's about them and we score the
data and write the scored data out to
another Kinesis stream we're importers
will read it and import it into our
Postgres database and then they will
also send it to another Kinesis stream
which will be read by our lurchers and
our lurchers are responsible for look
doing a matching on customer policies to
see if an alert condition has been met
and if they have been met they may take
an action like quarantine in the email
on the sensor across the net across the
internet and this is a you know our
real-time pipeline for our control
system similar to this we also have some
upgrades to this pipeline this side
already was kind of busy but you can
imagine that we also we also have
elasticsearch in this mix it's a
another Kinesis cream downstream of the
importers and i'll talk a little bit
about that now when we talk about the
components so again s3 is our data Lake
okay
it stores even though data is coming
into Kinesis we use firehose to write
that data to s3 because our model
building will go against the data in s3
we still do full model bills every night
instead of SNF SQL we actually we could
have stuck with it for a messaging but
because the whole world uses Kafka and
we didn't want to be so innovative we
decided okay let's go with something
that's a hosted Casca we went with
Kinesis here and you know again it's
server list something we don't manage
and it's work fine for us in retrospect
I think we might have been able to do as
soon as sqs and it might have actually
been better but one thing we looked at
was the message size and we at that time
we were concerned the 64 KB size limit
on sqs messages might have been too
restrictive and the Kinesis you get I
think 1 MB so that was one of the
reasons we went with Kinesis for general
processing we started moving everything
to lambda for the cost reason that I
mentioned but lambda only supports like
Python node and Java we still have a
bunch of stuff in Ruby and for that
reason we still run a the the auto
scalars and sometimes there's also a
limit of memory the largest lamb that
you can get is that 1.5 gigs of memory
and some things we do may require more
than that because we'll map a huge model
into memory and we might end up using
more memory so that's like the second
use case for using an auto scaler over
lambda spark so this is an interesting
point our first implementation of the of
our real-time system just took our spark
jobs for scoring and just change the
interface to use the the micro batch
approach and the you know the job was
written to run in 10 minutes
so it still ran in 10 minutes
but that didn't really help us for
near-real-time so we found the
performance of the job was not better
just because we had micro batches
because some of the operations in the
spaghetti code that became our spark
code with doing some really inefficient
things and the amount of time to rewrite
that job would have taken probably a
year because you know given you know I'm
not saying the data times where folks
were not like following best practices
but they were sort of you know writing
code with a free hand and you know
data's means they had to get stuff done
in 10 to 20 minutes and that code didn't
translate well to running in the near
real-time streaming model so we took a
scoring and aggregation out and we just
use Park now for model building and
we've actually moved the scoring
directly into a lambda and I'll talk
about the aggregation next we still use
airflow for certain things like the
nightly model build we're still using
the database to hold some data but what
we've done is so because we went near
real-time
we could no longer do this aggregation
every hour our users want real-time
aggregation and the other problem with
doing hourly aggregation is what you're
essentially doing is a pre aggregate
some data lamps in a details table in
Postgres
and then your airflow job is saying
compute a pre aggregate for that one
hour and data comes late and when data
comes late
you're advocates are completely wrong so
what we did was we just killed pre
aggregation altogether and we we put
elasticsearch downstream of our database
reading off a change capture stream and
we just do on the fly our vacations 500
million data points and it turns out our
web app is 100x faster and it's more
accurate so we actually you know we
follow this approach over like polyglot
persistence where we say your relational
database doesn't have to do everything
if you have a relational database and a
change capture stream
downstream you can put whatever you want
it could be a graph engine it could be a
search engine it could be whatever and
we currently put Alaska
search downstream to do both search and
also abrogation on-the-fly and that's
been a huge win for us so what sort of
other innovations have we come up with
when we started building this NRT
pipeline oh I should I should mention
that the elasticsearch that we're using
is the hosted Amazon Elastic search we
again don't manage it so it's completely
serverless
I don't know why I said managed we
should actually say service for that and
we also did one other thing our spark
jobs for scoring needed to do some
windowing functions and what we actually
do is we store data in ElastiCache again
the Amazon ElastiCache and instead of
going you know to Postgres or you know
writing some complex spark code right so
how many of you use a burl okay not a
huge amount how many do you know
whatever is okay still not a huge number
of people and I how many of you work in
finance
okay well interesting you the one who
raised his hand ever since I met okay so
um I've talked to a couple people about
this and typically the people in finance
don't have this exact requirement that
you find in Silicon Valley
which is that you know the market closes
they have a down time and so I'll talk
about that right now
so Apache Avro is the self-describing
serialization format so think about when
you put data in a database and you
access that data there's a schema around
that data and there's some sort of and
is some sort of code that's validating
that all data in or out
matches the schema but what happens when
you have a data pipeline and two
processes are communicating through a
file on the file system right or they're
communicating through a pipe or data
streaming through what guarantee do they
have that the reader and writer are
using the same schema like do they do
they agree on the types of the fields in
that data to degree in the cardinality
of the fields this is the problem that
Avro solves right Avro is applies a
schema on top of data at rest or
streaming so that there's a contract and
data doesn't get corrupt so it supports
Prem
of data types into on boolean float
string that sort of stuff
complex nested data types records arrays
unions maps and it has language bindings
to various languages at least until
recently I would say it was the most
common a structure format we like used
for big data you know but now I guess
we're seeing more and more parkade right
but it was definitely for a long period
of time the the format of choice and it
supports schema evolution so why is it
useful alright so a gari is an IOT
company ok we have sensors deployed in
our customer sites those sensors are
upgraded at different times we have no
control over when a customer will
upgrade their sensor so they're all
running different versions of the code
and they're actually sending different
versions of the format to us right so
maybe version one is sending three
fields version two is have added two
more fields to the data sent to us and
version three has added two fields and
drop two fields right and at any given
time that mix of data formats is coming
over Kinesis to us and to make matters
slightly more complicated the the
version that we're expecting on our side
is a different version as well so very
early on before we even had our first
customer we pick Avro as our
serialization format to support this
specific use case we should never know
the format of the data coming over the
wire it should be transparent and the
way this is done is the code in the
server is sort of compiled against
version v4 and the code running and all
these consumers is compiled against
different versions we call one a writer
schema version and one a reader schema
version the readers on the you know is
on V 4 and the writers on V 1 V 2 V 3
and when the reader has to read the code
it hands in both the reader and writer
schema and it gets a clean union of that
data and never need to worry about it so
as long as the reader the writer schema
is passed with every message over the
pipe
and the and a got like the the
cloud-based service has access to it it
can do this schema resolution and we
never have to worry about backward
compatibility and we do we do enforce
backward compatibility in a Jenkins job
if you have questions I can talk about
that afterwards so it became so useful
as a tool to shield us from data format
issues with our customers we used it
inside our pipeline for every step so
all our processes are decoupled from
each other not just by virtue of using
cues like Kinesis but also because the
data format supports automatic schema
evolution and because we're a Silicon
Valley company you know of 20 engineers
we need to have at least four different
languages so everything is written in a
different language rails Python and Java
are all involved in decoding and
encoding stuff and setting stuff in the
pipeline and we have no J's now so
unluckily that also supports Averell so
there are language bindings for all of
these languages so as an example this is
a schema an average schema you have a
record type we calling that a guru user
is a user record and the user record has
the name as well as the user's favorite
number and the users favorite color and
the first field is a required field the
name is required and the second two
fields favorite number and favorite
color you'll see it has this weird
notation looks like an array of int and
nan or string and L it just means it's
an optional field so as I mentioned Avro
is very common in the big data world and
at rest on like HDFS where the block
size is like 64 MB and and the schema is
placed at the top of the file as JSON
and then there are just datum in binary
written below it and when you have a
million or billion of these records the
overhead of the schema is tiny like
0.001 percent it's like very very common
as an overhead for the schema but what
happens if you're streaming one record
at a time over Kinesis
right or Kafka in that case the schemas
99% of your data you actually don't want
to send the schema with every single
message that you sent this is overhead
so one thing that we've I think the last
time I gave this talk I promised out
open-source this and I haven't got
around to it so I promise you yet again
that we're going to open source this is
a schema registry
it comes with Alec confluent there is
like an implementation we've implemented
something on a lambda that is like very
effective very lightweight essentially
we have a lambda and it has two
functions so sometimes people talk about
it's a function as a service we we
actually don't follow that model we
in a lambda and so one of the functions
is register schema so in this case our
producer before it sends a message down
the pipe will register a schema and
we'll get back a unique ID
in fact UUID and it will then send that
UUID plus a datum across the wire and
the consumer will get the ID and the
datum so the problem is it's got this
data it doesn't have the writer schema
yet it has an ID to get a writer schema
it will then do a lookup by get schema
by ID get the writer schema and cache it
and then it will use that along with its
you know what it compiled against the
reader schema to decode it on the fly
and that's essentially how we use it so
in every place we use kinases we have
this lambda schema registry and we cut
down on all hours all the time taking to
process this data we you know all the
time send in transmission and processing
so um I think at this point I'm actually
ready for questions of course none of
what we've done is a single man effort
multiple people in my team have been
helping and you know any questions yes
yes the question is was I talking about
a diverse number like a Java lambda
functional number of you right AWS
lambda thank you sure okay so the way
Kinesis works is that you get it's very
much a kafka you get sharp partitions
and each partition is provisioned with a
very strict IO limit and let's say so
for a given partition you can do a
thousand put a second and five reads a
second okay so what happens is what
happens when you exceed that well you
get throttled by Amazon right so you
have to so when that happens you have to
scale out your shards okay
the problem with the scale out of these
shards is that the way Amazon they do
what's called a split of shards the way
they split the shard is they put an end
of shard marker in the old shard and and
create two new ones splitting the shot
the the key space they don't actually
cut the the amount of data on the two in
half they don't like divide the shard
move the data in half so so what they do
is they just put it they just and the
current chart and they create two empty
sure to empty shards now that's what a
shard split is in in Amazon are you
following that yep right so now what
happens it typically is but you find
that your when you find that you're in
trouble with a shard is typically on the
reading side you're you're falling
behind in processing on the reading side
and there's a huge you're like behind by
let's say eight hours or a couple hours
you just behind by a couple hours
splitting the shard doesn't help you
you're still behind you don't get you
don't magically get to X the read
capacity that you need it so that's one
problem you know they say that shot
splits are very cheap and fast and this
is why they're cheap and trusts they're
not moving the data round the second
problem is no one should have to worry
about this right with SMS and SQS you
get magical
automatic horizontal scaling it
shouldn't be provisioned io and so had
we stuck with it we could just process
everything in parallel and never have to
worry about oh you know this is our
expected capacity and this is how much
we've provisioned and so what we
currently have is an air flow job that
does like like pre-emptive scaling when
we're close to meet it like hitting a
boundary we start adding shards and and
we need a job to maintain that in the
SMSs qf model it is automatically scales
under the hood Amazon itself keeps
adding instances when it internally it's
hitting 80 percent capacity
on any one of the sqs partitions that's
what we would like I hope to the answer
your version okay in the back okay so
his question is in our implementation of
the schema registry are we always
caching the writer schema on the
consumer side or is it a lookup per
object so the answer is yeah we're
always caching and interestingly enough
it's a half of our functions that moved
to lambdas and those lambdas are calling
this lambda and we find this is how we
found out our lambdas are very
long-lived because they're all lambdas
reading from Kinesis they live for three
days so so they're the hits to the
schema registry lambda from one of our
normal processing chain lambdas are very
low they cache for like few days the
answer cooking ok good ok so that's a
really good question this question is
what is the scale that we are seeing the
scale that we are seeing is tens of
millions an hour sometimes more depends
on the hour of the day and some things
that we do like we can some parts of our
code we have to reprocess and we can
push 400 million through in about a day
like on a reprocessing use case if you
have to if we have to make a change to
our scoring algorithm or something we
sometimes reprocess I mentioned we have
elastic search if I add a new field and
we maintain 60 days of data security the
data for us or 500 million records so
if we add a new field for that one
Kinesis hop we have to basically push
500 million records in and it takes
about like a day any other questions
perfect great question so his question
is what's the trade-off of EMR versus
self-hosted so it took us like an hour
to so the palm with EMR is we cannot
specify our own am i right so we would
get the EMR ami and then we start
installing like tons of Python packages
that would take 40 minutes and then
we're ready to use it right so then you
know so so that was a major problem like
that spin up time so that's why we moved
away from the EMR thank you