最近项目中准备使用消息中间件ApachePulsar,借着机会先做个简单了解吧。ApachePulsar ApachePulsar是Apache软件基金会顶级项目,是下一代云原生分布式消息流平台。 Pulsar作为下一代云原生分布式消息流平台,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性,内置诸多其他系统商业版本才有的特性,是云原生时代解决实时消息流数据传输、存储和计算的最佳解决方案。 Pulsar简介系统架构 功能特色租户和命名空间(namespace)是Pulsar支持多租户的两个核心概念。在租户级别,Pulsar为特定的租户预留合适的存储空间、应用授权与认证机制。在命名空间级别,Pulsar有一系列的配置策略(policy),包括存储配额、流控、消息过期策略和命名空间之间的隔离策略。Pulsar做了队列模型和流模型的统一,在Topic级别只需保存一份数据,同一份数据可多次消费。以流式、队列等方式计算不同的订阅模型大大提升了灵活度。Pulsar使用计算与存储分离的云原生架构,数据从Broker搬离,存在共享存储内部。上层是无状态Broker,复制消息分发和服务;下层是持久化的存储层Bookie集群。Pulsar存储是分片的,这种构架可以避免扩容时受限制,实现数据的独立扩展和快速恢复。Pulsar原生支持跨地域复制,因此Pulsar可以跨不同地理位置的数据中心复制数据。当数据中心中断或网络分区时,在多个数据中心存有消息副本尤为重要,提高可用性。PulsarFunctions是基于Pulsar的轻量级流处理方式。PulsarFunctions直接部署在broker节点上(或作为Kubernetes集群中的容器)。通过PulsarFunctions,Pulsar可以直接解决许多流处理任务,简化操作。支持客户端Java客户端C客户端。NetC客户端Go客户端NodeJS客户端Ruby客户端Pulsar安装与部署 目前Pulsar不支持Window,下面通过Docker进行安装,可以参考官网https:pulsar。apache。orgdocsnextgettingstarteddocker 同时可以安装PulsarManager,具体操作可以参考官方文档https:pulsar。apache。orgdocsnextadministrationpulsarmanager 其中PulsarManager是一个网页式可视化管理与监测工具,支持多环境下的动态配置。可用于管理和监测租户、命名空间、topic、订阅、broker、集群等。window环境使用docker推荐使用DockerDesktop,和linux一样可以通过docker命令管理镜像、部署容器等操作。打开并启动DockerDesktop后,在终端执行命令执行 dockersearchpulsar 可以查询到pulsar相关的镜像 镜像下载这里我们选择分别下载红框的两个镜像,执行命令 dockerpullapachepulsarpulsardockerpullapachepulsarpulsarmanager启动启动Pulsardockerrunitp6650:6650p8080:8080mountsourcepulsardata,targetpulsardatamountsourcepulsarconf,targetpulsarconfapachepulsarpulsarbinpulsarstandalone启动PulsarManagerdockerrunnamepulsarmanagerditp9527:9527p7750:7750eSPRINGCONFIGURATIONFILEpulsarmanagerpulsarmanagerapplication。propertiesapachepulsarpulsarmanager 添加用户:forftokens1Ain(curlhttp:localhost:7750pulsarmanagercsrftoken)dosetCSRFTOKENAcurlXPUTXXSRFTOKEN:CSRFTOKENHCookie:XSRFTOKENCSRFTOKEN;HContentType:applicationjsond{name:admin,password:123456,description:superuseradmin,email:admintest。com}http:localhost:7750pulsarmanageruserssuperuser 访问:http:localhost:9527用户名密码:admin123456 配置environments:这里需要保证PulsarManager应用服务能够访问到Pulsar应用,由于都是通过Docker部署,配置ServiceURL需要使用网络IP,不要用localhost。 管理界面: Pulsar与SpringBoot集成springbootversion:2。3。7。RELEASEpulsarclient:2。10。2通过Properties简单定义一些Broker相关的属性DataConfigurationProperties(prefixpulsar)publicclassPulsarProperties{privateStringcluster;privateStringnamespace;privateStringserverUrl;privateStringtoken;}通过配置定义了一些常用的组件,比如生产、消费工厂ConfigurationEnableConfigurationProperties({PulsarProperties。class})publicclassPulsarBootstrapConfiguration{privatefinalPulsarPropertiesproperties;publicPulsarBootstrapConfiguration(PulsarPropertiesproperties){this。propertiesproperties;}Bean(destroyMethodclose)publicPulsarClientpulsarClient()throwsPulsarClientException{ClientBuilderclientBuilderPulsarClient。builder()。serviceUrl(properties。getServerUrl());returnclientBuilder。build();}BeanpublicPulsarProducerFactorypulsarProducerFactory()throwsPulsarClientException{returnnewPulsarProducerFactory(pulsarClient(),properties);}BeanpublicPulsarConsumerFactorypulsarConsumerFactory()throwsPulsarClientException{returnnewPulsarConsumerFactory(pulsarClient(),properties);}}启动服务,在服务启动后,通过实现SmartInitializingSingleton接口,完成容器基本启动(不包含Lazy的Bean)后,开始对消费者Consumer监听Slf4jSpringBootApplicationpublicclassPulsarApplicationimplementsSmartInitializingSingleton{AutowiredprivatePulsarConsumerFactoryconsumerFactory;publicstaticvoidmain(String〔〕args){SpringApplication。run(PulsarApplication。class,args);}OverridepublicvoidafterSingletonsInstantiated(){startConsumerListener();}privatevoidstartConsumerListener(){ConsumerStringconsumercreateConsumer();if(consumer!null){while(!Thread。currentThread()。isInterrupted()){CompletableFuturelt;?extendsMessagelt;?completableFutureconsumer。receiveAsync();Messagelt;?messagenull;try{messagecompletableFuture。get();}catch(InterruptedExceptione){Thread。currentThread()。interrupt();log。error(错误,e);}catch(ExecutionExceptione){log。error(错误,e);}if(message!null){try{log。info(接收消息:{},message。getValue());consumer。acknowledge(message);}catch(PulsarClientExceptione){consumer。negativeAcknowledge(message);thrownewRuntimeException(e);}}}}}privateConsumerStringcreateConsumer(){try{returnconsumerFactory。getConsumer(Constants。TOPICDEMO);}catch(PulsarClientExceptione){log。error(创建consumer出错:{},e。getMessage(),e);}returnnull;}}消息发送测试Slf4jRunWith(SpringRunner。class)SpringBootTestpublicclassPulsarBootTests{AutowiredprivatePulsarProducerFactoryproducerFactory;TestpublicvoidsendMessage()throwsPulsarClientException{ProducerproducerproducerFactory。getProducer(Constants。TOPICDEMO);producer。send(测试消息:newDate());producer。close();}}检查消息接收情况2023020512:05:14。043INFO23472〔ulsartimer61〕o。a。p。c。impl。ConsumerStatsRecorderImpl:〔TOPICDEMO〕〔subTOPICDEMO〕〔7c2b2〕Prefetchedmessages:0Consumethroughputreceived:0。02msgss0。00MbitsAcksentrate:0。02acksFailedmessages:0batchmessages:0Failedacks:02023020512:06:16。425INFO23472〔main〕com。sucl。pulsar。PulsarApplication:接收消息:测试消息:SunFeb0512:06:16CST2023结束语 该篇主要通过官网对ApachePulsar做了简单的了解与尝试,同时基于SpringBoot,以简单的示例代码实现了消息的发送与接收,其中各个组件仅仅使用了默认的配置,在生产环境需要根据Pulsar的特性以及官方API使其具有扩展性与易用性。