Blame |
Last modification |
View Log
| RSS feed
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sshd.common.channel;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.SocketException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.sshd.common.util.Buffer;
/**
* TODO Add javadoc
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class ChannelPipedInputStream
extends InputStream {
private final Window localWindow
;
private final Buffer buffer =
new Buffer();
private final byte[] b =
new byte[1];
private boolean closed
;
private boolean eofSent
;
private final Lock lock =
new ReentrantLock();
private final Condition dataAvailable = lock.
newCondition();
private int timeout =
0; // zero is infinite
/**
* {@link ChannelPipedOutputStream} is already closed and so we will not receive additional data.
* This is different from the {@link #closed}, which indicates that the reader of this {@link InputStream}
* will not be reading data any more.
*/
private boolean writerClosed
;
public ChannelPipedInputStream
(Window localWindow
) {
this.
localWindow = localWindow
;
}
public void setTimeout
(int timeout
) {
this.
timeout = timeout
;
}
public int getTimeout
() {
return timeout
;
}
@
Override
public int available
() throws IOException {
lock.
lock();
try {
int avail = buffer.
available();
if (avail ==
0 && writerClosed
) {
return -
1;
}
return avail
;
} finally {
lock.
unlock();
}
}
public int read
() throws IOException {
synchronized (b
) {
int l = read
(b,
0,
1);
if (l == -
1) {
return -
1;
}
return ((int) b
[0] & 0xff
);
}
}
@
Override
public int read
(byte[] b,
int off,
int len
) throws IOException {
int avail
;
long startTime =
System.
currentTimeMillis();
lock.
lock();
try {
for (;;) {
if (closed
&& writerClosed
&& eofSent || closed
&& !writerClosed
) {
throw new IOException("Pipe closed");
}
if (buffer.
available() > 0) {
break;
}
if (writerClosed
) {
eofSent =
true;
return -
1; // no more data to read
}
try {
if (timeout
> 0) {
long remaining = timeout -
(System.
currentTimeMillis() - startTime
);
if (remaining
<=
0) {
throw new SocketException("timeout");
}
dataAvailable.
await(remaining,
TimeUnit.
MILLISECONDS);
} else {
dataAvailable.
await();
}
} catch (InterruptedException e
) {
throw (IOException) new InterruptedIOException().
initCause(e
);
}
}
if (len
> buffer.
available()) {
len = buffer.
available();
}
buffer.
getRawBytes(b, off, len
);
if (buffer.
rpos() > localWindow.
getPacketSize() || buffer.
available() ==
0) {
buffer.
compact();
}
avail = localWindow.
getMaxSize() - buffer.
available();
} finally {
lock.
unlock();
}
localWindow.
check(avail
);
return len
;
}
public void eof
() {
lock.
lock();
try {
writerClosed =
true;
dataAvailable.
signalAll();
} finally {
lock.
unlock();
}
}
@
Override
public void close
() throws IOException {
lock.
lock();
try {
closed =
true;
dataAvailable.
signalAll();
} finally {
lock.
unlock();
}
}
public void receive
(byte[] bytes,
int off,
int len
) throws IOException {
lock.
lock();
try {
if (writerClosed || closed
) {
throw new IOException("Pipe closed");
}
buffer.
putRawBytes(bytes, off, len
);
dataAvailable.
signalAll();
} finally {
lock.
unlock();
}
localWindow.
consume(len
);
}
}