별집사의 IT세상

[Kafka] kafka connect transform issue 정리 본문

IT/Kafka

[Kafka] kafka connect transform issue 정리

별집사 2022. 9. 7. 18:07
반응형

프로젝트 시 issue를 계속 추가할 예정.

  • 특정 필드에 타임스탬프 시간 insert
        "transforms": "InsertField",
        "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertField.timestamp.field": "create_dtm",

 

  • debizium connector timestamp issue

debizium source connector에서 timestamp를 연결할 시 ZonedTimedStamp 형식으로 값이 전환되어 오류가 발생한다.

 

        "transforms": "TimestampConverter",
        "transforms.TimestampConverter.field": "create_dtm",
        "transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ss'Z'",
        "transforms.TimestampConverter.target.type": "Timestamp",
        "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",

 

  • source db의 column이 sink db에 없는 경우

 sink db에 column이 더 많은 것은 오류가 되지 않지만, source db의 column이 sink db에 매칭이 안되면 connect 오류가 발생한다. 이는 blacklist를 사용해 제외해야한다.

        "transforms": "ReplaceField",
        "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.ReplaceField.blacklist": "user_id",

 

  • source db의 column과 sink db의 column이 다른 경우

컬럼명이 source db와 sink db가 다른 경우 renameField를 이용하여 변환을 해줘야한다.

"transforms.RenameField.renames": "source column:sink column",

        "transforms": "RenameField",
        "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.RenameField.renames": "user_id:emp_id",

 

  • source db의 varchar column을 sink db의 timestamp column으로 바꾸는 경우

컬럼명까지 바뀌는경우 위의 RenameField와 혼합하여 사용하면된다. 주의점은 varchar가 timestamp에 변환되는 기준이 있어야 변환이 되는것 같다.

ex) source column value : 2022-09-07 >>       "transforms.TimeStamp.format": "yyyy-MM-dd"

20220907 >>       "transforms.TimeStamp.format": "yyyyMMdd"

        "transforms": "RenameField, TimeStamp",
        "transforms.RenameField.renames": "snd_dt:snd_dtm",
        "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",

        "transforms.TimeStamp.field": "snd_dtm",
        "transforms.TimeStamp.format": "yyyyMMdd",
        "transforms.TimeStamp.target.type": "Timestamp",
        "transforms.TimeStamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",

 

반응형

'IT > Kafka' 카테고리의 다른 글

[Kafka] Kafka Config Ackmode  (0) 2022.09.06
[JAVA] kafka sub 리스너 자동 실행 방법  (0) 2022.09.05
Comments