-
Notifications
You must be signed in to change notification settings - Fork 65
Thread parking for Kotlin/Common #498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/parking/KThread.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/parking/JvmParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/parking/JvmParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/androidNative32BitMain/kotlin/kotlinx/atomicfu/parking/PosixParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/parking/ParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/parking/JvmParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/androidNative32BitMain/kotlin/kotlinx/atomicfu/parking/PosixParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/androidNative64BitMain/kotlin/kotlinx/atomicfu/parking/PosixParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/androidNative32BitMain/kotlin/kotlinx/atomicfu/parking/PosixParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/androidNative32BitMain/kotlin/kotlinx/atomicfu/parking/PosixParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/parking/KThread.kt
Outdated
Show resolved
Hide resolved
… Commonizing is difficult due to absence of common threading api.
…arking behaviour on jvm and native.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation is almost ready to go, I believe. Good work! Only some documentation fixes are left, and there's maybe potential to simplify the code.
I haven't looked at the tests yet, though.
atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt
Outdated
Show resolved
Hide resolved
if (timout == Duration.INFINITE) threadLocalParkingHandle.parker.park() | ||
else threadLocalParkingHandle.parker.parkNanos(timout.toLong(DurationUnit.NANOSECONDS)) | ||
} | ||
actual fun parkUntil(deadline: TimeMark) = park(deadline.elapsedNow() * -1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: can the code be simplified if we take parkUntil
as the fundamental operation and then implement parkNanos
on top of it? The idea is that POSIX exposes something that's similar to parkUntil
already, so waitWithDeadline
may be easier to write than timedWait
, and LockSupport.parkUntil
may be somewhat simpler than LockSupport.parkNanos
(due to the simpler "is it time to exit yet?" check).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, a TimeMark
is not directly convertible to nanos from epoch. The only way I have found so far is by somehow getting the current time from epoch and subtracting TimeMark.elapsedNow
(which is converting to duration.)
Additionally, to get current time in nanos from epoch (on native) we either need kotlinx-datetime
which doesn't seem to be more granular than milliseconds. Or rely on clock_gettime
from POSIX and convert TimeMark to duration, which is what we currently do.
But I might be missing some api?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, TimeMark
does not expose its relationship to absolute time.
kotlin.time.Instant
(which used to be in kotlinx-datetime
) does provide nanosecond precision, and in fact, delegates to clock_gettime
. So, with Kotlin 2.1.20, which introduced Instant
, it's possible to do this (pseudocode):
fun parkUntil(deadline: TimeMark) {
val estimatedTargetTime: Instant = Clock.System.now() - deadline.elapsedNow()
sleepUntil(instant.epochSeconds, instant.nanosecondsOfSecond)
}
The problem is, we're not on Kotlin 2.1.20, so we indeed don't have access to Instant
.
But! This gets me thinking: why do we need an Instant
at all? It's semantically wrong. In fact, we may have a subtle bug.
See, CLOCK_REALTIME
(https://man7.org/linux/man-pages/man2/clock_gettime.2.html), kotlin.time.Instant
, and by default, pthread_cond_timedwait
(https://docs.oracle.com/cd/E19120-01/open.solaris/816-5137/gfvid/index.html) all have to do with system time, that is, what your clock in the corner of the screen says. This clock may get out of sync, and then get back in sync when your system queries the Internet for the up-to-date time.
It seems like the following may happen:
- At 12:13:59, we park for three seconds. We are waiting until 12:14:02.
- One second later, at 12:14:00, we query the Internet and see that our clock is too fast and is into the future by two seconds.
- One second later, it's 12:13:59 again.
- Then, three seconds later, it's finally 12:14:02... but five seconds have passed.
TimeSource.Monotonic
, though, works with elapsed time, which is what we want.
Proposed fix: https://stackoverflow.com/a/14397906. With this, we don't need to extract epoch time at all.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is quite the edge case! However, the fix proposed relies on pthread_condattr_setclock
which is not available on darwin.
Also, I am wondering, is the kotlin TimeSource.Monotonic
synced to the posix CLOCK_MONOTONIC
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is quite the edge case!
Indeed, but I'd estimate that every day, someone somewhere encounters this. Having clocks drift by a couple of seconds per day is typical.
not available on darwin.
Oh, ok. If you also can't think of a way to fix this there, we need to at least document that on Native, the system clock is used and is susceptible to such issues.
Also, I am wondering, is the kotlin
TimeSource.Monotonic
synced to the posixCLOCK_MONOTONIC
?
Kotlin/Native delegates to C++'s std::conditional<high_resolution_clock::is_steady, high_resolution_clock, steady_clock>
. I don't know how our C++ standard library implements those, but it's not necessarily a wrapper around POSIX.
atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/BarrierTest.kt
Outdated
Show resolved
Hide resolved
val threadSetSize = (iteration + 1) * 5 | ||
val bar = CyclicBarrier(threadSetSize) | ||
val syncBar = CyclicBarrier(threadSetSize * 5) | ||
val ar = Arrs(threadSetSize * 5) | ||
repeat(threadSetSize * 5) { tId -> | ||
val t = Fut { | ||
repeat(50) { internalIteration -> | ||
sleepMills(Random.nextLong(100)) | ||
bar.await() | ||
sleepMills(Random.nextLong(100)) | ||
val newN = ar.before[tId].value!! + 1 | ||
ar.before[tId].value = newN | ||
syncBar.await() | ||
repeat(ar.before.size) { otherThread -> | ||
if (ar.before[otherThread].value!! < newN) { | ||
fail("Thread $tId (value: $newN, id: ${ParkingSupport.currentThreadHandle()}) continued too early: $otherThread had value ${ar.before[otherThread].value!!}") | ||
} | ||
if (ar.before[otherThread].value!! > newN + 1) { | ||
fail("Thread $tId (value: $newN, id: ${ParkingSupport.currentThreadHandle()}) too far behind: $otherThread had value ${ar.before[otherThread].value!!}") | ||
} | ||
} | ||
} | ||
} | ||
threads.add(t) | ||
} | ||
Fut.waitAllAndThrow(threads) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are quite a few constants mentioned here: 5
, 50
, 100
. Could you give them clear names? This goes for other tests, too.
repeat(threadSetSize * 5) { tId -> | ||
val t = Fut { | ||
repeat(50) { internalIteration -> | ||
sleepMills(Random.nextLong(100)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In total, these sleepMills
affect test execution time considerably. Are they necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are not necessary for the test to pass no. However, I believe that adding random waits gives us more variantions in thread arrival time at the barrier relative to each other. Which could increase the chance of catching bugs.
But 100 milliseconds might be a bit excessive, I have decreased it to 5 wdyt?
ParkingSupport.park(Duration.INFINITE) | ||
} | ||
|
||
assertTrue(time.inWholeMilliseconds > 900) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more reliable (and still less time-consuming) way to test this would be to spin for several thousands of times and see if park
ever finishes earlier than the corresponding unpark
(basically, that happens-before is established between the code before unpark
and the code after park
).
Also, can't this test be flaky due to spurious wakeups? The thread being parked should at least be a separate one and not the test thread, I believe; otherwise, the tests can interfere with one another.
atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TestThread.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt
Outdated
Show resolved
Hide resolved
- in ThreadParkerTest.kt move parking behaviour to sub thread to prevent potential interference with other tests. - Create consts for numbers - Removed simple cyclic barrier test (as this should be covered byt the stress variant) - Decreased wait times
atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeParkingUtils.kt
Outdated
Show resolved
Hide resolved
actual fun createRef(): ParkingData { | ||
val mut = nativeHeap.alloc<pthread_mutex_t>().ptr | ||
val cond = nativeHeap.alloc<pthread_cond_t>().ptr | ||
val attr = nativeHeap.alloc<pthread_condattr_tVar>().ptr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this is irritatingly close to pthread_condattr_t
, which is used by the other Unix-like platforms, but after several attempts, I did not manage to find an incantation that would allow avoiding the code duplication for androidNativeMain
.
atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ExchangerTest.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt
Outdated
Show resolved
Hide resolved
- move destruction of condattr to `createRef` - common callAndVerify + happy paths - timout typos
… redundant null checks, refactor waitAllAndThrow
Please update the target branch to |
There are small formatting issues here and there (like trailing spaces), please run the "Code/Reformat Code" in IJ (you can perform a bulk cleanup by selecting all the necessary files and then running the reformatting). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nicely done! I left a few comments regarding tests and documentation, but overall it looks good.
} | ||
} | ||
|
||
private interface ParkingState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to make it sealed
?
* - (Only on JVM) The thread was interrupted. The interrupted flag stays set after wakeup. | ||
* A future call to [park] this thread will return immediately, unless the `Thread.interrupted` flag is cleared. | ||
*/ | ||
fun parkUntil(deadline: TimeMark) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are no tests for this function.
|
||
/** | ||
* Unparks the thread corresponding to [handle]. | ||
* If [unpark] is called while the corresponding thread is not parked, the next [park] call will return immediately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add a test covering this part of the contract?
fun unpark(handle: ParkingHandle) | ||
|
||
/** | ||
* Returns the [ParkingHandle] that can be used to [unpark] the current thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing parts:
- does this function return the same instance for the same thread on each call?
- is it safe to cache returned value?
- is it safe to share it with other threads and call
unpark
concurrently?
* Parking and unparking support for threads on Kotlin/Native and Kotlin/JVM. | ||
* Can be used as a building block to create locks and other synchronization primitives. | ||
* | ||
* A call to [ParkingSupport.park] or [ParkingSupport.parkUntil] will suspend the current thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth using the word "block" instead of the "suspend" to avoid potential confusion with coroutines suspension.
internal fun Long.addNanosToSeconds(nanos: Long): Long { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
internal fun Long.addNanosToSeconds(nanos: Long): Long { | |
/** | |
* Adds nano seconds to current time in seconds. | |
*/ | |
internal fun Long.addNanosToSeconds(nanos: Long): Long { |
* Is used to unpark a thread. | ||
* Can be obtained by calling [ParkingSupport.currentThreadHandle]. | ||
* Is required by [ParkingSupport.unpark]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like?
* Is used to unpark a thread. | |
* Can be obtained by calling [ParkingSupport.currentThreadHandle]. | |
* Is required by [ParkingSupport.unpark]. | |
* A handle allowing to unpark a thread of execution using [ParkingSupport.unpark]. | |
* There is a one-to-one mapping between threads and parking handles. | |
* A handle can be obtained by calling [ParkingSupport.currentThreadHandle]. | |
* Refer to [ParkingSupport] documentation for more details on how to use [ParkingHandle] and how parking works in general. |
class TimedParkingTest { | ||
|
||
@Test | ||
fun testNanosFirstUnpark400() = retry(3) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If test cases differ only by constants, maybe we can create a single function that accepts corresponding parameters and call it from each test method?
internal expect val clockId: Int | ||
internal expect fun setClock(attr: CPointer<pthread_condattr_t>): Int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend either using more specific names, or scoping the property and the function somehow. Right now they are too generic and are quite confusing because of that.
class TimedParkingTest { | ||
|
||
@Test | ||
fun testNanosFirstUnpark400() = retry(3) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reliability of these tests worries me. 100ms slack feels huge, but in practice we can accidentally exceed it and got a test failure (and I guess we're already doing that given the test failures on Windows).
For deadline-specific scenarios, I don't think we need to unpark a future from the main thread at all, and we should only verify that it was parked for at least timeout
-time-units.
For first-unpark-specific scenarios, we can't get away from verifying that the park
was actually "interrupted" by the unpark
call, but maybe we can increase the slack to a few minutes, instead of 100ms?
Allows to pause and resume thread execution. On native platforms it is based on pthread_cond_wait, on JVM it uses LockSupport.
Threads can be pre unparked (calling unpark before park cancels the parking operation). And thread can be parker with a timeout.
Suspend the current thread by calling:
Resume a thread by calling:
Get current thread reference by calling: