发布于 2017-12-02 00:24:56 | 244 次阅读 | 评论: 0 | 来源: 网友投递
			Apache Kafka 开源消息系统
Apache Kafka是一个开源消息系统项目,由Scala写成。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。		
Spring for Apache Kafka 2.1.0 已发布,同时发布的还有 1.3.2 和 2.0.2 维护版本,包含重要的 Bug 修复。
2.1.0 版本的主要将 kafka-clients 库升级到 1.0.0,以及一些改进:
Sometimes, when a message can’t be processed, you may wish to stop the container so the condition can be corrected and the message re-delivered. The framework now provides the ContainerStoppingErrorHandler for record listeners and ContainerStoppingBatchErrorHandler for batch listeners.
The KafkaAdmin now supports increasing partitions when a NewTopic bean is detected with a larger number of partitions than currently exist on the topic.
StringJsonMessageConverter and JsonSerializer/JsonDeserializer now pass and consume type information in Headers. This allows multiple types to be easily sent/received on the same topic:
@SpringBootApplication  public class Kafka21Application {        public static void main(String[] args) {          SpringApplication.run(Kafka21Application.class, args)              .close();      }        @Bean      public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {          return args -> {              template.send(MessageBuilder.withPayload(42)                      .setHeader(KafkaHeaders.TOPIC, "blog")                      .build());              template.send(MessageBuilder.withPayload("43")                      .setHeader(KafkaHeaders.TOPIC, "blog")                      .build());              Thread.sleep(5_000);          };      }        @Bean      public StringJsonMessageConverter converter() {          return new StringJsonMessageConverter();      }        @Component      @KafkaListener(id = "multi", topics = "blog")      public static class Listener {            @KafkaHandler          public void intListener(Integer in) {              System.out.println("Got an int: " + in);          }            @KafkaHandler          public void stringListener(String in) {              System.out.println("Got a string: " + in);          }        }    }
Got an int: 42 Got a string: 43
 the JsonSerializer and JsonDeserializer can be configured using kafka properties for the producer/consumer.