Real-time Streaming Attribution Using Apache Flink

// 09.12.2016 // Data

In this blog post, I will share a proof of concept for real-time attribution using Apache Flink from streaming data sources of impressions and events, and how we handled some of the specific problems inherent in windowing and processing real-time data streams at scale. Our goal was to determine if we could use Flink to stream impression and event data so that we could determine attribution in real time in order to optimize advertising strategies immediately.

In digital advertising, we refer to ads – whether they are served on social networks, Mobile, Video, or display – as impressions. Once the impressions are served, we need to know if the users liked the ad based on their reaction to it – like if they clicked on the ad or converted on the advertiser’s website – and we classify these actions as events. Attribution is the process of matching these events with impressions to assign the credit of a conversion event to the impressions that lead to the conversion, so that advertisers can see what creatives, audience segments, and publishers lead to more valuable conversions and optimize their campaigns accordingly.

There are several advantages for a stream processing system compared to a batch processing system. In a batch world, you define a batch size which is usually at least an hour and all the data that was buffered within that hour is used for the computation, which means that you have to wait at least an hour to see some result. You might have noticed, I used the word “buffered,” since all the data sources are essentially streaming data sources, since any data source that you can think of produces data one record at a time, which aligns with the architecture of a streaming system compared to a batch system. Also, in a streaming system you have more control over the actions that have to be performed at a much more granular level (per message), compared to batch where you see all the buffered data together.

Architecture

picture1

Flink reads raw messages from Kafka topics and parse these messages into Streams of EventRecords and ImpressionRecords. It then does some filtering and joins with the metadata file stored in S3 to enrich the records. These two streams are connected using Flink’s ConnectedStream API since we need both Impressions and Events in the same window to do the attribution. We apply an attribution function to this connected stream and make the results available for querying through QueryableStateStream API and in the Druid datastore.

Windowing and Attribution

Since a stream is an unbounded set of elements, we need to apply some kind of boundary (in this case a time-based window) before we can apply a function on it. A RollingWindow rolls over the elements keeping only the latest 30 days of data in the window, at the same time maintaining only one window. The connected stream is keyed by UUID (user ID) and AdvertiserID and RollingWindow* of 30 days is applied on it.

*(note: RollingWindows is not an API available in Flink but we simulate a RollingWindow by managing our own state and removing elements from the state which are older than 30 days. We choose to use this abstraction since the Windowing options provided by Flink – such as SlidingWindow, TumblingWindow etc does not provide us the exact behavior that we expect.)

Keying the stream of records splits the window into multiple smaller windows called Panes corresponding to each key. This helps in reducing the number of elements that has to be held in memory during attribution, since for a given UUID and AdvertiserID, the number of Impressions and Events for the last 30 days is not huge. (I will be using the term Window instead of Pane in the rest of the blog, even though it is per Pane that the attribution logic is applied)

picture2

The window is triggered whenever an Event arrives and the attribution logic is applied on the elements within the window. We manage the contents in the window by using RocksDB state backend, i.e., all the events and impressions are saved into the state we maintain and are evicted from the state that Flink maintains for the window. This was done to enable complex eviction logic that we need to perform, which could not be achieved with the existing capabilities of Flink Evictors.

The Attribution function goes through the impressions and events within the window looking for following patterns:

  1. Impression – Click Event – Conversion Event
  2. Impression – Conversion Event

Here a) is called a PostClick event. Where there should a Click that matches with the Impression (joined using AuctionID) such that Click’s event timestamp is after Impression’s event timestamp. There should also be a Conversion Event that matches with the Impression (joined using PixelID) that occurred after the impression within a specified maximum time period (called PV time lag). If the function finds a Click-Impression match (let us call it C-I) and a Conversion Event-Impression match (E-I), then it tries to find a (C-I), (E-I) match such that the Conversion Event occurred within a specified maximum time period (called PC time lag) after the Impression that matched with the Click. If such match is found, function emits PC+1. Indicating that it has found 1 new Post Click event.

b) is a subset of a), where we could not find any matching Click for the impressions but we could find a Conversion Event that occurred after Impression. This is called a Post View event, and we emit PV+1 in this case.

Handling Out-of-Order Messages

In an ideal situation, we receive the messages (events and impressions) in the same order as they occurred – this is the case where EventTime ordering is same as the ProcessingTime ordering. But mostly, this is not the case. If we consider three types of messages – Impressions (i), Click Event (c) and Conversion Event (e). There are 6 possible ways in which this could occur in the real world. And for each of these 6 cases, there are 6 possible ways in which these messages could arrive at our system. So, considering only 3 messages, we have 36 different cases to handle. The important point to note here is that the state of the system should be based EventTime of the message, irrespective of the order in which they arrive. Consider the below example:

picture3

In the ideal case, a user sees an Impression, Clicks on it and then Buys the product. So the order of messages is Impressions – Click Event – Conversion Event. When we receive these messages, our attribution function emits a PC+1 (a PostClick event) – the state of the system is PC+1.

But sometimes we receive these messages out of order. For example, it is possible that we receive the Impression, then a Conversion Event and later on receive the Click event.  When our system sees an Impression and a Conversion Event, it emits PV+1, saying that it has found a Post View event – the state of the system is PV+1. But there was a Click event which occurred in between impression and event but arrived after the Conversion event.

Now our Flink processor should identify that there is a need for correction, and emit and PC+1 (found a PostClick event) and PV-1 (to ignore the previously emitted PV+1). So the final state of the system is (PV+1) + (PC+1) + (PV-1) = PC+1. This should be the final state even if the arrival order was (c – i – e), (c – e – i), (e – i – c) or (e – c – i).

We set the TimeCharacteristic of Flink to EventTime, and created a timestamp and Watermark generator that extracts the timestamp field from the messages to let Flink know which is the EventTime of the record. Once event time is enabled, Flink will assign the messages to the windows based on EventTime rather than the ProcessingTime. This feature is useful in the case of SlidingWindows, where there are multiple windows open at the same time. Enabling EventTime processing lets Flink assign the elements to the correct windows based on EventTime, rather than the ProcessingTime.  But in our case, we have only one window (Rolling Window), hence all the elements are assigned to the same 30-day RollingWindow.

(Note : Even though Flink assigns the messages to the windows based on the EventTime, the messages are currently not sorted within the Window based on the EventTime, although there are plans to change this in upcoming releases.)

Querying the Stream

The window function emits a stream of AttributedRecords which contains a set of keys such as CampaignId, AdvertiserId and the delta (+1 or -1) for PVs, PCs, Clicks and Impressions. The metrics are aggregated by using a ReduceFunction, which adds up all the counts to give the aggregated state. The next step is to be able to query the state of the stream even before the result is stored in some external datastore. We made use of Flink’s upcoming feature – QueryableStateStream which lets us query the internal state of Flink to see the attribution results before writing into external datastore.

Conclusion

We were able to process about 40gb of data in 4 hours running this job on AWS cluster with 5 task managers and 15 task slots with 1.3gb of memory for each slot. Doing attribution in real-time can help customers see how well their campaigns are performing without having to wait hours for a batch report to run, and we could easily modify the attribution function to do more complex attribution logic.

Thanks for reading and hope the article gave some insights into the domain of real-time streaming attribution with Apache Flink.

A Picture of Vishnu Viswanath

Vishnu Viswanath

Data Engineer at MediaMath Vishnu Viswanath is a Data Engineer at MediaMath, with over 5 years of experience in designing and building various scalable and efficient systems and has expertise in most of the BigData stacks. He is a relentless tech-enthusiast and likes to keep himself up to date with the technologies in vogue and a keen open source believer and contributor too. When he is not coding his way to “geek” status, he loves traveling and has a serious case of wanderlust.  
0 Comments.

Leave a Reply

Your email address will not be published. Required fields are marked *