背景
最近又收到新需求,原本项目已经集成了开源kafka进行功能开发,但是又因为开源的kafka不稳定,也不安全,所以就要求要集成华为RC6.5.1安全版kafka,并且能够跟开源的kafka进行动态切换,想用哪个用哪个。。
注意:kafka集成是通过bean管理,所以有注册bean的操作
废话不多说,开搞,我把我经历的踩坑和埋坑经验分享给有需要的人
解决了哪些问题
1.如何通过配置文件控制初始化注册哪个版本的kafka的bean
1
| 简单说怎么解让springboot在启动时动态根据配置文件的配置项确定初始化注册指定版本的kafka的bean呢?
|
使用@Bean
+@ConditionalOnPoroperty
+@ConditionalOnMissingBean
做到可通过配置文件进行动态初始化
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Configuration public KafkaInitAutoConfigure{
@Bean @ConditionalOnPoroperty(prefix="kafka", name="type", havingValue="apache") @ConditionalOnMissingBean({KafkaInitTemplate.class,ApacheKafkaTemplate.class}) public KafkaInitTemplate apacheKafkaTemplateInit(){ }
@Bean @ConditionalOnPoroperty(prefix="kafka", name="type", havingValue="huawei") @ConditionalOnMissingBean({KafkaInitTemplate.class,HuaweiKafkaTemplate.class}) public HuaweiKafkaTemplate huaweiKafkaTemplateInit(){ } }
|
2.认证文件读取问题
使用华为安全版RC6.5.1,需要使用krb5.conf
、user.keytab
、以及jass.conf文件
jass.conf
文件可代码生成,也可自己创建填写,内容格式如下:
1 2 3 4 5 6 7 8 9 10
| KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="src/main/resources/user.keytab" principal="developuser" useTicketCache=false storeKey=true debug=true; refreshKrb5Config=true; };
|
其中keyTab
为user.keytab
文件的绝对路径,principal
是认证用户.
就如jass.conf
文件中keytab的路径要求是一个绝对路径,所以,你的项目如果打成jar包去运行的话,就得考虑把这个认证文件放在一个固定的路径。
如果你是上k8s,也不用担心,挂载对应的路径去读取就好了。
3.运行报错:could not login: the client is being …
如果运行时初始化kafka生产者出现这个错误,一定是你的认证文件不正确,或者jass.conf
中的配置信息填写不正确
- 仔仔细细确认程序读取的那两个认证文件是否正确
- 仔仔细细确认
jass.conf
中配置的user.keytab
路径是否正确
- 仔仔细细确认
jass.conf
中配置的principal
是否正确
4. 运行报错:Clock skew too great (37) - PROCESS_TGS
原因就是:客户端和服务器系统时间相隔超过5分钟
确认下两个系统之间的时间之差吧,通过相应的命令修改好即可
注意:k8s启动的服务是看配置的时区是什么,即timezone
,并不是所谓的系统时间。
5. 运行报错:Server not found in Kerberos database (7) - LOOKING_UP_SERVER
原因:是因为kafka-clients版本问题
要使用华为提供的kafka-clients,它兼容开源的kafka-clients,不用担心使用它,就不能切换到开源版的kafka
完整的【同时集成华为RC6.5.1安全版kafka和原生kafka,通过配置文件动态控制】的代码如下
kafka自动装配类:KafkaInitAutoConfigure.java
作用:用于项目启动时,根据指定配置初始化指定类型的kafkaTemple的bean,以便在各个service层使用。同一返回KafkaInitTemplate便于统一使用KafkaInitTemplate去进行kafka的生产和消费,关键在于底层的生产和消费使用不同的版本kafka即可,不需要把所有的类型都引用一遍
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Configuration @EnableConfigurationProperties({KafkaInitProperties.class,HuaweiKafkaProperties.class}) @AutoConfigureAfter(KafkaAutoConfiguration.class) public KafkaInitAutoConfigure{ private static final Logger logger = LoggerFactory.getLogger(KafkaInitAutoConfigure.class)
@Bean @ConditionalOnPoroperty(prefix="kafka", name="type", havingValue="apache") @ConditionalOnMissingBean({KafkaInitTemplate.class,ApacheKafkaTemplate.class}) public KafkaInitTemplate apacheKafkaTemplateInit(KafkaTemplate kafkaTemplate, ComsumerFactory consumerFactory, KafkaInitProperties kafkaInitProperties) throws Exception{ logger.info("初始化apache的kafka"); KafkaInitTemplate kafkaInitTemplate = new KafkaInitTemplate(); kafkaInitTemplate.setKafkaInitProperties(kafkaInitProperties); kafkaInitTemplate.setKafkaTemplate(kafkaTemplate); kafkaInitTemplate.setComsumerFactory(consumerFactory); kafkaInitTemplate.afterPropertiesSet(); return kafkaInitTemplate; }
@Bean @ConditionalOnPoroperty(prefix="kafka", name="type", havingValue="huawei") @ConditionalOnMissingBean({KafkaInitTemplate.class,HuaweiKafkaTemplate.class}) public KafkaInitTemplate huaweiKafkaTemplateInit(HuaweiKafkaProperties huaweiKafkaProperties,KafkaInitProperties kafkaInitProperties) throws Exception{ logger.info("初始化apache的kafka"); KafkaInitTemplate kafkaInitTemplate = new KafkaInitTemplate(); kafkaInitTemplate.setKafkaInitProperties(kafkaInitProperties); HuaweiKafkaTemplate huaweiKafkaTemplate = new HuaweiKafkaTemplate(); kafkaInitTemplate.setHuaweiKafkaTemplate(huaweiKafkaTemplate); kafkaInitTemplate.afterPropertiesSet(); return kafkaInitTemplate; } }
|
统一处理的kafka处理类:KafkaInitTemplate.java
作用:提供一个各个类型的kafka装饰类,提供各个类型kafka的get和set方法,以及配置文件的get和set。提供send(生产)和recieve(消费)两个方法,recieve(消费)提供一个监听器,你可以通过代码起一个线程异步实时监听获取kafka消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| public class KafkaInitTemplate implements InitalizingBean{ private KafkaInitProperties kafkaInitProperties; private KafkaTemplate<String,String> kafkaTemplate; private ConsumerFactory consumerFactory; private HuaweiKafkaTemplate huaweiKafkaTemplate; @Override public void afterPropertiesSet() throws Exception{ switch(kafkaInitProperties.getKafkaType){ case "huawei": Assert.state(huaweiKafkaTemplate != null, "huaweiKafkaTemplate未初始化"); break; default: Assert.state(kafkaTemplate != null, "kafkaTemplate未初始化"); Assert.state(consumerFactory != null, "consumerFactory未初始化"); } }
public KafkaTemplate<String,String> getKafkaTemplate(){ Assert.state(kafkaTemplate != null, "kafkaTemplate未初始化"); return kafkaTemplate; } public void setKafkaTemplate(KafkaTemplate<String,String> kafkaTemplate){ this.kafkaTemplate = kafkaTemplate; }
public KafkaInitProperties getKafkaInitProperties(){ return kafkaInitProperties; } public void setKafkaInitProperties(KafkaInitProperties kafkaInitProperties){ this.kafkaInitProperties = kafkaInitProperties; }
public ConsumerFactory getConsumerFactory(){ Assert.state(consumerFactory != null, "consumerFactory未初始化"); return consumerFactory; } public void setConsumerFactory(ConsumerFactory consumerFactory){ this.consumerFactory = consumerFactory; }
public HuaweiKafkaTemplate getHuaweiKafkaTemplate(){ Assert.state(huaweiKafkaTemplate != null, "huaweiKafkaTemplate未初始化"); return huaweiKafkaTemplate; } public void setKafkaTemplate(HuaweiKafkaTemplate huaweiKafkaTemplate){ this.huaweiKafkaTemplate = huaweiKafkaTemplate; } }
public void send(String topic,String key, Object value){ Assert.notNull(topic,"topic不能为空"); Assert.notNull(key,"key不能为空"); Assert.notNull(value,"value不能为空"); switch(this.kafkaInitProperties.getKafkaType){ case "huawei": this.huaweiKafkaTemplate.send(topic,key,JSON.toJSONString(value)); break; default: this.kafkaTemplate.send(topic,key,JSON.toJSONString(value)); } }
public AbstractMessageListenerContainer<String,String> receive(String topic, String groupId,MessageListener messageListener){ Assert.notNull(topic,"topic不能为空"); Assert.notNull(groupId,"groupId不能为空"); Assert.notNull(messageListener,"messageListener不能为空"); ContainerProperties containerProperties = new ContainerProperties(new String[]{topic}); containerProperties.setGroupId(groupId); containerProperties.setMessageListener(messageListener); KafkaMessageListenerContainer<String,String> messageListenerContainer; switch(this.kafkaInitProperties.getKafkaType){ case "huawei": messageListenerContainer = new KafkaMessageListenerContainer(this.huaweiKafkaTemplate.createConsumerFactory(), containerProperties); break; default: messageListenerContainer = new KafkaMessageListenerContainer(this.consumerFactory, containerProperties); } messageListenerContainer.setBeanName(topic + "_" + groupId); messageListenerContainer.start(); return messageListenerContainer; }
|
至于HuaweiKafkaTemplate.java,参照从华为集群下载的kafka客户端示例中,初始化即可,下面就不一一手打了,太累了,提供一些照片给大家看看
- 这是构造方法里面初始化了一个生产者

- 这是创建一个消费者工厂的方法

- 这是用消费者发送kafka消息的方法

- 这是安全认证的方法





如果你通过自己生成jass.conf
文件,就没要调用writeJassFile
方法
以上只是提供一个实现思路,和一些问题的解决方案,大家在实操过程中可以参考,切不可生搬硬套,有问题可以问我,我会及时回复大家
文章知识点与官方知识档案匹配,可进一步学习相关知识
云原生入门技能树首页概览17248 人正在系统学习中
本文转自 https://blog.csdn.net/zhangtao0417/article/details/125466693,如有侵权,请联系删除。