1 package benchmarks.scheduler.actors 2 3 import benchmarks.* 4 import benchmarks.akka.* 5 import benchmarks.scheduler.actors.PingPongActorBenchmark.* 6 import kotlinx.coroutines.* 7 import kotlinx.coroutines.channels.* 8 import org.openjdk.jmh.annotations.* 9 import java.util.concurrent.* 10 11 /* 12 * Cores count actors chained into single cycle pass message and process it using its private state. 13 * 14 * Benchmark (actorStateSize) (dispatcher) Mode Cnt Score Error Units 15 * CycledActorsBenchmark.cycledActors 1 fjp avgt 14 22.751 ± 1.351 ms/op 16 * CycledActorsBenchmark.cycledActors 1 ftp_1 avgt 14 4.535 ± 0.076 ms/op 17 * CycledActorsBenchmark.cycledActors 1 experimental avgt 14 6.728 ± 0.048 ms/op 18 * 19 * CycledActorsBenchmark.cycledActors 1024 fjp avgt 14 43.725 ± 14.393 ms/op 20 * CycledActorsBenchmark.cycledActors 1024 ftp_1 avgt 14 13.827 ± 1.554 ms/op 21 * CycledActorsBenchmark.cycledActors 1024 experimental avgt 14 23.823 ± 1.643 ms/op 22 * 23 * CycledActorsBenchmark.cycledActors 262144 fjp avgt 14 1885.708 ± 532.634 ms/op 24 * CycledActorsBenchmark.cycledActors 262144 ftp_1 avgt 14 1394.997 ± 101.938 ms/op 25 * CycledActorsBenchmark.cycledActors 262144 experimental avgt 14 1804.146 ± 57.275 ms/op 26 */ 27 @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) 28 @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) 29 @Fork(value = 1) 30 @BenchmarkMode(Mode.AverageTime) 31 @OutputTimeUnit(TimeUnit.MILLISECONDS) 32 @State(Scope.Benchmark) 33 open class CycledActorsBenchmark : ParametrizedDispatcherBase() { 34 35 companion object { 36 val NO_CHANNEL = Channel<Letter>(0) 37 } 38 39 @Param("fjp", "ftp_1", "scheduler") 40 override var dispatcher: String = "fjp" 41 42 @Param("1", "1024") 43 var actorStateSize = 1 44 45 @Benchmark <lambda>null46 fun cycledActors() = runBlocking { 47 val stopChannel = Channel<Unit>(CORES_COUNT) 48 runCycle(stopChannel) 49 repeat(CORES_COUNT) { 50 stopChannel.receive() 51 } 52 } 53 runCyclenull54 private suspend fun runCycle(stopChannel: Channel<Unit>) { 55 val trailingActor = lastActor(stopChannel) 56 57 var previous = trailingActor 58 for (i in 1 until CORES_COUNT) { 59 previous = createActor(previous, stopChannel) 60 } 61 62 trailingActor.send(Letter(Start(), previous)) 63 } 64 <lambda>null65 private fun lastActor(stopChannel: Channel<Unit>) = actor<Letter>(capacity = 1024) { 66 var nextChannel: SendChannel<Letter>? = null 67 val state = LongArray(actorStateSize) { ThreadLocalRandom.current().nextLong(1024) } 68 69 for (letter in channel) with(letter) { 70 when (message) { 71 is Start -> { 72 nextChannel = sender 73 sender.send(Letter(Ball(ThreadLocalRandom.current().nextInt(1, 100)), NO_CHANNEL)) 74 } 75 is Ball -> { 76 nextChannel!!.send(Letter(Ball(transmogrify(message.count, state)), NO_CHANNEL)) 77 } 78 is Stop -> { 79 stopChannel.send(Unit) 80 return@actor 81 } 82 else -> error("Can't happen") 83 } 84 } 85 } 86 <lambda>null87 private fun createActor(nextActor: SendChannel<Letter>, stopChannel: Channel<Unit>) = actor<Letter>(capacity = 1024) { 88 var received = 0 89 val state = LongArray(actorStateSize) { ThreadLocalRandom.current().nextLong(1024) } 90 91 for (letter in channel) with(letter) { 92 when (message) { 93 is Ball -> { 94 if (++received > 1_000) { 95 nextActor.send(Letter(Stop(), NO_CHANNEL)) 96 stopChannel.send(Unit) 97 return@actor 98 } else { 99 nextActor.send(Letter(Ball(transmogrify(message.count, state)), NO_CHANNEL)) 100 } 101 } 102 is Stop -> { 103 nextActor.send(Letter(Stop(), NO_CHANNEL)) 104 stopChannel.send(Unit) 105 } 106 else -> error("Can't happen") 107 } 108 } 109 } 110 transmogrifynull111 private fun transmogrify(value: Int, coefficients: LongArray): Int { 112 var result = 0L 113 for (coefficient in coefficients) { 114 result += coefficient * value 115 } 116 117 return result.toInt() 118 } 119 } 120