StreamCruncher - Basics

StreamCruncher is a pure Java™ application. It requires Java 1.5 or above. Java 1.6+ is recommended. It is an application that sits on top of the Database. StreamCruncher communicates with the Database using the JDBC Driver provided by the Database. It is a full fledged, multi-threaded application. Since there is no native Database specific code, for optimum performance, the Database must ideally be located on the same machine as StreamCruncher or must be accessible through a high speed LAN. For best performance, an "In-Memory and Embedded" Database should be used. Various Databases are supported. Each Database provides a unique set of features and modes of operation.

StreamCruncher uses Database Tables which are created and maintained automatically and hidden from the User, by the Kernel. These Tables are created and used when Partitions are defined over Input Event Streams, as part of the "Running Query". Once the Input Event Stream is registered in StreamCruncher, Queries can then be registered which monitor the Event Streams. For each such "Running Query", there is another Table where the Query results are dumped into. This is again created and maintained automatically by the Kernel. StreamCruncher provides an API that is extremely easy to use. This API provides an elegant wrapper over the Kernel and the underlying Database.

In its most basic setting, StreamCruncher might be likened to running an SQL Query in a timed-loop over the Database. However, it can do a lot more than that. For starters, if the Query is setup to run against multiple Event Streams (Event Co-relation), the Query will be triggered whenever an Event arrives in any of those Input Streams. It must be noted that, when the Query is already running, an Event cannot trigger a second instance of the Query.

Event Processing Constructs

One of the most distinguishing features of StreamCruncher are the Window and Parition constructs. 3 types of Windows are supported over Event Streams.

1) Sliding Window: A fixed size moving Window over the Stream - store last N.

2) Time Window: A fixed width Window, where the "width" refers to the time in Milliseconds/Seconds/Minutes/Hours after which, the Event is pushed out of the Window - store last N milliseconds/seconds/minutes/hours. Any "SQL Timestamp" column in the Event Table can be configured in the Time Window as reference, by using the API to point out the Timestamp column.

A maximum window size can also be specified, which forces the Window to consume at each run the specified number (at most) of Events. If there is atleast one Event that was not included in a run then the Query will run immediately after that and expel the first/all Events in the Window to make room for the pending Events, even if they had not yet expired.

3) Tumbling Window: Events stay in the Window for one cycle only and expire in the next cycle - store latest N.

In addition to the Windows, StreamCruncher allows Events in an Event Stream to be partitioned into smaller Windows, based on a group of Columns in the Event. For example: "Recent 5 Shipping Orders Events at Country -> State -> Department level".

A simple Time Window is defined like this: partition store last N seconds. A heirarchy of Windows - partition by country, state, city store last N seconds.

Apart from defining Partitions, StreamCruncher also supports Aggregates on columns. Several Aggregate functions are supported. Custom Aggregate functions can also be plugged in. An Average Aggregate function  defined on a column in the Stream, with a Sliding Window would maintain a Rolling-average on the Events in the Window. Aggregates are akin to Materialized Views in RDBMS, where the Database automatically updates a View.

Here is a list of a few features provided by Partitions:

1) A Partition allows a heirarchy of Windows to be created on an Event Stream, just like how the "SQL Group By" clause groups Rows based on the Column values. Where as, a simple Window Partition provides just one single Window on the Stream

2) A Partition column also allows Aggregates to be specified for each Window in the heirarchy of Windows. Several Aggregate Functions are supported - Count, Sum, Min, Max, Geometric Mean, Kurtosis, Variance etc.

Aggregates also provide features which prove very useful while implementing real-world problems, by means of the pinned and entrance only clauses. Aggregate Functions maintain numbers based on the column values currently inside the Window. So, when Events enter the Window or exit the Window on expiry, the aggregated value changes. This behaviour might prove to be limiting, in cases where a simple counter-like functionality is required.

select src_ip_address, recent_requests, total_requests
from request_events (partition by src_ip_address
                     store last 30 minutes
                     with pinned
                     count(id) as recent_requests,
                     count(id) entrance only as total_requests)
                     as denial_attack_check
where denial_attack_check.$row_status is new
and recent_requests > 75;

The example above maintains 2 counters on the incoming requests to a Website at the Source IP Address level. One counter (recent_requests) maintains a count of requests made in the last 30 minutes. When the Event (Request) expires after 30 minutes, the counter decrements by 1. There is another counter (total_requests) with the entrance only clause, where the counter does not decrement when the Event expires. It records only the incoming half of Events. This keeps an account of the total number of requests made ever since the Query was setup. However, the Window is a "Time Based Window" and such Windows expire when all Events have expired and no new ones arrive. This would reset the "total_requests" counter every time such a thing happened. To prevent that from happening, the pinned clause is used to hold the Window, even if it is empty for as long as the Query is running. The "recent_requests" counter will reduce to 0 when the Window becomes empty, but not the "total_requests" counter.

3) Partitions also provide a feature which makes specific kinds of Events to be visible to the Query. By using the $row_status is new/dead/not dead clause, Events that got added in to the Window in the Current query cycle, New Events and older Events that are still inside the Window or Events that have just been expelled from the Window, respectively can be made visible to the Query

4) Partitions can have Pre-Filters, which are defined using the Where-clause.

select .. ..
from stream1 (partition by a, b, c
              store last 30 minutes
              where a > 10 and b = c)
              as filtered_stream
where a > 15 and filtered_stream.$row_staus is not dead;

As can be seen in the example above, Events can be filtered even before they enter the Partitions. This is extremely useful where the Windows must not be polluted by Events that are not required. It also helps to split the incoming Event Stream and process them in different Queries, which can be thought of as Load-Distribution.

Pre-Filter expressions are limited to basic operations such as: <, >, !=, =, *, /, +, -, in (..), not in (..), and, or. Functions (min, max, current_timestamp() and others), like operator, exists clause and other such syntax are not supported. Although the Pre-Filter syntax is limited, a lot can be accomplished with just these operators. Since the (not) in clause is supported, it can either refer to a hard-coded list of values or a Sub-Query:

select .. ..
from stream1 (partition store last 30 minutes
              where a > b and b in (100, 120, 140))
              as filtered_stream
where .. .;

By making use of a Sub-Query with the in clause, the Events can be filtered based on a dynamic condition, in the example below - a regular Database Table containing the details of Priority Customers. Since the reference list is a Table, it can be updated and/or refreshed without having to stop the Query. This feature might also be called as Content Based Routing.

select .. ..
from stream1 (partition store last 30 minutes
              where customer_id
              in (select customer_id from priority_customers))
              as filtered_stream
where .. .;

5) Partitions can be Chained, to hold and/or expel Events, calculate Aggregates etc in cases where a single Partition clause on a Stream does not suffice

select country, num_skus, avg_qty

from
 test (partition by country store last 5 seconds
        with count(item_sku) as num_skus, avg(item_qty) as avg_qty)

      to

      (partition by store last 5 seconds
       where
       $row_status is new
       and
       (num_skus >= 10 and num_skus <= 30))
      as testStr

where testStr.$row_status is new;

The Query above shows the first Partition which maintains 2 aggregates on a 5 second Window, per Country over the Stream; which is then consumed by a second anonymous Partition, which accepts only the latest aggregate values and where the count is between 10 and 30. This is in turn retained in a 5 second Window. Events that make it into the second Partition are retrieved by the Query.

Chained Partitions must always have a Pre-Filter Where-clause and the first condition must either be a $row_status is new or $row_status is dead clause. If there are additional conditions in the expression, then it must follow the status clause after an and. Ex: where $row_status is new or a more complex $row_status is new and ((price > 10.0 and price <= 12.5) or symbol = 'XYZ'). The Pre-Filter Where-clause in Chained Partitions must not use the $row_status other than at the beginning.

For an Input Event defined as {a, b, c, d, e}, each of the constructs will produce results with different structures:

1) Simple Partition - (partition store last 10). Structure will be {a, b, c, d, e}

3) Partition - (partition by c, d, e store last 10). Structure will still be {a, b, c, d, e}, but Windows will be maintained at "c -> d-> e" level.

4) Aggregated Partition - (partition by c, d store last 8 with avg(a) as avg_a). Structure will still be {c, d, avg_a}, and Windows maintained at "c -> d" level.

StreamCruncher also provides other Windows in Partitions that are slight variants of the Sliding Window:

a) Random Events Window: This maintains a fixed size Window of Events, where the decision to consume or discard a new Event into the Window is decided at random. If the Window is full, then the new Event that is allowed to enter the Window expels the oldest Event from the Window, just like in a Sliding Window.

b) Highest/Lowest values Window: This type of fixed size Window requires an Event property/column to be specified (using column-x clause) whose highest/lowest values are used as the basis to consume/retain/discard Events in the Window.

Highest/Lowest values Windows also provide a unique feature called the Update Group. If the Window is not provided with an Update Group, then each Event that enters the Window is treated as a unique entry. The example below holds the sum of quantities sold by a store over a 30 day period. The Sum, maintained at "Country -> State -> Category -> SKU" level is fed to another Partition in the Chain. The second Partition holds the Top 5 highest selling items based on the Sums calculated by the first Partition in the Chain.

select order_country, order_state, order_category,
       order_item_sku, order_total_qty
from order_events
(partition by order_country, order_state,
              order_category, order_item_sku
              store last 30 days with sum(order_quantity) as order_total_qty)
to
(partition by order_country, order_state,
              order_category
              store highest 5 using order_total_qty
              with update group order_country, order_state,
              order_category, order_item_sku where $row_status is new)
where order_events.$row_status is not dead;

Everytime the Sum changes, the new Sum is sent to the second Partition. If the with update group... clause is not specified, then the updated Sum will be treated as an unrelated Event and when it enters the Window, assuming it makes it to the Top 5 and the previous Sum for that category is already in the Top 5; both the old and the new Sums will be displayed in the Window. This might not be the expected behaviour. But if an Update Group, is specified, the values of the columns provided to the Group (Country, State, Category and SKU) will be used to generate a Group-Key. If the Window already has another Event with the same Group-Key, then that value will be updated with the new value. If the new value cannot make it to the Top 5, then both the old and new values will be moved out of the Window. This is the expected behaviour for the Top-Selling-Items Use Case mentioned above.

Multi-Stream Correlation/Pattern Matching

Multi-Stream Event Correlation is one of the more advanced features offered by StreamCruncher. Where the SQL not exists clause for 2 Stream Correlation does not suffice, the alert..when..using.. clause should be used to monitor specific Patterns across multiple Event Streams.

select stg1_id, stg2_id, stg3_id, priority,
       case
          when stg2_id is null and stg3_id is not null
              then 'Stage 2 missing!'
          when stg2_id is not null and stg3_id is null
              then 'Stage 3 missing!'
          when stg2_id is null and stg3_id is null
              then 'Stage 2 & 3 missing!'
          else 'All OK!'
          end as comment

from
    alert
        one_event.event_1_id as stg1_id,
        two_event.event_2_id as stg2_id,
        three_event.event_3_id as stg3_id,
        one_event.event_priority as priority
    using
        stg1_event (partition store last 5 seconds where priority > 5)
                    as one_event correlate on event_1_id,
        stg2_event (partition store last 5 seconds where priority > 5)
                    as two_event correlate on event_2_id,
        stg3_event (partition store last 5 seconds where priority > 5)
                    as three_event correlate on event_3_id
    when
        present(one_event and two_event and not three_event) or
        present(one_event and not two_event and three_event) or
        present(one_event and two_event and three_event)

where priority < 7.5;

This clause is composed of several parts. The first, after the alert keyword, where columns from different Streams are projected as a single consolidated Event. The second is the using clause where the Partitions are defined which consume Events from the different Streams being correlated/monitored. Each Partition ends with a correlate on .. clause which indicates the Column from that Stream/Partition to be used to Correlate Events against the other Streams. This column must be of the same Data Type in all the Streams. This must also be a column whose values uniquely identify an Event.

The when .. clause is where the Patterns to watch for are specified. A pattern is specified by the present(..) clause, where the Partition alias is used to indicate whether Events from that Partition are to be monitored as part of that Pattern or not. Multiple Patterns can be specified by separating them with the or clause. The when present(..) or present... syntax follows the Disjunctive Normal Form.

If one of the columns being projected by the alert clause comes from a Stream on which a present clause has a present(..and not partitionx) defined on it; and that Pattern happens to fire, then that column will be null in the composite Output Event.

The alert clause must be surrounded by a select clause which can introduce additional pseudo-columns like the comment column shown above, in addition to projecting the columns provided by the alert definition. An additional where clause can be provided after the alert clause, which acts as a post-filter on the Patterns it fires.

If a present clause does not have a not in it, then that Pattern will trigger as soon as the Event Id appears in all the Streams defined in that clause. This can be thought of as an "Immediate Matcher".

If a present clause contains atleast one not in it, then for that Pattern to fire, all the Events in the other Streams, other than the ones with not in them must appear and stay for atleast one cycle while the ones with the not must not appear. While such a condition is asserted, the Pattern fires, when one of the "present" Events exits from the Partition. So, this can be thought of as a "Delayed Matcher".

Also, for a Pattern with atleast one not in it, the Pattern must stay "true" throughout its life, to fire.

Delayed Matcher Examples: For a pattern present(a and b and not c), the following scenarios show when it fires.

1) (_,_,) | a->(a,_,_) | c->(a,_,c)

    No fire. b did not appear and c appeared instead.

2) (_,_,) | a->(a,_,_) | b->(a,b,_) | (_,b,_)->a

   Fire as soon as a exits the Partition Window.

3) (_,_,) | a->(a,_,_) | b->(a,b,_) | (a,b,_) | c->(a,b,c)

   No. Appearance of c taints the Pattern.

4) (_,_,) | a->(a,_,_) | b->(a,b,_) | (a,b,_) |
   c->(a,b,c) | (a,b,_)->c | (a,b,_)

   No fire. Pattern tainted by appearance of c even though
   it disappears in a subsequent cycle.

It must also be noted that a Pattern that has fired once will not fire again - i.e if an Event expires, while a part of the Pattern is still true and before the whole pattern expires, it gets asserted again. However, if the whole Pattern has expired and the it gets asserted again some time later, then it will fire again.

Event Weights

Events from each Stream can be assigned different weights. For example: Stream A can be setup such that it can trigger the Query only if there are 5 or more Events pending. This might be required, where Events from Stream A are very frequent, but don't carry very important information. Stream B might be less frequent, but might bring in more critical data. So, even a single Event from Stream B can be made to trigger the Query. Or, whenever the combined weight of Events across all Streams goes above One, the Query executes. Event weights are however ignored by the Query if Time Windows or Tumbling Windows have been defined. In such cases, the Weight cannot stop an Event from getting pushed out of the Window after it has expired, which in turn triggers the Query execution. The Query can also be setup to execute at fixed intervals, which ignores all other settings.

If a Partition is configured to use a Pre-Filter, i.e if it has a where clause as part of the partition definition, then the Event Weight is for those Events that make it through the Filter.

select .. from ..
(partition by store last ..
       where item_sku in (select item_sku from priority_item))
       as order_events
where order_events.$row_status is dead;
              

In the Query above, if the Event Weight is defined as 0.1, then if 10 Events arrive, out of which only 5 of them make it through the Filter (Item SKU is in the Priority Item list), then the accumulated weight will be 0.5; which is still less than 1.0 and so the Query will not get triggered.

When the Event weights are left at their default values, the Query will execute whenever an Event arrives. If several Events arrive in close succession, then Query might execute just once by consuming all the Events. This behaviour however, depends on the type of Window configured over the Stream.

1) If a Sliding Window has been configured on a Stream, then the Query is forced to execute once for each Event because the Sliding Window moves forward one Event at a time

2) If it were a Time Window, then the Query can consume all new Events at once. The Query also executes when an Event expires and has to be expunged from the Window

3) A Tumbling Window would consume the specified number of Events at once and discard them in the next cycle

4) It is also possible for the Query with say, a Sliding Window (A) and Time based Window (B) to run just once if the arrival of an Event in A and an Event expiry in B coincide. Thus, it is possible for "Query trigger conditions" to bunch up

Row status in Partitions

Windowing constructs are useful only if there is a mechanism to identify Events that got into, out of or still inside the Window. The row_status clause is useful in several situations:

1) You want to observe the Events that got expelled from a Window in the Partition in the current cycle. ($row_status is dead)

2) You want to observe the contents of a Window, which will include all the Events that are still inside the Window and the new Events that got added in the current cycle ($row_status is not dead)

3) You want to co-relate the Events that have been added into the Window in the current cycle with Events/Rows from another Stream/Table. Since you have already matched the older Events in the Window in some previous cycle and you don't want to re-match them again and also because using an Indexed column on a Table with less rows as the "driving-table" in a Join improves performance ($row_status is new). The SLA-Failure-Alert example makes best use of this feature.

Self Joins in Partitions

Most often, Partitions are created on multiple Streams and they are correlated using some shared characteristics like a common Customer Id, the same Bank transaction Id etc. There are some cases where the Events inside Partitions have to be compared with other Events inside the same Partition. In standard SQL, it is quite common to see such Self-Joins. On Streams, the same concept can be applied to implement some very interesting solutions.

Example: Police reports containing vehicle descriptions of suspects are sent as a continuous Stream of Events. Each report/description of a suspected vehicle is held for 30 minutes. Reports come from different Police jurisdictions. The Stream is Partitioned into several 30 minute Time Based Windows. i.e Each jurisdiction has a separate Partition/Window. Every time a new report comes in, the other Windows (other jurisdictions) are scanned for a similar vehicle description. If it matches, then the results are sent as query output. So, alerts are raised when suspects drive their vehicles and cross Police jurisdictions.

It would be very inefficient to define 2 identical Partitions on the same Stream and perform what is called a Self-Join in SQL. StreamCruncher provides the self# clause where a Partition on a Stream can be referred to with an alias which can then be used to perform the Self-Join.

select veh_events.state, veh_events.county,
       previous_veh_events.state, previous_veh_events.county,
       veh_events.veh_make, ....
from veh_report (partition by state, county store last
     30 minutes) as veh_events,
     self#veh_events as previous_veh_events
where veh_events.$row_status is new
         and previous_veh_events.$row_status is not dead
         and veh_events.county != previous_veh_events.county
         and veh_events.veh_make = previous_veh_events.veh_make
         and veh_events.veh_year = previous_veh_events.veh_year
         and veh_events.veh_color = previous_veh_events.veh_color
order by previous_veh_events.report_time desc;
              

The query above, matches new vehicle reports with older reports that were made from other Counties in the last 30 minutes. Such a report can help Police to plan better and respond quicker. By prefixing the alias of the original Partition with a self# clause and then naming the reference with a new alias, the same Partition can be Joined with itself, thereby helping to scan and identify Patterns within the same Stream.