ThingsBoard数据采集性能

One of the key features of ThingsBoard open-source IoT Platform is data collection and this is a crucial feature that must work reliably under high load. In this article, we are going to describe steps and improvements that we have made to ensure that single instance of ThingsBoard server can constantly handle 20,000+ devices and 30,000+ MQTT publish messages per second, which in summary gives us around 2 million published messages per minute.

Thingsboard开源IoT平台的一个关键特性是数据收集,这是必须在高负载下可靠运行的关键特性。在本文中,我们将描述所做操作和优化,以确保Thingsboard服务器的单一实例能够每秒不断处理20,000个以上的设备和30,000+条MQTT发布消息,总而言之,每分钟能处理大约200万条发布消息。

架构

ThingsBoard performance leverages three main projects:

  • Netty for high-performance MQTT server/broker for IoT devices.
  • Akka for high-performance actor system to coordinate messages between millions of devices.
  • Cassandra for scalable high-performance NoSQL DB to store timeseries data from devices.

We also use Zookeeper for coordination and gRPC in cluster mode. See platform architecture for more details.

Thingsboard性能利用三个主要项目:

  • Netty用于 IoT 设备的高性能MQTT服务器/代理。
  • Akka为高性能actor系统协调数百万设备之间的消息。
  • Cassandra用于可扩展的高性能NoSQL DB,用于存储来自设备的时间序列数据。

我们还使用Zookeeper进行协调和以集群模式使用gRPC。有关更多详细信息,请参阅平台架构。

数据流和测试工具

IoT devices connect to ThingsBoard server via MQTT and issue “publish” commands with JSON payload. Size of single publish message is approximately 100 bytes. MQTT is lightweight publish/subscribe messaging protocol and offers a number of advantages over HTTP request/response protocol.

IoT设备通过MQTT连接到Thingsboard服务器,并使用JSON格式发出“publish”命令。单个发布消息的大小约为100字节。 MQTT是轻量级的发布/订阅消息传递协议,并且相对于HTTP请求/响应协议提供了许多优点。

ThingsBoard server processes MQTT publish messages and stores them to Cassandra asynchronously. The server may also push data to websocket subscriptions from the Web UI dashboards (if present). We try to avoid any blocking operations and this is critical for overall system performance. ThingsBoard supports MQTT QoS level 1, which means that a client receives a response to the publish message only after data is stored to Cassandra DB. Data duplicates which are possible with QoS level 1 are just the overwrites to the corresponding Cassandra row and thus are not present in persisted data. This functionality provides reliable data delivery and persistence.

Thingsboard服务器处理MQTT发布消息并将它们以异步方式存储到Cassandra。服务器还可以将数据从Web UI仪表盘(如果存在的话)推送到WebSocket订阅。我们尝试避免任何阻塞操作,这对于整体系统性能至关重要。 Thingsboard支持MQTT QoS级别1,这意味着客户端只有将数据存储到Cassandra DB后才会收到对发布消息的响应。 QoS级别1可能的数据复制只是覆盖对应的Cassandra行,因此不会出现在持久性数据中。此功能提供可靠的数据传递和持久性。

We have used Gatling load testing framework that is also based on Akka and Netty. Gatling is able to simulate 10K MQTT clients using 5-10% of a 2-core CPU. See our separate article about how we improved unofficial Gatling MQTT plugin to support our use case.

我们使用了基于Akka和Netty的Gatling负载测试框架。 Gatling能够使用2核CPU的5-10%来模拟10K MQTT客户端。请参阅我们的单独文章,了解我们如何改进非官方GatlingMQTT插件以支持测试用例。

性能优化步骤

步骤1.异步Cassandra驱动程序API

The results of first performance tests on the modern 4-core laptop with SSD were quite poor. The platform was able to process only 200 messages per second. The root cause and a main performance bottle-neck were quite obvious and easy to find. It appears that the processing was not 100% asynchronous and we were executing blocking API call of Cassandra driver inside the Telemetry plugin actor. Quick refactoring of the plugin implementation resulted in more than 10X performance improvement and we received approximately 2500 published messages per second from 1000 devices. We would like to recommend this article about async queries to Cassandra.

带有SSD的4核的现代笔记本电脑第一次性能测试的结果相当糟糕。平台每秒只能处理200条消息。根本原因和主要性能瓶颈相当明显,很容易找出来。很显然,处理不是 100%异步,我们正在执行Telemetry插件actor中Cassandra驱动程序阻塞API调用。快速重构插件实现使性能提高了10倍以上,我们每秒收到大约2500条来自1000个设备已发布的消息。

步骤2.连接池

We have decided to move to AWS EC2 instances to be able to share both results and tests we executed. We start running tests on c4.xlarge instance (4 vCPUs and 7.5 Gb of RAM) with Cassandra and ThingsBoard services co-located.

我们决定迁移到AWS EC2实例,以便能够共享我们执行的结果和测试。我们使用Cassandra和Thingsboard服务同地协作开始在 c4.xlarge 实例(4个vCPU和7.5 Gb的RAM)上运行测试。

Test specification:

  • Number of devices: 10 000
  • Publish frequency per device: once per second
  • Total load: 10 000 messages per second

First test results were obviously unacceptable:

测试规格:

  • 设备数:10 ,000
  • 每个设备的发布频率:每秒一次
  • 总负载:每秒10,000条消息 第一个测试结果显然是不可接受的:

The huge response time above was caused by the fact that the server was simply not able to process 10 K messages per second and they are getting queued.

响应时间较长时因为服务器无法一秒钟处理10k等待处理的消息。

We have started our investigation with monitoring memory and CPU load on the testing instance. Initially, our guessing regarding poor performance was because of the heavy load on CPU or RAM. But in fact, during load testing, we have seen that CPU in particular moments was idle for a couple of seconds. This ‘pause’ event was happening every 3-7 seconds, see chart below.

我们开始监控测试实例上的内存和CPU负载。最初我们猜测是因为CPU或RAM超负载导致低性能。但事实上在负载测试期间,CPU在特定的时刻暂停了几秒钟。这个“暂停”事件每3-7秒发生一次,见下图:

As a next step, we have decided to do the thread dump during these pauses. We were expecting to see threads that are blocked and this could give us some clue what is happening while pauses. So we have opened separate console to monitor CPU load and another one to execute thread dump while performing stress tests using the following command:

接下来,我们决定在CPU暂停期间执行线程转储。我们期望通过被阻塞的现场调查处暂停期间CPU发生的情况。因此,我们打开两个控制台,使用其中一个监控CPU负载,另一个执行压力测试时使用以下命令执行线程转储:

kill -3 THINGSBOARD_PID

We have identified that during pause there was always one thread in TIMED_WAITING state and the root cause was in method awaitAvailableConnection of Cassandra driver:

我们已经确定,在暂停期间,总有一个线程处于TIMED_WAITING状态,根本原因是Cassandra驱动程序的awaitAvailableConnection方法:

java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
parking to wait for  <0x0000000092d9d390> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
at com.datastax.driver.core.HostConnectionPool.awaitAvailableConnection(HostConnectionPool.java:287)
at com.datastax.driver.core.HostConnectionPool.waitForConnection(HostConnectionPool.java:328)
at com.datastax.driver.core.HostConnectionPool.borrowConnection(HostConnectionPool.java:251)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.query(RequestHandler.java:301)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.sendRequest(RequestHandler.java:281)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:91)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
at org.thingsboard.server.dao.AbstractDao.executeAsync(AbstractDao.java:91)
at org.thingsboard.server.dao.AbstractDao.executeAsyncWrite(AbstractDao.java:75)
at org.thingsboard.server.dao.timeseries.BaseTimeseriesDao.savePartition(BaseTimeseriesDao.java:135)

As a result, we have realized that the default connection pool configuration for cassandra driver caused bad results in our use case.

因此,我们意识到在用例中,cassandra驱动程序的默认连接池配置导致了错误。

Official configuration for connection pool feature contains special option ‘Simultaneous requests per connection’ that allows you to tune concurrent request per single connection. We use cassandra driver protocol v3 and it uses the next values by default:

  • 1024 for LOCAL hosts.
  • 256 for REMOTE hosts.

Considering the fact that we are actually pulling data from 10,000 devices, default values are definitely not enough. So we have done changes in the code and updated values for LOCAL and REMOTE hosts and set them to the maximum possible values:

连接池功能的正式配置包含特殊选项“每个连接的同时请求数”,允许您调整每个连接的并发请求。我们使用cassandra驱动程序协议v3,默认情况下使用下一个值:

  • 1024为LOCAL主机
  • 256用于REMOTE主机

考虑到我们实际上从10,000个设备中提取数据,默认值是绝对不够的。因此,我们对LOCAL和REMOTE主机的代码和更新值进行了更改,并将其设置为最大可能值:

poolingOptions
    .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
    .setMaxRequestsPerConnection(HostDistance.REMOTE, 32768);

Test results after the applied changes are listed below.

The results were much better, but far from even 1 million messages per minute. We have not seen pauses in CPU load during our tests on c4.xlarge anymore. CPU load was high (80-95%) during the entire test. We have done couple thread dumps to verify that cassandra driver does not await available connections and indeed we have not seen this issue anymore.

最终结果虽然好多了,但远不能达到每分钟100万条消息的性能。在对c4.xlarge的测试中,我们还没有看到CPU负载的暂停。整个测试期间CPU负载很高(80-95%)。我们做了几个线程转储来验证cassandra驱动程序不等待可用的连接,确实再没有看到这个问题了。

步骤3.垂直扩展

We have decided to run the same tests on twice as more powerful node c4.2xlarge with 8 vCPUs and 15Gb of RAM. The performance increase was not linear and the CPU was still loaded (80-90%).

我们决定在两个更强大的节点 c4.2xlarge 上运行相同的测试,节点配置吗8个vCPU和15Gb的RAM。性能提高不是线性的,CPU仍然负载(80-90%)。

We have noticed a significant improvement in response time. After significant peak on the start of the test maximum response time was within 200ms and average response time was ~ 50ms.

我们注意到响应时间有显着改善。在测试开始的最大峰值之后,最大响应时间在200ms内,平均响应时间为〜50ms。

Number of requests per second was around 10K

每秒的请求数大约为10K。

We have also executed test on c4.4xlarge with 16 vCPUs and 30Gb of RAM but have not noticed significant improvements and decided to separate ThingsBoard server and move Cassandra to three nodes cluster.

我们还对c4.4xlarge执行测试,它配置有16个vCPU和30Gb RAM,但没有发现到重大改进,决定分离Thingsboard服务器,并将Cassandra移动到三个节点集群。

步骤4.横向扩展

Our main goal was to identify how much MQTT messages we can handle using single ThingsBoard server running on c4.2xlarge. We will cover horizontal scalability of ThingsBoard cluster in a separate article. So, we decided to move Cassandra to three c4.xlarge separate instances with default configuration and launch gatling stress test tool from two separate c4.xlarge instances simultaneously to minimize the possible affect on latency and throughput by thirdparty.

我们的主要目标是确定使用在c4.2xlarge上运行的单一Thingsboard服务器到底可以处理多少条MQTT消息。我们将在另一篇文章中介绍Thingsboard集群的水平可伸缩性。因此,我们决定将Cassandra移动到三个具有默认配置的c4.xlarge单独实例上,并同时从两个单独的c4.xlarge实例启动gatling应力测试工具,以尽量减少第三方对延迟和吞吐量的可能影响。

测试规格:

  • Number of devices: 20 000
  • Publish frequency per device: twice per second
  • Total load: 40 000 messages per second

The statistics of two simultaneous test runs launched on different client machines is listed below.

  • 设备数:20,000
  • 每台设备的发布频率:每秒两次
  • 总负载:每秒40,000条消息

在不同客户端机器上启动的两个同时测试运行的统计信息如下所示。

Based on the data from two simultaneous test runs we have reached 30 000 published messages per second which is equal to 1.8 million per minute.

基于两个同时测试运行的数据,我们每秒达到30 000条公布的消息,即等于每分钟180万次。

如何重复测试

We have prepared several AWS AMIs for anyone who is interested in replication of these tests. See separate documentation page with detailed instructions.

我们为任何有兴趣复制这些测试的人准备了几个AWS AMI。请参阅单独的文档页面,其中包含详细说明。

结论

This performance test demonstrates how a small ThingsBoard cluster, that costs approximately 1$ per hour, can easily receive, store and visualize more than 100 million messages from your devices. We will continue our work on performance improvements and are going to publish performance results for the cluster of ThingsBoard servers in our next blog post.

这个性能测试表明,小小的Thingsboard集群(每小时花费大约1美元)可以轻松接收,存储和可视化来自设备的超过1亿条消息。我们将继续性能改进工作,并在我们的下一篇博文中发布Thingsboard服务器集群的性能结果。

We hope this article will be useful for people who are evaluating the platform and want to execute performance tests on their own. We also hope that performance improvement steps will be useful for any engineers who use similar technologies.

我们希望本文将对那些正在评估平台并希望自己执行性能测试的人有用。我们还希望性能改进步骤将对使用类似技术的任何工程师都有用。

results matching ""

    No results matching ""