---
title: "Socket Connections Update"
author: "Tomas Kalibera, Luke Tierney"
date: 2020-03-17
categories: ["User-visible Behavior", "Internals"]
tags: ["sockets", "parallel", "PSOCK cluster"]

---

```{r setup, include=FALSE}
knitr::opts_chunk$set(collapse = TRUE)
```

Starting up a PSOCK cluster is not fast.  In R 3.6 on just a few years old
laptop with 8 logical cores, running Windows, it takes about 1.7s to start a
cluster with 8 nodes:

```
library(parallel); system.time(cl <- makePSOCKcluster(8))
```

A good design is to start a cluster only once during an R session and then
pass it to computations that can take advantage of it.  This is needed so
that the end user always has full control over how many cores are used in
total.  Starting a cluster in package code out of direct control of the user
often causes big slowdowns by overloading the machine, resulting in much
worse performance than sequential execution.

The 1.7s thus may seen acceptable, but if we start a larger cluster on a
server machine with many cores, one node per logical core, the startup times
become prohibitively large.  On a recent Fedora server with 64 logical
cores, it takes about 14s.  On an old Solaris server with 64 logical cores,
it takes 211s!

In R-devel, we have extended the sockets API and re-designed the startup of
a PSOCK cluster.  The Windows laptop mentioned above now starts the cluster in
0.5s in R-devel.  Timings for several other machines with more cores are

|                           | R 3.6  | R-devel |
| :------------------------ | ----:  | ------: |
| Fedora server (64 cores)  |  14s   | 0.4s    |
| Ubuntu server (40 cores)  |   6.6s | 0.4s    |
| Windows server (48 cores) |   9.3s | 0.5s    |
| Solaris server (64 cores) | 211s   | 7s      |
| macOS desktop (12 cores)  |   4.2s | 0.7s    |

The speedup is large when the number of cores is large, but the
individual cores are slow.

# Compatibility

The rest of this post describes in detail how these performace improvements
were achieved.
The socket layer improvements do not change documented behavior for the
existing API, but change observable behavior (sometimes timeouts are
enforced when they were not before, `blocking = FALSE` is respected on the
server end of a connection). The usual way of testing R changes via
comparing results of package checks does not help too much  here as the CRAN
policies limit the number of cores to 2 for the checks (to prevent
overloading of the check machines). Users relying on parallelization/sockets
communication are hence invited and encouraged  to run any of their tests
they have on large systems and report any new bugs.

# Starting a PSOCK cluster in R 3.6

R 3.6 and earlier starts a cluster sequentially.  For each node, the server
issues a `system()` command to start the node, then waits via
`socketConnection(server = TRUE)` for the node to connect, and then does the
same for the second node, etc.  Nearly all the time to start a cluster is
spent in starting all the R sessions.

Even though simple on the server, it is not simple in the nodes: when a node
is started, it tries to connect to the server via `socketConnection(server =
FALSE)`, but the connection may fail due to a race condition: the operating
system may decide to run `socketConnection` in the node before
`socketConnection` in the server, and then the connection will fail. 
Therefore, even in R 3.6 cluster setup, the nodes had to re-try in case of a
failure, and did so with an exponential backoff.

There is no way to avoid this race condition, because
`socketConnection(server = TRUE)` does all three of the server socket
operations: `bind()`, `listen()`, and `accept()`.  It always creates and
binds a temporary server socket, makes it listening, waits for a connection
via `accept()`, and then destroys the server socket.  In the time intervals
when there is no listening server socket on the given port on the server,
the connection from the node will fail.

In practice, this has been working well, because on typical operating
systems the server will soon be scheduled and the number of retries
will be small.  However, if we were starting the nodes in parallel,
the number of retries would probably increase dramatically with the
increasing number of nodes, which would damage performance.

# Server socket connections

We have, instead, extended the R connections API so that one can work with
server socket connections directly, re-using them for accepting multiple
socket connections:

```
serverSocket(port)
     
socketAccept(socket, blocking = FALSE, open = "a+",
             encoding = getOption("encoding"),
             timeout = getOption("timeout"))
```

`serverSocket` creates a listening server socket connection, which is of a
new class `"servsockconn"`.  `socketAccept` accepts an incoming connection
to the given server socket.  `socketSelect` can be used on a listening
server socket to wait for when a connection is ready to be accepted.  The
server socket connection can be closed by `close` as usual.

# Listen backlog

The operating system has a queue of partial and established incoming
connections.  Established connections are ready to be given to the
application via `accept()` (`socketAccept()`).  This queue has a limited
length, which one can influence via an argument to `listen()`, but it is
only a hint and one cannot programmatically find out how large the queue
really is.  The operating system may impose a limit and make that queue
shorter.  We have modified R-devel to use the maximum length supported on
each system, but there is no guarantee it would be enough for all nodes. 
The cluster setup hence needs to be behave correctly when the queue is too
short.

TCP uses a 3-way handshake when establishing a connection.  In the normal
successful case when the queue is not full, a node sends `SYN` packet, the
server puts the connection into a queue and responds with `SYN+ACK`, the
node then gives the connection to the application as established (via
`connect()`/`socketConnection(server=FALSE)`) and sends `ACK` to the server. 
The server then flags the connection as established (some systems put it to
a different queue) and gives it to the application (via
`accept()`/`socketAccept()`).

When the queue is full, a number of things can happen.  Linux has
effectively two queues, one for already established connections and one for
partially established ones.  Only the size of the established queue is
influenced by the backlog argument to `listen()`, and when that queue is full,
Linux already decreases the rate of adding connections to the
partially-established queue.  It may simply drop a `SYN` packet and not put
an incoming connection to the partially-established queue.  The TCP layer
on the node will retry a few times, re-sending the `SYN`, but eventually give
up, send a `RST` (reset), and the connection will fail.  It is hence
necessary to keep the code for retries after failure in the nodes even with
the new server socket API.

Moreover, it can still happen even on Linux that an `ACK` from the node is
received on the server, but the established queue is full.  Then the server
may send a `RST` to the node and the node will fail, because it is already
blocked waiting for commands from the server on a connection it believes is
already established.  A similar situation arises when the server drops the
`ACK` packet, but keeps the connection in the partially-established queue. 
It may then after a timeout re-send `SYN+ACK` to the node, the node re-sends
its `ACK`, and eventually the connection may really succeed on the server or
the server may send a `RST` and remove it from the partially-established
queue.  More information on how Linux implements the backlog is available
[here](http://veithen.io/2014/01/01/how-tcp-backlog-works-in-linux.html).

An additional complication is that the server may drop the `ACK` from the
node and remove the connection from the partially-established queue. 
Empirically we have seen this during a stress test when the connections from
the client were old, and the number of occurrences increased with the age of
the connections, so presumably this is after a timeout.  Consequently, the
node then does not receive a `RST` and will keep waiting for a command from
the server indefinitely.

This situation is known as half-opened connection and can arise in various
ways in TCP communication.  It would be resolved by the TCP layer if the
node started communicating over the connection, but in the PSOCK protocol,
it is the server that starts communicating, so this issue needs to be
handled specially.

# Server-initiated handshake

We have thus changed the cluster setup procedure so that the server, as soon
as it gets a connection from the node, sends an initial command to the node
as a handshake.  The node waits for such initial command during the setup
phase and if it does not get the command in some time (half-opened
connection closed on server) or if the waiting fails (it gets a `RST` from
the server), the node re-tries establishing a connection to the server and
waiting for a new handshake.  This does not change the wire protocol: the
handshake is just a regular command and the node runs it and sends a
response.

# Connection timeouts

The R 3.6 sockets API allows to define a timeout for a socket connection at
creation time.  The timeout then influences most operations on the socket
(applies individually to low-level operations, but the R-level functions may
wait somewhat longer in total).  PSOCK clusters use connection timeouts of
30 days: if there is no command from the server (e.g.  the server crashes or
connection is lost) to the node within that time, the node exits on its own. 
For the handshake during cluster setup, we would need a much shorter
timeout.  We have hence extended the API to allow modifying a timeout of a
socket connection:

```
socketTimeout(socket, timeout = -1)
```

This new function allows using a short timeout during the handshake and in
`socketConnection` invocation on the node, which was previously
effectively blocking (blocking on Linux due to a select() bug, timeout of 30
days on other systems).  After the handshake, `socketTimeout` is called to
increase the timeout again to 30 days.

The server initiated handshake in addition to helping with half-opened
connections opened on the client only (via the timeout) also gets rid of any
half-opened connection opened only on the server.  The TCP layer will detect
those and fail when the server starts communicating.  We have observed these
as well during our initial stress testing.

# Parallel PSOCK cluster setup

In R-devel, the new parallel cluster setup is used by default for
homogeneous clusters with all nodes running on `localhost`, when all nodes are
started automatically.  The original sequential startup code is still
supported for the other cases.  There is a new cluster option
`setup_strategy` with values `"parallel"` and `"sequential"`.  `"parallel"`
is the default and tells R to use parallel strategy on all cluster where
supported.

# R socket layer improvements

Several issues of the R socket layer implementation were fixed as part of
this work.

The connection timeout with `socketConnection(server = FALSE)` on Linux is
now enforced. Before, the call was accidentally blocking due to
Linux-specific behavior of `select()`.

`socketConnection(server = FALSE)` on Windows now returns immediately when
the connection fails.  Before, one had to wait for a timeout to expire due
to Windows-specific behavior of `select()` when waiting for a connection.

`socketConnection(server = FALSE)` now detects when a connection is
available right away without waiting (probably unlikely and only possible on
a `localhost`) and returns it.  Previously, R would wait anyway and attempt to
connect again, possibly leaking the connection.

`socketConnection(server = TRUE)` (and `socketAccept()`) now enforce the
connection timeout.  Previously, they could block indefinitely due to a race
condition when a connection seems available to be accepted by `select()`,
but is re-set by the client by the time `accept()` is called.  On some
systems, `accept()` would then block.  On other (we triggered this on Solaris),
`accept()` would then fail; after the change, R will keep waiting for a good
connection respecting the timeout.

Read operation from a socket is now robust against spurious readability of
the socket (`select()` reports it as readable, but then e.g.  due to an
invalid checksum `recv()` would block).  This problem may happen on Linux.

Write operation to a socket is now robust against spurious writeability of
the socket.  Previously, this case could lead to unpredictable behavior. 
However, this kind of `select()` bug has not been reported on any system to
our knowledge.

The `blocking = FALSE` argument on a socket connection (`socketConnection()`) is
now respected also on the server side of a socket.  Previously, read/write
operations on the server end of a socket were accidentally blocking even
with `blocking = FALSE`.

The internal handling of status codes from socket operations on Windows has
been updated for WinSock2.