说下你可能没用过的EventBus
最近在CodeReview的时候发现了这样一个业务场景,某个业务处理完成之后需要通知审核人员,通知的方式包含短信和邮件,所以代码大致是这样:业务校验validate();处理业务逻辑doBusiness();发送邮件或者发送其他类型消息sendMsg();
这个对不对呢?
基于这种普遍的业务场景来说,一般首先我们会考虑同步或者异步发送的问题。
同步的话对接口RT有影响,而且和业务逻辑耦合在一起,这样的做法肯定不太好。
一般情况下,我们会做成异步的方式,使用MQ自己发送自己消费,或者说一个线程池搞定,这样的话不影响主业务逻辑,可以提高性能,并且代码做到了解耦。
然后还有就是数据一致性的问题,邮件一定要发送成功吗?
大多数时候其实我们并不要求邮件一定100发送成功,失败了就失败好了,监控告警打点做好失败率不要超过阈值就好,还有就是消息服务一旦收到请求应该自己保证消息能够投递。
所以总的来说,使用MQ发送消息自己消费处理,或者线程池异步处理,最后自己搞个补偿的逻辑就能处理好这类问题。
那么,今天要说的是这两个解决方案之外的处理方式,对于这种场景其实我们可以用EventBus来解决。EventBus使用
看名字就知道,EventBus是事件总线的意思,它是GoogleGuava库的一个工具,基于观察者模式可以做到进程内的代码解耦作用。
就拿上面的例子来说,引入一个MQ太重了,其实不太需要这样做,EventBus也能达到这个效果,和MQ相比他只能提供进程内的消息事件传递,这对于我们这种业务场景来说足够了不是吗?
我们先看EventBus怎么来使用,一般先创建一个EventBus实例。1。创建EventBusprivatestaticEventBuseventBusnewEventBus();
第二步,创建一个事件消息订阅者,处理方式非常简单,只要在我们希望去处理事件的方法上加上Subscribe注解即可。
形参只能有一个,如果定义0个或者多个的话运行就会报错。publicclassEmailMsgHandler{Subscribepublicvoidhandle(LongbusinessId){System。out。println(sendemailmsgbusinessId);}}
第三步,注册事件。eventBus。register(newEmailMsgHandler());
第四步,发送事件。eventBus。post(1L);
这就是一个EventBus使用的最简单例子,下面我们看看结合开头说的例子怎么处理。结合实际
比如上面说的案例,举例来说比如注册和用户下单的场景,都需要发送消息和邮件给用户。
EventBus并不强制说我们一定要用单例模式,因为他的创建销毁成本比较低,所以更多是根据我们的业务场景和上下文自己来选择。publicclassUserService{privatestaticEventBuseventBusnewEventBus();publicvoidregist(){LonguserId1L;eventBus。register(newEmailMsgHandler());eventBus。register(newSmsMsgHandler());eventBus。post(userId);}}publicclassBookingService{privatestaticEventBuseventBusnewEventBus();publicvoidbooking(){业务逻辑LongbookingId2L;eventBus。register(newEmailMsgHandler());eventBus。register(newSmsMsgHandler());eventBus。post(bookingId);}}
然后在业务逻辑处理完成之后,分别去注册了邮件和短信两个事件订阅者。publicclassEmailMsgHandler{Subscribepublicvoidhandle(LongbusinessId){System。out。println(sendemailmsgbusinessId);}}publicclassSmsMsgHandler{Subscribepublicvoidhandle(LongbusinessId){System。out。println(sendsmsmsgbusinessId);}}
最后我们发送事件,用户注册我们发送了一个用户ID,下单成功我们发送了一个订单ID。
再写一个测试类去测试一下,分别创建两个service,然后分别调用方法。publicclassEventBusTest{publicstaticvoidmain(String〔〕args){UserServiceuserServicenewUserService();userService。regist();BookingServicebookingServicenewBookingService();bookingService。booking();}}
执行测试类,我们可以看到输出,分别去执行了我们的事件订阅的方法。sendemailmsg1sendsmsmsg1sendemailmsg2sendsmsmsg2
使用起来你会发现非常简单,对于希望轻量级简单地做到解耦使用EventBus非常合适。注意别踩坑
首先,注意一下例子中的参数都是Long类型,如果事件的参数是其他类型的话,那么消息是无法接受到的,比如我们把下单中发送的订单ID改成String类型然后会发现没有消费了,因为我们没有定义一个参数类型是String的方法。publicclassBookingService{privatestaticAsyncEventBuseventBusnewAsyncEventBus(Executors。newFixedThreadPool(3));publicvoidbooking(){业务逻辑StringbookingId2;eventBus。register(newEmailMsgHandler());eventBus。register(newSmsMsgHandler());eventBus。post(bookingId);}}输出sendemailmsg1sendsmsmsg1
去EmailMsgHandler和SmsMsgHandler都新增一个接收String类型的订阅方法,这样就可以接收到了。Subscribepublicvoidhandle(StringbusinessId){System。out。println(sendemailmsgforstringbusinessId);}Subscribepublicvoidhandle(StringbusinessId){System。out。println(sendsmsmsgforstringbusinessId);}输出sendsmsmsg1sendemailmsg1sendemailmsgforstring2sendsmsmsgforstring2
除此之外,其实我们可以定义一个DeadEvent来处理这种情况,它相当于是一个默认的处理方式,当没有匹配的事件类型参数的话就会默认发送一个DeadEvent事件。
定义一个默认处理器。publicclassDefaultEventHandler{Subscribepublicvoidhandle(DeadEventevent){System。out。println(nosubscriber,event);}}
给BookingService新增一个pay()支付方法,下单完了去支付,注册我们的默认事件。publicvoidpay(){业务逻辑eventBus。register(newDefaultEventHandler());eventBus。post(newPayment(UUID。randomUUID()。toString()));}ToStringDataNoArgsConstructorAllArgsConstructorpublicclassPayment{privateStringpaymentId;}
执行测试bookingService。pay()看到输出结果:nosubscriber,DeadEvent{sourceAsyncEventBus{default},eventPayment(paymentId255da94271284bd1bacaf0a8e569ed88)}源码分析
OK,简单的介绍就到这里,那其实到目前为止我们说的这个都是同步调用的,这不太符合我们的要求,我们当然使用异步处理更好。
那就看看源码它是怎么实现的。BetapublicclassEventBus{privatestaticfinalLoggerloggerLogger。getLogger(EventBus。class。getName());privatefinalStringidentifier;privatefinalExecutorexecutor;privatefinalSubscriberExceptionHandlerexceptionHandler;privatefinalSubscriberRegistrysubscribersnewSubscriberRegistry(this);privatefinalDispatcherdispatcher;publicEventBus(){this(default);}publicEventBus(Stringidentifier){this(identifier,MoreExecutors。directExecutor(),Dispatcher。perThreadDispatchQueue(),LoggingHandler。INSTANCE);}}
identifier就是个名字,标记,默认就是default。
executor执行器,默认创建一个MoreExecutors。directExecutor(),事件订阅者根据你自己提供的executor来决定如何执行事件订阅的处理方式。
exceptionHandler是异常处理器,默认创建的就是打点日志。
subscribers就是我们的消费者,订阅者。
dispatcher用来做事件分发。
默认创建的executor是一个MoreExecutors。directExecutor(),看到command。run()你就会发现他这不就是同步执行嘛。publicstaticExecutordirectExecutor(){returnDirectExecutor。INSTANCE;}privateenumDirectExecutorimplementsExecutor{INSTANCE;Overridepublicvoidexecute(Runnablecommand){command。run();}OverridepublicStringtoString(){returnMoreExecutors。directExecutor();}
同步执行还是不太好,我们希望不光给我们解耦,还要异步执行,EventBus给我们提供了AsyncEventBus,Executor我们自己传入就好了。publicclassAsyncEventBusextendsEventBus{publicAsyncEventBus(Stringidentifier,Executorexecutor){super(identifier,executor,Dispatcher。legacyAsync(),LoggingHandler。INSTANCE);}publicAsyncEventBus(Executorexecutor,SubscriberExceptionHandlersubscriberExceptionHandler){super(default,executor,Dispatcher。legacyAsync(),subscriberExceptionHandler);}publicAsyncEventBus(Executorexecutor){super(default,executor,Dispatcher。legacyAsync(),LoggingHandler。INSTANCE);}
上面的代码我们改成异步的,这样不就好起来了嘛,这样的话,实际上可以结合我们自己的线程池来处理了。privatestaticAsyncEventBuseventBusnewAsyncEventBus(Executors。newFixedThreadPool(3));
OK,这个说清楚了,我们可以顺便再看看事件分发的处理,看到DeadEvent了吧,没有当前事件的订阅者,就会发送一个DeadEvent事件,bingo!publicvoidpost(Objectevent){IteratorSubscribereventSubscriberssubscribers。getSubscribers(event);if(eventSubscribers。hasNext()){dispatcher。dispatch(event,eventSubscribers);}elseif(!(eventinstanceofDeadEvent)){theeventhadnosubscribersandwasnotitselfaDeadEventpost(newDeadEvent(this,event));}}总结
OK,这个使用和源码还是比较简单的哈,有兴趣的同学可以自己去瞅瞅,花不了多少工夫。
总的来说,EventBus就是提供了我们一个更优雅的代码解耦的方式,实际工作中的业务你肯定能用上它!