`
zhuhui_zj
  • 浏览: 36134 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

netty代码分析(二)--ChannelPipeline

 
阅读更多

一、简介

    ChannelPipeline是事件(event)的通道,它用于组织事件拦截器ChannelHandler。所以本质上,它只是ChannelHandler的一个链表。从I/O端获取的数据将以事件的方式通过ChannelPipeline,并被ChannelUpstreamHandler拦截,最终被程序的业务逻辑处理。要发送的数据通过pipeline被ChannelDowmstreamHandler拦截,并最终到达ChannelSink,由底层nio处理。

二、ChannelHandlerContext

    ChannelHandlerContext是ChannelHandler的包装,使其能以链表的方式存储在ChannelPipeline中:

public class DefaultChannelPipeline implements ChannelPipeline {

    private class DefaultChannelHandlerContext implements ChannelHandlerContext {

        volatile DefaultChannelHandlerContext next;
        volatile DefaultChannelHandlerContext prev;
        private final String name;
        private final ChannelHandler handler;

        public void sendDownstream(ChannelEvent e) {
        		// 找到从prev往前第一个canHandleDownstream的handler
            DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
            if (prev == null) {
                try {
                    // 若已到达pipeline的底端,则交由ChannelSink处理
                    getSink().eventSunk(DefaultChannelPipeline.this, e);
                } catch (Throwable t) {
                    notifyHandlerException(e, t);
                }
            } else {
                DefaultChannelPipeline.this.sendDownstream(prev, e);
            }
        }

        public void sendUpstream(ChannelEvent e) {
            DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
            if (next != null) {
                DefaultChannelPipeline.this.sendUpstream(next, e);
            }
        }

    ....
  }

    ....
}

三、ChannelPipeline

    ChannelPipeline以链表的方式组织ChannelHandler,提供事件的上下行传输:

public class DefaultChannelPipeline implements ChannelPipeline {

    private volatile Channel channel;
    private volatile ChannelSink sink;
    private volatile DefaultChannelHandlerContext head;
    private volatile DefaultChannelHandlerContext tail;

    public synchronized void addLast(String name, ChannelHandler handler) {
        if (name2ctx.isEmpty()) {
            init(name, handler);
        } else {
            checkDuplicateName(name);
            // 将新加入的handler放置在链表的底端
            DefaultChannelHandlerContext oldTail = tail;
            DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, 
                                                            null, name, handler);
            oldTail.next = newTail;
            tail = newTail;
            name2ctx.put(name, newTail);
        }
    }

    public void sendUpstream(ChannelEvent e) {
    	//找到从head开始第一个canHandleUpStream的handler
        DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
        ....
        sendUpstream(head, e);
    }

    void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        try {
            ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
        } catch (Throwable t) {
            notifyHandlerException(e, t);
        }
    }

    // 以类似方式实现sendDownstream
    ....
}

四、ChannelHandler

    ChannelHandler是事件的拦截器,也是系统主要的扩展点,用户一般通过继承SimpleChannelHandler来实现自己的协议解析方案:

public class SimpleChannelHandler implements 
                         ChannelUpstreamHandler, ChannelDownstreamHandler {

    // 上行事件的分发函数,根据事件的不同,调用不同的方法
    public void handleUpstream(
            ChannelHandlerContext ctx, ChannelEvent e) throws Exception {

        if (e instanceof MessageEvent) {
            messageReceived(ctx, (MessageEvent) e);
        } else if (e instanceof ChildChannelStateEvent) {
            ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
            if (evt.getChildChannel().isOpen()) {
                childChannelOpen(ctx, evt);
            } else {
                childChannelClosed(ctx, evt);
            }
        } else if (e instanceof ChannelStateEvent) {
            ChannelStateEvent evt = (ChannelStateEvent) e;
            switch (evt.getState()) {
            case OPEN:
                if (Boolean.TRUE.equals(evt.getValue())) {
                    channelOpen(ctx, evt);
                } else {
                    channelClosed(ctx, evt);
                }
                break;
            case BOUND:
                if (evt.getValue() != null) {
                    channelBound(ctx, evt);
                } else {
                    channelUnbound(ctx, evt);
                }
                break;
            case CONNECTED:
                if (evt.getValue() != null) {
                    channelConnected(ctx, evt);
                } else {
                    channelDisconnected(ctx, evt);
                }
                break;
            case INTEREST_OPS:
                channelInterestChanged(ctx, evt);
                break;
            default:
                ctx.sendDownstream(e);
            }
        } else if (e instanceof ExceptionEvent) {
            exceptionCaught(ctx, (ExceptionEvent) e);
        } else {
            ctx.sendUpstream(e);
        }
    }

    ....
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics