Subversion Repositories Programming Utils

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
86 rm5248 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.sshd.client.channel;
20
 
21
import java.io.IOException;
22
import java.io.InputStream;
23
 
24
import org.apache.sshd.common.SshConstants;
25
import org.apache.sshd.common.channel.ChannelOutputStream;
26
import org.apache.sshd.common.channel.ChannelPipedInputStream;
27
import org.apache.sshd.common.channel.ChannelPipedOutputStream;
28
import org.apache.sshd.common.channel.ChannelAsyncInputStream;
29
import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
30
import org.apache.sshd.common.future.CloseFuture;
31
 
32
/**
33
 * TODO Add javadoc
34
 *
35
 * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
36
 */
37
public class ChannelSession extends AbstractClientChannel {
38
 
39
    private Thread streamPumper;
40
 
41
    public ChannelSession() {
42
        super("session");
43
    }
44
 
45
    @Override
46
    protected void doOpen() throws IOException {
47
        if (streaming == Streaming.Async) {
48
            asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
49
                @Override
50
                protected CloseFuture doCloseGracefully() {
51
                    try {
52
                        sendEof();
53
                    } catch (IOException e) {
54
                        session.exceptionCaught(e);
55
                    }
56
                    return super.doCloseGracefully();
57
                }
58
            };
59
            asyncOut = new ChannelAsyncInputStream(this);
60
            asyncErr = new ChannelAsyncInputStream(this);
61
        } else {
62
            invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
63
            if (out == null) {
64
                ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
65
                ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
66
                out = pos;
67
                invertedOut = pis;
68
            }
69
            if (err == null) {
70
                ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
71
                ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
72
                err = pos;
73
                invertedErr = pis;
74
            }
75
            if (in != null) {
76
                streamPumper = new Thread("ClientInputStreamPump") {
77
                    @Override
78
                    public void run() {
79
                        pumpInputStream();
80
                    }
81
                };
82
                // Interrupt does not really work and the thread will only exit when
83
                // the call to read() will return.  So ensure this thread is a daemon
84
                // to avoid blocking the whole app
85
                streamPumper.setDaemon(true);
86
                streamPumper.start();
87
            }
88
        }
89
    }
90
 
91
    @Override
92
    protected void doCloseImmediately() {
93
        if (streamPumper != null) {
94
            streamPumper.interrupt();
95
            streamPumper = null;
96
        }
97
        super.doCloseImmediately();
98
    }
99
 
100
    protected void pumpInputStream() {
101
        try {
102
            byte[] buffer = new byte[remoteWindow.getPacketSize()];
103
            while (!closeFuture.isClosed()) {
104
                int len = securedRead(in, buffer, 0, buffer.length);
105
                if (len > 0) {
106
                    invertedIn.write(buffer, 0, len);
107
                    invertedIn.flush();
108
                } else {
109
                    sendEof();
110
                    break;
111
                }
112
            }
113
        } catch (Exception e) {
114
            if (!isClosing()) {
115
                log.info("Caught exception", e);
116
                close(false);
117
            }
118
        }
119
    }
120
 
121
    //
122
    // On some platforms, a call to System.in.read(new byte[65536], 0,32768) always throws an IOException.
123
    // So we need to protect against that and chunk the call into smaller calls.
124
    // This problem was found on Windows, JDK 1.6.0_03-b05.
125
    //
126
    protected int securedRead(InputStream in, byte[] buf, int off, int len) throws IOException {
127
        int n = 0;
128
        for (;;) {
129
            int nread = in.read(buf, off + n, Math.min(1024, len - n));
130
            if (nread <= 0) {
131
                return (n == 0) ? nread : n;
132
            }
133
            n += nread;
134
            if (n >= len) {
135
                return n;
136
            }
137
            // if not closed but no bytes available, return
138
            if (in.available() <= 0) {
139
                return n;
140
            }
141
        }
142
    }
143
 
144
}