/*
 * Decompiled with CFR 0.152.
 */
package com.macromedia.fcs.common;

import com.macromedia.fcs.client.Version;
import com.macromedia.fcs.common.MessageDispatcher;
import com.macromedia.fcs.common.StatusInfo;
import com.macromedia.fcs.shared.ChunkCommon;
import com.macromedia.fcs.shared.Globals;
import com.macromedia.fcs.shared.TAckProc;
import com.macromedia.fcs.shared.TCChunkInputStream;
import com.macromedia.fcs.shared.TCChunkOutputStream;
import com.macromedia.fcs.shared.TCCommand;
import com.macromedia.fcs.shared.TCJavaSerializer;
import com.macromedia.fcs.shared.TCMessage;
import com.macromedia.fcs.shared.TChunkContext;
import com.macromedia.fcs.shared.TGetMsgProc;
import com.macromedia.fcs.shared.TReadProc;
import com.macromedia.fcs.shared.TWriteProc;
import com.macromedia.fcs.util.BaseSocketChannel;
import com.macromedia.fcs.util.Connector;
import com.macromedia.fcs.util.SSLSocketChannel;
import com.macromedia.fcs.util.SSLSocketChannelManager;
import com.macromedia.fcs.util.SimpleSocketChannel;
import com.macromedia.fcs.util.SimpleSocketChannelManager;
import com.macromedia.fcs.util.ThreadPool;
import com.macromedia.fcs.util.Transport;
import com.macromedia.fcs.util.Util;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NoConnectionPendingException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RtmpTransport
implements Transport,
TAckProc,
TReadProc,
TWriteProc,
Globals,
Version {
    public static final String FLASH_VERSION = "FCSj/1.13";
    Connector _connector = Connector.getDefault();
    ThreadPool _iopool = ThreadPool.get("FCSj_IO");
    ThreadPool _tpool = ThreadPool.get("FCSj_Worker");
    ThreadPool _ssliopool = ThreadPool.get("FCSj_SSL_IO");
    Object _sslReadWriteMutex = new Object();
    ByteBuffer _inbuf = ByteBuffer.allocateDirect(4096);
    ByteBuffer _outbuf = ByteBuffer.allocateDirect(4096);
    BaseSocketChannel _channel = null;
    SelectionKey _selkey = null;
    int _interestOps = 0;
    static final int MAX_FINISH_ATTEMPTS = 8;
    int _numFinishAttempts = 0;
    TCChunkInputStream _inchunks = null;
    TCChunkOutputStream _outchunks = null;
    String _type = null;
    boolean _connected = false;
    boolean _closing = false;
    boolean _closed = false;
    boolean _reading = false;
    boolean _writing = false;
    Reader _reader = new Reader();
    Writer _writer = new Writer();
    SSLReader _sslReader = new SSLReader();
    SSLWriter _sslWriter = new SSLWriter();
    static final Integer _0 = new Integer(0);
    Stream _stream0 = null;
    Map<Integer, Stream> _streams = new HashMap<Integer, Stream>();
    Map<Integer, Stream> _pendingStreams = new HashMap<Integer, Stream>();
    private static Logger _log = null;
    Util.URI _uri = null;
    int _objectEncoding = 0;
    List<Runnable> _jobList = new LinkedList<Runnable>();

    public RtmpTransport() {
        if (_log == null) {
            _log = LoggerFactory.getLogger(RtmpTransport.class);
        }
        this._inchunks = new TCChunkInputStream();
        this._inchunks.SetCallbacks(this, this);
        this._inchunks.GetAckContext().SetCallbacks(this, this);
        this._outchunks = new TCChunkOutputStream();
        this._outchunks.SetCallbacks(this, this);
    }

    public RtmpTransport(String type) {
        if (_log == null) {
            _log = LoggerFactory.getLogger(RtmpTransport.class);
        }
        this._inchunks = new TCChunkInputStream();
        this._inchunks.SetCallbacks(this, this);
        this._inchunks.GetAckContext().SetCallbacks(this, this);
        this._outchunks = new TCChunkOutputStream();
        this._outchunks.SetCallbacks(this, this);
        this._type = type;
    }

    public Stream connect(Util.URI uri, int audioProps, int videoProps, int objectEncoding, Object[] args, MessageDispatcher md) throws Exception {
        if (uri.port < 0) {
            uri.port = uri.isSSL ? 443 : 1935;
        }
        if (uri.host == null) {
            uri.host = "localhost";
        }
        this._objectEncoding = objectEncoding;
        TCCommand response = new TCCommand(this._objectEncoding);
        response.setMethodName("connect");
        response.setTrxID(new Integer(1));
        response.setResponseData("app", uri.app);
        response.setResponseData("tcUrl", uri.toString());
        response.setResponseData("flashVer", FLASH_VERSION);
        response.setResponseData("swfUrl", uri.toString());
        response.setResponseData("objectEncoding", new Integer(objectEncoding));
        if (this._type == null) {
            response.setResponseData("type", "nonprivate");
        } else {
            response.setResponseData("type", this._type);
        }
        if (audioProps != 0) {
            response.setResponseData("audioCodecs", new Integer(audioProps));
        }
        if (videoProps != 0) {
            response.setResponseData("videoCodecs", new Integer(videoProps));
        }
        for (int i = 0; i < args.length; ++i) {
            response.setUserArg(args[i]);
        }
        this._stream0 = new Stream(md, response.getCommandResponse());
        this._stream0._id = new Integer(0);
        this._stream0._mode = 1;
        this._stream0.createDataContext();
        this._streams.put(_0, this._stream0);
        this._uri = uri;
        InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(uri.host), uri.port);
        _log.info(Thread.currentThread() + " Connecting to " + address + "( " + uri.host + ", " + uri.port + ")");
        this._channel = this._uri.isSSL ? SSLSocketChannelManager.getInstance().createChannel(this, address) : SimpleSocketChannelManager.getInstance().createChannel(this, address);
        this._connector.add(this);
        return this._stream0;
    }

    public Stream createStream(MessageDispatcher md) {
        Stream s0 = this._stream0;
        if (s0 == null) {
            return null;
        }
        Stream s = new Stream(md);
        Integer trxId = s0._md.getNextTrxId();
        this._pendingStreams.put(trxId, s);
        TCCommand cmdCreate = new TCCommand(this._objectEncoding);
        cmdCreate.setMethodName("createStream");
        cmdCreate.setTrxID(trxId);
        this.queueCommandMessage(cmdCreate.getCommandResponse());
        return s;
    }

    @Override
    public Util.URI getURI() {
        return this._uri;
    }

    public boolean isConnected() {
        return this._connected;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void abort() {
        if (this._closed) {
            return;
        }
        this._connected = false;
        _log.info("Connection Aborted Abnormally");
        try {
            Stream s0 = this._stream0;
            if (s0 != null) {
                s0._md.dispatch(new StatusInfo("status", "NetConnection.Connect.Closed", "FCSj connector shutdown abnormally!", null));
            }
            this.detach();
        }
        finally {
            this._closed = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void close() {
        if (_log.isDebugEnabled()) {
            StackTraceElement[] stackTraceList = Thread.currentThread().getStackTrace();
            StringBuilder stackTraceBuilder = new StringBuilder();
            for (StackTraceElement stackTrace : stackTraceList) {
                stackTraceBuilder.append(stackTrace.toString());
            }
            _log.debug("Close called with following tack trace:" + stackTraceBuilder.toString());
        }
        if (this._closed) {
            return;
        }
        this._connected = false;
        this._closing = true;
        if (this._reading || this._writing) {
            _log.info("Attempt to close connection while still reading or writing. URI: " + this._uri + " R: " + this._reading + " W: " + this._writing);
            return;
        }
        try {
            this._connector.remove(this);
            Stream s0 = this._stream0;
            if (s0 != null) {
                s0._md.dispatch(new StatusInfo("status", "NetConnection.Connect.Closed", null, null));
            }
            this.detach();
        }
        finally {
            this._closed = true;
        }
    }

    public void detach() {
        this._numFinishAttempts = 0;
        this._streams.clear();
        this._stream0 = null;
    }

    @Override
    public void register(Selector selector) {
        try {
            this._selkey = this._channel.register(selector, 13, this);
        }
        catch (ClosedChannelException ccx) {
            ccx.printStackTrace();
        }
    }

    @Override
    public synchronized void unregister(Selector selector) {
        SelectionKey sk = this._selkey;
        if (sk != null) {
            sk.cancel();
        }
        this._selkey = null;
        try {
            BaseSocketChannel chan = this._channel;
            if (chan != null) {
                chan.close();
            }
            this._channel = null;
        }
        catch (IOException iox) {
            iox.printStackTrace();
        }
    }

    @Override
    public synchronized void addInterestOps(int ops) {
        this._interestOps |= ops;
        this._connector.setInterestOps(this);
    }

    @Override
    public synchronized void onInterestOps() {
        SelectionKey sk = this._selkey;
        if (sk != null && sk.isValid()) {
            sk.interestOps(sk.interestOps() | this._interestOps);
        }
        this._interestOps = 0;
    }

    @Override
    public synchronized void setInterestOps(int ops) {
        this._interestOps = ops;
    }

    @Override
    public synchronized int getInterestOps() {
        return this._interestOps;
    }

    @Override
    public void onChannelConnection() {
        try {
            this._channel.initialization();
        }
        catch (IOException iox) {
            _log.debug("cannot initialize", (Throwable)iox);
        }
    }

    @Override
    public synchronized void onSelection(SelectionKey sk) {
        if (this._closed) {
            return;
        }
        Stream s0 = this._stream0;
        MessageDispatcher md0 = null;
        if (s0 != null) {
            md0 = s0._md;
        }
        if (_log.isDebugEnabled()) {
            _log.debug(Thread.currentThread() + " sk.interestOps =" + this.dumpSelectionKey(sk));
        }
        if (sk.isConnectable()) {
            Transport t = (Transport)sk.attachment();
            try {
                if (t.getChannel().finishConnect()) {
                    t.onChannelConnection();
                    sk.interestOps(5);
                    return;
                }
            }
            catch (NoConnectionPendingException e) {
                _log.warn("Received NoConnectionPendingException for transport " + t.getURI());
            }
            catch (Exception e) {
                _log.error("Exception for transport " + t.getURI(), (Throwable)e);
            }
            t.onSelectorConnectFail();
        }
        if (!this._channel.isConnected()) {
            this.close();
            return;
        }
        if (sk.isReadable()) {
            sk.interestOps(sk.interestOps() & 0xFFFFFFFE);
            if (_log.isDebugEnabled()) {
                _log.debug(Thread.currentThread() + " sk is readable - interest ops:" + this.dumpSelectionKey(sk));
            }
            this.onSelectorRead();
        }
        if (sk.isWritable()) {
            sk.interestOps(sk.interestOps() & 0xFFFFFFFB);
            if (_log.isDebugEnabled()) {
                _log.debug(Thread.currentThread() + " sk is writable - interest ops:" + this.dumpSelectionKey(sk));
            }
            this.onSelectorWrite();
        }
    }

    @Override
    public void onSelectorRead() {
        if (this.isClosing()) {
            return;
        }
        if (this._channel instanceof SSLSocketChannel) {
            this._ssliopool.addTask(this._sslReader);
        } else if (this._channel instanceof SimpleSocketChannel) {
            this._iopool.addTask(this._reader);
        }
    }

    @Override
    public void onSelectorWrite() {
        if (this.isClosing()) {
            return;
        }
        if (this._channel instanceof SSLSocketChannel) {
            this._ssliopool.addTask(this._sslWriter);
        } else if (this._channel instanceof SimpleSocketChannel) {
            this._iopool.addTask(this._writer);
        }
    }

    @Override
    public void onSelectorConnectFail() {
        if (this.isClosing()) {
            return;
        }
        Stream s0 = this._stream0;
        MessageDispatcher md0 = null;
        if (s0 != null) {
            md0 = s0._md;
        }
        try {
            if (this._channel.isConnectionPending() && this._numFinishAttempts < 8) {
                ++this._numFinishAttempts;
                return;
            }
            _log.error("Connection failed; Channel Connection Pending: " + this._channel.isConnectionPending() + " Finish Attempts: " + this._numFinishAttempts + " URL: " + this._uri);
            md0.dispatch(new StatusInfo("status", "NetConnection.Connect.Failed", null, null));
        }
        catch (IOException iox) {
            _log.error("Connection failed for " + this._uri, (Throwable)iox);
            md0.dispatch(new StatusInfo("status", "NetConnection.Connect.Failed", null, null));
            return;
        }
        if (!this._channel.isConnected()) {
            this.close();
            return;
        }
    }

    public void triggerWrite() {
        this.addInterestOps(5);
    }

    public void queueCommandMessage(TCMessage msg) {
        try {
            this._streams.get(_0).queueMessage(msg);
        }
        catch (NullPointerException e) {
            _log.error("Disconnected, message discarded!");
            return;
        }
        this.triggerWrite();
    }

    public ThreadPool.Stats getStats(String which) {
        if (which == null) {
            which = "fcsj.tpool.workers";
        }
        if (which.equals("fcsj.tpool.workers")) {
            return this._tpool.getStats();
        }
        if (which.equals("fcsj.tpool.io")) {
            return this._iopool.getStats();
        }
        return null;
    }

    public int getObjectEncoding() {
        return this._objectEncoding;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void readBuffer() {
        try {
            if (this.isClosing()) {
                return;
            }
            this.setReading(true);
            boolean[] pbBytesRemaining = new boolean[]{true};
            for (int i = 0; i < 2; ++i) {
                int nread = 0;
                if (this._channel != null && this._inbuf != null) {
                    nread = this._channel.read(this._inbuf);
                } else if (this._channel == null) {
                    return;
                }
                _log.debug("Read from channel: " + nread);
                if (nread == -1 || !this._channel.isConnected()) {
                    _log.info("Connection read error: " + nread + " LP: " + this._channel.socket().getLocalPort() + " RP: " + this._channel.socket().getPort() + " URI: " + this._uri);
                    this.setReading(false);
                    this.close();
                    return;
                }
                if (nread == 0) {
                    _log.debug("Read from channel is ZERO, break out of loop");
                    break;
                }
                this._inbuf.flip();
                _log.debug("Read " + nread + " into _inbuf " + this._inbuf);
                TCMessage m = null;
                while ((m = this._inchunks.ReadMessage(pbBytesRemaining)) != null) {
                    _log.debug("Got message " + m + " on stream id " + m.getStreamID());
                    Stream s = this._streams.get(new Integer(m.getStreamID()));
                    if (s != null) {
                        s._md.dispatch(this, m);
                        continue;
                    }
                    _log.warn("Unknown stream id " + m.getStreamID() + " - " + this._uri);
                }
                if (this._channel instanceof SSLSocketChannel && pbBytesRemaining[0]) {
                    _log.debug("*************Still need to read more bytes");
                    this._inbuf.clear();
                    ((SSLSocketChannel)this._channel).flagForReading();
                    return;
                }
                this._inbuf.compact();
            }
            this._inchunks.TriggerAck();
            this.addInterestOps(5);
        }
        catch (Exception x) {
            _log.debug("error reading from " + this._uri, (Throwable)x);
            this.close();
        }
        finally {
            this.setReading(false);
            if (this.isClosing()) {
                this.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void writeBuffer() {
        try {
            if (this.isClosing()) {
                this.close();
            }
            this.setWriting(true);
            boolean[] pbDidWrite = new boolean[]{true};
            for (int i = 0; i < 2; ++i) {
                while (this._outbuf.position() < this._outbuf.limit() && pbDidWrite[0]) {
                    this._outchunks.WriteMessage(pbDidWrite);
                }
                this._outbuf.flip();
                int nwrite = 0;
                if (this._channel != null && this._outbuf != null && this._outbuf.hasRemaining()) {
                    nwrite = this._channel.write(this._outbuf);
                } else if (this._channel == null) {
                    return;
                }
                if (nwrite != 0) {
                    _log.debug(i + "Write " + nwrite + " into _outbuf " + this._outbuf);
                }
                this._outbuf.compact();
                if (nwrite == -1) {
                    this.setWriting(false);
                    this.close();
                    return;
                }
                if (nwrite == 0) break;
                if (!this._outbuf.hasRemaining() || !(this._channel instanceof SSLSocketChannel)) continue;
                _log.debug("+++++++++Check if there are more to be written. Reactivate interest in writing. We will try again");
                ((SSLSocketChannel)this._channel).flagForWriting();
            }
            if (this._outbuf.position() > 0 || pbDidWrite[0]) {
                _log.info("######## writer has buffer remaining bytes ##########");
                this.addInterestOps(4);
            }
        }
        catch (Exception x) {
            _log.warn("error writing to " + this._uri, (Throwable)x);
            this._closing = true;
        }
        finally {
            this.setWriting(false);
            if (this.isClosing()) {
                this.close();
            }
        }
    }

    @Override
    public int doRead(Object context, byte[] buffer, int offset, int length, boolean[] pbBytesRemaining) {
        int nbytes = ChunkCommon.CoreMin(this._inbuf.limit() - this._inbuf.position(), length);
        boolean bl = pbBytesRemaining[0] = length - nbytes > 0;
        if (_log.isDebugEnabled()) {
            _log.debug(Thread.currentThread() + " Getting " + nbytes + " from _inbuf " + this._inbuf);
        }
        this._inbuf.get(buffer, offset, nbytes);
        return nbytes;
    }

    @Override
    public int doWrite(Object context, byte[] buffer, int offset, int length) {
        int nbytes = ChunkCommon.CoreMin(this._outbuf.limit() - this._outbuf.position(), length);
        this._outbuf.put(buffer, offset, nbytes);
        if (_log.isDebugEnabled()) {
            _log.debug(Thread.currentThread() + " Putting " + nbytes + " into _outbuf " + this._outbuf);
        }
        return nbytes;
    }

    @Override
    public void onAck(Object context, int iEvent, Object pArg1, Object pArg2) {
        if (iEvent == 5) {
            TCMessage m = (TCMessage)pArg1;
            byte[] msgBuffer = m.getMsgBuffer();
            short sType = (short)(msgBuffer[0] << 8 & 0xFF00 | msgBuffer[1] & 0xFF);
            switch (sType) {
                case 3: {
                    break;
                }
                case 6: {
                    byte[] data = new byte[]{0, 7, msgBuffer[2], msgBuffer[3], msgBuffer[4], msgBuffer[5]};
                    this._outchunks.SendUserCtrlMsg(data, data.length);
                    this.triggerWrite();
                    break;
                }
                case 7: {
                    break;
                }
            }
        } else if (this._outchunks.HandleFlowControlEvent(iEvent, pArg1, pArg2)) {
            this.triggerWrite();
        }
    }

    private String dumpSelectionKey(SelectionKey sk) {
        int ops = sk.interestOps();
        String s = "";
        if ((ops & 0x10) != 0) {
            s = s + " OP_ACCEPT";
        }
        if ((ops & 8) != 0) {
            s = s + " OP_CONNECT";
        }
        if ((ops & 1) != 0) {
            s = s + " OP_READ";
        }
        if ((ops & 4) != 0) {
            s = s + " OP_WRITE";
        }
        return s;
    }

    private synchronized void setClosing(boolean b) {
        this._closing = b;
    }

    @Override
    public synchronized boolean isClosing() {
        return this._closing;
    }

    @Override
    public BaseSocketChannel getChannel() {
        return this._channel;
    }

    private synchronized void setReading(boolean b) {
        this._reading = b;
    }

    private synchronized boolean isReading() {
        return this._reading;
    }

    private synchronized void setWriting(boolean b) {
        this._writing = b;
    }

    private synchronized boolean isWriting() {
        return this._writing;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addTask(Runnable job) {
        int nJobs;
        List<Runnable> list = this._jobList;
        synchronized (list) {
            this._jobList.add(job);
            nJobs = this._jobList.size();
        }
        if (nJobs == 1) {
            this._tpool.addTask(new SynchronizedJobRunner());
        }
    }

    class SynchronizedJobRunner
    implements Runnable {
        SynchronizedJobRunner() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            int i = 0;
            while (true) {
                Runnable job;
                List<Runnable> list = RtmpTransport.this._jobList;
                synchronized (list) {
                    if (RtmpTransport.this._jobList.isEmpty()) {
                        return;
                    }
                    job = RtmpTransport.this._jobList.get(0);
                }
                if (i >= 2) {
                    RtmpTransport.this._tpool.addTask(this);
                    return;
                }
                try {
                    job.run();
                }
                finally {
                    list = RtmpTransport.this._jobList;
                    synchronized (list) {
                        RtmpTransport.this._jobList.remove(job);
                    }
                }
                ++i;
            }
        }
    }

    public class Writer
    implements Runnable {
        @Override
        public synchronized void run() {
            RtmpTransport.this.writeBuffer();
        }
    }

    public class Reader
    implements Runnable {
        @Override
        public synchronized void run() {
            RtmpTransport.this.readBuffer();
        }
    }

    public class SSLWriter
    implements Runnable {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = RtmpTransport.this._sslReadWriteMutex;
            synchronized (object) {
                try {
                    if (RtmpTransport.this.isClosing()) {
                        RtmpTransport.this.setWriting(false);
                        if (RtmpTransport.this.isClosing()) {
                            RtmpTransport.this.close();
                        }
                        _log.debug("Finish SSL Writing ...");
                    } else {
                        RtmpTransport.this.setWriting(true);
                        ((SSLSocketChannel)RtmpTransport.this._channel).processWriting();
                    }
                }
                catch (Exception e) {
                    _log.warn("Exception for " + RtmpTransport.this._uri, (Throwable)e);
                }
                finally {
                    RtmpTransport.this.setWriting(false);
                    if (RtmpTransport.this.isClosing()) {
                        RtmpTransport.this.close();
                    }
                    _log.debug("Finish SSL Writing ...");
                }
            }
        }
    }

    public class SSLReader
    implements Runnable {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = RtmpTransport.this._sslReadWriteMutex;
            synchronized (object) {
                try {
                    if (RtmpTransport.this.isClosing()) {
                        RtmpTransport.this.setReading(false);
                        if (RtmpTransport.this.isClosing()) {
                            RtmpTransport.this.close();
                        }
                        _log.debug("Finish SSL Reading ...");
                    } else {
                        RtmpTransport.this.setReading(true);
                        ((SSLSocketChannel)RtmpTransport.this._channel).processReading();
                    }
                }
                catch (Exception e) {
                    _log.warn("Exception for " + RtmpTransport.this._uri, (Throwable)e);
                }
                finally {
                    RtmpTransport.this.setReading(false);
                    if (RtmpTransport.this.isClosing()) {
                        RtmpTransport.this.close();
                    }
                    _log.debug("Finish SSL Reading ...");
                }
            }
        }
    }

    public class Stream
    implements TGetMsgProc {
        Integer _id = null;
        MessageDispatcher _md = null;
        Map<Integer, List<TCMessage>> _mqs = new HashMap<Integer, List<TCMessage>>();
        Map<Integer, TChunkContext> _contexts = new HashMap<Integer, TChunkContext>();
        TCMessage _firstMsg = null;
        boolean _blocked = true;
        double _buftime = 0.1;
        double _buflen = 0.0;
        double _time = 0.0;
        double _publishTime;
        boolean _recvAudio = true;
        boolean _recvVideo = true;
        boolean _recvData = true;
        boolean _paused = false;
        static final int MODE_UNDEFINED = 0;
        static final int MODE_COMMANDS = 1;
        static final int MODE_SUBSCRIBE = 2;
        static final int MODE_PUBLISH = 3;
        int _mode = 0;

        public Stream(MessageDispatcher md) {
            this._md = md;
        }

        public Stream(MessageDispatcher md, TCMessage firstMsg) {
            this._md = md;
            this._firstMsg = firstMsg;
        }

        public void createDataContext() {
            Integer slot = new Integer(2);
            TChunkContext ctx = new TChunkContext();
            this._contexts.put(slot, ctx);
            ctx.SetCallbacks(this, ctx);
            RtmpTransport.this._outchunks.Register(ctx, 10, 2);
            this._mqs.put(slot, Collections.synchronizedList(new LinkedList()));
        }

        public void setStreamID(Integer id) {
            this._id = id;
            this.sendBufferSize();
        }

        public void queueMessage(TCMessage m) {
            Integer slot = new Integer(m.getMsgSlot());
            List<TCMessage> mq = this._mqs.get(slot);
            if (mq == null) {
                TChunkContext ctx = new TChunkContext();
                this._contexts.put(slot, ctx);
                ctx.SetCallbacks(this, ctx);
                int priority = 10;
                switch (slot) {
                    case 1: {
                        priority = 1;
                        break;
                    }
                    case 0: {
                        priority = 100;
                    }
                }
                RtmpTransport.this._outchunks.Register(ctx, priority, slot);
                mq = Collections.synchronizedList(new LinkedList());
                this._mqs.put(slot, mq);
            }
            mq.add(m);
            RtmpTransport.this.triggerWrite();
        }

        @Override
        public TCMessage getMessage(Object context, int qid, int[] streamID, long[] timeStamp) {
            if (this._firstMsg != null) {
                TCMessage m = this._firstMsg;
                this._firstMsg = null;
                return m;
            }
            if (!RtmpTransport.this._connected || this._blocked) {
                return null;
            }
            Integer slot = new Integer(qid);
            List<TCMessage> mq = this._mqs.get(slot);
            if (mq == null) {
                mq = Collections.synchronizedList(new LinkedList());
                this._mqs.put(slot, mq);
            }
            if (mq.isEmpty()) {
                return null;
            }
            TCMessage m = mq.remove(0);
            m.setStreamID(this._id);
            streamID[0] = m.getStreamID();
            timeStamp[0] = m.getMsgTime();
            return m;
        }

        public synchronized void close() {
            if (this._mode == 1) {
                return;
            }
            if (this._mode == 2 || this._mode == 3) {
                TCCommand cmd = new TCCommand(RtmpTransport.this._objectEncoding);
                cmd.setMethodName("closeStream");
                cmd.setTrxID(new Integer(0));
                this.queueMessage(cmd.getCommandResponse());
                RtmpTransport.this.triggerWrite();
            }
            this._paused = false;
            this._time = 0.0;
            this._mode = 0;
        }

        public boolean publish(String name, String type) {
            if (this._mode != 0) {
                return false;
            }
            this._mode = 3;
            this._publishTime = System.currentTimeMillis();
            TCCommand cmd = new TCCommand(RtmpTransport.this._objectEncoding);
            cmd.setMethodName("publish");
            cmd.setUserArg(name);
            cmd.setUserArg(type);
            this.queueMessage(cmd.getCommandResponse());
            return true;
        }

        public boolean send(String methodName, Object[] args) {
            if (this._mode != 3) {
                return false;
            }
            TCJavaSerializer serializer = new TCJavaSerializer(RtmpTransport.this._objectEncoding);
            serializer.PutVar(methodName, false);
            for (int i = 0; i < args.length; ++i) {
                serializer.PutVar(args[i], false);
            }
            TCMessage pMsg = new TCMessage();
            pMsg.setMsgID(18);
            pMsg.setTime(((double)System.currentTimeMillis() - this._publishTime) / 1000.0);
            pMsg.write(serializer.GetBuf(), serializer.GetPos());
            this.queueMessage(pMsg);
            return false;
        }

        public boolean sendMessage(TCMessage message) {
            if (this._mode != 3) {
                return false;
            }
            this.queueMessage(message);
            return true;
        }

        public boolean play(String whatToPlay, double start, double length, boolean reset, boolean ignore) {
            int iResetType;
            if (this._mode != 0) {
                return false;
            }
            this._mode = 2;
            this._paused = false;
            TCCommand cmdPlay = new TCCommand(RtmpTransport.this._objectEncoding);
            cmdPlay.setMethodName("play");
            cmdPlay.setUserArg(whatToPlay);
            cmdPlay.setUserArg(new Double(start));
            cmdPlay.setUserArg(new Double(length));
            int n = iResetType = ignore ? 2 : 0;
            if (reset) {
                iResetType = ignore ? 3 : 1;
            }
            cmdPlay.setUserArg(new Integer(iResetType));
            this.queueMessage(cmdPlay.getCommandResponse());
            return true;
        }

        public boolean seek(double offset) {
            if (this._mode != 2) {
                return false;
            }
            TCCommand cmd = new TCCommand(RtmpTransport.this._objectEncoding);
            cmd.setMethodName("seek");
            cmd.setUserArg(new Double(offset));
            this.queueMessage(cmd.getCommandResponse());
            return true;
        }

        public boolean pause(boolean pausePlay) {
            if (this._mode != 2) {
                return false;
            }
            this._paused = pausePlay;
            TCCommand cmd = new TCCommand(RtmpTransport.this._objectEncoding);
            cmd.setMethodName("pause");
            cmd.setTrxID(new Integer(0));
            cmd.setUserArg(new Boolean(this._paused));
            cmd.setUserArg(new Double(this._time));
            this.queueMessage(cmd.getCommandResponse());
            RtmpTransport.this.triggerWrite();
            return true;
        }

        public void receive(int type, boolean recv) {
            switch (type) {
                case 8: 
                case 9: {
                    TCCommand cmd = new TCCommand(RtmpTransport.this._objectEncoding);
                    if (type == 8) {
                        this._recvAudio = recv;
                        cmd.setMethodName("receiveAudio");
                    } else {
                        this._recvVideo = recv;
                        cmd.setMethodName("receiveVideo");
                    }
                    cmd.setUserArg(new Boolean(recv));
                    this.queueMessage(cmd.getCommandResponse());
                    break;
                }
                case 18: {
                    this._recvData = recv;
                }
            }
        }

        public void setBufferTime(double buftime) {
            this._buftime = buftime;
            if (this._id != null) {
                this.sendBufferSize();
            }
        }

        public double getBufferTime() {
            return this._buftime;
        }

        public double getBufferLength() {
            return this._buflen;
        }

        public double getTime() {
            return this._time;
        }

        private void sendBufferSize() {
            byte[] buf = new byte[10];
            buf[0] = 0;
            buf[1] = 3;
            ChunkCommon.Set4ByteInt(buf, 2, this._id);
            ChunkCommon.Set4ByteInt(buf, 6, (int)this._buftime * 1000);
            RtmpTransport.this._outchunks.SendUserCtrlMsg(buf, buf.length);
            RtmpTransport.this.triggerWrite();
        }
    }
}

