虽然guava中的eventbus已经很方便了,但是还是想要实现一个更为方便,同时支持延迟事件、同时带eventbus的组件。在Apache 项目中,有一个eventbus的组件,这个组件写得挺好的,想着用在业务系统上,因此自己抽取了一下,拿到业务系统中来用。话不多说,我们把它抽取出来吧,同时进行demo的运行。还是要感谢Apache DolphinScheduler的开源,让这个很简单,但是很高效的组件能够让我们便捷地使用。
首先是定义事件接口:
public interface IEvent {}
针对事件接口,我们抽象出共性方法接口:延迟时间和过期时间。
public abstractclass AbstractDelayEvent implements IEvent, Delayed { privatefinallong delayTime; privatefinallong expireTime; public long getDelayTime() { return delayTime; } public long getExpireTime() { return expireTime; } public AbstractDelayEvent(long delayTime) { this.delayTime = delayTime; this.expireTime = System.currentTimeMillis() + delayTime; } @Override public long getDelay(TimeUnit unit) { long diff = expireTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { if (this.expireTime < ((AbstractDelayEvent) o).expireTime) { return -1; } if (this.expireTime > ((AbstractDelayEvent) o).expireTime) { return1; } return0; }}
主要的信息:
定义eventbus中,我们需要使用的方法:
public interface IEventBus<T extends IEvent> { void publish(T event); Optional<T> poll() throws InterruptedException; Optional<T> peek(); Optional<T> remove(); boolean isEmpty(); int size();}
可以看到主要是:发布事件、消费、移除、删除、判断当前的事件是否为空,以及事件大小等方法。其中最重要的方法为发布事件和消费处理事件方法。
针对当前的事件bus接口进行抽象,抽取出共性方法,方便复用:
public abstractclass AbstractDelayEventBus<T extends AbstractDelayEvent> implements IEventBus<T> { protectedfinal DelayQueue<T> delayEventQueue = new DelayQueue<>(); @Override public void publish(T event) { delayEventQueue.put(event); } @Override public Optional<T> poll() throws InterruptedException { // 使用带超时的 poll 方法,等待事件到期 return Optional.ofNullable(delayEventQueue.poll(1000, TimeUnit.MILLISECONDS)); } @Override public Optional<T> peek() { return Optional.ofNullable(delayEventQueue.peek()); } @Override public Optional<T> remove() { return Optional.ofNullable(delayEventQueue.poll()); } @Override public boolean isEmpty() { return delayEventQueue.isEmpty(); } @Override public int size() { return delayEventQueue.size(); }}
接下来,我们使用它,来进行处理:
定义自己的延迟事件:
如果是在业务中,可以定义自己的业务数据信息事件对象
public class MyDelayEvent extends AbstractDelayEvent { private final String message; public MyDelayEvent(long delayTime, String message) { super(delayTime); this.message = message; } public String getMessage() { return message; }}
定义事件延迟事件bus
当然也可以进行自己的可定制化特性。
public class MyDelayEventBus extends AbstractDelayEventBus<MyDelayEvent> { // 不需要额外的修改}
进行测试:
思路:创建事件总线、发布事件,然后针对发布的事件信息,进行消费,然后等待延迟时间的到来,从而实现消费,从而进行业务的处理。
import java.util.Optional;publicclass EventBusExample { public static void main(String[] args) throws InterruptedException { // 创建事件总线 IEventBus<MyDelayEvent> eventBus = new MyDelayEventBus(); // 发布单个事件 eventBus.publish(new MyDelayEvent(100, "Single Event")); System.out.println("After publish, event bus size: " + eventBus.size()); // 持续尝试消费事件 while (true) { Optional<MyDelayEvent> event = eventBus.poll(); if (event.isPresent()) { System.out.println("Received event: " + event.get().getMessage()); } else { System.out.println("No event received within the timeout."); break; } } // 检查总线大小 System.out.println("Event bus size: " + eventBus.size()); }}
运行结果:
可以看到实现自己的业务逻辑还是很方便的,可以自己实现吧,这里给出的代码是可以运行的。
作者 | 刘亚洲