티스토리 뷰

공부/Kafka

카프카 프로듀서

흑개1 2024. 10. 13. 21:59

프로듀서

프로듀서 개요

애플리케이션에서 카프카에서 메시지를 써야 하는 상황들

  • 성능 메트릭 기록
  • 로그 메시지 저장
  • 센서 정보 수집 ..

각 case 마다 throughput, latency가 다름

카프카에서 메시지를 쓰는 작업(=produce)는 어떻게 이루어질까?

  1. ProducerRecord 객체 생성 : topic, value 필수 지정 (partition 과 key는 선택 사항)
  2. 객체를 byte 배열로 직렬화
  3. partitioner 가 파티션을 결정함
    1. 보통 기준은 ProducerRecord 객체의 키 값
    2. 키 값이 지정되지 않았을 경우 random
  4. 프로듀서는 이 레코드를 같은 토픽 파티션으로 전송될 레코드를 모은 record batch 에 추가, 이 record batch 가 카프카 브로커의 partition leader 에게 전송됨
    • partition replicas 들은 partition leader와 sync 가 맞춰짐, 만약 leader에 장애 발생 시 in-sync replica 가 leader로 선출될 수 있게 함
    • producer를 어떻게 설정하느냐에 따라 각 프로듀서들은 replica 에서도 record가 저장될 때까지 기다릴 수 있음 (⇒ durability 우선일 경우)
    • partition leader에 쓰였다고 해서 바로 읽을 수 있는 건 아니고, in-sync-replicas 에서 write 되면 message는 committed 되며 이건 읽을 수 있는 상태임, 이 메시지를 읽은 후에도 메시지가 lost 되지 않음. leader만 write acknowledge 되면 (acks=1) 메시지는 손실될 수 있음
  5. 브로커가 메시지를 받으면 응답을 돌려줌
    • 성공적으로 저장 시 토픽, 파티션, 파티션 안에 레코드의 오프셋 담은 RecordMetadata 리턴
    • 에러 시 에러 리턴 .. 프로듀서가 재시도 가능

초기 설정

  //create Producer Properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");  //connect to localhost

//set Producer properties
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());

//create the Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

bootstrap.servers : 카프카 클러스터와 연결 생성

key.serializer, value.serializer : 키, 밸류 값 직렬화,

key.serializer 에는 org.apache.kafka.common.serialization 를 구현하는 클래스 이름이 저장

메시지 전달

// create a Producer Record
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");

//send data (async)
producer.send(producerRecord);

메시지는 버퍼에 저장되었다가 별도 스레드에 의해 브로커로 보내짐

public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    return this.send(record, (Callback)null);
}

send() 메서드는 Future 객체를 리턴, 위 코드에서는 리턴값 무시하기 때문에 메시지 전송 실패해도 알아낼 방법이 없음

메시지를 직렬화하는데 실패할 경우 SerializationException, 버퍼 가득 찰 경우 TimeoutException, 전송 작업 수행하는 스레드에 인터럽트 걸릴 경우 InterruptException

동기적으로 메시지 전송하기

    // create a Producer Record
  ProducerRecord<String, String> producerRecord = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");

  //send data (sync)
  try {
      RecordMetadata recordMetadata = producer.send(producerRecord).get();
      log.info(recordMetadata.toString());
  } catch (InterruptedException e) {
      throw new RuntimeException(e);
  } catch (ExecutionException e) {
      throw new RuntimeException(e);
  }

응답이 올때까지 Future.get() 으로 대기함

BUT 동기적으로 전송할 경우 전송하는 요청하는 스레드는 blocking 됨 .. 실제 production 에는 잘 안씀

재시도 가능한 에러 는 메시지를 다시 전송함으로써 해결될 수 있음

(e.g. 연결 에러, 전송받은 브로커가 파티션 리더가 아닐 경우) — 재전송 횟수가 소진되고 나서도 에러가 해결되지 않은 경우에 한해 재시도 가능한 예외가 발생함

비동기적으로 메시지 전송하기

콜백 함수와 함께 send() 메서드를 호출하면 카프카 브로커로부터 응답을 받는 시점에서 콜백 함수가 호출된다

//send data (async)
producer.send(producerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null){
            exception.printStackTrace();
        }
    }
});

성공했을 경우 → RecordMetaData 를 반환하는데 대부분 애플리케이션에는 이 정보 필요없다

하지만 메시지 전송에 실패했을 경우에는 로그에 쓰던지, 예외를 발생시키던지 해야되기 때문에 콜백을 쓸 수 있음

  • 카프카가 에러를 리턴한다면 onCompletion() 메서드가 Exception 객체를 반환하게 됨
  • 콜백은 메인 스레드에서 실행됨, 따라서 프로듀서가 지연되는 상황을 막기 위해서는 콜백이 빨라야 하며 블로킹 작업은 콜백 내에서 수행하지 말아야 함. 블로킹 작업을 수행하는 다른 스레드를 사용해야 함

프로듀서 설정값들

  • client.id : 프로듀서를 사용하는 애플리케이션을 구분하기 위한 논리적 식별자. 주로 브로커가 프로듀서가 보내온 메시지를 app instance 별로 구분하기 위해 사용. 주로 디버깅, 트러블 슈팅용
  • acks : durability 와 관련된 설정값. 메시지가 유실될 가능성에 큰 영향을 미침. 이 매개변수는 프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지 결정함
    • acks = 0 브로커의 응답을 기다리지 않음, 브로커가 메시지 받지 못했을 경우 메시지는 그대로 유실. 주로 높은 처리량이 필요할 때 사용
    • acks = 1 리더 레플리카가 메시지 받는 순간 브로커로부터 성공했다는 응답 받음
    • acks = all 리더 레플리카 + 모든 in-sync 레플리카가 메시지 받는 순간 성공했다는 응답 받음, 가장 안전한 형태 BUT 지연 시간 길어짐
    BUT 컨슈머가 읽을 수 있을 때까지 시간(=종단 지연)은 세 값이 모두 똑같음따라서 종단 지연을 주로 고려해야 하는 상황이라면 어차피 세 값 모두 다 종단 지연은 같기 때문에 가장 신뢰성 있는 선택 acks= all 을 해도 무방
  • 카프카는 일관성을 유지하기 위해서 모든 in-sync 레플리카에 복제가 완료된 후에야 컨슈머가 레코드를 읽어갈 수 있게 함
  • 메시지 전달 시간

producerRecord 를 보낼때 걸리는 시간을 두 구간으로 나누면 ..

  • send() 에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간 - 이 시간 동안 send 를 호출한 스레드는 블록됨 — 1
  • send() 에 대한 비동기 호출이 성공적으로 리턴한 시간부터 콜백이 호출될 때까지 걸리는 시간 — 2

 

 

 

  • max.block.ms : 카프카 프로듀서가 send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() and abortTransaction() 를 호출했을 때 얼마나 오랫동안 블록되는지 결정
    send()는 메타데이터가 페치되는 시간 + buffer 할당을 기다리는 총 시간을 의미
  • delivery.timeout.ms : 레코드 전송 준비가 완료된 시점 (send() 가 문제없이 리턴되고 레코드가 배치에 저장 완료된 시점)에서부터 브로커로부터 응답을 받거나 전송을 포기하기 되는 시점까지의 제한시간을 정함.
    linger.ms + request.timeout.ms + retry.backoff.ms 보다 커야 함
    현재 배치를 전송하기 전까지 대기하는 시간 + broker 로부터 ack 를 기다리는 시간 + 전송 실패를 재시도하는 시간을 포함
  • request.timeout.ms : client 가 요청에 대한 응답을 받을 때까지 얼마나 기다릴 것인지 설정함. timeout 내에 받지 못하면 재시도 횟수가 다 채워지지 않을 경우 다시 재전송하게 됨, 아니면 timeout exception

 

  • retries, retry.backoff.ms : 재전송 횟수와 재전송 간격. 카프카에서는 이 값을 조절하는 것을 권장하지 않음 .. 크래시 난 브로커가 정상으로 돌아오기까지(모든 파티션에 대해 새 리더가 선출될때까지 얼마나 걸리는지)의 시간을 테스트한 뒤 delivery.timeout.ms 매개변수 잡아주는 것을 권장. 재전송을 시도하는 전체 시간이 크래시로부터 복구되기까지 시간보다 더 길게 잡히도록 설정.

  • linger.ms : 현재 배치를 전송하기 전까지 대기하는 시간을 결정. 배치가 가득 차거나 linger.ms 에 설정된 제한 시간이 다 되었을 때 배치를 전송함
    linger.ms 를 0보다 큰 값으로 설정하면 인위적인 지연을 추가하여 메시지를 즉시 보내는 대신 다른 레코드가 더 추가되서 전송할 수 있도록함. 지연 시간까지 기다려 전송을 일괄 처리할 수 있음. latency는 조금 줄어들지만 throughput 을 크게 증가시킴.

  • buffer.memory : 프로듀서가 브로커에 메시지를 전송하기 전에 메시지를 대기시키는 버퍼의 크기를 결정. 레코드가 브로커에 전달될 수 있는 것보다 더 빨리 전송된다면 버퍼 메모리가 가득 찰 수 있으며 max.block.ms 동안 블록되어 버퍼 메모리에 공간이 생기기를 기다리고 공간이 확보되지 않을 경우 예외 발생

  • compression.type : 압축 알고리즘 설정해서 메시지를 압축할 수 있게 함
  • batch.size : 같은 파티션에 여러 개 레코드가 전송될 경우 프로듀서는 이것들을 배치 단위로 모아서 한꺼번에 저장함. 이 매개변수는 각각의 배치에 사용될 메모리의 양을 결정.
    배치가 가득 차면 모든 메시지가 한꺼번에 전송, 이 값을 너무 작게 설정할 경우 프로듀서가 메시지를 자주 전송해야 되기 때문에 오버헤드 발생 가능
  • max.in.flight.request.per.connection : 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지의 수. 이 값을 올리면 메모리 사용량 증가, BUT 처리량 증가.처리량 고려 + 신뢰성 보장 위해 재시도 횟수 높아야 된다는 점을 고려하면 enable.idempotentce=true 로 설정하면 됨 .. in-flight 요청 허용하면서 순서 보장하고 재전송하더라도 중복이 발생하는것을 막아줌. 만약 retries > 0, 이 매개변수를 1 이상으로 잡아줄 경우 메시지의 순서가 뒤집어 질 수 있음 (1번째 배치는 실패, 2번째 배치는 성공, 1번째 배치는 재시도 후 성공할 경우)

  • max.request.size : 프로듀서가 전송하는 쓰기 요청의 크기를 결정. 프로듀서가 single request 에서 보낼 레코드 배치 수를 제한하여 큰 요청을 보내는걸 방지. 브로커의 message.max.bytes 매개 변수와 값을 맞추는게 좋음

  • receiver.buffer.bytes, send.buffer.bytes : 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기를 결정
  • enable.idempotence : 멱등적 프로듀서(??) 관련 내용.
    만약 acks=all, 재시도는 충분히 하도록 delivery.timeout.ms 는 크게 잡는다. 브로커가 프로듀서로부터 레코드 받아서 디스크에 쓰고, 다른 브로커에도 성공적으로 복제되었지만 첫번째 브로커가 프로듀서로 응답 보내기 전에 크래시 난다면 프로듀서는 request.timeout.ms 만큼 대기한 뒤 다시 재전송 하게 되며 이때 새로 보내진 메시지는 이미 메시지를 받은 브로커로 전달되게 된다 .. 즉 메시지가 중복된다

    이 매개변수 설정을 하게 되면 프로듀서는 레코드를 보낼 때마다 순차적인 번호를 붙여서 보내고, 브로커가 동일한 번호를 가진 레코드를 2개 이상 받을 경우 하나만 저장하게 됨. (흠.. 근데 만약 브로커가 달라지면 어케되는거임?) 프로듀서는 DuplicateSequenceException 받게 됨.

    이 기능을 활성화하기 위해선  max.in.flight.requests.per.connection to be less than or equal to 5 (with message ordering preserved for any allowable value), retries to be greater than 0, and acks must be ‘all’.

시리얼라이저

커스텀 시리얼라이저

public interface Serializer<T> extends Closeable {
    default void configure(Map<String, ?> configs, boolean isKey) {
    }

    byte[] serialize(String var1, T var2);

    default byte[] serialize(String topic, Headers headers, T data) {
        return this.serialize(topic, data);
    }

    default void close() {
    }
}

Seralizer 클래스를 구현해서 커스텀 시리얼라이저를 생성

BUT 이렇게 하면 새 version 으로 객체를 업데이트 했을 때, 기존 형식 - 새 형식 사이의 호환성을 유지하기 힘듦.

Apache avro를 사용해 직렬화

Avro는 언어 중립적인 데이터 직렬화 형식으로, 언어에 독립적인 스키마의 형태로 기술됨.

스키마는 주로 json 형식으로 정의됨

보통 스키마 레지스트리 에 스키마를 저장해두고 사용

사용되는 모든 스키마를 레지스트리에 저장, 카프카에 쓰는 레코드에는 사용된 스키마의 고유 식별자를 넣어주고 이 식별자를 가져와서 데이터를 역직렬화

스키마를 레지스트리에 저장하고 필요할 때 가져오는 작업이 시리얼라이저, 디시리얼라이저 내부에서 실행됨

 

파티션

키 값으로 파티션 결정

키 값이 지정되는 이유? 메시지에 밸류값과 함께 저장되는 추가적인 정보 저장, 토픽 중 파티션을 지정하기 위함

키 값이 null 일 경우 기본적으로 라운드 로빈 으로 지정된다

접착성 처리 를 위해서 프로듀서가 메시지 배치를 채울 때 다음 배치로 넘어가기 전 이전 배치를 먼저 채우게 되어있음 → 더 작은 요청으로 같은 수의 메시지를 전송할 수 있게 됨

이외에도 RoundRobinPartitioner (⇒ 키 값을 포함하고 있을 때도 랜덤 파티션 할당),

UniformStickyPartitioner (⇒ 전체 파티션이 균등한 분포 가지도록 할당 ) 이 있음 ..

헤더

레코드 헤더는 추가 메타데이터를 심을 때 사용

메시지의 전달 내역을 기록하기 위해 사용 → 메시지를 파싱할 필요 없이 헤더에 저장된 정보로 메시지 라우팅 하거나 출처 추적

인터셉터

카프카 클라이언트에 코드 변경 없이 공통된 동작을 추가하고 싶을 경우

onSend : 프로듀서가 레코드를 브로커로 보내기 전 직렬화 되기 전 호출. 유효한 ProducerRecord 를 리턴하기만 하면 됨.

onAcknowledgement : 브로커가 보낸 응답을 클라이언트가 받았을 때 호출. 내용 변경은 못하지만 그 안에 담긴 정보는 읽을 수 있음

주로 모니터링, 정보 추적, 표준 헤더 삽입에 사용됨 (implements ProducerInterceptor )

출처:

- 카프카 핵심 가이드 2nd Edition

'공부 > Kafka' 카테고리의 다른 글

카프카 내부 매커니즘 - 2  (0) 2024.11.10
카프카 컨슈머  (0) 2024.10.20
카프카 기본 개념  (0) 2024.10.06
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함