Community

๐Ÿ•Š๏ธ KStream๊ณผ KTable์˜ ์ดํ•ด์™€ ์กฐ์ธ ํ™œ์šฉ

Kafka Streams๋Š” ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ๊ฐ•๋ ฅํ•œ ๋„๊ตฌ๋กœ, KStream๊ณผ KTable์ด๋ผ๋Š” ๋‘ ๊ฐ€์ง€ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฒˆ ๊ธ€์—์„œ๋Š” ์ด ๋‘ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ์˜ ๊ฐœ๋…๊ณผ ์ฐจ์ด๋ฅผ ์‚ดํŽด๋ณด๊ณ , ์‹ค์ œ KStream-KStream, KStream-KTable, KTable-KTable ์กฐ์ธ์˜ ์˜ˆ์ œ๋ฅผ ์„ค๋ช…ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค. ย  KStreams - KStreams ์กฐ์ธ KStream-KStream ์กฐ์ธ์€ ๋‘ ๊ฐœ์˜ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ๊ฒฐํ•ฉํ•ฉ๋‹ˆ๋‹ค. ์ด ์กฐ์ธ์€ ํƒ€์ž„ ์œˆ๋„์šฐ๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์ด๋ฃจ์–ด์ง€๋ฉฐ, ์„ค์ •๋œ ๊ธฐ๊ฐ„ ๋‚ด์— ๋™์ผํ•œ ํ‚ค๋ฅผ ๊ฐ€์ง„ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฒฐํ•ฉํ•ฉ๋‹ˆ๋‹ค. * ํƒ€์ž„ ์œˆ๋„์šฐ๊ฐ€ ํ•„์ˆ˜์ ์ž…๋‹ˆ๋‹ค. * ๋ฐ์ดํ„ฐ๊ฐ€ ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋“ค์–ด์˜ค๋ฏ€๋กœ, ์‹œ๊ฐ„์ด ์ง€๋‚˜๋ฉด ์œˆ๋„์šฐ๊ฐ€ ๋‹ซํžˆ๊ณ  ๋” ์ด์ƒ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฒฐํ•ฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. [์‹œ๋‚˜๋ฆฌ์˜ค: ๊ฒฐ์ œ์™€ ๋ฐฐ์†ก ์š”์ฒญ ์ด๋ฒคํŠธ ๊ฒฐํ•ฉ] * payments ์ŠคํŠธ๋ฆผ: ๊ฒฐ์ œ ์ •๋ณด Key: ์ฃผ๋ฌธ ID, Value: ๊ฒฐ์ œ ์ƒํƒœ * shipments ์ŠคํŠธ๋ฆผ: ๋ฐฐ์†ก ์š”์ฒญ ์ •๋ณด Key: ์ฃผ๋ฌธ ID, Value: ๋ฐฐ์†ก ์ƒํƒœ KStream payments = builder.stream("payments"); KStream shipments = builder.stream("shipments"); // ํƒ€์ž„ ์œˆ๋„์šฐ๋ฅผ ์ ์šฉํ•œ ์กฐ์ธ KStream joinedStream = payments.join( shipments, (payment, shipment) -> "Payment: " + payment + ", Shipment: " + shipment, JoinWindows.of(Duration.ofMinutes(5)) // 5๋ถ„ ์œˆ๋„์šฐ ์„ค์ • ); joinedStream.to("joined-orders"); Key: "order123", Value: "Payment: completed, Shipment: dispatched" ย  KStreams - KTable ์กฐ์ธ KStream-KTable ์กฐ์ธ์€ ์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆผ(KStream)๊ณผ ์ƒํƒœ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ(KTable)๋ฅผ ๊ฒฐํ•ฉํ•ฉ๋‹ˆ๋‹ค. KStream์˜ ๊ฐ ์ด๋ฒคํŠธ๊ฐ€ KTable์˜ ํ˜„์žฌ ์ƒํƒœ์™€ ์กฐ์ธ๋ฉ๋‹ˆ๋‹ค. * KStream์˜ ๋ฐ์ดํ„ฐ๋Š” ์‹ค์‹œ๊ฐ„์œผ๋กœ ๊ณ„์† ํ๋ฅด์ง€๋งŒ, KTable์€ ํ˜„์žฌ ์ƒํƒœ๋งŒ ์œ ์ง€ํ•ฉ๋‹ˆ๋‹ค. * KStream์˜ ๊ฐ ๋ ˆ์ฝ”๋“œ๋Š” KTable์˜ ์ตœ์‹  ์ƒํƒœ์™€ ๊ฒฐํ•ฉ๋ฉ๋‹ˆ๋‹ค. [์‹œ๋‚˜๋ฆฌ์˜ค: ์‚ฌ์šฉ์ž ํ–‰๋™ ๋กœ๊ทธ์™€ ์‚ฌ์šฉ์ž ํ”„๋กœํ•„ ์กฐ์ธ] * user-actions ์ŠคํŠธ๋ฆผ: ์‚ฌ์šฉ์ž๊ฐ€ ์ˆ˜ํ–‰ํ•œ ํ–‰๋™ Key: ์‚ฌ์šฉ์ž ID, Value: ํ–‰๋™ ํƒ€์ž… (click, purchase) * user-profiles ํ…Œ์ด๋ธ”: ์‚ฌ์šฉ์ž ํ”„๋กœํ•„ ์ •๋ณด Key: ์‚ฌ์šฉ์ž ID, Value: ์‚ฌ์šฉ์ž ์ด๋ฆ„๊ณผ ์ง€์—ญ KStream userActions = builder.stream("user-actions"); KTable userProfiles = builder.table("user-profiles"); KStream joinedStream = userActions.leftJoin( userProfiles, (action, profile) -> { if (profile == null) return action + " by unknown user"; return action + " by " + profile; } ); joinedStream.to("user-actions-with-profile"); Key: "user123", Value: "click by Alice, US" Key: "user456", Value: "purchase by unknown user" ย  KStreams - KTable ์กฐ์ธ KTable-KTable ์กฐ์ธ์€ ๋‘ ๊ฐœ์˜ ์ƒํƒœ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ(KTable)๋ฅผ ๊ฒฐํ•ฉํ•ฉ๋‹ˆ๋‹ค. ์ด๋Š” KTable์˜ ํ‚ค๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ์—…๋ฐ์ดํŠธ๋  ๋•Œ๋งˆ๋‹ค ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”๋„ ์—…๋ฐ์ดํŠธ๋ฉ๋‹ˆ๋‹ค. * ์กฐ์ธ์€ ์ƒํƒœ ๊ธฐ๋ฐ˜์œผ๋กœ ๋™์ž‘ํ•˜๋ฉฐ, ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ณ€๊ฒฝ๋  ๋•Œ๋งˆ๋‹ค ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”๋„ ๊ฐฑ์‹ ๋ฉ๋‹ˆ๋‹ค. * ๊ฒฐํ•ฉ๋œ ๋ฐ์ดํ„ฐ๋Š” ํ•ญ์ƒ ์ตœ์‹  ์ƒํƒœ๋ฅผ ์œ ์ง€ํ•ฉ๋‹ˆ๋‹ค. [์‹œ๋‚˜๋ฆฌ์˜ค: ์ƒํ’ˆ ์ •๋ณด์™€ ์นดํ…Œ๊ณ ๋ฆฌ ์ •๋ณด ๊ฒฐํ•ฉ] * product-info ํ…Œ์ด๋ธ”: ์ƒํ’ˆ์˜ ์ด๋ฆ„๊ณผ ๊ฐ€๊ฒฉKey: ์ƒํ’ˆ ID, Value: ์ƒํ’ˆ ์ด๋ฆ„๊ณผ ๊ฐ€๊ฒฉ * category-info ํ…Œ์ด๋ธ”: ์ƒํ’ˆ์˜ ์นดํ…Œ๊ณ ๋ฆฌ ์ •๋ณดKey: ์ƒํ’ˆ ID, Value: ์นดํ…Œ๊ณ ๋ฆฌ ์ด๋ฆ„ KTable productInfo = builder.table("product-info"); KTable categoryInfo = builder.table("category-info"); KTable joinedTable = productInfo.join( categoryInfo, (product, category) -> product + " in category: " + category ); joinedTable.toStream().to("product-with-category"); Key: "product123", Value: "Coffee Machine, $120 in category: Appliances" ์‹ค๋ฌด ์‚ฌ๋ก€: KTable - KTable ์กฐ์ธ์œผ๋กœ ๋ฐ์ดํ„ฐ ์ •ํ™•๋„ ๊ฐœ์„  ์‹ค๋ฌด์—์„œ ์ œ๊ฐ€ ๊ฒฐํ•ฉํ•˜๊ณ ์ž ํ–ˆ๋˜ ๋ฐ์ดํ„ฐ๋Š” ์ƒํ’ˆ ์ •๋ณด A์™€ ์ƒํ’ˆ ์ •๋ณด B์˜€์Šต๋‹ˆ๋‹ค. ๊ธฐ์กด์—๋Š” KTable-KStream ์กฐ์ธ์„ ์‚ฌ์šฉํ–ˆ์œผ๋‚˜, ์ด ๋ฐฉ์‹์—์„œ๋Š” ์˜ค๋ž˜๋œ ๋ฐ์ดํ„ฐ A์™€ ์ตœ์‹  ๋ฐ์ดํ„ฐ B๊ฐ€ ๊ฒฐํ•ฉ๋˜๋Š” ๋ฌธ์ œ๊ฐ€ ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค. ์ด๋Š” ๋‘ ์ •๋ณด๊ฐ€ ๋ชจ๋‘ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ๊ฐ€ ์•„๋‹ˆ๋ผ ์ƒํƒœ์„ฑ ๋ฐ์ดํ„ฐ์˜€๊ธฐ ๋•Œ๋ฌธ์— ๋ฐœ์ƒํ•œ ์ด์Šˆ์˜€์Šต๋‹ˆ๋‹ค. ย  ์ด๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด KTable-KTable ์กฐ์ธ์œผ๋กœ ๋ณ€๊ฒฝํ•˜๊ณ , TTL์„ ์„ค์ •ํ•˜์—ฌ KTable ์ƒํƒœ์˜ ๋งŒ๋ฃŒ ์‹œ๊ฐ„์„ ๊ด€๋ฆฌํ–ˆ์Šต๋‹ˆ๋‹ค. ์ด๋กœ์จ ๊ณผ๊ฑฐ ๋ฐ์ดํ„ฐ๊ฐ€ ๊ฒฐํ•ฉ๋˜์–ด ์ƒํ’ˆ ์ •๋ณด์— ๋ฐ˜์˜๋˜๋Š” ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค. ย  ๊ฒฐ๋ก  Kafka Streams์˜ ์กฐ์ธ ์—ฐ์‚ฐ์€ ๋‹ค์–‘ํ•œ ๋ฐ์ดํ„ฐ ์†Œ์Šค๋ฅผ ๊ฒฐํ•ฉํ•˜์—ฌ ๊ฐ•๋ ฅํ•œ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ๋Š” ๋„๊ตฌ์ž…๋‹ˆ๋‹ค * KStream-KStream: ์ด๋ฒคํŠธ ๊ฐ„ ์‹œ๊ฐ„ ๊ด€๊ณ„๋ฅผ ๋ถ„์„ํ•˜๊ณ  ์ฒ˜๋ฆฌ * KStream-KTable: ์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ๋ฅผ ์ฐธ์กฐ ๋ฐ์ดํ„ฐ์™€ ๊ฒฐํ•ฉ * KTable-KTable: ์ƒํƒœ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ์˜ ์ง€์†์ ์ธ ๋™๊ธฐํ™” ๋ฐ ์ตœ์‹  ์ƒํƒœ ์œ ์ง€ Kafka Streams๋ฅผ ํ™œ์šฉํ•œ ์กฐ์ธ ๋ฐฉ์‹์€ ๋ฐ์ดํ„ฐ์˜ ํŠน์„ฑ๊ณผ ์š”๊ตฌ์‚ฌํ•ญ์— ๋”ฐ๋ผ ๋‹ฌ๋ผ์ ธ์•ผ ํ•ฉ๋‹ˆ๋‹ค.์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆผ์ธ์ง€, ์ƒํƒœ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ์ธ์ง€ ์ •ํ™•ํžˆ ์ดํ•ดํ•˜๊ณ  ์˜ฌ๋ฐ”๋ฅธ ์กฐ์ธ ๋ฐฉ์‹์„ ์„ ํƒํ•˜๋Š” ๊ฒƒ์ด ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์˜ ์ •ํ™•์„ฑ๊ณผ ํšจ์œจ์„ฑ์„ ๋†’์ด๋Š” ํ•ต์‹ฌ์ž…๋‹ˆ๋‹ค. ์›๋ฌธ: [https://ducktopia.tistory.com/141]

์•Œ๋ฆผ

์•Œ๋ฆผ์ด ์—†์Šต๋‹ˆ๋‹ค