๐๏ธ Kafka Streams: ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํ ๋๊ตฌ
Kafka Streams๋ Apache Kafka์์ ์ ๊ณตํ๋ ๋ถ์ฐ ์คํธ๋ฆฌ๋ฐ ์ฒ๋ฆฌ ์ ํ๋ฆฌ์ผ์ด์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ๋๋ค. ์ด ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ ๋๊ท๋ชจ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์ค์๊ฐ์ผ๋ก ์ฒ๋ฆฌํ๋ฉฐ ๋ฐ์ดํฐ ๋ถ์, ๋ณํ, ์ง๊ณ ๊ฐ์ ์์ ์ ์์ฝ๊ฒ ๊ตฌํํ ์ ์๋๋ก ์ค๊ณ๋์์ต๋๋ค. ย ์ด๋ฒ ๊ธ์์๋ Kafka Streams์ ํต์ฌ ๊ฐ๋ ๊ณผ ์๋ ๋ฐฉ์์ ์ด๋ณด์๋ ์ดํดํ ์ ์๋๋ก ๊ฐ๋จํ ์ค๋ช ํ๊ฒ ์ต๋๋ค. ย Kafka Streams๋ ๋ฌด์์ธ๊ฐ? Kafka Streams๋ ์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํ Apache Kafka์ ํด๋ผ์ด์ธํธ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ๋๋ค. ๋ณต์กํ ๋ถ์ฐ ์์คํ ์ ์ธ๋ถ์ฌํญ์ ๊ฐ๋ฐ์๊ฐ ์ง์ ๊ด๋ฆฌํ์ง ์์๋, ๊ฐ๋จํ ์ฝ๋๋ฅผ ํตํด ์ค์๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ณ ๋ถ์ํ ์ ์๊ฒ ๋์ต๋๋ค. Kafka Streams๋ ๋ฐ์ดํฐ์ ํํฐ๋ง, ๋ณํ, ์ง๊ณ ๊ฐ์ ์์ ์ ์ฝ๊ฒ ์ํํ๋๋ก ์ค๊ณ๋์์ต๋๋ค. ย [์ฃผ์ ํน์ง] * ์ฌ์ฉ ํธ์์ฑ Kafka Streams๋ Java ๋ฐ Scala ์ ํ๋ฆฌ์ผ์ด์ ์์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ก ์ฌ์ฉ๋๋ฉฐ, ๋ณ๋์ ํด๋ฌ์คํฐ ๊ด๋ฆฌ๊ฐ ํ์ ์์ต๋๋ค. * ํ์ฅ์ฑ Kafka Streams๋ ์๋์ผ๋ก ์์ ์ ๋ถ์ฐ ์ฒ๋ฆฌํ์ฌ ๋๋์ ๋ฐ์ดํฐ๋ ์์ ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์์ต๋๋ค. * ์ค์๊ฐ ์ฒ๋ฆฌ ๋ฐ์ดํฐ๋ฅผ ์์งํ์๋ง์ ํํฐ๋ง, ๋ณํ, ์ง๊ณ ๊ฐ์ ์์ ์ ์ฆ์ ์ํํ ์ ์์ต๋๋ค. * ๋ด๊ฒฐํจ์ฑ Kafka์ ๋ด์ฅ๋ ๋ด๊ฒฐํจ์ฑ ๋ฉ์ปค๋์ฆ์ ํ์ฉํด ์ฅ์ ๋ฐ์ ์์๋ ์์ ์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค. ย ๊ธฐ๋ณธ ๊ตฌ์ฑ ์์ Kafka Streams ์ ํ๋ฆฌ์ผ์ด์ ์ ์ฃผ์ ๊ฐ๋ ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค. ย [Stream] Stream์ Kafka ํ ํฝ์ ๋ฐ์ดํฐ๋ฅผ ๋ํ๋ ๋๋ค. Kafka Streams์์ Stream์ ๋์์ด ๋ณํํ๋ ๋ ์ฝ๋์ ํ๋ฆ์ผ๋ก, ์๋ฅผ ๋ค์ด ์ฌ์ฉ์ ํด๋ฆญ ๋ก๊ทธ๋ ์ผ์ ๋ฐ์ดํฐ๋ฅผ ์๊ฐํ ์ ์์ต๋๋ค. ย [Stream Processing Topology] Topology๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ ์ํ๋ ์ฐ์ฐ์ ๊ตฌ์กฐ์ ๋๋ค. ์ด๋ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ , ๋ณํํ๊ณ , ๊ฒฐ๊ณผ๋ฅผ ์ ์ฅํ๋ ์ฒ๋ฆฌ ํ๋ฆ์ผ๋ก ๊ตฌ์ฑ๋ฉ๋๋ค. * Source Processor: Kafka ํ ํฝ์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ต๋๋ค. * Stream Processor: ๋ฐ์ดํฐ๋ฅผ ๋ณํํ๊ฑฐ๋ ์ฒ๋ฆฌํฉ๋๋ค. * Sink Processor: ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฅธ Kafka ํ ํฝ์ ์ ์ฅํฉ๋๋ค. [KStream๊ณผ KTable] Kafka Streams์์ ๋ฐ์ดํฐ๋ฅผ ํํํ๋ ์ฃผ์ ๋ฐ์ดํฐ ๊ตฌ์กฐ์ ๋๋ค. * KStream: ๋ ์ฝ๋์ ์ฐ์์ ์ธ ์คํธ๋ฆผ์ ๋ํ๋ ๋๋ค. ์: Order ์ด๋ฒคํธ ์คํธ๋ฆผ * KTable: ์คํธ๋ฆผ์์ ์ง๊ณ๋ ์ํ ๋ฐ์ดํฐ๋ฅผ ๋ํ๋ ๋๋ค. ์: ํ์ฌ ์ฌ๊ณ ์ํ, ์ฌ์ฉ์ ํ๋กํ ย ์๋ ๋ฐฉ์ Kafka Streams ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ค์๊ณผ ๊ฐ์ ๋จ๊ณ๋ฅผ ๊ฑฐ์ณ ์๋ํฉ๋๋ค. ย 1. Kafka ํ ํฝ์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด๋ค์ ๋๋ค. StreamsBuilder builder = new StreamsBuilder(); KStream inputStream = builder.stream("input-topic"); * StreamsBuilder๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ ์ํ๋ ์์์ ์ ๋๋ค. * ์ ์ฝ๋์์ input-topic์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ฌ Kafka ํ ํฝ์ ๋๋ค. ย 2. ๋ฐ์ดํฐ๋ฅผ ๋ณํํ๊ฑฐ๋ ํํฐ๋งํฉ๋๋ค. KStream filteredStream = inputStream.filter( (key, value) -> value.contains("important") ); * filter ์ฐ์ฐ์ ํน์ ์กฐ๊ฑด์ ๋ง๋ ๋ฐ์ดํฐ๋ง ๋จ๊น๋๋ค. ์๋ฅผ ๋ค์ด, ๊ฐ์ "important"๊ฐ ํฌํจ๋ ๋ฐ์ดํฐ๋ง ํํฐ๋งํฉ๋๋ค. 3. ์ง๊ณ๋ฅผ ์ํํฉ๋๋ค. KTable wordCounts = inputStream .flatMapValues(value -> Arrays.asList(value.split(" "))) .groupBy((key, word) -> word) .count(); * ์ ์ฝ๋์์๋ ๊ฐ ๋จ์ด์ ๋ฐ์ ํ์๋ฅผ ๊ณ์ฐํฉ๋๋ค. ์ด๋ ๋ฐ์ดํฐ๋ฅผ ๋ถํด(flatMap)ํ๊ณ , ๊ทธ๋ฃนํํ ํ, ์ง๊ณํ๋ ๊ณผ์ ์ ํฌํจํฉ๋๋ค. 4. ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ๋ฅผ ์๋ก์ด Kafka ํ ํฝ์ ์ ์ฅํฉ๋๋ค. filteredStream.to("output-topic"); * to ์ฐ์ฐ์ ์ฌ์ฉํ์ฌ ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ๋ฅผ output-topic์ผ๋ก ์ ์ฅํฉ๋๋ค. 5. Kafka Streams ์ ํ๋ฆฌ์ผ์ด์ ์ ์์ํฉ๋๋ค. KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); ย ์ค์ ํ์ฉ ์ฌ๋ก Kafka Streams๋ ๋ค์ํ ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์ ์ ์ฌ์ฉ๋ฉ๋๋ค. 1. ๋ก๊ทธ ๋ถ์ ์ค์๊ฐ์ผ๋ก ๋ก๊ทธ ๋ฐ์ดํฐ๋ฅผ ๋ถ์ํ์ฌ ์ค๋ฅ๋ฅผ ๊ฐ์งํ๊ฑฐ๋ ์ฑ๋ฅ์ ๋ชจ๋ํฐ๋งํฉ๋๋ค. 2. IoT ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์ผ์ ๋ฐ์ดํฐ๋ฅผ ์์งํ๊ณ , ์ด๋ฅผ ๋ถ์ํ์ฌ ์ค์๊ฐ์ผ๋ก ์๋ฆผ์ ์์ฑํฉ๋๋ค. ์: ์จ๋ ์ผ์ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๊ฒฝ๊ณ ๋ฉ์์ง ์ ์ก. 3. ์ค์๊ฐ ์ถ์ฒ ์์คํ ์ฌ์ฉ์ ํ๋ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์ฆ๊ฐ์ ์ธ ์ถ์ฒ์ ์ ๊ณตํฉ๋๋ค. ์: ํน์ ์ํ์ ํด๋ฆญํ๋ฉด ๊ด๋ จ ์ํ ์ถ์ฒ. 4. ๊ธ์ต ๊ฑฐ๋ ๋ชจ๋ํฐ๋ง ๊ธ์ต ๊ฑฐ๋ ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ์ฒ๋ฆฌํ์ฌ ์ฌ๊ธฐ ํ์๋ฅผ ํ์งํฉ๋๋ค. ์: ๋น์ ์์ ์ผ๋ก ํฐ ๊ฑฐ๋๊ฐ ๋ฐ์ํ๋ฉด ๊ฒฝ๊ณ ๋ฅผ ์์ฑ. ย ์ฅ๋จ์ [์ฅ์ ] * ๊ฐ๋จํ API: ๋ณต์กํ ์ฝ๋๋ฅผ ์์ฑํ์ง ์๊ณ ๋ ์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅ * ํ์ฅ์ฑ: ๋๊ท๋ชจ ๋ฐ์ดํฐ ์ฒ๋ฆฌ์๋ ์ ํฉ * Kafka์์ ํตํฉ: Kafka์ ๊ธด๋ฐํ ํตํฉ๋์ด ๋์ ์ฑ๋ฅ์ ๋ณด์ฅ [๋จ์ ] * ์ด๊ธฐ ํ์ต ๊ณก์ : ์คํธ๋ฆฌ๋ฐ ์ฒ๋ฆฌ ๊ฐ๋ ๊ณผ Kafka์ ์๋ ๋ฐฉ์์ ์ดํดํด์ผ ํจ * Java ๊ธฐ๋ฐ: ๋ค๋ฅธ ์ธ์ด๋ก ๊ฐ๋ฐ์ ์ํ๋ ๊ฒฝ์ฐ ์ ์ฝ์ด ์์ ์ ์์ ย ๊ฒฐ๋ก Kafka Streams๋ ์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ๊ฐ๋ ฅํ ๋๊ตฌ์ ๋๋ค. ๊ฐ๋จํ ์ฝ๋๋ก ๋๊ท๋ชจ ๋ฐ์ดํฐ๋ฅผ ์์ ์ ์ด๊ณ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์์ด ๋ก๊ทธ ๋ถ์, IoT ๋ฐ์ดํฐ ์ฒ๋ฆฌ, ์ค์๊ฐ ์ถ์ฒ ์์คํ ๋ฑ ๋ค์ํ ๋ถ์ผ์์ ํ์ฉ๋ฉ๋๋ค. ย ๋ค์ ํฌ์คํ ์์๋ KStream๊ณผ KTable์ ํ์ฉํ ๋ฐ์ดํฐ ์คํธ๋ฆผ ๊ฐ์ ์กฐ์ธ(Join) ๋ฐฉ์์ ๋ํด ๋ค๋ฃจ๊ฒ ์ต๋๋ค. Inner Join, Left Join, Outer Join ๊ฐ์ ๋ค์ํ ๋ฐฉ์์ด ์ด๋ค ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ ์ ์๋์ง ๊ตฌ์ฒด์ ์ธ ์์ ์ ํจ๊ป ์ดํด๋ณผ ์์ ์ ๋๋ค. ๋ณธ๋ฌธ: [https://ducktopia.tistory.com/140]