resources
|
October 19, 2022

ksqlDB in Data Engineering at Games24x7

ksqlDB in Data Engineering at Games24x7
Share article

Within Games24x7, we have built a platform Data as a Service(DaaS) which primarily helps internal stakeholders including data scientists to create a set of attributes which they want to be updated in real time and can refer to the same using an API whenever it is needed.

For the above use case depending on the SLA and Request Per Sec(RPS), we use multiple query engines and one of them is ksqlDB.

ksqlDB in Games24x7

ksqlDB is built on top of Kafka Streams, a lightweight, powerful Java library for enriching, transforming, and processing real-time streams of data. Having Kafka Streams at its core means ksqlDB is built on well-designed and easily understood layers of abstractions.

ksqlDB supports interactive querying of tables materialized by persistent queries via pull queries. ksqlDB leverages the Kafka Streams library, to provide fault tolerance and table state replication for these persistent queries. You can execute a pull query by sending an HTTP request to the ksqlDB REST API, and the API responds with a single response.

Pull queries enable you to fetch the current state of a materialized view. Because materialized views are incrementally updated as new events arrive, pull queries run with predictably low latency. They’re a great match for request/response flows.

In DaaS, depending on use case we get requests for handling 10 RPS — 10000 RPS with SLA of <50ms in synchronous manner i.e. once DaaS API is called, applications / models of Data Scientist will wait for response from DaaS for doing subsequent actions.

Basic Flow of DaaS — ksqlDB Use Case

For serving such High Concurrency and Low Latency real time requests, we came across ksqlDB which serves the purpose. Generally ksqlDB supports it automatically but sometimes depending upon number of data points(columns) and size of data which needs to be returned throughput / latency can become a bottleneck.

So here we are sharing some configs / code changes which we made for increasing throughput and reducing latency.

Scenario 1 : We are observing low latency for a few thousand RPS but if we increase RPS further then we can see Throughput going down and Latency going up.

1. Horizontal / Vertical Scaling

First step generally in all distributed systems is to check if hardware resources are bottleneck i.e. if we are maxing out on CPU / Memory / Network Bandwidth etc. Generally based on our observation CPU peaks.

If yes then try to scale ksqlDB horizontally by adding more nodes to your cluster. You can scale it vertically as well depending on your environment.

2. Tuning Number of state stores of final ksqlDB table on which pull queries will run

If a table’s input stream has X partitions then your Stateful persistent queries create one state store per partition, so too many partitions can result in an excessive number of state stores, each with a very thin shard. You need adequate partitions to parallelize your workload completely across all nodes in your cluster.

ksqlDB docs recommends that a good balance is achieved with twice as many partitions as nodes in your cluster however given node in a ksqlDB deployment can handle more than one input partition, so partition counts don’t need to be the same as the node count.

Having said that it is always best to benchmark it in your use case and similarly we have tried different partition counts and came up with a number depending on our SLA of throughput/latency.

3. Other ksqlDB Configs

ksql.streams.cache.max.bytes.buffering / ksql.streams.statestore.cache.max.bytes

The maximum number of memory bytes to be used for buffering across all threads. The default value in ksqlDB is 10000000 (~ 10 MB). This buffer is also used for speeding up the read performance and hence can be tuned on the basis of application needs, data size and available resources.

These properties can be either set in ksqlDB terminal session.Also can be set in ‘ksql-server.properties’ like →

ksql.streams.cache.max.bytes.buffering=20000000

Scenario 2 : Throughput and Latency is in expected range but Intermittently throughput going down and Latency going up without CPU / Memory Maxing out. Also in a few cases queries are returning ERROR.

1. Some nodes in ksqlDB cluster might being marked as dead

Since ksqlDB is running in distributed mode so it is possible that one or more nodes in the cluster is down and it could be due to any reason so during that time to avoid failure of pull queries, ksqlDB supports High Availability via heartbeat mechanism.

Every ksqlDB server broadcasts its heartbeat using the existing internal REST API to other nodes of the cluster. ksqlDB servers register the heartbeats they receive and process them to determine which server failed to send its heartbeat within a window of two seconds, for example. Using this information, a server can determine the health status of the other servers in the cluster and avoid wastefully routing queries to failed servers. To read more about this you can read this detailed article Highly Available, Fault-Tolerant Pull Queries in ksqlDB.

To confirm that nodes are being marked as dead, you can look for the below message in log files and confirm the same.

INFO Host: <HostIP>:8088 marked as dead.

If it is happening then that means that active nodes in the cluster are not forwarding pull query requests to dead nodes and hence doing extra work which sometimes contributes to increase in latency. Also few requests which were already forwarded to dead nodes as remote requests might result in error.

In most of such scenarios what we have seen is that actually the dead node is active but is not able to communicate with other nodes and sometimes due to high concurrency few heartbeats are missed by other nodes and they marked it as dead.

So we can change the default value of heartbeat sending interval and heartbeat missed threshold count after which node will be marked as dead.

Variables are ksql.heartbeat.send.interval.ms / ksql.heartbeat.missed.threshold.ms and you can change it to other values like

ksql.heartbeat.send.interval.ms=200
ksql.heartbeat.missed.threshold.ms=10

Note :- Above configurations should be changed after having understanding of these configs and impact of the same on user application. Because increasing its values will result in late detection of failure of nodes.

2. Garbage Collection

Garbage collection is also one of the problems which we have observed sometimes in ksqlDB. As of writing time of this document ksqlDB(v0.27) supports Concurrent Mark Sweep(CMS) Collector and an issue is already opened for changing it to G1GC.

You can change heap size of ksqlDB application in “bin/ksql-run-class” file like

KSQL_HEAP_OPTS="-Xms3g -Xmx9g"

Mostly ksqlDB provides only Xmx input as value in KSQL_HEAP_OPTS but we can add Xms as well. In our case setting Xms solved initial frequent GC issues because what we have observed initially ksqlDB takes very little heap(in MBs) and then gradually increases but does MajorGC frequently at the same time.

Honestly, Individual applications might need to be tuned differently in case GC is the culprit.

Scenario 3 : Observing low latency for a few hundred RPS and if increased RPS further then the ksqlDB is not responding at all.

There is already an open issue with ksqlDB where in some corner cases it can go in deadlock. We have found this in ksqlDB v0.25.However we tuned the following 2 variables and saw that now we can run more RPS which was not running earlier and going in lock.

ksql.query.pull.thread.pool.size : Size of thread pool used for coordinating pull queries
ksql.query.pull.router.thread.pool.size : Size of thread pool used for routing pull queries