Subversion Repositories Programming Utils

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
86 rm5248 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.sshd.common.channel;
20
 
21
import java.io.IOException;
22
import java.io.InterruptedIOException;
23
import java.io.OutputStream;
24
 
25
import org.apache.sshd.common.SshConstants;
26
import org.apache.sshd.common.SshException;
27
import org.apache.sshd.common.util.Buffer;
28
import org.slf4j.Logger;
29
 
30
/**
31
 * TODO Add javadoc
32
 *
33
 * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
34
 */
35
public class ChannelOutputStream extends OutputStream {
36
 
37
    private final AbstractChannel channel;
38
    private final Window remoteWindow;
39
    private final Logger log;
40
    private final byte cmd;
41
    private final byte[] b = new byte[1];
42
    private Buffer buffer;
43
    private boolean closed;
44
    private int bufferLength;
45
    private int lastSize;
46
    private boolean noDelay = false;
47
 
48
    public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, Logger log, byte cmd) {
49
        this.channel = channel;
50
        this.remoteWindow = remoteWindow;
51
        this.log = log;
52
        this.cmd = cmd;
53
        newBuffer(0);
54
    }
55
 
56
    public void setNoDelay(boolean noDelay) {
57
        this.noDelay = noDelay;
58
    }
59
 
60
    public boolean isNoDelay() {
61
        return noDelay;
62
    }
63
 
64
    public synchronized void write(int w) throws IOException {
65
        b[0] = (byte) w;
66
        write(b, 0, 1);
67
    }
68
 
69
    public synchronized void write(byte[] buf, int s, int l) throws IOException {
70
        if (closed) {
71
            throw new SshException("Already closed");
72
        }
73
        while (l > 0) {
74
            // The maximum amount we should admit without flushing again
75
            // is enough to make up one full packet within our allowed
76
            // window size.  We give ourselves a credit equal to the last
77
            // packet we sent to allow the producer to race ahead and fill
78
            // out the next packet before we block and wait for space to
79
            // become available again.
80
            //
81
            int _l = Math.min(l, Math.min(remoteWindow.getSize() + lastSize, remoteWindow.getPacketSize()) - bufferLength);
82
            if (_l <= 0) {
83
                if (bufferLength > 0) {
84
                    flush();
85
                } else {
86
                    try {
87
                        remoteWindow.waitForSpace();
88
                    } catch (WindowClosedException e) {
89
                        closed = true;
90
                        throw e;
91
                    } catch (InterruptedException e) {
92
                        throw (IOException)new InterruptedIOException().initCause(e);
93
                    }
94
                }
95
                continue;
96
            }
97
            buffer.putRawBytes(buf, s, _l);
98
            bufferLength += _l;
99
            s += _l;
100
            l -= _l;
101
        }
102
        if (noDelay) {
103
            flush();
104
        }
105
    }
106
 
107
    @Override
108
    public synchronized void flush() throws IOException {
109
        if (closed) {
110
            throw new SshException("Already closed");
111
        }
112
        try {
113
            while (bufferLength > 0) {
114
                Buffer buf = buffer;
115
                int total = bufferLength;
116
                int length = Math.min(Math.min(remoteWindow.waitForSpace(), total), remoteWindow.getPacketSize());
117
                int pos = buf.wpos();
118
                buf.wpos(cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA ? 14 : 10);
119
                buf.putInt(length);
120
                buf.wpos(buf.wpos() + length);
121
                if (total == length) {
122
                    newBuffer(length);
123
                } else {
124
                    int leftover = total - length;
125
                    newBuffer(Math.max(leftover, length));
126
                    buffer.putRawBytes(buf.array(), pos - leftover, leftover);
127
                    bufferLength = leftover;
128
                }
129
                lastSize = length;
130
                remoteWindow.waitAndConsume(length);
131
                log.debug("Send {} on channel {}", cmd == SshConstants.SSH_MSG_CHANNEL_DATA ? "SSH_MSG_CHANNEL_DATA" : "SSH_MSG_CHANNEL_EXTENDED_DATA", channel.getId());
132
                channel.writePacket(buf);
133
            }
134
        } catch (WindowClosedException e) {
135
            closed = true;
136
            throw e;
137
        } catch (SshException e) {
138
            throw e;
139
        } catch (Exception e) {
140
            throw new SshException(e);
141
        }
142
    }
143
 
144
    @Override
145
    public synchronized void close() throws IOException {
146
        flush();
147
        closed = true;
148
    }
149
 
150
    private void newBuffer(int size) {
151
        buffer = channel.getSession().createBuffer(cmd, size <= 0 ? 0 : 12 + size);
152
        buffer.putInt(channel.getRecipient());
153
        if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) {
154
            buffer.putInt(1);
155
        }
156
        buffer.putInt(0);
157
        bufferLength = 0;
158
    }
159
 
160
}