Blame |
Last modification |
View Log
| RSS feed
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sshd.common.channel;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Window for a given channel.
* Windows are used to not overflow the client or server when sending datas.
* Both clients and servers have a local and remote window and won't send
* anymore data until the window has been expanded. When the local window
* is
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class Window {
private final static Logger log = LoggerFactory.
getLogger(Window.
class);
private final AbstractChannel channel
;
private final Object lock
;
private final String name
;
private int size
;
private int maxSize
;
private int packetSize
;
private boolean waiting
;
private boolean closed
;
public Window(AbstractChannel channel,
Object lock,
boolean client,
boolean local
) {
this.
channel = channel
;
this.
lock = lock
!=
null ? lock :
this;
this.
name =
(client
? "client" :
"server") +
" " +
(local
? "local " :
"remote") +
" window";
}
public int getSize
() {
synchronized (lock
) {
return size
;
}
}
public int getMaxSize
() {
return maxSize
;
}
public int getPacketSize
() {
return packetSize
;
}
public void init
(int size,
int packetSize
) {
synchronized (lock
) {
this.
size = size
;
this.
maxSize = size
;
this.
packetSize = packetSize
;
lock.
notifyAll();
}
}
public void expand
(int window
) {
synchronized (lock
) {
size += window
;
if (log.
isDebugEnabled()) {
log.
debug("Increase " + name +
" by " + window +
" up to " + size
);
}
lock.
notifyAll();
}
}
public void consume
(int len
) {
synchronized (lock
) {
//assert size > len;
size -= len
;
if (log.
isTraceEnabled()) {
log.
trace("Consume " + name +
" by " + len +
" down to " + size
);
}
}
}
public void consumeAndCheck
(int len
) throws IOException {
synchronized (lock
) {
//assert size > len;
size -= len
;
if (log.
isTraceEnabled()) {
log.
trace("Consume " + name +
" by " + len +
" down to " + size
);
}
check
(maxSize
);
}
}
public void check
(int maxFree
) throws IOException {
synchronized (lock
) {
if (size
< maxFree /
2) {
if (log.
isDebugEnabled()) {
log.
debug("Increase " + name +
" by " +
(maxFree - size
) +
" up to " + maxFree
);
}
channel.
sendWindowAdjust(maxFree - size
);
size = maxFree
;
}
}
}
public void waitAndConsume
(int len
) throws InterruptedException, WindowClosedException
{
synchronized (lock
) {
while (size
< len
&& !closed
) {
log.
debug("Waiting for {} bytes on {}", len, name
);
waiting =
true;
lock.
wait();
}
if (waiting
) {
log.
debug("Space available for {}", name
);
waiting =
false;
}
if (closed
) {
throw new WindowClosedException
();
}
size -= len
;
if (log.
isTraceEnabled()) {
log.
trace("Consume " + name +
" by " + len +
" down to " + size
);
}
}
}
public int waitForSpace
() throws InterruptedException, WindowClosedException
{
synchronized (lock
) {
while (size ==
0 && !closed
) {
log.
debug("Waiting for some space on {}", name
);
waiting =
true;
lock.
wait();
}
if (waiting
) {
log.
debug("Space available for {}", name
);
waiting =
false;
}
if (closed
) {
throw new WindowClosedException
();
}
return size
;
}
}
public void notifyClosed
() {
synchronized (lock
) {
closed =
true;
if (waiting
) {
lock.
notifyAll();
}
}
}
}