Nowadays events sound modern in IT. What you have in mind is probably asynchronous processing of data, which is considered good, because it saves precious CPU time and code acts when it needs to act – no polling, no repeating, just like hardware interrupts. This is well understood for tech-savvy people, but where does complex come from? It’s all about reasoning.

Use case

Imagine you want your house to be smart in terms of controlling the temperature and saving electricity. Let’s assume the following requirements:

  • there are 5 rooms in total (2 bedrooms, 1 living room, 1 kitchen, 1 bathroom),
  • there are 2 inhabitants equipped with smartphones, each owns one bedroom and the other rooms are shared,
  • there is a temperature sensor in every room which delivers readings every minute,
  • there is an air conditioner in every room which we can control at basic level (described later),
  • we have access to light switch states (lights on or off),
  • basic level of the desired temperature is 19°C,
  • we want the temperature to be 22°C when the room is used,
  • the room is used when the user is at home at daytime or when lights are on at night,
  • we want to heat shared space when a specific user is coming home (that assumes we have access to GPS data),
  • we want to heat also bedroom when it’s night time,
  • having an external source of weather prediction we want to exploit it (eg. expected outside temperature is 23°C, so we keep AC turned off).

 

What do we have exactly?

  • 5 temperature sensors delivering readings every minute (Sensor Stream),
  • 5 light switches with binary states (Light Switch Stream),
  • source of weather prediction (let’s assume it returns predicted temperature every hour) (Weather Prediction Stream),
  • 2 smartphones delivering their GPS coordinates (User Stream),
  • timer (which just returns current time) (Timer Stream).

These were our input sources. What output sources do we have?

  • 5 air conditioners.

Every air conditioner exposes simple API:

  • startHeating()
  • stopHeating()
  • startCooling()
  • stopCooling()

Having that we need to implement everything in between. We are the controller, the hub which reads data from all the inputs and acts accordingly controlling outputs. This data will be delivered to us – the controller – in an asynchronous manner. The data doesn’t start or stop – it’s a stream of data. We need to reason if, which and how strong heating/cooling should be started. Can you see the ifs, whiles, threads and other unpleasant stuff yet? Can you come up with any naive algorithm, simple enough to start (pseudo) coding? I’ve tried it for half an hour and I considered it too complicated. I’d like to offer a different approach. What do we really need? If something told us to start specific air conditioner in a specific mode we would have all we need.

 

Let’s make assumptions about what input streams are offering us:

// every minute
SensorStream {
  timestamp: long,
  roomId: int,
  temperature: float
}
// every minute
LightSwitchStream {
  timestamp: long,
  roomId: int,
  isEnabled: boolean
}
// every hour
PredictionStream {
  timestamp: long,
  predictedTemperature: float
}
// irregular
UserStream {
  timestamp: long,
  latitude: float,
  longitude: float,
  userId: int
}

We have 5 input streams and if we had 1 output stream with events eg.

OutputStream {
  sensorNumber: int,
  mode: HEAT|COOL
}

where sensorNumber is roomId. We could easily act on these events as it would be easy to translate them to Sensor API.

Layers of events

I told you about reasoning before. This is what the blackbox on the previous picture is doing. We can create virtual streams joining events from several sources, applying small logic e.g. filtering and leading us to final output stream i.e. we pick one or more input streams, we transform them and transfer to the newly created output stream which can be reused later. Let’s think about what we need. If we knew answers to these questions we would be much closer to the goal:

  • stream of events indicating if it’s daytime (DayTime Stream),
  • stream of events indicating if the user is near/inside the home (User Place Stream),
  • stream of events indicating if the room is used (Room Stream).

DayTime Stream

Using Timer Stream and sunrise equation we can create DayTime Stream. It would emit events twice a day:

{
  timestamp: long,
  isDaytime: true/false
}

User Place Stream

By Using User Stream we have access to user’s coordinates. We can assume that if the user is within a specific radius around the home for X consecutive readings, we mark that user as AT_HOME. OUTSIDE_HOME otherwise.

{
  timestamp: long,
  userId: int,
  userPlace: AT_HOME|OUTSIDE_HOME
}

Room Stream

This is a good place to provide information about rooms: to whom they belong to and whether they are used or not.

{
  timestamp: long,
  roomId: int,
  userId: int, // null if common space
  isUsed: boolean
}

You may ask why do we even need them? Well, keep in mind that input streams might not be easy to use directly e.g. Light Switch Stream is delivering data constantly this way:

LightSwitchStream(roomId = 1, isEnabled = true)
LightSwitchStream(roomId = 1, isEnabled = true)
LightSwitchStream(roomId = 1, isEnabled = true)
LightSwitchStream(roomId = 1, isEnabled = true)
LightSwitchStream(roomId = 1, isEnabled = false)
LightSwitchStream(roomId = 1, isEnabled = false)

Do we need to act on every event? Most likely no. We are interested only in changes of states, hence we create virtual stream Room Stream which keeps track only of state’s changes (true to false and vice versa) and also adds information if particular room is common space:

RoomStream(roomId = 1, userId = 2, isUsed = true)
RoomStream(roomId = 1, userId = 2, isUsed = false)
RoomStream(roomId = 1, userId = 2, isUsed = true)

Putting it all together, if we have access to the latest events of streams we are interested in, we can decide what to do with air conditioners (ie. emitting output event). Let’s assume our latest combined state looks like as follows (timestamps omitted for readability):

DayTimeStream(isDaytime = false)
UserPlaceStream(userId = 1, userPlace = AT_HOME)
RoomStream(roomId = 1, userId = 1, isUsed = true)
SensorStream(roomId = 1, temperature = 21C)
PredictionStream(predictedTemperature = 15C)

We know:

  • what time of day is it,
  • user 1 is at home,
  • user 1 is using his room,
  • there is 21°C in his room,
  • predicted temperature outside will be 15°C.

Comparing it against requirements we know what to do: emit event OutputStream(sensorNumber = 1, mode = HEAT). If the latest combined state changes (eg. room stops being used) the logic is invoked again and we take appropriate action.

Summary

Obviously, there are many issues to be resolved (the order events are received, handling different rooms/users at once, etc.), but it needs an introduction to specific CEP engine which will be discussed in second part. Hopefully, it has broadened your eventful thinking so far and you are ready for the more technical part.