Life Engineering
코루틴 빌더 기초 본문
코루틴 빌더
코루틴 빌더 함수 호출 시 새로운 코루틴 생성
모든 코루틴 빌더 함수는 코루틴을 추상화한 Job 객체 생성
runBlocking
public actual fun <T> runBlocking
(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T
새로운 코루틴을 실행한 뒤 완료될 때까지 현재 스레드를 중단 가능한 상태로 blocking,
따라서 runBlocking 내부에서 delay(1000L) 호출 시 Thread.sleep(1000L) 과 비슷하게 작동
runBlocking 이 사용되는 경우
- 메인 함수: 프로그램이 끝나는 걸 방지하기 위해 스레드를 블로킹할 필요가 있을 경우
- 유닛 테스트
현재는 거의 사용되지 않고 유닛 테스트에는 runTest
주로 사용됨
메인 함수는 runBlocking 대신에 suspend
를 붙여 중단 함수로 사용
launch
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
CoroutineScope 인터페이스의 확장 함수
순차 처리
join
: 코루틴 간 순차 처리 가능, 코드를 진행하기 전에 시작된 모든 코루틴이 실행을 완료하도록 함
import kotlinx.coroutines.*
suspend fun doTaskA() {
println(“Task A started”)
delay(2000) // Simulate some asynchronous work for 2 seconds
println(“Task A finished”)
}
suspend fun doTaskB() {
println(“Task B started”)
delay(3000) // Simulate some asynchronous work for 3 seconds
println(“Task B finished”)
}
fun main() = runBlocking {
val jobA = launch { doTaskA() }
val jobB = launch { doTaskB() }
println(“Launched both coroutines, now waiting for them to complete…”)
// Wait for both coroutines to complete using join()
jobA.join()
jobB.join()
println(“Both coroutines have completed.”)
}
//Launched both coroutines, now waiting for them to complete…
//Task A started
//Task B started
//Task A finished
//Task B finished
//Both coroutines have completed.
join
을 호출한 코루틴은 join의 대상이 된 코루틴이 완료될 때까지 일시 중단
joinAll
: 복수 코루틴 순차 처리, 모든 코루틴 작업이 완료되야 Caller 재개
fun main() = runBlocking {
println("main starts")
joinAll(
launch { coroutine(1, 500L) },
launch { coroutine(2, 300L) }
)
println("main ends")
}
private suspend fun coroutine(number: Int, t: Long) {
println("Routine $number starts to work")
delay(t)
println("Routine $number finished")
//main starts
//Routine 1 starts to work
//Routine 2 starts to work
//Routine 2 finished
//Routine 1 finishedmain
//ends
지연 실행
CoroutineStart.LAZY
: coroutine을 lazy하게 필요할 때만 실행
실행 전에 coroutine이 취소되면, 실행이 되지 않고 exception과 함께 완료됨
start 함수 사용하여 명시적으로 실행 OR join 호출하여 암시적으로 실행
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val job = launch(start = CoroutineStart.LAZY) {
stallForTime()
println("This is executed after the delay")
}
println("This is executed immediately")
job.start()
println("This is executed after starting the job")
}
suspend fun stallForTime() {
withContext(Dispatchers.Default) {
delay(2000L)
}
}
This is executed immediately
This is executed after starting the job
This is executed after the delay
취소
- 코루틴의 취소 특징
- 취소 시점으로부터 첫 번째 중단 지점으로부터 취소 이뤄짐
- 하위 Job이 있는 경우 모두 취소
- 한번 Job이 취소되면 해당 Job은 취소 상태가 되기 때문에 하위 Job 만들 수 없음
cancel
을 호출하면 코루틴은 즉시 취소되는 것이 아니라 Job 객체 내부 취소 확인용 플래그를 “취소 요청됨” 이라고 변경 → 코루틴이 이 플래그를 확인하는 시점에 취소됨
suspend fun main(): Unit = coroutineScope {
val job = launch {
repeat(1_000) { i ->
delay(200)
println("Printing $i")
}
}
delay(1100)
job.cancel()
job.join() //취소를 기다리기 위해서 join 함수 호출
println("Cancelled successfully")
}
// Printing 0
// Printing 1
// Printing 2
// Printing 3
// Printing 4
// Cancelled successfully
cancelAndJoin
함수 통해서 코루틴의 취소가 완료될 때까지 호출부의 코루틴이 일시 중단될 수 있음
- 코루틴의 취소 동작 방식
CancellationException
에 의해 취소됨 → 첫번째 suspend 포인트에서 CancellationException이 발생
import kotlinx.coroutines.*
import kotlin.random.Random
suspend fun main(): Unit = coroutineScope {
val job = Job()
launch(job) {
try {
delay(2000)
println("Job is done")
} finally {
println("Finally")
launch { // will be ignored
println("Will not be printed")
}
delay(1000) // here exception is thrown
println("Will not be printed")
}
}
delay(1000)
job.cancelAndJoin()
println("Cancel done")
}
// (1 sec)
// Finally
// Cancel done
위 코드처럼, finally
블록 이용해 사용중인 리소스 회수해야 함
Job이 이미 Cancelling 상태라면, suspend 또는 다른 코루틴을 시작하는 것은 불가능 → Cancelling 상태에서 다른 코루틴 시작하려고 하면 무시됨
사용중인 리소스 회수 위해 invokeOnCompletion
메서드 호출 가능, Job이 Completed
나 Cancelled
같은 마지막 상태에 도달했을 때 호출할 핸들러 지정
suspend fun main(): Unit = coroutineScope {
val job = launch {
delay(Random.nextLong(2400))
println("Finished")
}
delay(800)
job.invokeOnCompletion { exception: Throwable? ->
println("Will always be printed")
println("The exception was: $exception")
}
delay(800)
job.cancelAndJoin()
}
// Will always be printed
// The exception was:
// kotlinx.coroutines.JobCancellationException
// (or)
// Finished
// Will always be printed
// The exception was null
파라미터인 exception
의 종류는 다음과 같음
- 잡이 예외 없이 끝나면 null
- 코루틴이 취소되었으면
CancellationException
- 코루틴을 종료시킨 예외?
- 중단할 수 없는 걸 중단하기
취소는 중단점에서 일어나기 때문에 중단점이 없으면 취소를 할 수 없다, BUT 중단하려면..
yield
사용: 코루틴을 중단하고 즉시 재실행함. 중단 가능하지 않으면서 CPU 집약적인 연산들이 중단 함수에 있다면 각 연산들 사이에 yield 실행
suspend fun cpuIntensiveOperations() =
withContext(Dispatchers.Default) {
cpuIntensiveOperation1()
yield()
cpuIntensiveOperation2()
yield()
cpuINtensiveOperation3()
}
isActive
사용: Job의 상태를 추적함
public val CoroutineScope.isActive:Boolean
get() = coroutineContext[Job]?.isActive ?: true
CoroutineScope 은 coroutineConext 프로퍼티를 사용해 참조할 수 있는 컨텍스트 가지고 있음 > 코루틴 잡 (coroutineContext.job) 에 접근해 현재 상태가 무엇인지 확인가능
import kotlinx.coroutines.*
suspend fun main(): Unit = coroutineScope {
val job = Job()
launch(job) {
repeat(1000) { num ->
Thread.sleep(200)
ensureActive()
println("Printing $num")
}
}
delay(1100)
job.cancelAndJoin()
println("Cancelled successfully")
}
ensureActive
: Job이 active 상태가 아니면 CancellationException
을 던지는 함수
ensureActive vs yield?
ensureActive
함수는 coroutineScope 에서 호출 되어야 함yield
함수는 스코프를 필요로 하지 않음, CPU 사용량이 크거나 스레드를 블로킹하는 중단 함수에서 주로 사용됨 - yield 호출 시 재분배도 가능하기 때문에 한 프로세스가 다른 프로세스를 기아 상태에 빠지게 만드는 경우가 없게 함
Job의 상태
Active
: 잡이 실행되고 코루틴은 잡을 수행, 자식 코루틴 시작 가능New
: 지연 시작되는 코루틴, 그 코루틴이 작업 실행되면 Active로Completing
: 실행 완료되고, 자식들을 기다림Completed
: 자식들의 실행 모두 끝난 경우Cancelling
: 잡이 실행 도중(Active or Completing) 취소하거나 실패하게 될 경우Cancelled
: 취소 후 후처리 작업이 완료되면 Cancelled 상태 됨
import kotlinx.coroutines.*
suspend fun main() = coroutineScope {
// 빌더에 의해 생성된 Job은
val job = Job()
println(job) // JobImpl{Active}@ADD
// 완료될 때까지 Active 상태
job.complete()
println(job) // JobImpl{Completed}@ADD
// launch 로 생성되면 기본적으로 active 상태
val activeJob = launch {
delay(1000)
}
println(activeJob) // StandaloneCoroutine{Active}@ADD
// job 완료될 때까지 기다림
activeJob.join() // (1 sec)
println(activeJob) // StandaloneCoroutine{Completed}@ADD
// launch started lazily is in New state
val lazyJob = launch(start = CoroutineStart.LAZY) {
delay(1000)
}
println(lazyJob) // LazyStandaloneCoroutine{New}@ADD
// we need to start it, to make it active
lazyJob.start()
println(lazyJob) // LazyStandaloneCoroutine{Active}@ADD
lazyJob.join() // (1 sec)
println(lazyJob) //LazyStandaloneCoroutine{Completed}@ADD
}
- 잡의 상태를 확인하기 위해서는
isActive
,isCompleted
,isCancelled
를 확인하면 됨
async
async
코루틴 빌더는 값을 생성하도록 설계되어 있음, Deferred<T>
타입의 객체를 리턴하고 T는 생성되는 값의 타입임
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>
Deferred
객체- 미래의 어느 시점에 결과값이 반환될 수 있음을 나타냄,
await
함수 - 코루틴이 실행 완료되어 결과값을 반환할 때까지 호출부의 코루틴을 일시 중단시킴 (Job 객체의 join 과 유사)public interface Deferred<out T> : Job { public suspend fun await(): T ... }
- Job 인터페이스의 서브타입, Job 객체의 모든 함수와 프로퍼티 사용 가능..(e.g.
cancel
,isActive
..) awaitAll
함수: 모든 Deferred 코루틴들로부터 결과가 수신될 때까지 호출부 코루틴 일시 중단deferreds.map {it.await() }
와 동일하지 않음 -awaitAll
은 deferred 객체 중 하나라도 fail 하면 즉시 fail,deferreds.map {it.await() }
은 실패하는 작업에 맞닥드릴 때까지 하나씩 기다림suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T>
예제 코드 ..
import contributors.*
import kotlinx.coroutines.*
suspend fun loadContributorsConcurrent(service: GitHubService, req: RequestData): List<User> = coroutineScope {
val repos = service //특정 org의 repo list를 반환
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.body() ?: emptyList()
//repo list의 contributor list 가져옴
val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
async {
log("starting loading for ${repo.name}")
service
.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}
}
deferreds.awaitAll().flatten().aggregate()
}
withContext
Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns the result.
suspend fun <T> withContext(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T
block
은 CoroutineDispatcher
에서 실행을 위해 디스패치 되어야 하며,
블록이 완료되면 실행은 원래 디스패쳐로 다시 전환
withContext
로 async-await
쌍을 대체 가능함
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val result: String = withContext(Dispatchers.IO) {
delay(1000L) // 네트워크 요청
return@withContext "Dummy Response" // 문자열 반환
}
println(result)
}
/*
// 결과:
Dummy Response
*/
withContext 동작 방식
withContext
함수는 실행 중이던 코루틴을 그대로 유지하면서 실행 환경만 변경해 작업 처리
기존의 코루틴 유지 BUT CoroutineContext
객체만 바꿔서 실행(⇒ context-switch), 실행 스레드만 변경됨
withContext 사용시 주의점
독립적인 작업이 병렬로 실행되야 하는 상황에 순차적으로 실행되는 문제 발생
→ withContext가 새로운 코루틴을 생성하지 않기 때문
import kotlinx.coroutines.*
private val myDispatcher1 = newSingleThreadContext("MyThread1")
private val myDispatcher2 = newSingleThreadContext("MyThread2")
fun main() = runBlocking<Unit> {
println("[${Thread.currentThread().name}] 코루틴 실행")
withContext(myDispatcher1) {
println("[${Thread.currentThread().name}] 코루틴 실행")
withContext(myDispatcher2) {
println("[${Thread.currentThread().name}] 코루틴 실행")
}
println("[${Thread.currentThread().name}] 코루틴 실행")
}
println("[${Thread.currentThread().name}] 코루틴 실행")
}
/*
// 결과:
[main @coroutine#1] 코루틴 실행
[MyThread1 @coroutine#1] 코루틴 실행
[MyThread2 @coroutine#1] 코루틴 실행
[MyThread1 @coroutine#1] 코루틴 실행
[main @coroutine#1] 코루틴 실행
*/
withContext
함수를 통해 바뀐 CoroutineDispatcher 객체가 유효한 것은 withContext
블록 내부임,
블록을 벗어나면 이전의 원래 dispatcher 객체로 전환
'공부 > Kotlin' 카테고리의 다른 글
코루틴 컨텍스트 (Coroutine Context) (0) | 2024.06.21 |
---|---|
코루틴 디스패처(Coroutine Dispatcher) (0) | 2024.06.20 |
Kotlin 코루틴의 기본 (1) | 2024.06.09 |