diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/Flowables.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/Flowables.kt index f8725be..b35c915 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/Flowables.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/Flowables.kt @@ -10,6 +10,7 @@ import io.reactivex.rxjava3.core.BackpressureStrategy import io.reactivex.rxjava3.core.Flowable import io.reactivex.rxjava3.core.FlowableEmitter import io.reactivex.rxjava3.functions.* +import io.reactivex.rxjava3.internal.functions.Functions import org.reactivestreams.Publisher @@ -28,6 +29,18 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, BiFunction { t1, t2 -> combineFunction(t1, t2) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + bufferSize: Int, + combineFunction: (T1, T2) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2), + Functions.toFunction(combineFunction), bufferSize) + + /** * Emits `Pair` */ @@ -40,6 +53,18 @@ object Flowables { ): Flowable> = Flowable.combineLatest(source1, source2, BiFunction> { t1, t2 -> t1 to t2 }) + /** + * Emits `Pair` + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + bufferSize: Int + ): Flowable> = combineLatest(source1, source2, bufferSize, ::Pair) + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.combineLatest(source1, source2, source3, combineFunction)", "io.reactivex.Flowable"), @@ -55,6 +80,19 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, source3, Function3 { t1: T1, t2: T2, t3: T3 -> combineFunction(t1, t2, t3) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3), + Functions.toFunction(combineFunction), bufferSize) + + /** * Emits `Triple` */ @@ -68,6 +106,20 @@ object Flowables { ): Flowable> = Flowable.combineLatest(source1, source2, source3, Function3> { t1, t2, t3 -> Triple(t1, t2, t3) }) + /** + * Emits `Triple` + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + bufferSize: Int + ): Flowable> = + combineLatest(source1, source2, source3, bufferSize, ::Triple) + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.combineLatest(source1, source2, source3, source4, combineFunction)", "io.reactivex.Flowable"), @@ -84,6 +136,19 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, source3, source4, Function4 { t1: T1, t2: T2, t3: T3, t4: T4 -> combineFunction(t1, t2, t3, t4) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4), + Functions.toFunction(combineFunction), bufferSize) + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.combineLatest(source1, source2, source3, source4, source5, combineFunction)", "io.reactivex.Flowable"), @@ -101,6 +166,20 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, source3, source4, source5, Function5 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5 -> combineFunction(t1, t2, t3, t4, t5) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4, T5) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4, source5), + Functions.toFunction(combineFunction), bufferSize) + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.combineLatest(source1, source2, source3, source4, source5, source6, combineFunction)", "io.reactivex.Flowable"), @@ -119,12 +198,44 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, source3, source4, source5, source6, Function6 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6 -> combineFunction(t1, t2, t3, t4, t5, t6) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + source6: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4, T5, T6) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4, source5, source6), + Functions.toFunction(combineFunction), bufferSize) + + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.combineLatest(source1, source2, source3, source4, source5, source6, source7, combineFunction)", "io.reactivex.Flowable"), level = DeprecationLevel.WARNING) @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + source6: Flowable, + source7: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4, T5, T6, T7) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4, source5, source6, source7), + Functions.toFunction(combineFunction), bufferSize) + + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) inline fun combineLatest( source1: Flowable, source2: Flowable, @@ -157,6 +268,24 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, source3, source4, source5, source6, source7, source8, Function8 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6, t7: T7, t8: T8 -> combineFunction(t1, t2, t3, t4, t5, t6, t7, t8) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + source6: Flowable, + source7: Flowable, + source8: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4, source5, source6, source7, source8), + Functions.toFunction(combineFunction), bufferSize) + + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.combineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, combineFunction)", "io.reactivex.Flowable"), level = DeprecationLevel.WARNING) @@ -177,6 +306,24 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, Function9 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6, t7: T7, t8: T8, t9: T9 -> combineFunction(t1, t2, t3, t4, t5, t6, t7, t8, t9) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + source6: Flowable, + source7: Flowable, + source8: Flowable, + source9: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4, source5, source6, source7, source8, source9), + Functions.toFunction(combineFunction), bufferSize) + @CheckReturnValue @BackpressureSupport(BackpressureKind.SPECIAL) @@ -199,6 +346,19 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, BiFunction { t1, t2 -> combineFunction(t1, t2) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, + source2: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2) -> R + ): Flowable = Flowable.zip(listOf(source1, source2), + Functions.toFunction(combineFunction), delayError, bufferSize) + + /** * Emits `Pair` */ @@ -208,6 +368,19 @@ object Flowables { fun zip(source1: Flowable, source2: Flowable): Flowable> = Flowable.zip(source1, source2, BiFunction> { t1, t2 -> t1 to t2 }) + /** + * Emits `Pair` + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, + source2: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize() + ): Flowable> = zip(source1, source2, delayError, bufferSize, ::Pair) + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.zip(source1, source2, source3, combineFunction)", "io.reactivex.Flowable"), @@ -223,6 +396,16 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, source3, Function3 { t1: T1, t2: T2, t3: T3 -> combineFunction(t1, t2, t3) }) + fun zip( + source1: Flowable, + source2: Flowable, + source3: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3) -> R + ): Flowable = Flowable.zip(listOf(source1, source2, source3), + Functions.toFunction(combineFunction), delayError, bufferSize) + /** * Emits `Triple` */ @@ -236,6 +419,21 @@ object Flowables { ): Flowable> = Flowable.zip(source1, source2, source3, Function3> { t1, t2, t3 -> Triple(t1, t2, t3) }) + /** + * Emits `Triple` + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, + source2: Flowable, + source3: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize() + ): Flowable> = zip(source1, source2, source3, delayError, bufferSize, ::Triple) + + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.zip(source1, source2, source3, source4, combineFunction)", "io.reactivex.Flowable"), level = DeprecationLevel.WARNING) @@ -251,6 +449,21 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, source3, source4, Function4 { t1: T1, t2: T2, t3: T3, t4: T4 -> combineFunction(t1, t2, t3, t4) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4) -> R + ): Flowable = Flowable.zip(listOf(source1, source2, source3, source4), + Functions.toFunction(combineFunction), delayError, bufferSize) + + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.zip(source1, source2, source3, source4, source5, combineFunction)", "io.reactivex.Flowable"), level = DeprecationLevel.WARNING) @@ -264,6 +477,16 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, source3, source4, source5, Function5 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5 -> combineFunction(t1, t2, t3, t4, t5) }) + fun zip( + source1: Flowable, source2: Flowable, + source3: Flowable, source4: Flowable, + source5: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4, T5) -> R + ): Flowable = Flowable.zip(listOf(source1, source2, source3, source4, source5), + Functions.toFunction(combineFunction), delayError, bufferSize) + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.zip(source1, source2, source3, source4, source5, source6, combineFunction)", "io.reactivex.Flowable"), @@ -278,6 +501,20 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, source3, source4, source5, source6, Function6 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6 -> combineFunction(t1, t2, t3, t4, t5, t6) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, source2: Flowable, + source3: Flowable, source4: Flowable, + source5: Flowable, source6: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4, T5, T6) -> R + ): Flowable = Flowable.zip(source1, source2, source3, source4, source5, source6, + Function6 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6 -> combineFunction(t1, t2, t3, t4, t5, t6) }) + + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.zip(source1, source2, source3, source4, source5, source6, source7, combineFunction)", "io.reactivex.Flowable"), level = DeprecationLevel.WARNING) @@ -292,6 +529,20 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, source3, source4, source5, source6, source7, Function7 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6, t7: T7 -> combineFunction(t1, t2, t3, t4, t5, t6, t7) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, source2: Flowable, + source3: Flowable, source4: Flowable, + source5: Flowable, source6: Flowable, + source7: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4, T5, T6, T7) -> R + ): Flowable = Flowable.zip(listOf(source1, source2, source3, source4, source5, source6, source7), + Functions.toFunction(combineFunction), delayError, bufferSize) + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.zip(source1, source2, source3, source4, source5, source6, source7, source8, combineFunction)", "io.reactivex.Flowable"), @@ -308,6 +559,20 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, source3, source4, source5, source6, source7, source8, Function8 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6, t7: T7, t8: T8 -> combineFunction(t1, t2, t3, t4, t5, t6, t7, t8) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, source2: Flowable, + source3: Flowable, source4: Flowable, + source5: Flowable, source6: Flowable, + source7: Flowable, source8: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8) -> R + ): Flowable = Flowable.zip(listOf(source1, source2, source3, source4, source5, source6, source7, source8), + Functions.toFunction(combineFunction), delayError, bufferSize) + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", replaceWith = ReplaceWith("Flowable.zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, combineFunction)", "io.reactivex.Flowable"), level = DeprecationLevel.WARNING) @@ -328,6 +593,25 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, Function9 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6, t7: T7, t8: T8, t9: T9 -> combineFunction(t1, t2, t3, t4, t5, t6, t7, t8, t9) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + source6: Flowable, + source7: Flowable, + source8: Flowable, + source9: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9) -> R + ): Flowable = Flowable.zip(listOf(source1, source2, source3, source4, source5, source6, source7, source8, source9), + Functions.toFunction(combineFunction), delayError, bufferSize) + } /**