봄 부츠를 신은 카프카 스트림
봄부트 프로젝트에서 카프카 스트림과 실시간 처리 작업을 하고 싶습니다.그래서 Kafka Streams 설정이 필요하거나 Kstreams 또는 KTable을 사용하고 싶은데 인터넷에서 예제를 찾을 수 없습니다.
생산자와 소비자를 했습니다. 이제 실시간으로 스트리밍하고 싶습니다.
카프카 스트림을 처음 접하는 분들은 스프링부트를 그 위에 추가하는 것이 또 다른 수준의 복잡성을 가중시키고 있으며, 카프카 스트림은 큰 학습 곡선을 그대로 가지고 있습니다.시작하기 위한 기본 사항은 다음과 같습니다.
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
이제 구성 개체입니다.아래 코드는 당신이 두 개의 스트림 앱을 만들고 있다고 가정하고, 각 앱은 자체 처리 토폴로지를 나타낸다는 것을 명심하십시오.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaStreamConfig {
@Value("${delivery-stats.stream.threads:1}")
private int threads;
@Value("${delivery-stats.kafka.replication-factor:1}")
private int replicationFactor;
@Value("${messaging.kafka-dp.brokers.url:localhost:9092}")
private String brokersUrl;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
setDefaults(config);
return new StreamsConfig(config);
}
public void setDefaults(Map<String, Object> config) {
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
}
@Bean("app1StreamBuilder")
public StreamsBuilderFactoryBean app1StreamBuilderFactoryBean() {
Map<String, Object> config = new HashMap<>();
setDefaults(config);
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
return new StreamsBuilderFactoryBean(config);
}
@Bean("app2StreamBuilder")
public StreamsBuilderFactoryBean app2StreamBuilderFactoryBean() {
Map<String, Object> config = new HashMap<>();
setDefaults(config);
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app2");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
return new StreamsBuilderFactoryBean(config);
}
}
이제 streamsBuilder를 사용하여 앱을 구축하는 재미있는 부분이 나옵니다(이 예에서는 app1).
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class App1 {
@SuppressWarnings("unchecked")
@Bean("app1StreamTopology")
public KStream<String, Long> startProcessing(@Qualifier("app1StreamBuilder") StreamsBuilder builder) {
final KStream<String, Long> toSquare = builder.stream("toSquare", Consumed.with(Serdes.String(), Serdes.Long()));
toSquare.map((key, value) -> { // do something with each msg, square the values in our case
return KeyValue.pair(key, value * value);
}).to("squared", Produced.with(Serdes.String(), Serdes.Long())); // send downstream to another topic
return toSquare;
}
}
이게 도움이 되길 바랍니다.
Kafka 명령어를 사용하여 주제를 작성하고 데이터를 주제로 전송
주제 만들기:
kafka-topics.bat --zookeeper localhost:2181 --create --topic toSquare --replication-factor 1 --partitions 1
항목으로 데이터 전송:
kafka-console-producer --broker-list localhost:9092 --topic testStreamsIn --property parse.key=true --property key.separator=,
test,12345678
Spring Boot의 Kafka Streams로 쉽게 시작하는 방법:
https://start.spring.io 을 사용하여 프로젝트를 부트스트랩합니다.종속성으로 클라우드 스트림 및 Apache Kafka 스트림의 스프링을 선택합니다.다음은 사전 구성된 프로젝트 템플릿에 대한 링크입니다. https://start.spring.io/ #!language=flanguage&dependencies=kafka-timeout, 클라우드 스트림
앱에서 Kstream bean을 정의합니다.예를 들어, 이것은 매우 기본적인 소비자 애플리케이션입니다.데이터를 소비하고 Kstream에서 표준 출력으로 레코드를 기록하기만 하면 됩니다.
@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Main.class, args); } @Bean public java.util.function.Consumer<KStream<String, String>> process() { return stream -> stream.foreach((key, value) -> { System.out.println(key + ":" + value); }); } }
이 응용 프로그램에서는 단일 입력 바인딩을 정의했습니다.Spring에서 이름으로 이 바인딩을 만듭니다.
process-in-0
즉, 콩 함수의 이름 다음에 오는 것.-in-
매개 변수의 순서 위치가 그 뒤에 나옵니다.이 바인딩 이름을 사용하여 항목 이름과 같은 다른 속성을 설정할 수 있습니다.예를들면,spring.cloud.stream.bindings.process-in-0.destination=my-topic
.Spring Cloud Stream Kafka Binder Reference, Programming Model 섹션의 추가 예제를 참조하십시오.
설정하다
application.yaml
다음과 같이:spring: cloud: stream: bindings: process-in-0.destination: my-topic kafka: streams: binder: applicationId: my-app brokers: localhost:9092 configuration: default: key: serde: org.apache.kafka.common.serialization.Serdes$StringSerde value: serde: org.apache.kafka.common.serialization.Serdes$StringSerde
https://start.spring.io/ 을 사용하여 필요한 버전/의존성을 선택하고 프로젝트를 생성/수정하여 새 스프링 부트 프로젝트를 처음부터 만들 수 있습니다.
kstream api 메소드 구현을 시작할 수 있습니다(https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KStream.html)
언급URL : https://stackoverflow.com/questions/51733039/kafka-streams-with-spring-boot
'programing' 카테고리의 다른 글
문자열 내의 특정 문자 바꾸기 (0) | 2023.07.04 |
---|---|
세션이 생성되지 않음:이 버전의 ChromeDriver는 셀레늄을 사용하는 ChromeDriver Chrome에서 Chrome 버전 74 오류만 지원합니다. (0) | 2023.07.04 |
Spring @Cacheable 기본 TTL (0) | 2023.07.04 |
데이터베이스에 계정에 대한 항목을 만들기 전에 이미지 업로드를 처리하는 방법은 무엇입니까? (0) | 2023.07.04 |
mongodump에 대한 디렉토리/폴더를 지정할 수 있습니까? (0) | 2023.07.04 |