2026/4/17 22:13:27
网站建设
项目流程
国外平面设计师网站,阿里指数怎么没有了,360免费做网站电话,wordpress widget插件大家好#xff0c;我是小悟。
一、Kafka简介
1.1 什么是Kafka
Apache Kafka是一个分布式流处理平台#xff0c;具有以下核心特性#xff1a;
高吞吐量#xff1a;支持每秒百万级消息处理可扩展性#xff1a;支持水平扩展#xff0c;可动态添加节点持久化存储#xf…大家好我是小悟。一、Kafka简介1.1 什么是KafkaApache Kafka是一个分布式流处理平台具有以下核心特性高吞吐量支持每秒百万级消息处理可扩展性支持水平扩展可动态添加节点持久化存储消息可持久化到磁盘支持数据保留策略高可用性通过副本机制保证数据不丢失分布式架构支持多生产者和消费者1.2 Kafka核心概念BrokerKafka集群中的单个节点Topic消息的分类主题PartitionTopic的分区实现并行处理Replica分区副本保证高可用Producer消息生产者Consumer消息消费者Consumer Group消费者组二、搭建Kafka高可用集群2.1 集群架构规划建议至少3个节点的Kafka集群 3个节点的Zookeeper集群Zookeeper集群zk1:2181, zk2:2181, zk3:2181 Kafka集群kafka1:9092, kafka2:9092, kafka3:9092三、SpringBoot整合Kafka详细步骤3.1 创建SpringBoot项目使用Spring Initializr创建项目添加依赖!-- pom.xml -- dependencies dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-web/artifactId /dependency dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka/artifactId /dependency dependency groupIdorg.projectlombok/groupId artifactIdlombok/artifactId optionaltrue/optional /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-validation/artifactId /dependency /dependencies3.2 配置文件# application.yml spring: kafka: # Kafka集群配置高可用 bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092 # 生产者配置 producer: retries: 3 # 发送失败重试次数 acks: all # 所有副本确认才认为发送成功 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: compression.type: snappy # 压缩类型 linger.ms: 5 # 等待时间批量发送提高吞吐量 # 消费者配置 consumer: group-id: ${spring.application.name}-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: com.example.kafka.dto max.poll.records: 500 # 一次拉取最大记录数 session.timeout.ms: 10000 # 会话超时时间 heartbeat.interval.ms: 3000 # 心跳间隔 # 监听器配置 listener: concurrency: 3 # 并发消费者数量 ack-mode: batch # 批量确认 missing-topics-fatal: false # 主题不存在时不报错 # 高可用配置 properties: # 分区副本配置 replication.factor: 3 min.insync.replicas: 2 # 生产者的高可用配置 enable.idempotence: true # 幂等性 max.in.flight.requests.per.connection: 5 # 自定义配置 kafka: topics: order-topic: order-topic payment-topic: payment-topic retry-topic: retry-topic retry: max-attempts: 3 backoff-interval: 10003.3 配置类// KafkaConfig.java Configuration EnableKafka Slf4j public class KafkaConfig { Value(${kafka.topics.order-topic}) private String orderTopic; Value(${kafka.topics.payment-topic}) private String paymentTopic; Value(${kafka.topics.retry-topic}) private String retryTopic; Bean public KafkaAdmin kafkaAdmin() { MapString, Object configs new HashMap(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka1:9092,kafka2:9092,kafka3:9092); return new KafkaAdmin(configs); } Bean public NewTopic orderTopic() { // 创建Topic3个分区3个副本 return new NewTopic(orderTopic, 3, (short) 3); } Bean public NewTopic paymentTopic() { return new NewTopic(paymentTopic, 2, (short) 3); } Bean public NewTopic retryTopic() { return new NewTopic(retryTopic, 1, (short) 3); } // 死信队列配置 Bean public DeadLetterPublishingRecoverer dlqRecoverer(KafkaTemplateString, Object template) { return new DeadLetterPublishingRecoverer(template, (record, ex) - { log.error(消息处理失败发送到死信队列: {}, record.value(), ex); return new TopicPartition(dlq-topic, record.partition()); }); } Bean public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer dlqRecoverer) { // 重试3次后进入死信队列 DefaultErrorHandler handler new DefaultErrorHandler(dlqRecoverer, new FixedBackOff(1000L, 3)); handler.addNotRetryableExceptions(IllegalArgumentException.class); return handler; } // 生产者工厂增强配置 Bean public ProducerFactoryString, Object producerFactory() { MapString, Object configProps new HashMap(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka1:9092,kafka2:9092,kafka3:9092); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); configProps.put(ProducerConfig.ACKS_CONFIG, all); // 所有副本确认 configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数 configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性 configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); return new DefaultKafkaProducerFactory(configProps); } }3.4 消息实体类// OrderMessage.java Data NoArgsConstructor AllArgsConstructor Builder public class OrderMessage implements Serializable { private String orderId; private String userId; private BigDecimal amount; private String productName; private Integer quantity; private LocalDateTime createTime; private MessageStatus status; public enum MessageStatus { PENDING, PROCESSING, SUCCESS, FAILED } } // PaymentMessage.java Data NoArgsConstructor AllArgsConstructor Builder public class PaymentMessage { private String paymentId; private String orderId; private BigDecimal amount; private PaymentMethod paymentMethod; private PaymentStatus status; private LocalDateTime paymentTime; public enum PaymentMethod { ALIPAY, WECHAT, CREDIT_CARD } public enum PaymentStatus { INIT, PROCESSING, SUCCESS, FAILED } }3.5 生产者服务// KafkaProducerService.java Service Slf4j public class KafkaProducerService { Autowired private KafkaTemplateString, Object kafkaTemplate; Value(${kafka.topics.order-topic}) private String orderTopic; Value(${kafka.topics.payment-topic}) private String paymentTopic; /** * 发送订单消息同步 */ public SendResultString, Object sendOrderSync(OrderMessage orderMessage) { try { // 设置消息头 MessageHeaders headers new MessageHeaders(Map.of( message-id, UUID.randomUUID().toString(), message-time, String.valueOf(System.currentTimeMillis()) )); MessageOrderMessage message MessageBuilder .withPayload(orderMessage) .copyHeaders(headers) .build(); // 同步发送等待确认 ListenableFutureSendResultString, Object future kafkaTemplate.send(orderTopic, orderMessage.getOrderId(), message); // 等待发送结果 SendResultString, Object result future.get(5, TimeUnit.SECONDS); log.info(订单消息发送成功: topic{}, partition{}, offset{}, result.getRecordMetadata().topic(), result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); return result; } catch (Exception e) { log.error(订单消息发送失败: {}, orderMessage, e); throw new RuntimeException(消息发送失败, e); } } /** * 发送订单消息异步 */ public void sendOrderAsync(OrderMessage orderMessage) { kafkaTemplate.send(orderTopic, orderMessage.getOrderId(), orderMessage) .addCallback(new ListenableFutureCallbackSendResultString, Object() { Override public void onSuccess(SendResultString, Object result) { log.info(异步发送成功: topic{}, offset{}, result.getRecordMetadata().topic(), result.getRecordMetadata().offset()); } Override public void onFailure(Throwable ex) { log.error(异步发送失败: {}, orderMessage, ex); // 可以添加重试逻辑或写入本地文件 } }); } /** * 批量发送消息 */ public void batchSendOrders(ListOrderMessage orderMessages) { orderMessages.forEach(message - { kafkaTemplate.send(orderTopic, message.getOrderId(), message); }); kafkaTemplate.flush(); // 确保所有消息都发送 } /** * 发送到指定分区 */ public void sendToPartition(OrderMessage orderMessage, int partition) { kafkaTemplate.send(orderTopic, partition, orderMessage.getOrderId(), orderMessage); } /** * 事务消息发送 */ Transactional(transactionManager kafkaTransactionManager) public void sendTransactionalMessage(OrderMessage orderMessage) { // 数据库操作 // orderRepository.save(order); // Kafka消息发送与数据库操作在同一个事务中 kafkaTemplate.send(orderTopic, orderMessage.getOrderId(), orderMessage); // 其他业务操作 } }3.6 消费者服务// KafkaConsumerService.java Service Slf4j public class KafkaConsumerService { private static final String ORDER_CONTAINER_FACTORY orderContainerFactory; private static final String PAYMENT_CONTAINER_FACTORY paymentContainerFactory; /** * 订单消息消费者 - 批量消费 */ KafkaListener( topics ${kafka.topics.order-topic}, containerFactory ORDER_CONTAINER_FACTORY, groupId order-consumer-group ) public void consumeOrderMessages(ListOrderMessage messages) { log.info(收到批量订单消息数量: {}, messages.size()); for (OrderMessage message : messages) { try { processOrderMessage(message); } catch (Exception e) { log.error(订单处理失败: {}, message.getOrderId(), e); // 记录失败消息可以发送到重试队列 } } } /** * 单个订单消息消费 */ KafkaListener( topics ${kafka.topics.order-topic}, groupId order-single-consumer-group ) public void consumeSingleOrderMessage( Payload OrderMessage message, Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Header(KafkaHeaders.RECEIVED_PARTITION) int partition, Header(KafkaHeaders.OFFSET) long offset) { log.info(收到单个订单消息: topic{}, partition{}, offset{}, orderId{}, topic, partition, offset, message.getOrderId()); try { // 业务处理逻辑 processOrderMessage(message); // 处理成功后可以发送确认消息到下游 sendPaymentMessage(message); } catch (Exception e) { log.error(订单处理失败: {}, message.getOrderId(), e); throw e; // 抛出异常会触发重试机制 } } /** * 支付消息消费者 */ KafkaListener( topics ${kafka.topics.payment-topic}, containerFactory PAYMENT_CONTAINER_FACTORY, groupId payment-consumer-group ) public void consumePaymentMessage(PaymentMessage message) { log.info(收到支付消息: {}, message.getPaymentId()); // 支付处理逻辑 try { processPayment(message); } catch (Exception e) { log.error(支付处理失败: {}, message.getPaymentId(), e); } } private void processOrderMessage(OrderMessage message) { // 模拟业务处理 log.info(处理订单: {}金额: {}, message.getOrderId(), message.getAmount()); // 业务逻辑如 // 1. 验证订单 // 2. 扣减库存 // 3. 记录日志 // 4. 更新订单状态 // 模拟处理时间 try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void sendPaymentMessage(OrderMessage orderMessage) { PaymentMessage paymentMessage PaymentMessage.builder() .paymentId(UUID.randomUUID().toString()) .orderId(orderMessage.getOrderId()) .amount(orderMessage.getAmount()) .paymentMethod(PaymentMessage.PaymentMethod.ALIPAY) .status(PaymentMessage.PaymentStatus.INIT) .paymentTime(LocalDateTime.now()) .build(); // 这里可以使用KafkaTemplate发送支付消息 } private void processPayment(PaymentMessage message) { // 支付处理逻辑 log.info(处理支付: {}订单: {}, message.getPaymentId(), message.getOrderId()); } }3.7 消费者容器工厂配置// ConsumerConfig.java Configuration public class ConsumerConfig { Value(${spring.kafka.bootstrap-servers}) private String bootstrapServers; // 订单消费者容器工厂批量消费 Bean(ORDER_CONTAINER_FACTORY) public ConcurrentKafkaListenerContainerFactoryString, OrderMessage orderContainerFactory() { ConcurrentKafkaListenerContainerFactoryString, OrderMessage factory new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(orderConsumerFactory()); factory.setConcurrency(3); // 并发消费者数量 factory.getContainerProperties().setPollTimeout(3000); factory.setBatchListener(true); // 启用批量消费 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 设置批量消费参数 factory.getContainerProperties().setIdleBetweenPolls(1000); return factory; } Bean public ConsumerFactoryString, OrderMessage orderConsumerFactory() { MapString, Object props new HashMap(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, order-consumer-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(JsonDeserializer.TRUSTED_PACKAGES, com.example.kafka.dto); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 批量拉取数量 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 高可用配置 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); return new DefaultKafkaConsumerFactory(props); } }3.8 监控和管理端点// KafkaMonitorController.java RestController RequestMapping(/api/kafka) Slf4j public class KafkaMonitorController { Autowired private KafkaAdmin kafkaAdmin; Autowired private KafkaTemplateString, Object kafkaTemplate; /** * 获取Topic列表 */ GetMapping(/topics) public ResponseEntityListString getTopics() throws Exception { try (AdminClient adminClient AdminClient.create(kafkaAdmin.getConfigurationProperties())) { ListTopicsResult topicsResult adminClient.listTopics(); SetString topicNames topicsResult.names().get(); return ResponseEntity.ok(new ArrayList(topicNames)); } } /** * 获取Topic详情 */ GetMapping(/topics/{topic}/details) public ResponseEntityMapInteger, ListInteger getTopicDetails( PathVariable String topic) throws Exception { try (AdminClient adminClient AdminClient.create(kafkaAdmin.getConfigurationProperties())) { DescribeTopicsResult describeResult adminClient.describeTopics(Collections.singleton(topic)); TopicDescription topicDescription describeResult.values().get(topic).get(); MapInteger, ListInteger partitionInfo new HashMap(); for (TopicPartitionInfo partition : topicDescription.partitions()) { ListInteger replicas partition.replicas().stream() .map(Node::id) .collect(Collectors.toList()); partitionInfo.put(partition.partition(), replicas); } return ResponseEntity.ok(partitionInfo); } } /** * 发送测试消息 */ PostMapping(/send-test) public ResponseEntityString sendTestMessage(RequestParam String topic) { OrderMessage testMessage OrderMessage.builder() .orderId(TEST- System.currentTimeMillis()) .userId(test-user) .amount(new BigDecimal(100.00)) .productName(测试商品) .quantity(1) .createTime(LocalDateTime.now()) .status(OrderMessage.MessageStatus.PENDING) .build(); kafkaTemplate.send(topic, testMessage.getOrderId(), testMessage); return ResponseEntity.ok(测试消息发送成功); } /** * 获取消费者组信息 */ GetMapping(/consumer-groups) public ResponseEntityMapString, Object getConsumerGroups() throws Exception { try (AdminClient adminClient AdminClient.create(kafkaAdmin.getConfigurationProperties())) { ListConsumerGroupsResult groupsResult adminClient.listConsumerGroups(); CollectionConsumerGroupListing groups groupsResult.all().get(); MapString, Object result new HashMap(); result.put(consumerGroups, groups); result.put(count, groups.size()); return ResponseEntity.ok(result); } } }3.9 异常处理和重试机制// KafkaExceptionHandler.java Component Slf4j public class KafkaExceptionHandler { /** * 全局Kafka监听器异常处理 */ EventListener public void handleException(ListenerContainerConsumerFailedEvent event) { log.error(Kafka消费者异常: {}, event.getContainer().getListenerId(), event.getException()); // 记录异常信息 // 发送告警 // 写入错误日志 } /** * 自定义重试策略 */ Bean public RetryTemplate kafkaRetryTemplate() { RetryTemplate retryTemplate new RetryTemplate(); // 重试策略最多重试3次每次间隔1秒 SimpleRetryPolicy retryPolicy new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(3); // 退避策略 FixedBackOffPolicy backOffPolicy new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(1000L); retryTemplate.setRetryPolicy(retryPolicy); retryTemplate.setBackOffPolicy(backOffPolicy); return retryTemplate; } }3.10 健康检查// KafkaHealthIndicator.java Component public class KafkaHealthIndicator implements HealthIndicator { Autowired private KafkaTemplateString, Object kafkaTemplate; Override public Health health() { try { // 尝试发送一个测试消息来检查Kafka连接 kafkaTemplate.send(health-check-topic, health-check, ping) .get(5, TimeUnit.SECONDS); return Health.up() .withDetail(status, Kafka集群连接正常) .withDetail(timestamp, LocalDateTime.now()) .build(); } catch (Exception e) { return Health.down() .withDetail(status, Kafka集群连接异常) .withDetail(error, e.getMessage()) .withDetail(timestamp, LocalDateTime.now()) .build(); } } }四、高可用性保障措施4.1 集群配置建议# kafka-server.properties 关键配置 broker.id1 listenersPLAINTEXT://:9092 advertised.listenersPLAINTEXT://kafka1:9092 num.network.threads3 num.io.threads8 socket.send.buffer.bytes102400 socket.receive.buffer.bytes102400 socket.request.max.bytes104857600 # 日志配置 log.dirs/data/kafka-logs num.partitions3 num.recovery.threads.per.data.dir1 # 副本和ISR配置 offsets.topic.replication.factor3 transaction.state.log.replication.factor3 transaction.state.log.min.isr2 default.replication.factor3 min.insync.replicas2 # 日志保留 log.retention.hours168 log.segment.bytes1073741824 log.retention.check.interval.ms300000 # Zookeeper配置 zookeeper.connectzk1:2181,zk2:2181,zk3:2181 zookeeper.connection.timeout.ms60004.2 生产环境部署建议硬件配置至少3个Kafka节点 3个Zookeeper节点SSD磁盘提高IO性能充足的内存和CPU资源网络配置使用专用网络配置合理的防火墙规则监控告警使用Kafka Manager或Confluent Control Center监控指标吞吐量、延迟、副本同步状态五、测试示例// KafkaIntegrationTest.java SpringBootTest Slf4j class KafkaIntegrationTest { Autowired private KafkaProducerService producerService; Test void testSendAndReceiveMessage() throws InterruptedException { // 创建测试消息 OrderMessage orderMessage OrderMessage.builder() .orderId(TEST- UUID.randomUUID()) .userId(user-001) .amount(new BigDecimal(199.99)) .productName(测试商品) .quantity(2) .createTime(LocalDateTime.now()) .status(OrderMessage.MessageStatus.PENDING) .build(); // 发送消息 SendResultString, Object result producerService.sendOrderSync(orderMessage); assertNotNull(result); assertNotNull(result.getRecordMetadata()); log.info(消息发送成功分区: {}, offset: {}, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); // 等待消费者处理 Thread.sleep(2000); } Test void testBatchSend() { ListOrderMessage messages new ArrayList(); for (int i 0; i 100; i) { OrderMessage message OrderMessage.builder() .orderId(BATCH- i) .userId(user- i) .amount(new BigDecimal(i * 10)) .productName(商品 i) .quantity(1) .createTime(LocalDateTime.now()) .status(OrderMessage.MessageStatus.PENDING) .build(); messages.add(message); } producerService.batchSendOrders(messages); } }六、总结6.1 实现的高可用特性数据冗余通过副本机制Replication Factor3保证数据安全故障转移Leader选举机制确保节点故障时自动切换负载均衡分区机制实现水平扩展和负载均衡容错处理死信队列和重试机制保障消息不丢失监控告警完善的健康检查和监控体系6.2 最佳实践建议合理规划分区根据业务吞吐量和消费者数量设置分区数监控副本同步确保ISRIn-Sync Replicas数量足够配置重试机制针对网络波动和临时故障进行重试实施消息幂等避免重复消费问题定期清理数据设置合理的消息保留策略6.3 性能优化建议批量操作使用批量发送和批量消费提高吞吐量压缩传输启用消息压缩减少网络带宽消耗合理批大小根据业务场景调整批量大小异步确认非关键业务使用异步发送提高响应速度通过以上方案SpringBoot整合Kafka实现了高可用的消息队列集群具备生产级的可靠性、可扩展性和容错能力能够满足企业级应用的需求。谢谢你看我的文章既然看到这里了如果觉得不错随手点个赞、转发、在看三连吧感谢感谢。那我们下次再见。您的一键三连是我更新的最大动力谢谢山水有相逢来日皆可期谢谢阅读我们再会我手中的金箍棒上能通天下能探海