Extending Play’s validation to work with Big Data tools like DynamoDB, S3, and Spark
In this two-part blog series, we are looking at how MediaMath uses Play’s API to perform data validation on big data pipelines. In part one, we covered data validation with Play’s combinator-based API. In part two, we’ll extend that data validation to work with Amazon Web Services DynamoDB, AWS S3, and Spark.
Extending validation to work with AWS DynamoDB
MediaMath uses a variety of technologies in our analytics stack, including AWS DynamoDB. DynamoDB is a distributed, fault-tolerant key value store as a service that makes it easy to store/query massive datasets. We use it to power a few internal troubleshooting tools which have front-end/API layers written in Play.
That said, a downside of using the AWS Java SDK from Scala is that it feels quite verbose and unidiomatic. We really liked the succinct JSON API from Play and wanted to see if it could be extended to create a data-binding layer for DynamoDB’s Item
instead of JSON docs. It turns out this was quite easy to do and the results are now open sourced as Play DynamoDB.
Working with Play DynamoDB is very similar to working with the Play JSON API.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
```scala /** * Example item as returned from DynamoDB API */ val item = Map( "username" => new AttributeValue().withS("themodernlife"), "favoriteQuotes" => new AttributeValue().withSS("Audentes fortuna iuvat", "Festina lente"), "githubUrl" => new AttributeValue().withS("https://github.com/themodernlife"), "commits" => new AttributeValue().withN("25"), "createdAt" => new AttributeValue().withS("2014-05-19 11:26:00") ) /** * Case class for objects stored in DynamoDB */ case class User(username: String, favoriteQuotes: Set[String], githubUrl: Option[String], commits: Int, createdAt: DateTime) /** * Override default date parsing */ implicit val jodaDateTimeReads = Reads.dateTimeReads("yyyy-MM-dd HH:mm:ss") /** * Formula for validating a User case class */ implicit val userReads = ( DdbKey("username").read[String] and DdbKey("favoriteQuotes").read[Set[String]] and DdbKey("githubUrl").read[Option[String]] and DdbKey("commits").read[Int] and DdbKey("createdAt").read[DateTime] )(User) /** * Perform the validation and convert to case class */ val user = Item.parse(item).validate[User] ``` |
As you can see, the code is almost the same:
- Create your domain object of type
D
- Create your blueprint for parsing it from some type
T
, in this case a DynamoDBItem
- Use Play’s functional/combinator constructs to map from
Item => DdbResult[D]
Let’s take this further!
—————————————-
OK, how does that relate to processing big data pipelines?
There has been a lot of discussion around splitting out Play’s JSON API into its own project, as can be seen from this pull request. It makes a lot of sense because it nicely generalizes the issue of translating data to and from a potentially unsafe wire format in a fully type safe way.
Development work on the new Validation API happens in GitHub at the Play data validation library, where it already unifies parsing of JSON and HTML forms. Recently MediaMath submitted a patch to extend it to work with CSV/TSV delimited files like so:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
```Scala case class Contact(name: String, email: String, birthday: Option[LocalDate]) val contactReads = From[Delimited] { __ => ( (__ \ 0).read[String] and (__ \ 1).read(email) and (__ \ 2).read(optionR[LocalDate](Rules.equalTo("N/A"))) )(Contact)} val csv1 = "John Doe,joe@example.com,1981-07-24" val csv2 = "Jane Doe,jane@example.com,N/A" contactReads.validate(csv1.split(",")) mustEqual Success(Contact("John Doe", "joe@example.com", Some(new LocalDate(1981, 7, 24)))) contactReads.validate(csv2.split(",")) mustEqual Success(Contact("Jane Doe", "jane@example.com", None)) ``` |
With the new API you create and combine Rules
that can bind and validate records from TSV files. Add Apache Spark to the mix and you get a very compelling development environment for doing fast, reliable and type safe data processing over enormous data sets (which are often stored as lines of JSON or CSV/TSV).
Processing S3 Access Logs in Spark
———————————-
We are heavy users of S3 at MediaMath and have enabled S3 Access Logging on many of the buckets we use for production datasets. An S3 Access Log looks like this:
1 2 3 |
``` 2a520ba2eac7794c5663c17db741f376756214b1bbb423214f979d1ba95bac73 bidder-data-bucket [16/Jul/2014:15:47:42 +0000] 74.121.142.208 arn:aws:iam::000000000000:user/test@mediamath.com XXXXXXXXXXXXXXXX REST.PUT.OBJECT 2014-07-16-15/bd.ewr-bidder-x36.1405524542.log.lzo "PUT /2014-07-16-15/bd.ewr-bidder-x36.1405524542.log.lzo HTTP/1.1" 200 - - 18091170 931 66 "-" "aws-sdk-java/1.7.5 Linux/3.2.0-4-amd64 OpenJDK_64-Bit_Server_VM/20.0-b12/1.6.0_27 com.amazonaws.services.s3.transfer.TransferManager/1.7.5" - ``` |
What if we want to do some data analysis on these access logs, using Spark?
First, let’s create our domain object
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
```scala case class S3AccessLog( bucketOwner: String, bucket: String, time: DateTime, remoteIp: String, requester: String, requesterId: String, operation: String, key: String, requestUri: String, httpStatus: Int, errorCode: Option[String], bytesSent: Option[Long], objectSize: Option[Long], totalTime: Long, turnAroundTime: Long, referrer: String, userAgent: String, versionId: String ) ``` |
The S3 access logs need a little processing before we can simply treat them as a “space-delimited” file. For example, empty values are represented by the string -
. Fortunately, we can account for all that by chaining multiple Rule
s together to create a new Rule
which maps from an Array[String]
(aliased as Delimited
) to our S3AccessLog
domain object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
```scala class S3AccessLogScheme(csvParser: CSVParser = new CSVParser(' ')) extends Scheme[S3AccessLog] { private val dateTimePattern = """(.*)\[(\d\d/\w\w\w/\d\d\d\d:\d\d:\d\d:\d\d (\+|-)?\d\d\d\d)\](.*)""".r implicit val reader = From[Delimited] { __ => ( (__ \ 0).read[String] ~ (__ \ 1).read[String] ~ (__ \ 2).read(jodaDateRule("dd/MMM/yyyy:HH:mm:ss Z")) ~ (__ \ 3).read[String] ~ (__ \ 4).read[String] ~ (__ \ 5).read[String] ~ (__ \ 6).read[String] ~ (__ \ 7).read[String] ~ (__ \ 8).read[String] ~ (__ \ 9).read[Int] ~ (__ \ 10).read(optionR[String](equalTo("-"))) ~ (__ \ 11).read(optionR[Long](equalTo("-"))) ~ (__ \ 12).read(optionR[Long](equalTo("-"))) ~ (__ \ 13).read[Long] ~ (__ \ 14).read[Long] ~ (__ \ 15).read[String] ~ (__ \ 16).read[String] ~ (__ \ 17).read[String] )(S3AccessLog.apply _)} private def fixDate(s: String): String = dateTimePattern.replaceAllIn(s, "$1\"$2\"$4") def translate(record: String): VA[S3AccessLog] = { val sanitized = fixDate(record) reader.validate(csvParser.parseLine(sanitized)) } } ``` |
Now we’re ready to start crunching these logs with Spark:
1 2 3 4 5 6 7 8 9 10 11 |
```scala def main(args: Array[String]) = { val sc = new SparkContext("local", "") lazy val scheme = new S3AccessLogScheme() sc.textFile("s3://mm-log-files/S3AccessLogs/bidder-data-bucket/2014-07-16-16-32-16-EC08CB17F0F10717") .map(scheme.translate) // Type of the stream at this point is Validation[Seq[ValidationError], S3AccessLog] .foreach(println) } ``` |
The data pipeline above is fully typed, resilient to bad records and can be joined, grouped and aggregated with compile time type checks and IDE-based code completion. Much nicer than hardcoding column names all throughout your job!
More to come
————
This series just scratched the surface of what’s possible with the strong, combinator-based approach to data translation and validation offered by the new Play Validation API. I really hope the project catches on and can stand on its own two feet (without Play). In the future, we’d like to merge our Play DynamoDB library into it as well. As we’ve shown, the enhanced type safety and reusable parsing logic can be used in many ways outside of a traditional web app.
If you’d like more info, check out these links:
- An announcement of the new Play 2.3 validation API from Julien Tournay (http://jto.github.io/articles/play_new_validation_api)
- Play DynamoDB (https://github.com/MediaMath/play-dynamodb)
- The new Play validation API project (https://github.com/jto/validation)
And if you want to learn more about how we use Spark, AWS, and Play to reimagine marketing here at MediaMath, send me a note at @themodernlife or get in touch in the comments.
Can you point me to something that might explain this syntax?:
case class Contact(name: String, email: String, birthday: Option[LocalDate])
val contactReads = From[Delimited] { __ => (
(__ \ 0).read[String] and
(__ \ 1).read(email) and
(__ \ 2).read(optionR[LocalDate](Rules.equalTo(“N/A”)))
)(Contact)}