--- title: "Socket Connections Update" author: "Tomas Kalibera, Luke Tierney" date: 2020-03-17 categories: ["User-visible Behavior", "Internals"] tags: ["sockets", "parallel", "PSOCK cluster"] ---
Starting up a PSOCK cluster is not fast. In R 3.6 on just a few years old laptop with 8 logical cores, running Windows, it takes about 1.7s to start a cluster with 8 nodes:
library(parallel); system.time(cl <- makePSOCKcluster(8))
A good design is to start a cluster only once during an R session and then pass it to computations that can take advantage of it. This is needed so that the end user always has full control over how many cores are used in total. Starting a cluster in package code out of direct control of the user often causes big slowdowns by overloading the machine, resulting in much worse performance than sequential execution.
The 1.7s thus may seen acceptable, but if we start a larger cluster on a server machine with many cores, one node per logical core, the startup times become prohibitively large. On a recent Fedora server with 64 logical cores, it takes about 14s. On an old Solaris server with 64 logical cores, it takes 211s!
In R-devel, we have extended the sockets API and re-designed the startup of a PSOCK cluster. The Windows laptop mentioned above now starts the cluster in 0.5s in R-devel. Timings for several other machines with more cores are
R 3.6 | R-devel | |
---|---|---|
Fedora server (64 cores) | 14s | 0.4s |
Ubuntu server (40 cores) | 6.6s | 0.4s |
Windows server (48 cores) | 9.3s | 0.5s |
Solaris server (64 cores) | 211s | 7s |
macOS desktop (12 cores) | 4.2s | 0.7s |
The speedup is large when the number of cores is large, but the individual cores are slow.
The rest of this post describes in detail how these performace improvements
were achieved.
The socket layer improvements do not change documented behavior for the
existing API, but change observable behavior (sometimes timeouts are
enforced when they were not before, blocking = FALSE
is respected on the
server end of a connection). The usual way of testing R changes via
comparing results of package checks does not help too much here as the CRAN
policies limit the number of cores to 2 for the checks (to prevent
overloading of the check machines). Users relying on parallelization/sockets
communication are hence invited and encouraged to run any of their tests
they have on large systems and report any new bugs.
R 3.6 and earlier starts a cluster sequentially. For each node, the server
issues a system()
command to start the node, then waits via
socketConnection(server = TRUE)
for the node to connect, and then does the
same for the second node, etc. Nearly all the time to start a cluster is
spent in starting all the R sessions.
Even though simple on the server, it is not simple in the nodes: when a node
is started, it tries to connect to the server via socketConnection(server = FALSE)
, but the connection may fail due to a race condition: the operating
system may decide to run socketConnection
in the node before
socketConnection
in the server, and then the connection will fail.
Therefore, even in R 3.6 cluster setup, the nodes had to re-try in case of a
failure, and did so with an exponential backoff.
There is no way to avoid this race condition, because
socketConnection(server = TRUE)
does all three of the server socket
operations: bind()
, listen()
, and accept()
. It always creates and
binds a temporary server socket, makes it listening, waits for a connection
via accept()
, and then destroys the server socket. In the time intervals
when there is no listening server socket on the given port on the server,
the connection from the node will fail.
In practice, this has been working well, because on typical operating systems the server will soon be scheduled and the number of retries will be small. However, if we were starting the nodes in parallel, the number of retries would probably increase dramatically with the increasing number of nodes, which would damage performance.
We have, instead, extended the R connections API so that one can work with server socket connections directly, re-using them for accepting multiple socket connections:
serverSocket(port)
socketAccept(socket, blocking = FALSE, open = "a+",
encoding = getOption("encoding"),
timeout = getOption("timeout"))
serverSocket
creates a listening server socket connection, which is of a
new class "servsockconn"
. socketAccept
accepts an incoming connection
to the given server socket. socketSelect
can be used on a listening
server socket to wait for when a connection is ready to be accepted. The
server socket connection can be closed by close
as usual.
The operating system has a queue of partial and established incoming
connections. Established connections are ready to be given to the
application via accept()
(socketAccept()
). This queue has a limited
length, which one can influence via an argument to listen()
, but it is
only a hint and one cannot programmatically find out how large the queue
really is. The operating system may impose a limit and make that queue
shorter. We have modified R-devel to use the maximum length supported on
each system, but there is no guarantee it would be enough for all nodes.
The cluster setup hence needs to be behave correctly when the queue is too
short.
TCP uses a 3-way handshake when establishing a connection. In the normal
successful case when the queue is not full, a node sends SYN
packet, the
server puts the connection into a queue and responds with SYN+ACK
, the
node then gives the connection to the application as established (via
connect()
/socketConnection(server=FALSE)
) and sends ACK
to the server.
The server then flags the connection as established (some systems put it to
a different queue) and gives it to the application (via
accept()
/socketAccept()
).
When the queue is full, a number of things can happen. Linux has
effectively two queues, one for already established connections and one for
partially established ones. Only the size of the established queue is
influenced by the backlog argument to listen()
, and when that queue is full,
Linux already decreases the rate of adding connections to the
partially-established queue. It may simply drop a SYN
packet and not put
an incoming connection to the partially-established queue. The TCP layer
on the node will retry a few times, re-sending the SYN
, but eventually give
up, send a RST
(reset), and the connection will fail. It is hence
necessary to keep the code for retries after failure in the nodes even with
the new server socket API.
Moreover, it can still happen even on Linux that an ACK
from the node is
received on the server, but the established queue is full. Then the server
may send a RST
to the node and the node will fail, because it is already
blocked waiting for commands from the server on a connection it believes is
already established. A similar situation arises when the server drops the
ACK
packet, but keeps the connection in the partially-established queue.
It may then after a timeout re-send SYN+ACK
to the node, the node re-sends
its ACK
, and eventually the connection may really succeed on the server or
the server may send a RST
and remove it from the partially-established
queue. More information on how Linux implements the backlog is available
here.
An additional complication is that the server may drop the ACK
from the
node and remove the connection from the partially-established queue.
Empirically we have seen this during a stress test when the connections from
the client were old, and the number of occurrences increased with the age of
the connections, so presumably this is after a timeout. Consequently, the
node then does not receive a RST
and will keep waiting for a command from
the server indefinitely.
This situation is known as half-opened connection and can arise in various ways in TCP communication. It would be resolved by the TCP layer if the node started communicating over the connection, but in the PSOCK protocol, it is the server that starts communicating, so this issue needs to be handled specially.
We have thus changed the cluster setup procedure so that the server, as soon
as it gets a connection from the node, sends an initial command to the node
as a handshake. The node waits for such initial command during the setup
phase and if it does not get the command in some time (half-opened
connection closed on server) or if the waiting fails (it gets a RST
from
the server), the node re-tries establishing a connection to the server and
waiting for a new handshake. This does not change the wire protocol: the
handshake is just a regular command and the node runs it and sends a
response.
The R 3.6 sockets API allows to define a timeout for a socket connection at creation time. The timeout then influences most operations on the socket (applies individually to low-level operations, but the R-level functions may wait somewhat longer in total). PSOCK clusters use connection timeouts of 30 days: if there is no command from the server (e.g. the server crashes or connection is lost) to the node within that time, the node exits on its own. For the handshake during cluster setup, we would need a much shorter timeout. We have hence extended the API to allow modifying a timeout of a socket connection:
socketTimeout(socket, timeout = -1)
This new function allows using a short timeout during the handshake and in
socketConnection
invocation on the node, which was previously
effectively blocking (blocking on Linux due to a select() bug, timeout of 30
days on other systems). After the handshake, socketTimeout
is called to
increase the timeout again to 30 days.
The server initiated handshake in addition to helping with half-opened connections opened on the client only (via the timeout) also gets rid of any half-opened connection opened only on the server. The TCP layer will detect those and fail when the server starts communicating. We have observed these as well during our initial stress testing.
In R-devel, the new parallel cluster setup is used by default for
homogeneous clusters with all nodes running on localhost
, when all nodes are
started automatically. The original sequential startup code is still
supported for the other cases. There is a new cluster option
setup_strategy
with values "parallel"
and "sequential"
. "parallel"
is the default and tells R to use parallel strategy on all cluster where
supported.
Several issues of the R socket layer implementation were fixed as part of this work.
The connection timeout with socketConnection(server = FALSE)
on Linux is
now enforced. Before, the call was accidentally blocking due to
Linux-specific behavior of select()
.
socketConnection(server = FALSE)
on Windows now returns immediately when
the connection fails. Before, one had to wait for a timeout to expire due
to Windows-specific behavior of select()
when waiting for a connection.
socketConnection(server = FALSE)
now detects when a connection is
available right away without waiting (probably unlikely and only possible on
a localhost
) and returns it. Previously, R would wait anyway and attempt to
connect again, possibly leaking the connection.
socketConnection(server = TRUE)
(and socketAccept()
) now enforce the
connection timeout. Previously, they could block indefinitely due to a race
condition when a connection seems available to be accepted by select()
,
but is re-set by the client by the time accept()
is called. On some
systems, accept()
would then block. On other (we triggered this on Solaris),
accept()
would then fail; after the change, R will keep waiting for a good
connection respecting the timeout.
Read operation from a socket is now robust against spurious readability of
the socket (select()
reports it as readable, but then e.g. due to an
invalid checksum recv()
would block). This problem may happen on Linux.
Write operation to a socket is now robust against spurious writeability of
the socket. Previously, this case could lead to unpredictable behavior.
However, this kind of select()
bug has not been reported on any system to
our knowledge.
The blocking = FALSE
argument on a socket connection (socketConnection()
) is
now respected also on the server side of a socket. Previously, read/write
operations on the server end of a socket were accidentally blocking even
with blocking = FALSE
.
The internal handling of status codes from socket operations on Windows has been updated for WinSock2.