programing

봄 부츠를 신은 카프카 스트림

javamemo 2023. 7. 4. 21:38
반응형

봄 부츠를 신은 카프카 스트림

봄부트 프로젝트에서 카프카 스트림과 실시간 처리 작업을 하고 싶습니다.그래서 Kafka Streams 설정이 필요하거나 Kstreams 또는 KTable을 사용하고 싶은데 인터넷에서 예제를 찾을 수 없습니다.

생산자와 소비자를 했습니다. 이제 실시간으로 스트리밍하고 싶습니다.

카프카 스트림을 처음 접하는 분들은 스프링부트를 그 위에 추가하는 것이 또 다른 수준의 복잡성을 가중시키고 있으며, 카프카 스트림은 큰 학습 곡선을 그대로 가지고 있습니다.시작하기 위한 기본 사항은 다음과 같습니다.

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>${spring.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>${kafka.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>${kafka.version}</version>
</dependency>

이제 구성 개체입니다.아래 코드는 당신이 두 개의 스트림 앱을 만들고 있다고 가정하고, 각 앱은 자체 처리 토폴로지를 나타낸다는 것을 명심하십시오.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaStreamConfig {

  @Value("${delivery-stats.stream.threads:1}")
  private int threads;

  @Value("${delivery-stats.kafka.replication-factor:1}")
  private int replicationFactor;

  @Value("${messaging.kafka-dp.brokers.url:localhost:9092}")
  private String brokersUrl;


  @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  public StreamsConfig kStreamsConfigs() {
    Map<String, Object> config = new HashMap<>();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
    setDefaults(config);
    return new StreamsConfig(config);
  }


  public void setDefaults(Map<String, Object> config) {
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
  }

  @Bean("app1StreamBuilder")
  public StreamsBuilderFactoryBean app1StreamBuilderFactoryBean() {
    Map<String, Object> config = new HashMap<>();
    setDefaults(config);
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
    config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
    config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
    return new StreamsBuilderFactoryBean(config);
  }

  @Bean("app2StreamBuilder")
  public StreamsBuilderFactoryBean app2StreamBuilderFactoryBean() {
    Map<String, Object> config = new HashMap<>();
    setDefaults(config);
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app2");
    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
    config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
    config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
    return new StreamsBuilderFactoryBean(config);
  }
}

이제 streamsBuilder를 사용하여 앱을 구축하는 재미있는 부분이 나옵니다(이 예에서는 app1).

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class App1 {
  @SuppressWarnings("unchecked")
  @Bean("app1StreamTopology")
  public KStream<String, Long> startProcessing(@Qualifier("app1StreamBuilder") StreamsBuilder builder) {

    final KStream<String, Long> toSquare = builder.stream("toSquare", Consumed.with(Serdes.String(), Serdes.Long()));
    toSquare.map((key, value) -> { // do something with each msg, square the values in our case
      return KeyValue.pair(key, value * value);
    }).to("squared", Produced.with(Serdes.String(), Serdes.Long())); // send downstream to another topic

    return toSquare;
  }
}

이게 도움이 되길 바랍니다.

Kafka 명령어를 사용하여 주제를 작성하고 데이터를 주제로 전송

주제 만들기:

kafka-topics.bat --zookeeper localhost:2181 --create --topic toSquare --replication-factor 1 --partitions 1

항목으로 데이터 전송:

kafka-console-producer --broker-list localhost:9092 --topic testStreamsIn --property parse.key=true --property key.separator=,
test,12345678

Spring Boot의 Kafka Streams로 쉽게 시작하는 방법:

  1. https://start.spring.io 을 사용하여 프로젝트를 부트스트랩합니다.종속성으로 클라우드 스트림 및 Apache Kafka 스트림의 스프링선택합니다.다음은 사전 구성된 프로젝트 템플릿에 대한 링크입니다. https://start.spring.io/ #!language=flanguage&dependencies=kafka-timeout, 클라우드 스트림

  2. 앱에서 Kstream bean을 정의합니다.예를 들어, 이것은 매우 기본적인 소비자 애플리케이션입니다.데이터를 소비하고 Kstream에서 표준 출력으로 레코드를 기록하기만 하면 됩니다.

    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Main.class, args);
        }
    
        @Bean
        public java.util.function.Consumer<KStream<String, String>> process() {
            return stream -> stream.foreach((key, value) -> {
                System.out.println(key + ":" + value);
            });
        }
    }
    

    이 응용 프로그램에서는 단일 입력 바인딩을 정의했습니다.Spring에서 이름으로 이 바인딩을 만듭니다.process-in-0즉, 콩 함수의 이름 다음에 오는 것.-in-매개 변수의 순서 위치가 그 뒤에 나옵니다.이 바인딩 이름을 사용하여 항목 이름과 같은 다른 속성을 설정할 수 있습니다.예를들면,spring.cloud.stream.bindings.process-in-0.destination=my-topic.

    Spring Cloud Stream Kafka Binder Reference, Programming Model 섹션의 추가 예제를 참조하십시오.

  3. 설정하다application.yaml다음과 같이:

    spring:
      cloud:
        stream:
          bindings:
            process-in-0.destination: my-topic
          kafka:
            streams:
              binder:
                applicationId: my-app
                brokers: localhost:9092
                configuration:
                  default:
                    key:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                    value:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    

https://start.spring.io/ 사용하여 필요한 버전/의존성을 선택하고 프로젝트를 생성/수정하여 새 스프링 부트 프로젝트를 처음부터 만들 수 있습니다.

kstream api 메소드 구현을 시작할 수 있습니다(https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KStream.html)

언급URL : https://stackoverflow.com/questions/51733039/kafka-streams-with-spring-boot

반응형