Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- kafka connect #debizium #transform
- Queue
- 토마토(고)
- Floyd
- vue
- 페이지전환
- kafka #ackmode #manual #acknowledge
- 김씨만행복한세상
- 새로운 방
- sql #오라클 #oracle #sequence #foreach #insert #mybatis
- 정올
- 1045
- 최단거리
- 페이지 전환
- 스택
- hexagonal architecture #layer architecture #아키텍쳐 #헥사고날
- Floyd 알고리즘
- 코드그라운드
- 태그를 입력해 주세요.
- 2613
- 새로운방
- 1108
- 큐
- 오류교정
- maven #메이븐 #빌드 #build #lifecycle
- kafka #consumer #autoStartup
- JAVA #필수값
- 알고리즘
- 1037
- 암스트롱 수
Archives
- Today
- Total
별집사의 IT세상
[Kafka] kafka connect transform issue 정리 본문
반응형
프로젝트 시 issue를 계속 추가할 예정.
- 특정 필드에 타임스탬프 시간 insert
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.timestamp.field": "create_dtm",
- debizium connector timestamp issue
debizium source connector에서 timestamp를 연결할 시 ZonedTimedStamp 형식으로 값이 전환되어 오류가 발생한다.
"transforms": "TimestampConverter",
"transforms.TimestampConverter.field": "create_dtm",
"transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ss'Z'",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
- source db의 column이 sink db에 없는 경우
sink db에 column이 더 많은 것은 오류가 되지 않지만, source db의 column이 sink db에 매칭이 안되면 connect 오류가 발생한다. 이는 blacklist를 사용해 제외해야한다.
"transforms": "ReplaceField",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.blacklist": "user_id",
- source db의 column과 sink db의 column이 다른 경우
컬럼명이 source db와 sink db가 다른 경우 renameField를 이용하여 변환을 해줘야한다.
"transforms.RenameField.renames": "source column:sink column",
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "user_id:emp_id",
- source db의 varchar column을 sink db의 timestamp column으로 바꾸는 경우
컬럼명까지 바뀌는경우 위의 RenameField와 혼합하여 사용하면된다. 주의점은 varchar가 timestamp에 변환되는 기준이 있어야 변환이 되는것 같다.
ex) source column value : 2022-09-07 >> "transforms.TimeStamp.format": "yyyy-MM-dd"
20220907 >> "transforms.TimeStamp.format": "yyyyMMdd"
"transforms": "RenameField, TimeStamp",
"transforms.RenameField.renames": "snd_dt:snd_dtm",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.TimeStamp.field": "snd_dtm",
"transforms.TimeStamp.format": "yyyyMMdd",
"transforms.TimeStamp.target.type": "Timestamp",
"transforms.TimeStamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
반응형
'IT > Kafka' 카테고리의 다른 글
[Kafka] Kafka Config Ackmode (0) | 2022.09.06 |
---|---|
[JAVA] kafka sub 리스너 자동 실행 방법 (0) | 2022.09.05 |
Comments