SEDA是流水线化的事件驱动模型,能够异步地执行服务。和直接用事件驱动模型相比,SEDA更加去中心化与模块化。之前看了SEDA的论文,干脆拿Java NIO自己手撸个玩具实现吧
架构
Accept使用最基本的Reactor模型同步处理,Read则先通过Reactor同步获得数据,然后数据通过SEDA异步处理。这里的数据必须同步获得,不然因为数据没有被读取,会产生大量重复异步事件(踩坑)
Event
代码语言:javascript复制public class Event {
enum Type{Read,Write,ReadRepsonse,WriteResponse};
public Type type;
public SelectionKey key;
public String Packet;
public Event(SelectionKey key,Type type){
this.type = type;
this.key = key;
}
}
懒得写OOP了,字段肯定不同。这里把所有类型的字段做了并集简易实现。
数据获取
代码语言:javascript复制 if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException ex) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
String str = new String(readBuffer.array(), 0, numRead);
Event event = new Event(key, Event.Type.Read);
event.Packet = str;
System.out.println("Sync " str);
StageMap.getInstance().stageMap.get("read").Enqueue(event);
}
获取数据后,向ReadStage(维护单例表进行查找)的阻塞队列发送包含数据的事件,这个过程是完全同步的,也就是正常的Reactor。
Stage
代码语言:javascript复制@Override
public void Enqueue(Event e){
synchronized (lock){
Runnable task;
BatchingQ.offer(e);
if(BatchingQ.size() == BatchSize){
ArrayList<Event> elist = new ArrayList<>();
try {
for (int i = 0; i < BatchSize; i ) {
elist.add(i,BatchingQ.poll());
}
}catch(Exception ex){
//do nothing
}
task = new HandleThread(elist);
pool.execute(task);
}
}
}
Stage继承相同的接口,实现Enqueue操作。当队列长度达到BatchSize时,从线程池抽出一个线程异步地处理Batching。到这里,数据的处理过程已经完全和Reactor负责的数据获取过程异步执行了。
代码语言:javascript复制 class HandleThread implements Runnable{
ArrayList<Event> elist;
public HandleThread(ArrayList<Event> elist){
this.elist = elist;
}
@Override
public void run() {
try {
Event e;
for (int i = 0; i < BatchSize; i ) {
e = elist.get(i);
if(e.type == Event.Type.Read){
System.out.println("Async " e.Packet);
Event event = new Event(e.key, Event.Type.ReadRepsonse);
event.Packet = e.Packet;
StageMap.getInstance().stageMap.get("app").Enqueue(event);
}
}
}catch(Exception e){
//do nothing
}
}
}
随后,ReadStage处理完Read事件(简单的Print)后发出ReadResponse事件,要求AppStage进一步处理。这就是事件驱动,不同stage之间完全是异步的,只有事件enqueue是同步的。
AppStage执行的操作也很简单,还是打印,毕竟就是个demo。
WriteStage就是NIO代码复制粘贴,send Client。
代码语言:javascript复制 @Override
public void run() {
try {
Event e;
for (int i = 0; i < BatchSize; i ) {
e = elist.get(i);
if(e.type == Event.Type.Write){
System.out.println("Write " e.Packet);
ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
SocketChannel channel = (SocketChannel) e.key.channel();
sendBuffer.clear();
sendBuffer.put(e.Packet.getBytes());
sendBuffer.flip();
channel.write(sendBuffer);
Event event = new Event(e.key, Event.Type.WriteResponse);
StageMap.getInstance().stageMap.get("app").Enqueue(event);
}
}
}catch(Exception e){
//do nothing
}
}
结果
Batching
简单地把请求搁置到队列长度>5再进行批处理。
代码见https://github.com/sjtuzwj/SEDA-Sample