1 package benchmarks.scheduler.actors
2 
3 import benchmarks.*
4 import benchmarks.akka.*
5 import benchmarks.scheduler.actors.PingPongActorBenchmark.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.channels.*
8 import org.openjdk.jmh.annotations.*
9 import java.util.concurrent.*
10 
11 /*
12  * Cores count actors chained into single cycle pass message and process it using its private state.
13  *
14  * Benchmark                           (actorStateSize)  (dispatcher)  Mode  Cnt     Score     Error  Units
15  * CycledActorsBenchmark.cycledActors                 1           fjp  avgt   14    22.751 ±   1.351  ms/op
16  * CycledActorsBenchmark.cycledActors                 1         ftp_1  avgt   14     4.535 ±   0.076  ms/op
17  * CycledActorsBenchmark.cycledActors                 1  experimental  avgt   14     6.728 ±   0.048  ms/op
18  *
19  * CycledActorsBenchmark.cycledActors              1024           fjp  avgt   14    43.725 ±  14.393  ms/op
20  * CycledActorsBenchmark.cycledActors              1024         ftp_1  avgt   14    13.827 ±   1.554  ms/op
21  * CycledActorsBenchmark.cycledActors              1024  experimental  avgt   14    23.823 ±   1.643  ms/op
22  *
23  * CycledActorsBenchmark.cycledActors            262144           fjp  avgt   14  1885.708 ± 532.634  ms/op
24  * CycledActorsBenchmark.cycledActors            262144         ftp_1  avgt   14  1394.997 ± 101.938  ms/op
25  * CycledActorsBenchmark.cycledActors            262144  experimental  avgt   14  1804.146 ±  57.275  ms/op
26  */
27 @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
28 @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
29 @Fork(value = 1)
30 @BenchmarkMode(Mode.AverageTime)
31 @OutputTimeUnit(TimeUnit.MILLISECONDS)
32 @State(Scope.Benchmark)
33 open class CycledActorsBenchmark : ParametrizedDispatcherBase() {
34 
35     companion object {
36         val NO_CHANNEL = Channel<Letter>(0)
37     }
38 
39     @Param("fjp", "ftp_1", "scheduler")
40     override var dispatcher: String = "fjp"
41 
42     @Param("1", "1024")
43     var actorStateSize = 1
44 
45     @Benchmark
<lambda>null46     fun cycledActors() = runBlocking {
47         val stopChannel = Channel<Unit>(CORES_COUNT)
48         runCycle(stopChannel)
49         repeat(CORES_COUNT) {
50             stopChannel.receive()
51         }
52     }
53 
runCyclenull54     private suspend fun runCycle(stopChannel: Channel<Unit>) {
55         val trailingActor = lastActor(stopChannel)
56 
57         var previous = trailingActor
58         for (i in 1 until CORES_COUNT) {
59             previous = createActor(previous, stopChannel)
60         }
61 
62         trailingActor.send(Letter(Start(), previous))
63     }
64 
<lambda>null65     private fun lastActor(stopChannel: Channel<Unit>) = actor<Letter>(capacity = 1024) {
66         var nextChannel: SendChannel<Letter>? = null
67         val state = LongArray(actorStateSize) { ThreadLocalRandom.current().nextLong(1024) }
68 
69         for (letter in channel) with(letter) {
70             when (message) {
71                 is Start -> {
72                     nextChannel = sender
73                     sender.send(Letter(Ball(ThreadLocalRandom.current().nextInt(1, 100)), NO_CHANNEL))
74                 }
75                 is Ball -> {
76                     nextChannel!!.send(Letter(Ball(transmogrify(message.count, state)), NO_CHANNEL))
77                 }
78                 is Stop -> {
79                     stopChannel.send(Unit)
80                     return@actor
81                 }
82                 else -> error("Can't happen")
83             }
84         }
85     }
86 
<lambda>null87     private fun createActor(nextActor: SendChannel<Letter>, stopChannel: Channel<Unit>) = actor<Letter>(capacity = 1024) {
88         var received = 0
89         val state = LongArray(actorStateSize) { ThreadLocalRandom.current().nextLong(1024) }
90 
91         for (letter in channel) with(letter) {
92             when (message) {
93                 is Ball -> {
94                     if (++received > 1_000) {
95                         nextActor.send(Letter(Stop(), NO_CHANNEL))
96                         stopChannel.send(Unit)
97                         return@actor
98                     } else {
99                         nextActor.send(Letter(Ball(transmogrify(message.count, state)), NO_CHANNEL))
100                     }
101                 }
102                 is Stop -> {
103                     nextActor.send(Letter(Stop(), NO_CHANNEL))
104                     stopChannel.send(Unit)
105                 }
106                 else -> error("Can't happen")
107             }
108         }
109     }
110 
transmogrifynull111     private fun transmogrify(value: Int, coefficients: LongArray): Int {
112         var result = 0L
113         for (coefficient in coefficients) {
114             result += coefficient * value
115         }
116 
117         return result.toInt()
118     }
119 }
120