1 package benchmarks.scheduler.actors
2 
3 import benchmarks.*
4 import benchmarks.akka.*
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.channels.*
7 import org.openjdk.jmh.annotations.*
8 import java.util.concurrent.*
9 
10 /*
11  * Benchmark                                   (dispatcher)  Mode  Cnt    Score    Error  Units
12  * PingPongActorBenchmark.coresCountPingPongs  experimental  avgt   10  185.066 ± 21.692  ms/op
13  * PingPongActorBenchmark.coresCountPingPongs           fjp  avgt   10  200.581 ± 22.669  ms/op
14  * PingPongActorBenchmark.coresCountPingPongs         ftp_1  avgt   10  494.334 ± 27.450  ms/op
15  * PingPongActorBenchmark.coresCountPingPongs         ftp_2  avgt   10  498.754 ± 27.743  ms/op
16  * PingPongActorBenchmark.coresCountPingPongs         ftp_8  avgt   10  804.498 ± 69.826  ms/op
17  *
18  * PingPongActorBenchmark.singlePingPong       experimental  avgt   10   45.521 ±  3.281  ms/op
19  * PingPongActorBenchmark.singlePingPong                fjp  avgt   10  217.005 ± 18.693  ms/op
20  * PingPongActorBenchmark.singlePingPong              ftp_1  avgt   10   57.632 ±  1.835  ms/op
21  * PingPongActorBenchmark.singlePingPong              ftp_2  avgt   10  112.723 ±  5.280  ms/op
22  * PingPongActorBenchmark.singlePingPong              ftp_8  avgt   10  276.958 ± 21.447  ms/op
23  */
24 @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
25 @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
26 @Fork(value = 1)
27 @BenchmarkMode(Mode.AverageTime)
28 @OutputTimeUnit(TimeUnit.MILLISECONDS)
29 @State(Scope.Benchmark)
30 open class PingPongActorBenchmark : ParametrizedDispatcherBase() {
31     data class Letter(val message: Any?, val sender: SendChannel<Letter>)
32 
33     @Param("scheduler", "fjp", "ftp_1")
34     override var dispatcher: String = "fjp"
35 
36     @Benchmark
<lambda>null37     fun singlePingPong() = runBlocking {
38         runPingPongs(1)
39     }
40 
41     @Benchmark
<lambda>null42     fun coresCountPingPongs() = runBlocking {
43         runPingPongs(Runtime.getRuntime().availableProcessors())
44     }
45 
runPingPongsnull46     private suspend fun runPingPongs(count: Int) {
47         val me = Channel<Letter>()
48         repeat(count) {
49             val pong = pongActorCoroutine()
50             val ping = pingActorCoroutine(pong)
51             ping.send(Letter(Start(), me))
52         }
53 
54         repeat(count) {
55             me.receive()
56         }
57     }
58 }
59 
CoroutineScopenull60 fun CoroutineScope.pingActorCoroutine(
61     pingChannel: SendChannel<PingPongActorBenchmark.Letter>,
62     capacity: Int = 1
63 ) =
64     actor<PingPongActorBenchmark.Letter>(capacity = capacity) {
65         var initiator: SendChannel<PingPongActorBenchmark.Letter>? = null
66         for (letter in channel) with(letter) {
67             when (message) {
68                 is Start -> {
69                     initiator = sender
70                     pingChannel.send(PingPongActorBenchmark.Letter(Ball(0), channel))
71                 }
72                 is Ball -> {
73                     pingChannel.send(PingPongActorBenchmark.Letter(Ball(message.count + 1), channel))
74                 }
75                 is Stop -> {
76                     initiator!!.send(PingPongActorBenchmark.Letter(Stop(), channel))
77                     return@actor
78                 }
79                 else -> error("Cannot happen $message")
80             }
81         }
82     }
83 
pongActorCoroutinenull84 fun CoroutineScope.pongActorCoroutine(capacity: Int = 1) =
85     actor<PingPongActorBenchmark.Letter>(capacity = capacity) {
86         for (letter in channel) with(letter) {
87             when (message) {
88                 is Ball -> {
89                     if (message.count >= N_MESSAGES) {
90                         sender.send(PingPongActorBenchmark.Letter(Stop(), channel))
91                         return@actor
92                     } else {
93                         sender.send(PingPongActorBenchmark.Letter(Ball(message.count + 1), channel))
94                     }
95                 }
96                 else -> error("Cannot happen $message")
97             }
98         }
99     }
100