티스토리 뷰
데이터 파이프라인 구축하기
카프카는 데이터 파이프라인 단계 사이사이에서 매우 크고 안정적인 버퍼 역할을 해줄 수 있음
데이터 파이프라인의 읽는 쪽, 쓰는 쪽을 분리함으로써 하나의 원본에서 가져온 동일한 데이터를 다른 적시성 / 가용성 요구 조건을 갖는 여러 대상 애플리케이션이나 시스템으로 보낼 수 있음
데이터 파이프라인 구축 시 고려사항
- 적시성
- 좋은 데이터 통합시스템은 시스템의 각기 다른 적시성의 요구 조건을 지원해야 하고 요구 조건이 변경되었을 때도 유연하게 대처해야 함
- 카프카는 실시간으로 작동되는 데이터 파이프라인부터 배치 작업에 이르는 모든 작업에 사용될 수 있음
- 카프카는 쓰는 쪽, 읽는 쪽 모두 시간적 민감도에 대한 요구 조건을 분리시키는 거대한 버퍼가 될 수 있음 (쓰는 쪽이 어떤 속도로 쓰든 읽는 쪽은 스스로의 요구 조건에 맞춰 읽을 수 있고, 반대으 경우도 마찬가지)
- 신뢰성
- 장애 발생에 대해 신속하고 자동화된 복구를 수행
- 또한 전달 보장 을 통해 원본 시스템에서 발생한 모든 이벤트가 유실, 중복 없이 목적지에 도착하는 것이 중요
- 처리율
- 매우 높은 처리율을 가질 수 있도록 확장 가능해야 하며 / 처리율이 증가하는 경우에도 적응할 수있어야 함
- 카프카가 읽는 쪽 / 쓰는 쪽 사이의 버퍼 역할을 하기 때문에 프로듀서의 처리율과 컨슈머의 처리율을 분리할 수 있고 파이프라인의 한 쪽을 독립적으로 확장할 수 있음
- 카프카 커넥트 API 는 작업을 병렬화하는데 초점을 맞춰서 시스템 요구 조건에 따라 싱글 노드든 여러 개의 노드든 아무 상관없이 실행될 수 있음
- 압축 코덱 지원
- 데이터 형식
- 카프카 , 커넥트 API 는 데이터 형식에 완전히 독립적
- 카프카 커넥트는 자료형, 스키마를 포함하는 고유한 인메모리 객체를 가지고 있고 이 레코드를 어떤 형식으로든 저장할 수 있도록 장착 가능한 컨버터 지원
- 카프카 데이터를 외부 시스템에 쓸 경우 싱크 커넥터가 외부 시스템에 쓰여지는 데이터 형식을 책임짐
- 변환
- ETL: 추출-변환-적재, 연산과 저장을 데이터 파이프라인 내부에서 수행 이 방식의 단점은 파이프라인 하단에서 데이터를 처리하고자 할 경우 이미 삭제되거나 변환된 데이터는 처리할 수 없음
- ELT: 추출-적재-변환: 원본 시스템 데이터, 대상 시스템 데이터가 최대한 비슷하고 최소한의 변환만 수행, 데이터 처리 작업이 하나의 시스템에서 처리됨
- 카프카 커넥트는 원본 시스템의 데이터를 카프카로 옮길 때, 카프카 데이터를 대상 시스템으로 옮길 때 단위 레코드를 변환할 수 있게 해주는 단일 메시지 변환(Single Message Transformation) 제공 → 다른 토픽으로 메시지 보내거나, 필터링, 자료형 변환 등의 기능 수항
- 장애 처리
- 모든 이벤트를 장기간에 걸쳐 저장하도록 카프카를 설정해서 필요할 경우 이전 시점으로 돌아가서 에러 복구를 함. 대상 시스템 유실되었을 경우 카프카에 저장된 이벤트를 다시 replay 해서 복구함
- 결함과 민첩성
- 데이터 원본과 대상을 분리
- 강결합이 생기는 다음 케이스가 있음
- 임기응변 파이프라인: 데이터 파이프라인이 특정한 엔드포인트와 강하게 결합되는 경우 (e.g. logstash - elasticsearch), 새로운 시스템을 도입해야 할때마다 추가적으로 파이프라인 구축해야 되는 경우
- 메타데이터 유실: 데이터 파이프라인이 스키마 메타테이터 보존 X, 스키마 진화 및 변경 역시 지원하지 않는다면 소스 시스템 - 대상 시스템 간 강결합 존재 파이프라인에서 스키마 진화를 지원한다면 각 시스템은 독립적으로 애플리케이션 변경 가능중복된 데이터가 들어왔을 때 이를 처리하는 코드도 만들어놔야 ..
- 중복 방지: timestamp + value 값 같이 넣어서 지금 있는 timestamp 보다 이전 값 들어오면 무시 …
- 진짜 중요한 데이터는 transaction 시 rdb 에 insert → kafka produce 형태로
- 과도한 처리: 파이프라인에서 데이터에 대해 과도한 변환 및 처리 하게 되면 하단에 있는 애플리케이션 요구 사항이 변경될 때마다 파이프라인도 변경 .. 가능한 로데이터를 직접 건들이지 않고 데이터 처리는 하단의 애플리케이션이 처리하도록 하는게 낫다
카프카 커넥트
카프카 커넥트 vs 카프카 프로듀서/ 컨슈머 클라이언트
- 카프카 클라이언트는 애플리케이션의 코드를 변경할 수 있으면서 카프카에 데이터를 쓰거나 읽어오고 싶을 때 씀
- 카프카 커넥트는 아파치 카프카의 일부로써, 카프카와 다른 데이터 저장소 사이 확장성과 신뢰성을 가지면서 데이터를 주고받는 수단 제공, 직접 코드나 API 를 작성하지 않은, 변경도 할 수 없는 데이터 저장소 와 카프카를 연결시켜야 할 때 사용
- 카프카 커넥트를 사용하려면 연결하고자 하는 데이터 저장소에 맞는 커넥터가 필요
- 커넥트 API 가 설정 관리, 오프셋 저장, 병렬 처리, 에러 처리, 서로 다른 데이터 형식 지원, 표준화된 관리 기능을 제공하기 때문에 데이터 저장소와 카프카를 연결하고자 한다면 커넥트를 사용하는 것을 권장
- 커넥트: 커넥터 플러그인을 개발하고 실행하기 위한 API 와 런타임 제공
- 커넥터 플러그인: 카프카 커넥트가 실행시키는 라이브러리, 데이터 이동을 담당
- 카프카 커넥트는 워커 프로세스들의 클러스터 형태로 실행, 사용자는 워커에 커넥트 플러그인을 설치하고 Rest API를 사용해서 커넥트 별 설정을 잡아주거나 관리해주면 됨
- 커넥터는 데이터 이동을 병렬화해서 처리하고 워커의 유휴 자원을 효율적으로 사용하기 위해 task 를 추가적으로 실행
- 소스 커넥터 태스크: 원본 시스템으로부터 데이터 읽어와서 커넥트 자료 객체의 형태로 워커 프로세스로 전달
- 싱크 커넥터 태스크: 워커로부터 커넥트 자료 객체를 받아 대상 시스템에 쓰는 작업을 담당
- 카프카 커넥터는 자료 객체를 카프카에 쓸 때 사용되는 형식으로 바꿀 수 있도록 컨버터 제공 (JSON, Avro, ProtoBuf .. 등 스키마 컨버터 지원)
카프카 커넥터 예제
- 핵심 설정은 아래와 같음
- bootstrap.servers: 카프카 커넥트와 함께 작동하는 카프카 브로커의 목록. 이 브로커로 데이터를 전달하거나 이 브로커의 데이터를 읽어 다른 시스템으로 전달
- group.id: 동일한 그룹 ID 를 갖는 모든 워커들은 같은 커넥트 클러스터를 구성
- plugin.path: 카프카 커넥트는 커넥터, 컨버터, 트랜스포메이션 등을 다운로드 받아 플랫폼에 플러그인 할 수 있도록 되어 있음 그 의존성들을 찾을 수 있는 디렉토리를 설정할 수 있음
- key.converter, value.converter: 커넥트는 카프카에 저장된 여러 형식의 데이터를 처리할 수 있음 카프카에 저장될 메시지의 키와 밸류 부분 각각에 컨버터 설정 가능
변경 데이터 캡쳐와 디비지움 프로젝트
- 모든 관계형 데이터베이스는 transaction log (redo log, binlog, write-ahead log) 를 포함하고 있음 이 외부 시스템이 트랜잭션 로그를 직접 읽어서 관계형 데이터베이스의 내용물의 변경을 탐지하는 방법을 CDC (Change Data Capture) 이라 부름
- 디비지움 프로젝트는 다양한 DB 에 대한 오픈소스 CDC 커넥터를 제공함, CDC 기능을 사용하고자 한다면 Debezium 에 포함된 CDC 커넥터를 사용하자
MySQL - Kafka - Elastic Search 예제
다음 링크를 참고하여 진행
# docker-compose.yml
version: '2.1'
services:
zookeeperdbz:
image: debezium/zookeeper:1.1
container_name: zookeeperdbz
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafkadbz:
image: debezium/kafka:1.1
container_name: kafkadbz
ports:
- "9092:9092"
depends_on:
- zookeeperdbz
environment:
ZOOKEEPER_CONNECT: zookeeperdbz:2181
LISTENERS: PLAINTEXT://0.0.0.0:9092
ADVERTISED_LISTENERS: PLAINTEXT://kafkadbz:9092
OFFSETS_TOPIC_REPLICATION_FACTOR: 1
mysqldbz:
image: debezium/example-mysql:1.1
container_name: mysqldbz
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: debezium
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
elasticdbz:
image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0
container_name: elasticdbz
ports:
- "9200:9200"
- "9300:9300"
environment:
discovery.type: single-node
connectdbz:
image: debezium/connect:1.1
container_name: connectdbz
ports:
- "8083:8083"
depends_on:
- zookeeperdbz
- kafkadbz
- mysqldbz
- elasticdbz
environment:
GROUP_ID: "1"
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
BOOTSTRAP_SERVERS: kafkadbz:9092
CONNECT_GROUP_ID: "1"
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
- mysql connector 설정
curl -i -X POST -H
"Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysqldbz",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafkadbz:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'
이후 topic 조회해서 topic 생성 확인
[kafka@e061fae58215 bin]$ ./kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
dbserver1
dbserver1.inventory.addresses
dbserver1.inventory.customers
dbserver1.inventory.geom
dbserver1.inventory.orders
dbserver1.inventory.products
dbserver1.inventory.products_on_hand
my_connect_configs
my_connect_offsets
my_connect_statuses
2. mysql table row 1개 update 후 kafka topic 에 데이터 써지는지 확인
mysql> UPDATE `inventory`.`customers` SET `last_name` = 'Kretchmar Kretchmer' WHERE `id` = 1004;
Query OK, 1 row affected (0.07 sec)
Rows matched: 1 Changed: 1 Warnings: 0
3. 의존성 .jar 복사 및 ElasticsearchSinkConnector 등록
https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch 에서 .jar 파일 복사 후 컨테이너에 복사
docker cp /path-to-file/confluentinc-kafka-connect-elasticsearch-5.5.0/kafka-connect-jdbc/* connectdbz:/kafka/connect/
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"name": "elastic-sink",
"config": {
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "dbserver1.inventory.customers",
"connection.url": "<http://elasticdbz:9200>",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"type.name": "customer"
}
}'
4. mysql ↔ elasticsearch 간 싱크 체크
엘라스틱 서치 인덱스 조회
curl <http://localhost:9200/dbserver1.inventory.customers/_search\\?pretty\\=true>
{
"took" : 267,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "dbserver1.inventory.customers",
"_type" : "_doc",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "sally.thomas@acme.com"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "_doc",
"_id" : "1002",
"_score" : 1.0,
"_source" : {
"id" : 1002,
"first_name" : "George",
"last_name" : "Bailey",
"email" : "gbailey@foobar.com"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "_doc",
"_id" : "1003",
"_score" : 1.0,
"_source" : {
"id" : 1003,
"first_name" : "Edward",
"last_name" : "Walker",
"email" : "ed@walker.com"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "_doc",
"_id" : "1004",
"_score" : 1.0,
"_source" : {
"id" : 1004,
"first_name" : "Anne",
"last_name" : "Kretchmar Kretchmer",
"email" : "annek@noanswer.org"
}
}
]
}
}
mysql 테이블에 row insert
insert into customers values(default, 'halohalo', 'kim', 'halohalo.kim@example.com');
Query OK, 1 row affected (0.06 sec)
kafka topic
[kafka@e061fae58215 bin]$ ./kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic dbse
rver1.inventory.customers
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 1005,
"first_name": "halohalo",
"last_name": "kim",
"email": "halohalo.kim@example.com"
},
"source": {
"version": "1.1.2.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1734261990000,
"snapshot": "false",
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 729,
"row": 0,
"thread": 2,
"query": null
},
"op": "c",
"ts_ms": 1734261990750,
"transaction": null
}
}
elastic index status
curl <http://localhost:9200/dbserver1.inventory.customers/_search\\?pretty\\=true>
{
"took" : 653,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 5,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "dbserver1.inventory.customers",
"_type" : "_doc",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "sally.thomas@acme.com"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "_doc",
"_id" : "1002",
"_score" : 1.0,
"_source" : {
"id" : 1002,
"first_name" : "George",
"last_name" : "Bailey",
"email" : "gbailey@foobar.com"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "_doc",
"_id" : "1003",
"_score" : 1.0,
"_source" : {
"id" : 1003,
"first_name" : "Edward",
"last_name" : "Walker",
"email" : "ed@walker.com"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "_doc",
"_id" : "1004",
"_score" : 1.0,
"_source" : {
"id" : 1004,
"first_name" : "Anne",
"last_name" : "Kretchmar Kretchmer",
"email" : "annek@noanswer.org"
}
},
{ #new index is created ..
"_index" : "dbserver1.inventory.customers",
"_type" : "_doc",
"_id" : "1005",
"_score" : 1.0,
"_source" : {
"id" : 1005,
"first_name" : "halohalo",
"last_name" : "kim",
"email" : "halohalo.kim@example.com"
}
}
]
}
}
개별 메시지 변환
개별 메시지 변환(Single Message transofmration) 은 카프카 커넥트가 메시지를 복사하는 도중에
데이터 변환 작업의 일부로서 코드를 작성할 필요 없이 수행
Cast: 필드 타입 변환
MaskField: 필드 내용물 null 로 채움
Filter: 특정한 조건에 부합하는 메시지 제외하거나 포함
Flattern: 중첩된 자료 구조 폄
HeaderForm: 메시지에 포함된 필드를 헤더로 이동시키거나 복사
InsertHeader: 각 메시지 헤더에 정적인 문자열 추가
InsertField: 메시지에 새로운 필드를 추가
RegexRouter: 정규식과 교체된 문자열 사용해서 목적지 토픽의 이름을 바꿈
TimestampRouter: 메시지에 포함된 타임스탬프 값 기준으로 토픽 변경, 토픽 이름만으로 목적지 시스템 데이터 세트를 찾아야 할 경우 유용
카프카 커넥트 구성 요소
- 커넥터
- 다음 3가지 작업을 수행
- 커넥터에서 몇 개의 태스크가 실행되어야 하는지 결정
- JDBC 소스 커넥터에서 DB에 연결한 뒤 테이블 찾고 그 결과 근거로 얼마나 많은 태스크 필요한지 결정
- 데이터 복사 작업을 각 태스크에 어떻게 분할해 줄 지 결정
- 각 태스크에 복사 작업을 할당해 줄 테이블 목록 사용해서 각 태스크에 전달된 설정들을 생성
- 워커로부터 태스크 설정 얻어와서 태스크에 전달
- 워커들이 태스크 실행 시키고, 이 태스크들이 데이터베이스 서로 다른 테이블 복사할 수 있도록 각각에 대한 설정 전달
- 다음 3가지 작업을 수행
- 태스크
- 태스크는 데이터를 실제로 카프카에 넣거나 가져오는 작업을 담당
- 모든 태스크는 워커로부터 컨텍스트 를 받아서 초기화됨
- 소스 컨텍스트는 소스 태스크가 소스 레코드의 오프셋을 저장할 수 있게 해주는 객체를 포함 (e.g. JDBC 의 소스 커넥터의 경우 테이블의 타임스탬프 열 값이 됨 )
- 싱크 컨텍스트는 싱크 커넥터가 카프카로부터 받는 레코드를 제어할 수 있게 하는 메서드들이 있음 (재시도 하거나, exactly-once 전달을 위해 오프셋 외부에 저장하거나 할 때 사용)
- 태스크는 초기화한 뒤 Connector 가 생성해서 태스크에게 전달해준 설정값을 갖고 있는 Properies 객체와 함께 리턴
- 소스 태스크는 외부 시스템 폴링해서 워커가 카프카로 보낼 레코드 리스트 리턴
- 싱크 태스크는 워커 통해 카프카 레코드 받아서 외부 시스템에 쓰게 됨
- 워커
- 워커 프로세스는 커넥터와 태스크를 실행시키는 역할을 맡는 컨테이너 프로세스
- 커넥터와 설정을 정의하는 HTTP 요청을 처리, 커넥터 설정을 내부 카프카 토픽에 저장, 커넥터와 태스크를 실행시키고 설정값 전달, 소스와 싱크 커넥터의 오프셋을 내부 카프카 토픽에 자동으로 커밋하는 작업과 태스크에서 에러가 발생할 경우 재시도하는 역할을 함
- 워커 프로세스거 크래시 날 경우 다른 워커들이 이걸 감지해서 다른 워커들로 재할당
- 새로운 워커가 추가될 경우 다른 워커들이 감지해서 모든 워커의 부하가 균형이 잡히도록 커넥터와 태스크 할당
- 커넥터, 태스크: 데이터 이동
- 워커: REST API, 설정 관리, 신뢰성, 고가용성, 부하 분산, 규모 확장성 담당
- 워커가 설정 관리, 에러 처리, 재시도, 모니터링, 규모 확장 및 축소 등을 수행함
- 컨버터 및 커넥트 데이터 모델
- 카프카 커넥트 데이터 API : 데이터 객체와 데이터 객체 구조를 나타내는 스키마를 다룸
- 소스 커넥터: 데이터 AP 사용해서 원본 시스템의 이벤트를 읽어와 Schema, Value 순서쌍 생성
- e.g. JDBC 소스 커넥터가 데이터베이스 열 읽어온 뒤, 데이터베이스 열에 따라 ConnectSchema 객체 생성한 후, 데이터베이스 레코드의 모든 필드를 포함하는 Struct 객체 생성 뒤 각각의 열에 대해 열의 이름과 저장된 값을 저장함
- 커넥트 워커가 데이터 객체를 카프카에 쓰는 방법 → 컨버터
- 사용자가 워커나 커넥터를 설정할 때 카프카에 데이터를 저장할 때 사용하고자 하는 컨버터 선택 → json, Avro, ProtoBuf .. 등등이 사용 가능
- 커넥터가 데이터 API 객체를 워커에 리턴하면 워커는 설정된 컨버터를 사용해서 이 레코드를 정해진 타입 (JSON, Avro 객체 등) 에 따라 kafka 에 쓰게 됨
- 싱크 커넥터: 커넥트 워커는 카프카로부터 데이터 읽어온 뒤, 컨버터 사용해서 레코드를 카프카에 저장된 형식에서 커넥트 데이터 API 객체로 변환하고 이 객체는 싱크 커넥터로 전달되어 대상 시스템에 쓰여지게 됨
- 즉 데이터 형식과 무관하게 어떤 커넥터도 카프카에서 데이터를 읽고 쓸 수 있게 됨
- 오프셋 관리:
- 커넥터는 어떤 데이터를 이미 처리했는지 알아야 함, 커넥터는 카프카가 제공하는 API 를 이용해서 어느 이벤트가 미리 처리되었는지에 대한 정보를 유지 관리함
- 소스 커넥터가 커넥터 워커에 리턴하는 레코드에 논리적인 파티션과 오프셋이 포함 (여기서 오프셋이란 JDBC 커넥터인 경우 테이블 레코드의 ID 나 타임스탬프를 의미) 됨
- 소스 커넥터가 레코드를 리턴하고 워커는 이 레코드를 카프카 브로커로 보낸 후, 이 레코드가 브로커에 성공적으로 써지고 성공 응답을 워커가 받게 되면 워커는 카프카로 보낸 레코드에 대한 오프셋을 저장하게 됨.
- 이 오프셋 정보는 카프카 토픽에 저장됨 (offset.storage.topic 설정을 이용해서 토픽 이름 바꿀 수 있음), 저장된 오프셋 정보는 추후 재시작 / 크래시가 발생 하더라도 마지막 저장된 오프셋에서부터 이벤트 처리를 시작할 수 있게 됨
- 카프카 커넥트는 생성한 커넥트의 모든 설정을 config.storage.topic , 각 커넥터의 현재 상태를 status.storage.topic 에 저장함
- 싱크 커넥터의 경우에도 토픽, 파티션, 오프셋 식별자가 있는 카프카 레코드를 읽은 후 이 레코드를 대상 시스템에 저장하고 이 작업이 성공하면 싱크 커넥터는 커넥터에 주어졌던 오프셋을 카프카에 커밋하게 됨
카프카 커넥트 대안
데이터 시스템에 카프카를 통합하는 건 카프카 커넥터 뿐만이 아니라 다른 방식도 충분히 사용할 수 있다 ..
- 다른 데이터 저장소를 위한 수집 프레임워크: 카프카가 아키텍처의 핵심 부분이고 많은 소스, 싱크를 연결하는 것이 목표라면 카프카 커넥트를 사용하는 걸 권장, 하지만 카프카가 데이터 컴포넌트의 일부일 뿐이라면 다른 걸 쓰는게 더 바람직
- GUI 기반 ETL 툴
- 스트림 프로세싱 프레임워크: 스트림 처리 프레임워크가 시스템에 대한 쓰기, 읽기 모두 지원하면 이걸 쓰는게 낫다. 스트림 처리 워크플로우에서 한 단계를 단축시킬 수 있다 (처리된 이벤트를 카프카에 저장할 필요 없이 바로 다른 시스템에 쓰면 됨)
'공부 > Kafka' 카테고리의 다른 글
카프카 모니터링하기 (0) | 2024.12.29 |
---|---|
신뢰성 있는 데이터 전달, Exactly at-once semantics (1) | 2024.11.17 |
카프카 내부 매커니즘 - 2 (0) | 2024.11.10 |
카프카 컨슈머 (0) | 2024.10.20 |
카프카 프로듀서 (0) | 2024.10.13 |