Skip to content

Thread parking for Kotlin/Common #498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 61 commits into
base: master
Choose a base branch
from
Open

Conversation

bbrockbernd
Copy link
Collaborator

@bbrockbernd bbrockbernd commented Dec 19, 2024

Allows to pause and resume thread execution. On native platforms it is based on pthread_cond_wait, on JVM it uses LockSupport.

Threads can be pre unparked (calling unpark before park cancels the parking operation). And thread can be parker with a timeout.

Suspend the current thread by calling:

ParkingSupport.park(timeout: kotlin.time.Duration)
// or
ParkingSupport.parkUntil(deadline: TimeMark)
// or park without timout
ParkingSupport.park(kotlin.time.Duration.INFINITE)

Resume a thread by calling:

ParkingSupport.unpark(handle: ParkingHandle)

Get current thread reference by calling:

val handle = ParkingSupport.currentThreadHandle()

@bbrockbernd bbrockbernd marked this pull request as ready for review December 19, 2024 11:10
Copy link

@dkhalanskyjb dkhalanskyjb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation is almost ready to go, I believe. Good work! Only some documentation fixes are left, and there's maybe potential to simplify the code.

I haven't looked at the tests yet, though.

if (timout == Duration.INFINITE) threadLocalParkingHandle.parker.park()
else threadLocalParkingHandle.parker.parkNanos(timout.toLong(DurationUnit.NANOSECONDS))
}
actual fun parkUntil(deadline: TimeMark) = park(deadline.elapsedNow() * -1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: can the code be simplified if we take parkUntil as the fundamental operation and then implement parkNanos on top of it? The idea is that POSIX exposes something that's similar to parkUntil already, so waitWithDeadline may be easier to write than timedWait, and LockSupport.parkUntil may be somewhat simpler than LockSupport.parkNanos (due to the simpler "is it time to exit yet?" check).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, a TimeMark is not directly convertible to nanos from epoch. The only way I have found so far is by somehow getting the current time from epoch and subtracting TimeMark.elapsedNow (which is converting to duration.)

Additionally, to get current time in nanos from epoch (on native) we either need kotlinx-datetime which doesn't seem to be more granular than milliseconds. Or rely on clock_gettime from POSIX and convert TimeMark to duration, which is what we currently do.

But I might be missing some api?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, TimeMark does not expose its relationship to absolute time.

kotlin.time.Instant (which used to be in kotlinx-datetime) does provide nanosecond precision, and in fact, delegates to clock_gettime. So, with Kotlin 2.1.20, which introduced Instant, it's possible to do this (pseudocode):

fun parkUntil(deadline: TimeMark) {
  val estimatedTargetTime: Instant = Clock.System.now() - deadline.elapsedNow()
  sleepUntil(instant.epochSeconds, instant.nanosecondsOfSecond)
}

The problem is, we're not on Kotlin 2.1.20, so we indeed don't have access to Instant.

But! This gets me thinking: why do we need an Instant at all? It's semantically wrong. In fact, we may have a subtle bug.

See, CLOCK_REALTIME (https://man7.org/linux/man-pages/man2/clock_gettime.2.html), kotlin.time.Instant, and by default, pthread_cond_timedwait (https://docs.oracle.com/cd/E19120-01/open.solaris/816-5137/gfvid/index.html) all have to do with system time, that is, what your clock in the corner of the screen says. This clock may get out of sync, and then get back in sync when your system queries the Internet for the up-to-date time.

It seems like the following may happen:

  • At 12:13:59, we park for three seconds. We are waiting until 12:14:02.
  • One second later, at 12:14:00, we query the Internet and see that our clock is too fast and is into the future by two seconds.
  • One second later, it's 12:13:59 again.
  • Then, three seconds later, it's finally 12:14:02... but five seconds have passed.

TimeSource.Monotonic, though, works with elapsed time, which is what we want.

Proposed fix: https://stackoverflow.com/a/14397906. With this, we don't need to extract epoch time at all.

What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is quite the edge case! However, the fix proposed relies on pthread_condattr_setclock which is not available on darwin.

Also, I am wondering, is the kotlin TimeSource.Monotonic synced to the posix CLOCK_MONOTONIC?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is quite the edge case!

Indeed, but I'd estimate that every day, someone somewhere encounters this. Having clocks drift by a couple of seconds per day is typical.

not available on darwin.

Oh, ok. If you also can't think of a way to fix this there, we need to at least document that on Native, the system clock is used and is susceptible to such issues.

Also, I am wondering, is the kotlin TimeSource.Monotonic synced to the posix CLOCK_MONOTONIC?

Kotlin/Native delegates to C++'s std::conditional<high_resolution_clock::is_steady, high_resolution_clock, steady_clock>. I don't know how our C++ standard library implements those, but it's not necessarily a wrapper around POSIX.

@dkhalanskyjb dkhalanskyjb self-requested a review April 3, 2025 10:42
@bbrockbernd bbrockbernd requested a review from dkhalanskyjb April 9, 2025 12:33
@dkhalanskyjb dkhalanskyjb requested a review from fzhinkin April 14, 2025 11:09
Comment on lines 57 to 82
val threadSetSize = (iteration + 1) * 5
val bar = CyclicBarrier(threadSetSize)
val syncBar = CyclicBarrier(threadSetSize * 5)
val ar = Arrs(threadSetSize * 5)
repeat(threadSetSize * 5) { tId ->
val t = Fut {
repeat(50) { internalIteration ->
sleepMills(Random.nextLong(100))
bar.await()
sleepMills(Random.nextLong(100))
val newN = ar.before[tId].value!! + 1
ar.before[tId].value = newN
syncBar.await()
repeat(ar.before.size) { otherThread ->
if (ar.before[otherThread].value!! < newN) {
fail("Thread $tId (value: $newN, id: ${ParkingSupport.currentThreadHandle()}) continued too early: $otherThread had value ${ar.before[otherThread].value!!}")
}
if (ar.before[otherThread].value!! > newN + 1) {
fail("Thread $tId (value: $newN, id: ${ParkingSupport.currentThreadHandle()}) too far behind: $otherThread had value ${ar.before[otherThread].value!!}")
}
}
}
}
threads.add(t)
}
Fut.waitAllAndThrow(threads)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are quite a few constants mentioned here: 5, 50, 100. Could you give them clear names? This goes for other tests, too.

repeat(threadSetSize * 5) { tId ->
val t = Fut {
repeat(50) { internalIteration ->
sleepMills(Random.nextLong(100))
Copy link

@dkhalanskyjb dkhalanskyjb Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In total, these sleepMills affect test execution time considerably. Are they necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not necessary for the test to pass no. However, I believe that adding random waits gives us more variantions in thread arrival time at the barrier relative to each other. Which could increase the chance of catching bugs.

But 100 milliseconds might be a bit excessive, I have decreased it to 5 wdyt?

ParkingSupport.park(Duration.INFINITE)
}

assertTrue(time.inWholeMilliseconds > 900)
Copy link

@dkhalanskyjb dkhalanskyjb Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more reliable (and still less time-consuming) way to test this would be to spin for several thousands of times and see if park ever finishes earlier than the corresponding unpark (basically, that happens-before is established between the code before unpark and the code after park).

Also, can't this test be flaky due to spurious wakeups? The thread being parked should at least be a separate one and not the test thread, I believe; otherwise, the tests can interfere with one another.

- in ThreadParkerTest.kt move parking behaviour to sub thread to prevent potential interference with other tests.
- Create consts for numbers
- Removed simple cyclic barrier test (as this should be covered byt the stress variant)
- Decreased wait times
actual fun createRef(): ParkingData {
val mut = nativeHeap.alloc<pthread_mutex_t>().ptr
val cond = nativeHeap.alloc<pthread_cond_t>().ptr
val attr = nativeHeap.alloc<pthread_condattr_tVar>().ptr

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this is irritatingly close to pthread_condattr_t, which is used by the other Unix-like platforms, but after several attempts, I did not manage to find an incantation that would allow avoiding the code duplication for androidNativeMain.

- move destruction of condattr to `createRef`
- common callAndVerify + happy paths
- timout typos
… redundant null checks, refactor waitAllAndThrow
@fzhinkin
Copy link
Collaborator

Please update the target branch to develop.

@fzhinkin
Copy link
Collaborator

There are small formatting issues here and there (like trailing spaces), please run the "Code/Reformat Code" in IJ (you can perform a bulk cleanup by selecting all the necessary files and then running the reformatting).

Copy link
Collaborator

@fzhinkin fzhinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely done! I left a few comments regarding tests and documentation, but overall it looks good.

}
}

private interface ParkingState
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to make it sealed?

* - (Only on JVM) The thread was interrupted. The interrupted flag stays set after wakeup.
* A future call to [park] this thread will return immediately, unless the `Thread.interrupted` flag is cleared.
*/
fun parkUntil(deadline: TimeMark)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no tests for this function.


/**
* Unparks the thread corresponding to [handle].
* If [unpark] is called while the corresponding thread is not parked, the next [park] call will return immediately
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a test covering this part of the contract?

fun unpark(handle: ParkingHandle)

/**
* Returns the [ParkingHandle] that can be used to [unpark] the current thread.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing parts:

  • does this function return the same instance for the same thread on each call?
  • is it safe to cache returned value?
  • is it safe to share it with other threads and call unpark concurrently?

* Parking and unparking support for threads on Kotlin/Native and Kotlin/JVM.
* Can be used as a building block to create locks and other synchronization primitives.
*
* A call to [ParkingSupport.park] or [ParkingSupport.parkUntil] will suspend the current thread.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth using the word "block" instead of the "suspend" to avoid potential confusion with coroutines suspension.

Comment on lines +27 to +28
internal fun Long.addNanosToSeconds(nanos: Long): Long {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
internal fun Long.addNanosToSeconds(nanos: Long): Long {
/**
* Adds nano seconds to current time in seconds.
*/
internal fun Long.addNanosToSeconds(nanos: Long): Long {

Comment on lines +80 to +82
* Is used to unpark a thread.
* Can be obtained by calling [ParkingSupport.currentThreadHandle].
* Is required by [ParkingSupport.unpark].
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like?

Suggested change
* Is used to unpark a thread.
* Can be obtained by calling [ParkingSupport.currentThreadHandle].
* Is required by [ParkingSupport.unpark].
* A handle allowing to unpark a thread of execution using [ParkingSupport.unpark].
* There is a one-to-one mapping between threads and parking handles.
* A handle can be obtained by calling [ParkingSupport.currentThreadHandle].
* Refer to [ParkingSupport] documentation for more details on how to use [ParkingHandle] and how parking works in general.

class TimedParkingTest {

@Test
fun testNanosFirstUnpark400() = retry(3) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If test cases differ only by constants, maybe we can create a single function that accepts corresponding parameters and call it from each test method?

Comment on lines +65 to +66
internal expect val clockId: Int
internal expect fun setClock(attr: CPointer<pthread_condattr_t>): Int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend either using more specific names, or scoping the property and the function somehow. Right now they are too generic and are quite confusing because of that.

class TimedParkingTest {

@Test
fun testNanosFirstUnpark400() = retry(3) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reliability of these tests worries me. 100ms slack feels huge, but in practice we can accidentally exceed it and got a test failure (and I guess we're already doing that given the test failures on Windows).

For deadline-specific scenarios, I don't think we need to unpark a future from the main thread at all, and we should only verify that it was parked for at least timeout-time-units.

For first-unpark-specific scenarios, we can't get away from verifying that the park was actually "interrupted" by the unpark call, but maybe we can increase the slack to a few minutes, instead of 100ms?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants