๐๏ธ KStream๊ณผ KTable์ ์ดํด์ ์กฐ์ธ ํ์ฉ
Kafka Streams๋ ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํ ๊ฐ๋ ฅํ ๋๊ตฌ๋ก, KStream๊ณผ KTable์ด๋ผ๋ ๋ ๊ฐ์ง ๋ฐ์ดํฐ ๊ตฌ์กฐ๋ฅผ ์ ๊ณตํฉ๋๋ค. ์ด๋ฒ ๊ธ์์๋ ์ด ๋ ๋ฐ์ดํฐ ๊ตฌ์กฐ์ ๊ฐ๋ ๊ณผ ์ฐจ์ด๋ฅผ ์ดํด๋ณด๊ณ , ์ค์ KStream-KStream, KStream-KTable, KTable-KTable ์กฐ์ธ์ ์์ ๋ฅผ ์ค๋ช ํ๊ฒ ์ต๋๋ค. ย KStreams - KStreams ์กฐ์ธ KStream-KStream ์กฐ์ธ์ ๋ ๊ฐ์ ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ๊ฒฐํฉํฉ๋๋ค. ์ด ์กฐ์ธ์ ํ์ ์๋์ฐ๋ฅผ ๊ธฐ์ค์ผ๋ก ์ด๋ฃจ์ด์ง๋ฉฐ, ์ค์ ๋ ๊ธฐ๊ฐ ๋ด์ ๋์ผํ ํค๋ฅผ ๊ฐ์ง ๋ฐ์ดํฐ๋ฅผ ๊ฒฐํฉํฉ๋๋ค. * ํ์ ์๋์ฐ๊ฐ ํ์์ ์ ๋๋ค. * ๋ฐ์ดํฐ๊ฐ ์ค์๊ฐ์ผ๋ก ๋ค์ด์ค๋ฏ๋ก, ์๊ฐ์ด ์ง๋๋ฉด ์๋์ฐ๊ฐ ๋ซํ๊ณ ๋ ์ด์ ๋ฐ์ดํฐ๋ฅผ ๊ฒฐํฉํ ์ ์์ต๋๋ค. [์๋๋ฆฌ์ค: ๊ฒฐ์ ์ ๋ฐฐ์ก ์์ฒญ ์ด๋ฒคํธ ๊ฒฐํฉ] * payments ์คํธ๋ฆผ: ๊ฒฐ์ ์ ๋ณด Key: ์ฃผ๋ฌธ ID, Value: ๊ฒฐ์ ์ํ * shipments ์คํธ๋ฆผ: ๋ฐฐ์ก ์์ฒญ ์ ๋ณด Key: ์ฃผ๋ฌธ ID, Value: ๋ฐฐ์ก ์ํ KStream payments = builder.stream("payments"); KStream shipments = builder.stream("shipments"); // ํ์ ์๋์ฐ๋ฅผ ์ ์ฉํ ์กฐ์ธ KStream joinedStream = payments.join( shipments, (payment, shipment) -> "Payment: " + payment + ", Shipment: " + shipment, JoinWindows.of(Duration.ofMinutes(5)) // 5๋ถ ์๋์ฐ ์ค์ ); joinedStream.to("joined-orders"); Key: "order123", Value: "Payment: completed, Shipment: dispatched" ย KStreams - KTable ์กฐ์ธ KStream-KTable ์กฐ์ธ์ ์ค์๊ฐ ์คํธ๋ฆผ(KStream)๊ณผ ์ํ ๊ธฐ๋ฐ ๋ฐ์ดํฐ(KTable)๋ฅผ ๊ฒฐํฉํฉ๋๋ค. KStream์ ๊ฐ ์ด๋ฒคํธ๊ฐ KTable์ ํ์ฌ ์ํ์ ์กฐ์ธ๋ฉ๋๋ค. * KStream์ ๋ฐ์ดํฐ๋ ์ค์๊ฐ์ผ๋ก ๊ณ์ ํ๋ฅด์ง๋ง, KTable์ ํ์ฌ ์ํ๋ง ์ ์งํฉ๋๋ค. * KStream์ ๊ฐ ๋ ์ฝ๋๋ KTable์ ์ต์ ์ํ์ ๊ฒฐํฉ๋ฉ๋๋ค. [์๋๋ฆฌ์ค: ์ฌ์ฉ์ ํ๋ ๋ก๊ทธ์ ์ฌ์ฉ์ ํ๋กํ ์กฐ์ธ] * user-actions ์คํธ๋ฆผ: ์ฌ์ฉ์๊ฐ ์ํํ ํ๋ Key: ์ฌ์ฉ์ ID, Value: ํ๋ ํ์ (click, purchase) * user-profiles ํ ์ด๋ธ: ์ฌ์ฉ์ ํ๋กํ ์ ๋ณด Key: ์ฌ์ฉ์ ID, Value: ์ฌ์ฉ์ ์ด๋ฆ๊ณผ ์ง์ญ KStream userActions = builder.stream("user-actions"); KTable userProfiles = builder.table("user-profiles"); KStream joinedStream = userActions.leftJoin( userProfiles, (action, profile) -> { if (profile == null) return action + " by unknown user"; return action + " by " + profile; } ); joinedStream.to("user-actions-with-profile"); Key: "user123", Value: "click by Alice, US" Key: "user456", Value: "purchase by unknown user" ย KStreams - KTable ์กฐ์ธ KTable-KTable ์กฐ์ธ์ ๋ ๊ฐ์ ์ํ ๊ธฐ๋ฐ ๋ฐ์ดํฐ(KTable)๋ฅผ ๊ฒฐํฉํฉ๋๋ค. ์ด๋ KTable์ ํค๋ฅผ ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ๊ฐ ์ ๋ฐ์ดํธ๋ ๋๋ง๋ค ๊ฒฐ๊ณผ ํ ์ด๋ธ๋ ์ ๋ฐ์ดํธ๋ฉ๋๋ค. * ์กฐ์ธ์ ์ํ ๊ธฐ๋ฐ์ผ๋ก ๋์ํ๋ฉฐ, ์ ๋ ฅ ๋ฐ์ดํฐ๊ฐ ๋ณ๊ฒฝ๋ ๋๋ง๋ค ๊ฒฐ๊ณผ ํ ์ด๋ธ๋ ๊ฐฑ์ ๋ฉ๋๋ค. * ๊ฒฐํฉ๋ ๋ฐ์ดํฐ๋ ํญ์ ์ต์ ์ํ๋ฅผ ์ ์งํฉ๋๋ค. [์๋๋ฆฌ์ค: ์ํ ์ ๋ณด์ ์นดํ ๊ณ ๋ฆฌ ์ ๋ณด ๊ฒฐํฉ] * product-info ํ ์ด๋ธ: ์ํ์ ์ด๋ฆ๊ณผ ๊ฐ๊ฒฉKey: ์ํ ID, Value: ์ํ ์ด๋ฆ๊ณผ ๊ฐ๊ฒฉ * category-info ํ ์ด๋ธ: ์ํ์ ์นดํ ๊ณ ๋ฆฌ ์ ๋ณดKey: ์ํ ID, Value: ์นดํ ๊ณ ๋ฆฌ ์ด๋ฆ KTable productInfo = builder.table("product-info"); KTable categoryInfo = builder.table("category-info"); KTable joinedTable = productInfo.join( categoryInfo, (product, category) -> product + " in category: " + category ); joinedTable.toStream().to("product-with-category"); Key: "product123", Value: "Coffee Machine, $120 in category: Appliances" ์ค๋ฌด ์ฌ๋ก: KTable - KTable ์กฐ์ธ์ผ๋ก ๋ฐ์ดํฐ ์ ํ๋ ๊ฐ์ ์ค๋ฌด์์ ์ ๊ฐ ๊ฒฐํฉํ๊ณ ์ ํ๋ ๋ฐ์ดํฐ๋ ์ํ ์ ๋ณด A์ ์ํ ์ ๋ณด B์์ต๋๋ค. ๊ธฐ์กด์๋ KTable-KStream ์กฐ์ธ์ ์ฌ์ฉํ์ผ๋, ์ด ๋ฐฉ์์์๋ ์ค๋๋ ๋ฐ์ดํฐ A์ ์ต์ ๋ฐ์ดํฐ B๊ฐ ๊ฒฐํฉ๋๋ ๋ฌธ์ ๊ฐ ์์์ต๋๋ค. ์ด๋ ๋ ์ ๋ณด๊ฐ ๋ชจ๋ ์ค์๊ฐ ๋ฐ์ดํฐ๊ฐ ์๋๋ผ ์ํ์ฑ ๋ฐ์ดํฐ์๊ธฐ ๋๋ฌธ์ ๋ฐ์ํ ์ด์์์ต๋๋ค. ย ์ด๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด KTable-KTable ์กฐ์ธ์ผ๋ก ๋ณ๊ฒฝํ๊ณ , TTL์ ์ค์ ํ์ฌ KTable ์ํ์ ๋ง๋ฃ ์๊ฐ์ ๊ด๋ฆฌํ์ต๋๋ค. ์ด๋ก์จ ๊ณผ๊ฑฐ ๋ฐ์ดํฐ๊ฐ ๊ฒฐํฉ๋์ด ์ํ ์ ๋ณด์ ๋ฐ์๋๋ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ ์ ์์์ต๋๋ค. ย ๊ฒฐ๋ก Kafka Streams์ ์กฐ์ธ ์ฐ์ฐ์ ๋ค์ํ ๋ฐ์ดํฐ ์์ค๋ฅผ ๊ฒฐํฉํ์ฌ ๊ฐ๋ ฅํ ์ค์๊ฐ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๊ตฌ์ถํ ์ ์๋ ๋๊ตฌ์ ๋๋ค * KStream-KStream: ์ด๋ฒคํธ ๊ฐ ์๊ฐ ๊ด๊ณ๋ฅผ ๋ถ์ํ๊ณ ์ฒ๋ฆฌ * KStream-KTable: ์ค์๊ฐ ์คํธ๋ฆผ ๋ฐ์ดํฐ๋ฅผ ์ฐธ์กฐ ๋ฐ์ดํฐ์ ๊ฒฐํฉ * KTable-KTable: ์ํ ๊ธฐ๋ฐ ๋ฐ์ดํฐ์ ์ง์์ ์ธ ๋๊ธฐํ ๋ฐ ์ต์ ์ํ ์ ์ง Kafka Streams๋ฅผ ํ์ฉํ ์กฐ์ธ ๋ฐฉ์์ ๋ฐ์ดํฐ์ ํน์ฑ๊ณผ ์๊ตฌ์ฌํญ์ ๋ฐ๋ผ ๋ฌ๋ผ์ ธ์ผ ํฉ๋๋ค.์ค์๊ฐ ์คํธ๋ฆผ์ธ์ง, ์ํ ๊ธฐ๋ฐ ๋ฐ์ดํฐ์ธ์ง ์ ํํ ์ดํดํ๊ณ ์ฌ๋ฐ๋ฅธ ์กฐ์ธ ๋ฐฉ์์ ์ ํํ๋ ๊ฒ์ด ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ์ ํ์ฑ๊ณผ ํจ์จ์ฑ์ ๋์ด๋ ํต์ฌ์ ๋๋ค. ์๋ฌธ: [https://ducktopia.tistory.com/141]