Subversion Repositories Programming Utils

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
86 rm5248 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.sshd.common.channel;
20
 
21
import java.io.IOException;
22
import java.io.InputStream;
23
import java.io.InterruptedIOException;
24
import java.net.SocketException;
25
import java.util.concurrent.TimeUnit;
26
import java.util.concurrent.locks.Condition;
27
import java.util.concurrent.locks.Lock;
28
import java.util.concurrent.locks.ReentrantLock;
29
 
30
import org.apache.sshd.common.util.Buffer;
31
 
32
/**
33
 * TODO Add javadoc
34
 *
35
 * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
36
 */
37
public class ChannelPipedInputStream extends InputStream {
38
 
39
    private final Window localWindow;
40
    private final Buffer buffer = new Buffer();
41
    private final byte[] b = new byte[1];
42
    private boolean closed;
43
    private boolean eofSent;
44
 
45
    private final Lock lock = new ReentrantLock();
46
    private final Condition dataAvailable = lock.newCondition();
47
 
48
    private int timeout = 0; // zero is infinite
49
 
50
    /**
51
     * {@link ChannelPipedOutputStream} is already closed and so we will not receive additional data.
52
     * This is different from the {@link #closed}, which indicates that the reader of this {@link InputStream}
53
     * will not be reading data any more.
54
     */
55
    private boolean writerClosed;
56
 
57
    public ChannelPipedInputStream(Window localWindow) {
58
        this.localWindow = localWindow;
59
    }
60
 
61
    public void setTimeout(int timeout) {
62
        this.timeout = timeout;
63
    }
64
 
65
    public int getTimeout() {
66
        return timeout;
67
    }
68
 
69
    @Override
70
    public int available() throws IOException {
71
        lock.lock();
72
        try {
73
            int avail = buffer.available();
74
            if (avail == 0 && writerClosed) {
75
                return -1;
76
            }
77
            return avail;
78
        } finally {
79
            lock.unlock();
80
        }
81
    }
82
 
83
    public int read() throws IOException {
84
        synchronized (b) {
85
            int l = read(b, 0, 1);
86
            if (l == -1) {
87
                return -1;
88
            }
89
            return ((int) b[0] & 0xff);
90
        }
91
    }
92
 
93
    @Override
94
    public int read(byte[] b, int off, int len) throws IOException {
95
        int avail;
96
        long startTime = System.currentTimeMillis();
97
        lock.lock();
98
        try {
99
            for (;;) {
100
                if (closed && writerClosed && eofSent || closed && !writerClosed) {
101
                    throw new IOException("Pipe closed");
102
                }
103
                if (buffer.available() > 0) {
104
                    break;
105
                }
106
                if (writerClosed) {
107
                    eofSent = true;
108
                    return -1; // no more data to read
109
                }
110
                try {
111
                    if (timeout > 0) {
112
                        long remaining = timeout - (System.currentTimeMillis() - startTime);
113
                        if (remaining <= 0) {
114
                            throw new SocketException("timeout");
115
                        }
116
                        dataAvailable.await(remaining, TimeUnit.MILLISECONDS);
117
                    } else {
118
                        dataAvailable.await();
119
                    }
120
                } catch (InterruptedException e) {
121
                    throw (IOException) new InterruptedIOException().initCause(e);
122
                }
123
            }
124
            if (len > buffer.available()) {
125
                len = buffer.available();
126
            }
127
            buffer.getRawBytes(b, off, len);
128
            if (buffer.rpos() > localWindow.getPacketSize() || buffer.available() == 0) {
129
                buffer.compact();
130
            }
131
            avail = localWindow.getMaxSize() - buffer.available();
132
        } finally {
133
            lock.unlock();
134
        }
135
        localWindow.check(avail);
136
        return len;
137
    }
138
 
139
    public void eof() {
140
        lock.lock();
141
        try {
142
            writerClosed = true;
143
            dataAvailable.signalAll();
144
        } finally {
145
            lock.unlock();
146
        }
147
    }
148
 
149
    @Override
150
    public void close() throws IOException {
151
        lock.lock();
152
        try {
153
            closed = true;
154
            dataAvailable.signalAll();
155
        } finally {
156
            lock.unlock();
157
        }
158
    }
159
 
160
    public void receive(byte[] bytes, int off, int len) throws IOException {
161
        lock.lock();
162
        try {
163
            if (writerClosed || closed) {
164
                throw new IOException("Pipe closed");
165
            }
166
            buffer.putRawBytes(bytes, off, len);
167
            dataAvailable.signalAll();
168
        } finally {
169
            lock.unlock();
170
        }
171
        localWindow.consume(len);
172
    }
173
}