目录
一, BIO 最简使用姿势
二, connect 方法
2.1 Socket.connect 方法
2.2 AbstractPlainSocketImpl.connect 方法
2.3 DualStackPlainSocketImpl.socketConnect 方法
三, SocketInputStream
3.1 构造方法
3.2 read 方法
四, SocketInputStream
Netty 系列目录 (https://www.cnblogs.com/binarylei/p/10117436.html)
在上一篇文章中详细分析了 ServerSocket 的源码, Socket 和 ServerSocket 一样也只是一个门面模式, 真正的实现也是 SocksSocketImpl, 所以关于 setImpl,createImpl,new,bind,listen 都是类似的, 本文重点关注其 connect 和 IO 流的读取方法.
一, BIO 最简使用姿势
- //1. 连接服务器
- Socket socket = new Socket();
- socket.connect(new InetSocketAddress(HOST, PORT), 0);
- BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- PrintWriterout = new PrintWriter(socket.getOutputStream(), true);
- //2. 发送请求数据
- out.println("客户端发送请求数据...");
- //3. 接收服务端数据
- String response = in.readLine();
- System.out.println("Client:" + response);
ok, 代码已经完成!!! 下面的源码分析都会基于这个 demo.
二, connect 方法
2.1 Socket.connect 方法
- // timeout=0 表示永久阻塞, timeout>0 则指定超时时间
- public void connect(SocketAddress endpoint, int timeout) throws IOException {
- InetSocketAddress epoint = (InetSocketAddress) endpoint;
- InetAddress addr = epoint.getAddress ();
- int port = epoint.getPort();
- // 1. 创建底层 socket 套接字
- if (!created)
- createImpl(true);
- // 2. oldImpl 默认为 false, 也就是进入第一个 if 条件
- // checkOldImpl 会判断 impl 中有没有 connect(SocketAddress address, int port) 方法
- // 来设置 oldImpl 的值
- if (!oldImpl)
- impl.connect(epoint, timeout);
- else if (timeout == 0) {
- if (epoint.isUnresolved())
- impl.connect(addr.getHostName(), port);
- else
- impl.connect(addr, port);
- } else
- throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
- connected = true;
- bound = true;
- }
总结: Socket 首先和 ServerSocket 一样调用 createImpl 创建底层 socket 对象, 然后委托给 impl 完成连接操作
2.2 AbstractPlainSocketImpl.connect 方法
- protected void connect(SocketAddress address, int timeout) throws IOException {
- boolean connected = false;
- try {
- InetSocketAddress addr = (InetSocketAddress) address;
- this.port = addr.getPort();
- this.address = addr.getAddress();
- connectToAddress(this.address, port, timeout);
- connected = true;
- } finally {
- if (!connected) {
- close();
- }
- }
- }
- private void connectToAddress(InetAddress address, int port, int timeout) throws IOException {
- if (address.isAnyLocalAddress()) {
- doConnect(InetAddress.getLocalHost(), port, timeout);
- } else {
- doConnect(address, port, timeout);
- }
- }
总结: connect 将连接具体由 doConnect 完成
- synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException {
- synchronized (fdLock) {
- if (!closePending && (socket == null || !socket.isBound())) {
- NetHooks.beforeTcpConnect(fd, address, port);
- }
- }
- try {
- acquireFD();
- try {
- socketConnect(address, port, timeout);
- /* socket may have been closed during poll/select */
- synchronized (fdLock) {
- if (closePending) {
- throw new SocketException ("Socket closed");
- }
- }
- if (socket != null) {
- socket.setBound();
- socket.setConnected();
- }
- } finally {
- releaseFD();
- }
- } catch (IOException e) {
- close();
- throw e;
- }
- }
2.3 DualStackPlainSocketImpl.socketConnect 方法
- void socketConnect(InetAddress address, int port, int timeout)
- throws IOException {
- int nativefd = checkAndReturnNativeFD();
- if (address == null)
- throw new NullPointerException("inet address argument is null.");
- int connectResult;
- if (timeout <= 0) {
- connectResult = connect0(nativefd, address, port);
- } else {
- configureBlocking(nativefd, false);
- try {
- connectResult = connect0(nativefd, address, port);
- if (connectResult == WOULDBLOCK) {
- waitForConnect(nativefd, timeout);
- }
- } finally {
- configureBlocking(nativefd, true);
- }
- }
- if (localport == 0)
- localport = localPort0(nativefd);
- }
补充 1:connect0 在 JVM 中的实现
- JNIEXPORT jint JNICALL Java_java_net_DualStackPlainSocketImpl_connect0
- (JNIEnv *env, jclass clazz, jint fd, jobject iaObj, jint port) {
- SOCKETADDRESS sa;
- int rv;
- int sa_len = sizeof(sa);
- if (NET_InetAddressToSockaddr(env, iaObj, port, (struct sockaddr *)&sa,
- &sa_len, JNI_TRUE) != 0) {
- return -1;
- }
- rv = connect(fd, (struct sockaddr *)&sa, sa_len);
- if (rv == SOCKET_ERROR) {
- int err = WSAGetLastError();
- if (err == WSAEWOULDBLOCK) {
- return java_net_DualStackPlainSocketImpl_WOULDBLOCK;
- } else if (err == WSAEADDRNOTAVAIL) {
- JNU_ThrowByName(env, JNU_JAVANETPKG "ConnectException",
- "connect: Address is invalid on local machine, or port is not valid on remote machine");
- } else {
- NET_ThrowNew(env, err, "connect");
- }
- return -1; // return value not important.
- }
- return rv;
- }
总结: rv = connect(fd, (struct sockaddr *)&sa, sa_len) 建立远程连接.
补充 2:waitForConnect 在 JVM 中的实现
和 ServerSocket.waitForNewConnection 一样, 也是通过 Winsock 库的 select 函数来实现超时的功能.
- JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_waitForConnect
- (JNIEnv *env, jclass clazz, jint fd, jint timeout) {
- int rv, retry;
- int optlen = sizeof(rv);
- fd_set wr, ex;
- struct timeval t;
- FD_ZERO(&wr);
- FD_ZERO(&ex);
- FD_SET(fd, &wr);
- FD_SET(fd, &ex);
- t.tv_sec = timeout / 1000;
- t.tv_usec = (timeout % 1000) * 1000;
- rv = select(fd+1, 0, &wr, &ex, &t);
- if (rv == 0) {
- JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
- "connect timed out");
- shutdown( fd, SD_BOTH );
- return;
- }
- if (!FD_ISSET(fd, &ex)) {
- return; /* connection established */
- }
- for (retry=0; retry<3; retry++) {
- NET_GetSockOpt(fd, SOL_SOCKET, SO_ERROR,
- (char*)&rv, &optlen);
- if (rv) {
- break;
- }
- Sleep(0);
- }
- if (rv == 0) {
- JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
- "Unable to establish connection");
- } else {
- NET_ThrowNew(env, rv, "connect");
- }
- }
总结: rv = select(fd+1, 0, &wr, &ex, &t) 轮询会阻塞程序.
三, SocketInputStream
3.1 构造方法
- SocketInputStream(AbstractPlainSocketImpl impl) throws IOException {
- super(impl.getFileDescriptor());
- this.impl = impl;
- socket = impl.getSocket();
- }
总结: SocketInputStream 内部实现上也是对 impl 的封装. SocketInputStream.read 其实也是调用底层 socket 的 read 方法.
3.2 read 方法
- int read(byte b[], int off, int length, int timeout) throws IOException {
- int n;
- // EOF already encountered
- if (eof) {
- return -1;
- }
- // connection reset
- if (impl.isConnectionReset()) {
- throw new SocketException("Connection reset");
- }
- // bounds check
- if (length <= 0 || off <0 || off + length> b.length) {
- if (length == 0) {
- return 0;
- }
- throw new ArrayIndexOutOfBoundsException();
- }
- boolean gotReset = false;
- // acquire file descriptor and do the read
- FileDescriptor fd = impl.acquireFD();
- try {
- n = socketRead(fd, b, off, length, timeout);
- if (n> 0) {
- return n;
- }
- } catch (ConnectionResetException rstExc) {
- gotReset = true;
- } finally {
- impl.releaseFD();
- }
- /*
- * We receive a "connection reset" but there may be bytes still
- * buffered on the socket
- */
- if (gotReset) {
- impl.setConnectionResetPending();
- impl.acquireFD();
- try {
- n = socketRead(fd, b, off, length, timeout);
- if (n> 0) {
- return n;
- }
- } catch (ConnectionResetException rstExc) {
- } finally {
- impl.releaseFD();
- }
- }
- /*
- * If we get here we are at EOF, the socket has been closed,
- * or the connection has been reset.
- */
- if (impl.isClosedOrPending()) {
- throw new SocketException("Socket closed");
- }
- if (impl.isConnectionResetPending()) {
- impl.setConnectionReset();
- }
- if (impl.isConnectionReset()) {
- throw new SocketException("Connection reset");
- }
- eof = true;
- return -1;
- }
- private int socketRead(FileDescriptor fd, byte b[], int off, int len,
- int timeout) throws IOException {
- return socketRead0(fd, b, off, len, timeout);
- }
补充 2:socketRead0 在 JVM 中的实现
- // src/Windows/native/java.NET/SocketInputStream.c
- JNIEXPORT jint JNICALL Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this,
- jobject fdObj, jbyteArray data, jint off, jint len, jint timeout) {
- char *bufP;
- char BUF[MAX_BUFFER_LEN];
- jint fd, newfd;
- jint nread;
- if (IS_NULL(fdObj)) {
- JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
- return -1;
- }
- fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
- if (fd == -1) {
- NET_ThrowSocketException(env, "Socket closed");
- return -1;
- }
- /*
- * If the caller buffer is large than our stack buffer then we allocate
- * from the heap (up to a limit). If memory is exhausted we always use
- * the stack buffer.
- */
- if (len <= MAX_BUFFER_LEN) {
- bufP = BUF;
- } else {
- if (len> MAX_HEAP_BUFFER_LEN) {
- len = MAX_HEAP_BUFFER_LEN;
- }
- bufP = (char *)malloc((size_t)len);
- if (bufP == NULL) {
- /* allocation failed so use stack buffer */
- bufP = BUF;
- len = MAX_BUFFER_LEN;
- }
- }
- if (timeout) {
- if (timeout <= 5000 || !isRcvTimeoutSupported) {
- int ret = NET_Timeout (fd, timeout);
- if (ret <= 0) {
- if (ret == 0) {
- JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
- "Read timed out");
- } else if (ret == JVM_IO_ERR) {
- JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
- } else if (ret == JVM_IO_INTR) {
- JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
- "Operation interrupted");
- }
- if (bufP != BUF) {
- free(bufP);
- }
- return -1;
- }
- /*check if the socket has been closed while we were in timeout*/
- newfd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
- if (newfd == -1) {
- NET_ThrowSocketException(env, "Socket Closed");
- if (bufP != BUF) {
- free(bufP);
- }
- return -1;
- }
- }
- }
- // 最关键的代码, recv 从 socketfd 中读取数据
- nread = recv(fd, bufP, len, 0);
- if (nread> 0) {
- (*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP);
- } else {
- if (nread <0) {
- // Check if the socket has been closed since we last checked.
- // This could be a reason for recv failing.
- if ((*env)->GetIntField(env, fdObj, IO_fd_fdID) == -1) {
- NET_ThrowSocketException(env, "Socket closed");
- } else {
- switch (WSAGetLastError()) {
- case WSAEINTR:
- JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
- "socket closed");
- break;
- case WSAECONNRESET:
- case WSAESHUTDOWN:
- /*
- * Connection has been reset - Windows sometimes reports
- * the reset as a shutdown error.
- */
- JNU_ThrowByName(env, "sun/net/ConnectionResetException", "");
- break;
- case WSAETIMEDOUT :
- JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException", "Read timed out");
- break;
- default:
- NET_ThrowCurrent(env, "recv failed");
- }
- }
- }
- }
- if (bufP != BUF) {
- free(bufP);
- }
- return nread;
- }
总结: socketRead0 实现很长, 其实我们只用关注核心的实现 nread = recv(fd, bufP, len, 0); 即可, 毕竟我们不是专门做 c++.
四, SocketInputStream
和 SocketInputStream 类似, 就不继续分析了.
每天用心记录一点点. 内容也许不重要, 但习惯很重要!
来源: http://www.bubuko.com/infodetail-3115700.html