Notice
Recent Posts
Recent Comments
Link
«   2024/09   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
Archives
Today
Total
관리 메뉴

Life Engineering

코루틴 빌더 기초 본문

공부/Kotlin

코루틴 빌더 기초

흑개 2024. 6. 14. 00:51

코루틴 빌더

코루틴 빌더 함수 호출 시 새로운 코루틴 생성

모든 코루틴 빌더 함수는 코루틴을 추상화한 Job 객체 생성


runBlocking

public actual fun <T> runBlocking
(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T

새로운 코루틴을 실행한 뒤 완료될 때까지 현재 스레드를 중단 가능한 상태로 blocking,

따라서 runBlocking 내부에서 delay(1000L) 호출 시 Thread.sleep(1000L) 과 비슷하게 작동

runBlocking 이 사용되는 경우

  • 메인 함수: 프로그램이 끝나는 걸 방지하기 위해 스레드를 블로킹할 필요가 있을 경우
  • 유닛 테스트

현재는 거의 사용되지 않고 유닛 테스트에는 runTest 주로 사용됨

메인 함수는 runBlocking 대신에 suspend 를 붙여 중단 함수로 사용


launch

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

CoroutineScope 인터페이스의 확장 함수

순차 처리

  • join : 코루틴 간 순차 처리 가능, 코드를 진행하기 전에 시작된 모든 코루틴이 실행을 완료하도록 함
import kotlinx.coroutines.*

suspend fun doTaskA() {
    println(“Task A started”)
    delay(2000) // Simulate some asynchronous work for 2 seconds
    println(“Task A finished”)
}

suspend fun doTaskB() {
    println(“Task B started”)
    delay(3000) // Simulate some asynchronous work for 3 seconds
    println(“Task B finished”)
}

fun main() = runBlocking {
val jobA = launch { doTaskA() }
val jobB = launch { doTaskB() }

println(“Launched both coroutines, now waiting for them to complete…”)

// Wait for both coroutines to complete using join()
jobA.join()
jobB.join()

println(“Both coroutines have completed.”)
}

//Launched both coroutines, now waiting for them to complete…
//Task A started
//Task B started
//Task A finished
//Task B finished
//Both coroutines have completed.

join 을 호출한 코루틴은 join의 대상이 된 코루틴이 완료될 때까지 일시 중단

  • joinAll : 복수 코루틴 순차 처리, 모든 코루틴 작업이 완료되야 Caller 재개
fun main() = runBlocking {
    println("main starts")
    joinAll(
        launch { coroutine(1, 500L) },
        launch { coroutine(2, 300L) }
    )
    println("main ends")
}

private suspend fun coroutine(number: Int, t: Long) {
    println("Routine $number starts to work")
    delay(t)
    println("Routine $number finished")

//main starts
//Routine 1 starts to work
//Routine 2 starts to work
//Routine 2 finished
//Routine 1 finishedmain 
//ends

지연 실행

CoroutineStart.LAZY : coroutine을 lazy하게 필요할 때만 실행

실행 전에 coroutine이 취소되면, 실행이 되지 않고 exception과 함께 완료됨

start 함수 사용하여 명시적으로 실행 OR join 호출하여 암시적으로 실행

import kotlinx.coroutines.*

fun main() =  runBlocking<Unit> {
  val job = launch(start = CoroutineStart.LAZY) {
    stallForTime()
    println("This is executed after the delay")
  }

  println("This is executed immediately")

  job.start() 

  println("This is executed after starting the job")
}

suspend fun stallForTime() {
  withContext(Dispatchers.Default) {
    delay(2000L)
  }
}

This is executed immediately
This is executed after starting the job
This is executed after the delay

취소

  • 코루틴의 취소 특징
    • 취소 시점으로부터 첫 번째 중단 지점으로부터 취소 이뤄짐
    • 하위 Job이 있는 경우 모두 취소
    • 한번 Job이 취소되면 해당 Job은 취소 상태가 되기 때문에 하위 Job 만들 수 없음

cancel 을 호출하면 코루틴은 즉시 취소되는 것이 아니라 Job 객체 내부 취소 확인용 플래그를 “취소 요청됨” 이라고 변경 → 코루틴이 이 플래그를 확인하는 시점에 취소됨

suspend fun main(): Unit = coroutineScope {
   val job = launch {
       repeat(1_000) { i ->
           delay(200)
           println("Printing $i")
       }
   }

   delay(1100)
   job.cancel()
   job.join() //취소를 기다리기 위해서 join 함수 호출
   println("Cancelled successfully")
}
// Printing 0
// Printing 1
// Printing 2
// Printing 3
// Printing 4
// Cancelled successfully

cancelAndJoin 함수 통해서 코루틴의 취소가 완료될 때까지 호출부의 코루틴이 일시 중단될 수 있음

  • 코루틴의 취소 동작 방식

CancellationException 에 의해 취소됨 → 첫번째 suspend 포인트에서 CancellationException이 발생

import kotlinx.coroutines.*
import kotlin.random.Random

suspend fun main(): Unit = coroutineScope {
    val job = Job()
    launch(job) {
        try {
            delay(2000)
            println("Job is done")
        } finally {
            println("Finally")
            launch { // will be ignored
                println("Will not be printed")
            }
            delay(1000) // here exception is thrown
            println("Will not be printed")
        }
    }
    delay(1000)
    job.cancelAndJoin()
    println("Cancel done")
}
// (1 sec)
// Finally
// Cancel done

위 코드처럼, finally 블록 이용해 사용중인 리소스 회수해야 함

Job이 이미 Cancelling 상태라면, suspend 또는 다른 코루틴을 시작하는 것은 불가능 → Cancelling 상태에서 다른 코루틴 시작하려고 하면 무시됨

사용중인 리소스 회수 위해 invokeOnCompletion 메서드 호출 가능, Job이 CompletedCancelled 같은 마지막 상태에 도달했을 때 호출할 핸들러 지정

suspend fun main(): Unit = coroutineScope {
    val job = launch {
        delay(Random.nextLong(2400))
        println("Finished")
    }
    delay(800)
    job.invokeOnCompletion { exception: Throwable? ->
        println("Will always be printed")
        println("The exception was: $exception")
    }
    delay(800)
    job.cancelAndJoin()
}
// Will always be printed
// The exception was: 
// kotlinx.coroutines.JobCancellationException
// (or)
// Finished
// Will always be printed
// The exception was null

파라미터인 exception 의 종류는 다음과 같음

  • 잡이 예외 없이 끝나면 null
  • 코루틴이 취소되었으면 CancellationException
  • 코루틴을 종료시킨 예외?
  • 중단할 수 없는 걸 중단하기

취소는 중단점에서 일어나기 때문에 중단점이 없으면 취소를 할 수 없다, BUT 중단하려면..

  • yield 사용: 코루틴을 중단하고 즉시 재실행함. 중단 가능하지 않으면서 CPU 집약적인 연산들이 중단 함수에 있다면 각 연산들 사이에 yield 실행
suspend fun cpuIntensiveOperations() = 
    withContext(Dispatchers.Default) {
        cpuIntensiveOperation1()
        yield()
        cpuIntensiveOperation2()
        yield()
        cpuINtensiveOperation3()
    }
  • isActive 사용: Job의 상태를 추적함
public val CoroutineScope.isActive:Boolean
    get() = coroutineContext[Job]?.isActive ?: true

CoroutineScope 은 coroutineConext 프로퍼티를 사용해 참조할 수 있는 컨텍스트 가지고 있음 > 코루틴 잡 (coroutineContext.job) 에 접근해 현재 상태가 무엇인지 확인가능

 import kotlinx.coroutines.*

suspend fun main(): Unit = coroutineScope {
    val job = Job()
    launch(job) {
        repeat(1000) { num ->
            Thread.sleep(200)
            ensureActive()
            println("Printing $num")
        }
    }
    delay(1100)
    job.cancelAndJoin()
    println("Cancelled successfully")
}

ensureActive : Job이 active 상태가 아니면 CancellationException 을 던지는 함수

ensureActive vs yield?

  • ensureActive 함수는 coroutineScope 에서 호출 되어야 함
  • yield 함수는 스코프를 필요로 하지 않음, CPU 사용량이 크거나 스레드를 블로킹하는 중단 함수에서 주로 사용됨 - yield 호출 시 재분배도 가능하기 때문에 한 프로세스가 다른 프로세스를 기아 상태에 빠지게 만드는 경우가 없게 함

Job의 상태

Untitled

  • Active: 잡이 실행되고 코루틴은 잡을 수행, 자식 코루틴 시작 가능
  • New : 지연 시작되는 코루틴, 그 코루틴이 작업 실행되면 Active로
  • Completing: 실행 완료되고, 자식들을 기다림
  • Completed : 자식들의 실행 모두 끝난 경우
  • Cancelling : 잡이 실행 도중(Active or Completing) 취소하거나 실패하게 될 경우
  • Cancelled : 취소 후 후처리 작업이 완료되면 Cancelled 상태 됨
import kotlinx.coroutines.*

suspend fun main() = coroutineScope {
    // 빌더에 의해 생성된 Job은 
    val job = Job()
    println(job) // JobImpl{Active}@ADD
    // 완료될 때까지 Active 상태
    job.complete()
    println(job) // JobImpl{Completed}@ADD

    // launch 로 생성되면 기본적으로 active 상태
    val activeJob = launch {
        delay(1000)
    }
    println(activeJob) // StandaloneCoroutine{Active}@ADD
    // job 완료될 때까지 기다림
    activeJob.join() // (1 sec)
    println(activeJob) // StandaloneCoroutine{Completed}@ADD

    // launch started lazily is in New state
    val lazyJob = launch(start = CoroutineStart.LAZY) {
        delay(1000)
    }
    println(lazyJob) // LazyStandaloneCoroutine{New}@ADD
    // we need to start it, to make it active
    lazyJob.start()
    println(lazyJob) // LazyStandaloneCoroutine{Active}@ADD
    lazyJob.join() // (1 sec)
    println(lazyJob) //LazyStandaloneCoroutine{Completed}@ADD
}
  • 잡의 상태를 확인하기 위해서는 isActive, isCompleted, isCancelled 를 확인하면 됨

async

async 코루틴 빌더는 값을 생성하도록 설계되어 있음, Deferred<T> 타입의 객체를 리턴하고 T는 생성되는 값의 타입임

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> 
  • Deferred 객체

    • 미래의 어느 시점에 결과값이 반환될 수 있음을 나타냄,
    • await 함수 - 코루틴이 실행 완료되어 결과값을 반환할 때까지 호출부의 코루틴을 일시 중단시킴 (Job 객체의 join 과 유사)
    • public interface Deferred<out T> : Job { public suspend fun await(): T ... }
    • Job 인터페이스의 서브타입, Job 객체의 모든 함수와 프로퍼티 사용 가능..(e.g. cancel, isActive ..)
    • awaitAll 함수: 모든 Deferred 코루틴들로부터 결과가 수신될 때까지 호출부 코루틴 일시 중단deferreds.map {it.await() } 와 동일하지 않음 - awaitAll 은 deferred 객체 중 하나라도 fail 하면 즉시 fail, deferreds.map {it.await() } 은 실패하는 작업에 맞닥드릴 때까지 하나씩 기다림
    • suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T>
  • 예제 코드 ..

import contributors.*
import kotlinx.coroutines.*

suspend fun loadContributorsConcurrent(service: GitHubService, req: RequestData): List<User> = coroutineScope {
    val repos = service //특정 org의 repo list를 반환
        .getOrgRepos(req.org)
        .also { logRepos(req, it) } 
        .body() ?: emptyList()   

        //repo list의 contributor list 가져옴
    val deferreds: List<Deferred<List<User>>> = repos.map { repo -> 
        async {
            log("starting loading for ${repo.name}")
            service
                .getRepoContributors(req.org, repo.name)
                .also { logUsers(repo, it) }
                .bodyList()
        }
    }

    deferreds.awaitAll().flatten().aggregate()
}

withContext

Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns the result.

suspend fun <T> withContext(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T

blockCoroutineDispatcher 에서 실행을 위해 디스패치 되어야 하며,

블록이 완료되면 실행은 원래 디스패쳐로 다시 전환

withContextasync-await 쌍을 대체 가능함

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
  val result: String = withContext(Dispatchers.IO) {
    delay(1000L) // 네트워크 요청
    return@withContext "Dummy Response" // 문자열 반환
  }
  println(result)
}
/*
// 결과:
Dummy Response
*/

withContext 동작 방식

withContext 함수는 실행 중이던 코루틴을 그대로 유지하면서 실행 환경만 변경해 작업 처리

기존의 코루틴 유지 BUT CoroutineContext 객체만 바꿔서 실행(⇒ context-switch), 실행 스레드만 변경됨

withContext 사용시 주의점

독립적인 작업이 병렬로 실행되야 하는 상황에 순차적으로 실행되는 문제 발생

→ withContext가 새로운 코루틴을 생성하지 않기 때문

import kotlinx.coroutines.*

private val myDispatcher1 = newSingleThreadContext("MyThread1")
private val myDispatcher2 = newSingleThreadContext("MyThread2")

fun main() = runBlocking<Unit> {
  println("[${Thread.currentThread().name}] 코루틴 실행")
  withContext(myDispatcher1) {
    println("[${Thread.currentThread().name}] 코루틴 실행")
    withContext(myDispatcher2) {
      println("[${Thread.currentThread().name}] 코루틴 실행")
    }
    println("[${Thread.currentThread().name}] 코루틴 실행")
  }
  println("[${Thread.currentThread().name}] 코루틴 실행")
}
/*
// 결과:
[main @coroutine#1] 코루틴 실행
[MyThread1 @coroutine#1] 코루틴 실행
[MyThread2 @coroutine#1] 코루틴 실행
[MyThread1 @coroutine#1] 코루틴 실행
[main @coroutine#1] 코루틴 실행
*/

withContext 함수를 통해 바뀐 CoroutineDispatcher 객체가 유효한 것은 withContext 블록 내부임,

블록을 벗어나면 이전의 원래 dispatcher 객체로 전환

'공부 > Kotlin' 카테고리의 다른 글

코루틴 컨텍스트 (Coroutine Context)  (0) 2024.06.21
코루틴 디스패처(Coroutine Dispatcher)  (0) 2024.06.20
Kotlin 코루틴의 기본  (1) 2024.06.09