Queryable States in ApacheFlink – Part 2: Implementation
This is part 2 of the blog Queryable States in Apache Flink. In the previous blog, we saw how Apache Flink enabled Queryable States. In this part, we will create a Streaming Job with Queryable States and create a QueryClient to query the state. I assume that Flink is already installed and setup. If not you can check out my earlier blog post on installation here. I will be using a Tumbling window in this example, to read about Windows in Flink, please read this blog post.
All the code used in this blog post will be available on my GitHub.
Creating the Pipeline
Let us now create a streaming job with QueryableState. In this example, our input is climate log which is of the format country, state, temperature, humidity where country and state are Strings, temperature and humidity are Floats. We will first create case class to hold these logs.
1 2 3 4 5 6 7 |
<span class="hljs-keyword">case</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ClimateLog</span>(<span class="hljs-params">country: <span class="hljs-type">String</span>, state: <span class="hljs-type">String</span>, temperature: <span class="hljs-type">Float</span>, humidity: <span class="hljs-type">Float</span></span>)</span> <span class="hljs-class"><span class="hljs-keyword">object</span> <span class="hljs-title">ClimateLog</span> </span>{ <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">apply</span></span>(line: <span class="hljs-type">String</span>): <span class="hljs-type">Option</span>[<span class="hljs-type">ClimateLog</span>] = { <span class="hljs-keyword">val</span> parts = line.split(<span class="hljs-string">","</span>) <span class="hljs-keyword">try</span>{ <span class="hljs-type">Some</span>(<span class="hljs-type">ClimateLog</span>(parts(<span class="hljs-number">0</span>), parts(<span class="hljs-number">1</span>), parts(<span class="hljs-number">2</span>).toFloat, parts(<span class="hljs-number">3</span>).toFloat)) } <span class="hljs-keyword">catch</span> { <span class="hljs-keyword">case</span> e: <span class="hljs-type">Exception</span> => <span class="hljs-type">None</span> } } } |
We can then read the logs from a socket using
1 2 |
<span class="hljs-keyword">val</span> climateLogStream = senv.socketTextStream(<span class="hljs-string">"localhost"</span>, <span class="hljs-number">2222</span>) .flatMap(<span class="hljs-type">ClimateLog</span>(_)) |
We will create a KeyedStream and apply a Tumbling TimeWindow of 10 seconds. This will cause the window to be evaluated each time it tumbles. In the apply function, we will do a simple aggregation to sum up all the values of temperatures and humidities seen in that window.
1 2 3 4 5 6 7 8 9 |
val climateLogAgg = climateLogStream .keyBy(<span class="hljs-string">"country"</span>, <span class="hljs-string">"state"</span>) .timeWindow(Time.seconds(<span class="hljs-number">10</span>)) .apply(<span class="hljs-function"><span class="hljs-params">(key: Tuple, w: TimeWindow, clogs: Iterable[ClimateLog], out: Collector[ClimateLog])</span> =></span> { val agg = clogs.reduce(<span class="hljs-function"><span class="hljs-params">(c1: ClimateLog, c2: ClimateLog)</span> =></span> c1.copy( temperature = c1.temperature + c2.temperature, humidity=c1.humidity + c2.humidity)) out.collect(agg) }) |
QueryableStateStream
Now we will create a Stream that is queryable. To do that, we need a StateDescriptor that describes the type of elements that are going to be stored in the stream. We will create a ReducingStateDescriptor that aggregates the values seen so far. The ReducingStateDescriptor takes three parameters, first parameter is the name, second is the reducing function that has to be applied when new elements are added to the state, and the third describes the type of values that are going to be stored in the state.
1 2 3 4 5 6 7 8 9 10 |
val climateLogStateDesc = new ReducingStateDescriptor[ClimateLog]( <span class="hljs-string">"climate-record-state"</span>, reduceFunction, TypeInformation.of(new TypeHint[ClimateLog]() {})) val reduceFunction = new ReduceFunction[ClimateLog] { override <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">reduce</span><span class="hljs-params">(c1: ClimateLog, c2: ClimateLog)</span>:</span> ClimateLog = { c1.copy( temperature = c1.temperature + c2.temperature, humidity=c1.humidity + c2.humidity) } } |
Once that is done, we call asQueryableState function to make the stream queryable and pass the state descriptor created.This is shown below.
1 2 3 4 |
<span class="hljs-keyword">val</span> queryableStream = climateLogAgg .keyBy(<span class="hljs-string">"country"</span>) .asQueryableState(<span class="hljs-string">"climatelog-stream"</span>, climateLogStateDesc) senv.execute(<span class="hljs-string">"Queryablestate example streaming job"</span>) |
Note the first parameter while calling the asQueryableState state function, this is the queryableStateName which is used for identifying the stream. This will be later used by the QueryClient while querying.
QueryClient
Now we will move on to the creating the QueryClient. The client is going to be a separate application that queries the state of an already running Streaming job. First thing that the client needs to know is how to connect to the JobManager (remember the diagram from the previous blog?), which can be configured as follows
1 2 3 |
<span class="hljs-keyword">val</span> config = <span class="hljs-keyword">new</span> <span class="hljs-type">Configuration</span> config.setString(<span class="hljs-type">ConfigConstants</span>.<span class="hljs-type">JOB_MANAGER_IPC_ADDRESS_KEY</span>, <span class="hljs-string">"localhost"</span>) config.setString(<span class="hljs-type">ConfigConstants</span>.<span class="hljs-type">JOB_MANAGER_IPC_PORT_KEY</span>, <span class="hljs-string">"6123"</span>) |
Next we create an instance of QueryableStateClient and also Serializers for key and the value. The key serializer is used to create a serializedKey. The value serializer will be used later to deserialize the result returned back from the query. In the below example, we are asking the state to return the current running state value for the country “USA”.
1 2 3 4 5 6 7 8 9 10 |
val client = new QueryableStateClient(config) val execConfig = new ExecutionConfig val keySerializer = createTypeInformation[String].createSerializer(execConfig) val valueSerializer = TypeInformation.of(new TypeHint[<span class="hljs-string">ClimateLog</span>]() {}).createSerializer(execConfig) val key = "USA" val serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( key, keySerializer, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE) |
Now we can query the state using the client. Pass the serializedKey, JobID and queryableStateName as parameters. JobID can be obtained either from the Flink UI or from the job submission log. Note that climatelog-stream parameter which should be same as the queryableStateName used during job submission.
1 |
<span class="hljs-keyword">val</span> serializedResult = client.getKvState(jobId, <span class="hljs-string">"climatelog-stream"</span>, key.hashCode(), serializedKey) |
The query returns a Future object which can be accessed as follows. If the query was successful, then we can use the valueSerializer to deserialize and read the result. In this case, the deserialized result is an instance of the ClimateLog case class.
1 2 3 4 5 6 7 8 9 10 |
serializedResult onSuccess { <span class="hljs-keyword">case</span> result ⇒ { <span class="hljs-keyword">try</span> { val clog: <span class="hljs-type">ClimateLog</span> = <span class="hljs-type">KvStateRequestSerializer</span>.deserializeValue(result, valueSerializer) <span class="hljs-built_in">println</span>(s<span class="hljs-string">"State value: $clog"</span>) } <span class="hljs-keyword">catch</span> { <span class="hljs-keyword">case</span> e: <span class="hljs-type">Exception</span> ⇒ e.printStackTrace() } } } serializedResult onFailure { <span class="hljs-keyword">case</span> uk :<span class="hljs-type">UnknownKeyOrNamespace</span> ⇒ <span class="hljs-built_in">println</span>(uk.getMessage) <span class="hljs-keyword">case</span> e: <span class="hljs-type">Exception</span> ⇒ <span class="hljs-built_in">println</span>(e.getMessage) } |
To test the job, open a terminal and run netcat.
1 |
nc -lk 2222 |
Now submit the job using flink command line interface
1 2 |
flink run target/scala-<span class="hljs-number">2.11</span>/flink-vishnu-assembly-<span class="hljs-number">1.0</span>.jar Submitting job <span class="hljs-keyword">with</span> JobID: ec685d96da49644ab025c8f9a27ca07a. Waiting <span class="hljs-keyword">for</span> job completion |
Now all that is left to do is send some sample messages through netcat, and run the QueryClient with the JobId and other parameters.
There are a few possible Exceptions that can occur at this point.
1) Actor not found
1 |
Actor <span class="hljs-keyword">not</span> found <span class="hljs-symbol">for:</span> ActorSelection[Anchor(akka.<span class="hljs-symbol">tcp:</span>/<span class="hljs-regexp">/flink@localhost:6123/</span>), Path(<span class="hljs-regexp">/user/jobmanager</span>)] |
Make sure that your Flink cluster is up and running. Also you have to submit the Job through the command line, not from the IDE.
2) Job not found
1 |
java.lang.IllegalStateException: Job d8a3b9f9b8e6da33aa714633cee61c3b not found |
This is an easy one, just make sure that JobId passed matches with that of the running job.
3) No KvStateLocation found
1 |
org.apache.flink.runtime.query.UnknownKvStateLocation: No KvStateLocation found <span class="hljs-keyword">for</span> KvState instance <span class="hljs-keyword">with</span> name <span class="hljs-string">'climatelog-stream-temp'</span> |
Make sure that the state name(climatelog-stream) in the client matches with the one that was used during job submission.
4) KvState does not hold any state for key/namespace
1 |
org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace: KvState does not hold <span class="hljs-built_in">any</span> state <span class="hljs-keyword">for</span> key/<span class="hljs-keyword">namespace</span> |
This means that the stream that you are tying to query does not have the key(in this example – “USA”) that you are looking for. Did the messages that were sent through netcat have the key that is being used in the query?
5) Could not deserialize value
1 2 3 4 |
java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:310) at org.apache.flink.types.StringValue.readString(StringValue.java:770) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize |
Which indicates that something is wrong with the ValueSerializer. The easiest way to fix this is by going back to your Streaming Job code and making sure that you use the exact same TypeInformation in the client as used in the Job. e.g., using createTypeInformation[ClimateLog] instead of TypeInformation.of(new TypeHint[ClimateLog]() {}) can cause exception.
To summarize, we saw how Apache Flink enables querying it’s internal state and how we can develop a pipeline and query client to do so. Apart from Flink, Kafka also provides this feature.
That concludes the post and hope it was useful. Thanks for reading!
Vishnu Viswanath
Data Engineer at MediaMath Vishnu Viswanath is a Data Engineer at MediaMath, with over 5 years of experience in designing and building various scalable and efficient systems and has expertise in most of the BigData stacks. He is a relentless tech-enthusiast and likes to keep himself up to date with the technologies in vogue and a keen open source believer and contributor too. When he is not coding his way to “geek” status, he loves traveling and has a serious case of wanderlust.