1 package benchmarks.scheduler.actors
2
3 import benchmarks.*
4 import benchmarks.akka.*
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.channels.*
7 import org.openjdk.jmh.annotations.*
8 import java.util.concurrent.*
9
10 /*
11 * Benchmark (dispatcher) Mode Cnt Score Error Units
12 * PingPongActorBenchmark.coresCountPingPongs experimental avgt 10 185.066 ± 21.692 ms/op
13 * PingPongActorBenchmark.coresCountPingPongs fjp avgt 10 200.581 ± 22.669 ms/op
14 * PingPongActorBenchmark.coresCountPingPongs ftp_1 avgt 10 494.334 ± 27.450 ms/op
15 * PingPongActorBenchmark.coresCountPingPongs ftp_2 avgt 10 498.754 ± 27.743 ms/op
16 * PingPongActorBenchmark.coresCountPingPongs ftp_8 avgt 10 804.498 ± 69.826 ms/op
17 *
18 * PingPongActorBenchmark.singlePingPong experimental avgt 10 45.521 ± 3.281 ms/op
19 * PingPongActorBenchmark.singlePingPong fjp avgt 10 217.005 ± 18.693 ms/op
20 * PingPongActorBenchmark.singlePingPong ftp_1 avgt 10 57.632 ± 1.835 ms/op
21 * PingPongActorBenchmark.singlePingPong ftp_2 avgt 10 112.723 ± 5.280 ms/op
22 * PingPongActorBenchmark.singlePingPong ftp_8 avgt 10 276.958 ± 21.447 ms/op
23 */
24 @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
25 @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
26 @Fork(value = 1)
27 @BenchmarkMode(Mode.AverageTime)
28 @OutputTimeUnit(TimeUnit.MILLISECONDS)
29 @State(Scope.Benchmark)
30 open class PingPongActorBenchmark : ParametrizedDispatcherBase() {
31 data class Letter(val message: Any?, val sender: SendChannel<Letter>)
32
33 @Param("scheduler", "fjp", "ftp_1")
34 override var dispatcher: String = "fjp"
35
36 @Benchmark
<lambda>null37 fun singlePingPong() = runBlocking {
38 runPingPongs(1)
39 }
40
41 @Benchmark
<lambda>null42 fun coresCountPingPongs() = runBlocking {
43 runPingPongs(Runtime.getRuntime().availableProcessors())
44 }
45
runPingPongsnull46 private suspend fun runPingPongs(count: Int) {
47 val me = Channel<Letter>()
48 repeat(count) {
49 val pong = pongActorCoroutine()
50 val ping = pingActorCoroutine(pong)
51 ping.send(Letter(Start(), me))
52 }
53
54 repeat(count) {
55 me.receive()
56 }
57 }
58 }
59
CoroutineScopenull60 fun CoroutineScope.pingActorCoroutine(
61 pingChannel: SendChannel<PingPongActorBenchmark.Letter>,
62 capacity: Int = 1
63 ) =
64 actor<PingPongActorBenchmark.Letter>(capacity = capacity) {
65 var initiator: SendChannel<PingPongActorBenchmark.Letter>? = null
66 for (letter in channel) with(letter) {
67 when (message) {
68 is Start -> {
69 initiator = sender
70 pingChannel.send(PingPongActorBenchmark.Letter(Ball(0), channel))
71 }
72 is Ball -> {
73 pingChannel.send(PingPongActorBenchmark.Letter(Ball(message.count + 1), channel))
74 }
75 is Stop -> {
76 initiator!!.send(PingPongActorBenchmark.Letter(Stop(), channel))
77 return@actor
78 }
79 else -> error("Cannot happen $message")
80 }
81 }
82 }
83
pongActorCoroutinenull84 fun CoroutineScope.pongActorCoroutine(capacity: Int = 1) =
85 actor<PingPongActorBenchmark.Letter>(capacity = capacity) {
86 for (letter in channel) with(letter) {
87 when (message) {
88 is Ball -> {
89 if (message.count >= N_MESSAGES) {
90 sender.send(PingPongActorBenchmark.Letter(Stop(), channel))
91 return@actor
92 } else {
93 sender.send(PingPongActorBenchmark.Letter(Ball(message.count + 1), channel))
94 }
95 }
96 else -> error("Cannot happen $message")
97 }
98 }
99 }
100