Event Based Systems
Introduction
Dr. Emanuel Onica
Faculty of Computer Science, Alexandru Ioan Cuza University of Iaşi
Contents
1. Course overview
2. Background. The producer/consumer model 3. The event based interaction model
4. Case studies
5. Tools of the trade
2/45
Course overview
The course covers the event based systems topic.
Particular focus: the distributed systems area and the publish/subscribe (pub/sub) information dissemination model.
Curriculum overview:
• The event based interaction model
• Publish/subscribe architecture
• Content-based information filtering
• Data storage in pub/sub systems
• Complex event processing
• Data difussion in distributed event based systems
• Specific pub/sub routing algorithms
• Scalability aspects in event based systems
• Pub/sub security
• Fault tolerance in event based
systems 3/45
Course overview
Bibliography Books:
• Distributed Event-Based Systems – G. Mühl, L. Fiege, P. Pietzuch – Springer-Verlag 2006 (main ground for current course)
• Event Processing in Action – O. Etzion, P. Nibblet – Manning Publications Co. 2011
Other sources (conference proceedings, journals, industry reports):
• ACM International Conference on Distributed Event Based Systems
• IEEE International Conference on Distributed Computing Systems
• USENIX Symposium on Networked Systems Design and Implementation
• ACM/IFIP/USENIX Middleware Conference
• IEEE Transactions on Parallel and Distributed Systems
4/45
Course overview
Evaluation
• 30% course evaluation (C)
• 70% lab evaluation (L)
• Pass grade:
• >= 50% of C + L
• C: written test, open book probably in week 13
• L: will be discussed during the lab
5/45
Background
What is an event?
• the rise or drop of a stock market value
• a change in a patient monitoring data
• a new kW on your electricity counter
An event is any observable occurence of interest that has
happened in a specific context. 6/45
Background. Producer/Consumer
The producer represents the entity generating the event (can be associated to a software that emits the data describing the event):
• a stock market publication service for stock quotes
• a sensor data recording device
• a power consumption monitoring software
The consumer represents the entity that is interested in the event (can be associated with the software used to capture the event):
• an application used by a financial investment firm
• a software used by a medical facility to get reports on a patient condition
• an app used by a home user to track and get statistics on power consumption
7/45
Background. Producer/Consumer
What is the middleware for the producer/consumer communication?
How is it organized?
How do entities interact? 8/45
Producer/Consumer interaction models
Aspects to consider in particular:
• Who initiates the event data delivery? (Do we want to ask if it happened until it does, or we want to get it
„magically” when it happens)
• Who to ask for finding out about an event? (Do we want to get the weather broadcast exclusively from Radio
Erevan, or do we just want to get the weather broadcast)
9/45
Producer/Consumer interaction models
The resulting coupling issues:
• If we have to ask the producer, then when do we do it?
• performance issue: too often = useless communication overhead
• functionality issue: too seldom = missing urgent notifications
• If we have to ask a producer, then which one do we pick?
• performance issue: overburden on the same producer by multiple consumers
• functionality issue: not getting the event on time or at all
(Radio Erevan reports late and takes unexpected vacations) 10/45
Producer/Consumer interaction models
Seems we don’t want a tight coupling in the producer/consumer communication middleware.
This might, however, depend on the scenario: e.g., if we have one single producer and we want to query for events only at specific times the above does not apply.
Still, if we don’t want to ask and we don’t know who to ask, then what to do
? ...
11/45
Producer/Consumer interaction models
Request/Reply
• Most widely used model (client/server, RPC)
• Initiator: the consumer (getting data/events only when asking for it)
• Adressing: direct (must know exactly who to ask for data/events) + simple model
+ easy to implement - tight coupling - scalability issues:
short interval event polling = waste of resources
long interval event polling = increase in service latency
12/45
Producer/Consumer interaction models
Anonymous Request/Reply
• Multiple ways of implementing (figure is just one example)
• Initiator: the consumer (getting data/events only when asking for it)
• Addressing: indirect (don’t need to know the exact producer, only the set)
• At least one reply is expected for a request (can be more; i.e., the request is sent to more producers)
+ reduced coupling
+ relatively easy to implement
+ more flexible (data can be obtained from any source producing it)
- scalability issues (polling is still used)
- requires dedicated
middleware/implementation for indirect communication
13/45
Producer/Consumer interaction models
Design patterns background: Observer
Diagram source: Design Patterns Explained Simply (A. Shvetz – Sourcemaking.com)
14/45
Producer/Consumer interaction models
Callback
• Resembles the observer design pattern (distributed implementation)
• Initiator: the producer (consumer getting data/events whenever happens to be the case)
• Addressing: direct (most know exactly where to register for data/events) + relatively simple model
+ some flexibility (producers can
notify selective sets of consumers) - tight coupling
- some scalability issues (producer has to retain multiple consumers identity and manage data delivery to them)
15/45
Producer/Consumer interaction models
Design patterns background: Mediator
Diagram source: Design Patterns Explained Simply (A. Shvetz – Sourcemaking.com)
16/45
Producer/Consumer interaction models
Event-Based
• Follows the mediator design pattern (distributed implementation)
• Multiple ways of implementing (figure is just one example)
• Initiator: the producer (consumer getting data/events whenever happens to be the case)
• Addressing: indirect (consumers and producers don’t know each other) + minimal or no coupling
+ most flexible model (consumers receive from any producer; producers do not send to a specific consumer)
- communication middleware can be complicated to implement; impact on scalability, availability, security
17/45
Case studies
Information dissemination
(focus in this course)
• Applications: news
dissemination, stock market monitoring, etc.
• Typically large number of consumers
• Potentially wide geographical distribution
• Complex information flows
Figure source: Distributed Event-Based Systems (G. Mühl et al. – Springer 2006)
18/45
Case studies
Network monitoring
• Applications: network management, runtime
statistics, intrusion detection, etc.
• High level view of the system based on multiple low-level generated events
• Real-time processing and event aggregation
19/45
Case studies
Enterprise Application Integration
• Applications: connecting
heterogeneous modules, systems in joining business processes
• Focus on the loose coupling that guarantees independent
evolution/changes in the components
• Communication middleware relies on a mediator for
decoupling interfaces
20/45
Case studies
Mobile and ubiquitous systems
• Applications: mobile distributed applications, sensor networks
• Highly dynamic
environments where
components membership constantly changes
• Loose coupling and event aggregation are useful for scalability purposes
Figure source: Distributed Event-Based Systems (G. Mühl et al. – Springer 2006)
21/45
Tools of the trade
First ... Tools for what?
Event based systems involve multiple areas:
• Data storage
• Communication
• Processing algorithms and others ...
Trend: convergence between event processing and distributed data processing
Latest years focus on distributed processing = Hadoop
The difference: we’re less interested in static big data and more on continuous streams of event data dynamically produced
22/45
Tools of the trade – the evolution
• Context of richer (more data) and popular (more users) event streams to process in real-time
• From database engines to stream processing engines (the high level view):
• Major differences:
• Queries stored in memory for low latencies
• Low query volatility, high data volatility
• No fixed size for data as processing unit (can include windows to be processed)
and others... (more in ACID vs BASE – in a future lecture)
23/45
Tools of the trade
Table source: A Survey of the State-of-the-art in Event Processing (O.M. de Carvahlo et al. – WSPPD 2013)
Notations:
DBMS – Database Management System DSMS – Data Stream Management System CEP – Complex Event Processing solution
Note 1: typically distributed stream processing = distributed event processing
Note 2: a CEP module can be customly built on top of a DSMS 24/45
Tools of the trade
• Updated list of event stream processing platforms – commercial solutions:
• TIBCO Streaming
• SAP Sybase Aleri Streaming
• WSO2 Stream Processor
• Esper High Availability
• Microsoft SQL Server StreamInsight/Azure Stream Analytics
• Oracle Complex Event Processing
• IBM WebSphere Business Events
• Amazon Kinesis Data Streams
• Google Cloud Dataflow ...and others
25/45
Tools of the trade
• Updated list of event stream processing platforms – the Apache world:
• Apache Storm (started 2011 at Twitter)
• Apache Heron (started 2014 at Twitter)
• Apache Spark Streaming (started 2014 at Univ. of California, Berkeley)
• Apache Flink (started 2011)
• Apache Samza (started 2013 at LinkedIn)
• Apache Kafka Streams (started 2011 at LinkedIn)
• Apache Beam (started 2016 at Google – integrates with Dataflow)
• Apache Ignite (started 2015 – in-memory DB with stream support)
• Apache Flume (started 2009 – oriented on log streams) ...and others
26/45
The generic architecture overview
(not applicable to all solutions)
• Data producers often represented as source operators (no tuple stream input)
• Consumers often represented as sink operators (no tuple
stream output) 27/45
The generic architecture overview
Operators can be stateless or stateful:
• Stateless operators typically perform a tuple-by-tuple based operation (e.g., splitting the tuple)
• Stateful operators – terminology debate:
• Version a): implement operations that depend on multiple tuples stored as operator state (e.g., average on a window of tuples)
• Version b): any operator that must maintain some evolving state (e.g., also a filter that can change by modifying a stored
query) 28/45
Operator processing units
Depending on implemented operation, an operator can process:
• each tuple at a time (typically stateless operators, but not only)
• a window formed of multiple tuples (stateful operators)
29/45
Window characteristics
A window size can be determined by:
• time – often using tuples’ origin timestamp (e.g., size 10s)
• number – using a count of tuples (e.g., size 3 tuples)
Either case can require a tuple ordering - particularly in time based windows:
• typically relying on origin timestamps
• sometimes split based on origin id (e.g., size 10s per id)
30/45
Window characteristics
Types of advancement – using time windows as example:
• tumbling window: sequential fixed time size (size 10 in example), no overlapping
• example: window #1 t:10-19, window #2 t:20-29, ...
• hopping window: sequential fixed time size, fixed time overlapping interval (size 5 in example)
• a tuple can be part of more than one processing window
• example: window #1 t:10-19, window #2 t:15-24, window #3 t:20-29, ...
31/45
Window characteristics
• sliding window: sequential fixed time size (10 in example), flexible overlapping time interval:
• essentially cutting the fixed overlapping restriction from hopping windows => all possible windows of fixed size
• windows effectively determined by difference in tuple inclusion (last changed events list in t time size)
• example: window #1 t:10-19, window #2 t:12-21, window #3 t:15-24, window #4 t:21-30...
32/45
Window characteristics
• session window: sequential maximum time size (10 in example), with a fixed timeout (3 in example):
• a new window time count starts typically with the first event tuple in the window
• the timeout counts the time to the next event tuple and is the primary criteria to stop the window
• the second criteria to stop the window is the maximum time size since the first tuple
• sometimes the max time is checked at fixed intervals, i.e., each t seconds, independent of the time of the first tuple, allowing the window to extend up to two max durations
• example: window #1 t:10-18 (timeout triggered before max limit 19); window #2 t:21-24 (timeout triggered before max limit 20)...
33/45
Tools of the trade
Let’s pick a toy to play (= work in the lab):
STORM
Distributed stream processing platform used by:
34/45
Tools of the trade - Storm
• Initially developed at Twitter
• Currently an open source Apache project
• Integrates with a variety of other tools/platforms (RabbitMQ, Kafka, Amazon Kinesis, etc)
• Simple API (multiple languages supported – written in Java/Clojure)
• Scalable
• Fault tolerant
More on all these in future courses/labs. Let’s just get the basics now...
35/45
Tools of the trade - Storm
• Event data abstraction:
• The Tuple = a named list of values
<symbol name, quote> :
(„Google”, 1034) („Apple”, 1723) („Microsoft”, 1321)
• The Stream = a collection of tuples
[(„Google”, 1034),(„Apple”, 1723),(„Microsoft”, 1321),...]
36/45
Tools of the trade - Storm
Spout:
• The Storm abstraction for the source of streams
• Reads data from an external source (database, crawler, etc.)
• Can also produce/generate data (e.g., testing purposes)
• Emits the data as tuple streams
Bolt:
• The Storm abstraction for a stream processing operator
• Reads tuple streams emitted by spouts or other bolts
• Performs processing over tuples
• Emits resulted data as tuple streams 37/45
Tools of the trade - Storm
Topology:
• a directed graph (can have cycles) of spouts and bolts
• the directed edges represent data streams
• represents an actual Storm event processing application
38/45
Tools of the trade - Storm
Spouts and bolts logic is implemented by the developer using the Storm API.
Ok ... Spouts are quite clear: read/generate and send data.
What should I actually do in bolts?
Whatever you need for your application:
• Filter data tuples based on some conditions
• Join data from multiple streams into one
• Modify data tuples according to some function
• other stuff ... 39/45
Tools of the trade - Storm
Global bolt organization:
• How many? – Typically one functionality per bolt
• Why one? – Separation=Flexibility. Easier to distribute bolts
• Why distribute? – Scalability increase for massive data processing needs
Inner bolt organization:
• A bolt is composed of multiple running „tasks”
• Incoming data streams are split among tasks
• Distributing tuples to the tasks: depends on grouping strategy
40/45
Tools of the trade - Storm
Types of grouping:
1. Shuffled
• Round-robin distribution of tuples among tasks in a bolt 2. By fields
• Grouped tuples by fields subset are sent to a specific task
3. All to all
• All tasks receive all tuples
41/45
Tools of the trade - Storm
Storm cluster infrastructure organization (= how does all this get actually deployed in practice):
1. One Nimbus coordinating master node
• Code distribution on other nodes
• Task assignment on other nodes
• Monitoring activity/failures of other nodes
2. Worker nodes
• Servers running spout and/or bolt operators
• A worker machine runs also a Supervisor daemon listening for tasks to be asigned
3. Zookeeper server(s)
• Coordinate synchronization on Storm nodes communication
42/45
Tools of the trade - Storm
Figure source: ERICSSON RESEARCH BLOG
Storm cluster infrastructure summary
43/45
Tools of the trade - Storm
Problems
What if the workers have too much event data to process?
What if nimbus or the supervisors fail?
What if we want to deploy our Storm cluster on a public cloud and use it on processing confidential data?
Scalability, Dependability, Security – We’ll talk about all these,
in general or Storm related context, in future courses/labs 44/45
Tools of the trade - Storm
Discussion
Why use Storm? Why use any stream processing engine?
Why don’t just use a map reduce approach for performing computations over event data?
45/45