Reliable packet delivery using socket.io

tldr: socket.io does not provide reliable packet delivery, but you can write a wrapper around it, or design your app to be ready for everything.

socket.io seems to be well documented and yet it is poorly documented because it's hard to understand the underlying logic in non-ideal network conditions. It seems like socket.io tries to be a dead simple solution, but what are the constraints and limits of it? They are not well documented. As a consequence, there is a bit of misunderstanding of what socket.io can and can not do.

For example, does socket.io provide a guarantee of delivery? Does it guarantee at least an order of packets?

In this article, we investigate how a packet can be lost when using socket.io.

Also, I'll describe my wrapper library around socket.io to fight its limitations.

Event-driver vs RPC architecture

Event-driven

A loss of a single packet can be tolerated by the fault-tolerant logic of our client-server application.

So, one true solution to all the network problems is to design the application to be ready for anything.

But this is not simple anymore (socket.io advertised point).

RPC

RPC (remote procedure call) is a completely different approach where we try to hide the network and make client-server interop almost transparent. In this case, we write in an imperative (or functional) way, as if there is no network at all (more or less).

The problem is this approach is much more fragile and harder to be done correctly.

One can not completely ignore the nature of our networks, otherwise, even the simplest piece of code could enter an infinite loop of waiting for a client to receive a packet.

Reliable socket.io

I am thinking about some middle ground where we go event-driven but without too much worrying about the network.

So I am writing a wrapper around the socket.io to make it more resilient.

The goals of the library are:

1) try harder to send and not lose packets 2) allow to keep old connection (with all its state) upon reconnection of a client with all its listeners attached and even with all callbacks waiting

The first point should allow a programmer to not worry so much about losing packets and the state of the connection. Less bookkeeping, more magic!

The second point should allow a programmer to not worry about the reconnection of the same client (from the same tab in the same session of the same browser). Should allow writing simpler, more straightforward code, which automatically handles reconnection under the hood.

Besides, I am thinking about callbacks for callbacks to summon callback hell to JavaScript again!

Something like this:

emit('message', {data}, (answer, cb) => {
  if (answer) {
    cb({answer-to-answer}, (answer-to-answer-to-answer) => {
      /* ... */
    }
  }
})

But it is another topic.

The state of emitting in socket.io

Server side (io.emit)

Server-side socket.io's emit function in v4.1 just always returns true:

public emit(...): boolean {
  if (RESERVED_EVENTS.has(ev)) {
    throw new Error(`"${ev}" is a reserved event name`);
  }
  const data: any[] = [ev, ...args];
  const packet: any = {
    type: PacketType.EVENT,
    data: data,
  };

  // access last argument to see if it's an ACK callback
  if (typeof data[data.length - 1] === "function") {
    debug("emitting packet with ack id %d", this.nsp._ids);
    this.acks.set(this.nsp._ids, data.pop());
    packet.id = this.nsp._ids++;
  }

  const flags = Object.assign({}, this.flags);
  this.flags = {};

  this.packet(packet, flags);

  return true; /* <------ */
}

Source line

So we can not know if the packet was sent successfully or not.

But the socket.io buffers everything (even the packets sent before the actual connection is established). Or not?

packet function< in socket.ts:

packet(packet, opts) {
  packet.nsp = this.nsp.name;
  opts.compress = false !== opts.compress;
  this.client._packet(packet as Packet, opts);  /* <-- */
}

just calls _packet in client.ts:

_packet(packet: Packet | any[], opts: WriteOptions = {}): void {
  if (this.conn.readyState !== "open") {
    debug("ignoring packet write %j", packet);
    return; /* <------ oops */
  }
  const encodedPackets = opts.preEncoded
    ? (packet as any[]) // previous versions of the adapter incorrectly used socket.packet() instead of writeToEngine()
    : this.encoder.encode(packet as Packet);
  for (const encodedPacket of encodedPackets) {
    this.writeToEngine(encodedPacket, opts);
  }
}

And this function does not do anything if the socket is not in the open state.

(The socket can be in opening, open, closing, or close states.)

So we can easily lose a packet if the connection drops right before server-side emit.

To avoid it we can first check for socket.connected

if (socket.connected) {
  socket.emit(...)
}

But there is still a probability that the connection is ok while checking for connected but lost right before emit.

connected is changed by socket.io itself, but js is single-threaded, so there is no possibility for connected and readyState to change while we executing a synchronous block of code like:

while (true) {
  console.log(socket.connected)
}

It will always print true infinitely, or it will always print false infinitely.

I tested it by dropping the connection through iptables like this:

#!/bin/bash
sudo iptables -I INPUT -i lo -p tcp --dport 10104 -j DROP
sudo iptables -I INPUT -i lo -p tcp --sport 10104 -j DROP

To undrop:

#!/bin/bash
sudo iptables -D INPUT -i lo -p tcp --dport 10104 -j DROP
sudo iptables -D INPUT -i lo -p tcp --sport 10104 -j DROP

where 10104 is my HTTP port to which socket.io is attached.

So, from the point of socket.io the connection is still there even if it is not really true when we emit with connected guard:

if (socket.connected) {
  socket.emit(...)
}

Let's move further. What writeToEngine does?

_packet(packet: Packet | any[], opts: WriteOptions = {}): void {
  if (this.conn.readyState !== "open") {
    debug("ignoring packet write %j", packet);
    return;
  }
  const encodedPackets = opts.preEncoded
    ? (packet as any[]) // previous versions of the adapter incorrectly used socket.packet() instead of writeToEngine()
    : this.encoder.encode(packet as Packet);
  for (const encodedPacket of encodedPackets) {
    this.writeToEngine(encodedPacket, opts); /* <--- */
  }
}

writeToEngine:

private writeToEngine(
  encodedPacket: String | Buffer,
  opts: WriteOptions
): void {
  if (opts.volatile && !this.conn.transport.writable) {
    debug(
      "volatile packet is discarded since the transport is not currently writable"
    );
    return;
  }
  this.conn.write(encodedPacket, opts);
}

First, it discards volatile packets if they are not lucky to be sent at the same time as normal packets are sending.

Then it calls write on conn.

I believe that it is write from engine.io/lib/socket.js:

write(data, options, callback) {
  this.sendPacket("message", data, options, callback);
  return this;
}

where sendPacket is:

sendPacket(type, data, options, callback) {
  /* ...omitted some parameters normaization here... */

  if ("closing" !== this.readyState && "closed" !== this.readyState) {
    debug('sending packet "%s" (%s)', type, data);

    const packet = {
      type: type,
      options: options
    };
    if (data) packet.data = data;

    // exports packetCreate event
    this.emit("packetCreate", packet);

    this.writeBuffer.push(packet);

    // add send callback to object, if defined
    if (callback) this.packetsFn.push(callback);

    this.flush();
  }
}

It checks readyState for closing and close, but, again, if the connection is lost right before emit, readyState will still be open.

So we indeed buffer our packet if the connection was open before emitting.

You can confirm it by running the app with DEBUG=engine* variable:

DEBUG=engine* node myapp

(This is documented in README.md of engine.io.)

Then you get this log on emit with simultaneous connection drop:

engine:socket sending packet "message" (2["stc","hi",null]) +16s
engine:socket flushing buffer to transport +1ms
engine:ws writing "42["stc","hi",null]" +17s

As we see the sendPacket saves our packet in writeBuffer and then calls flush.

flush:

flush() { if ( "closed" !== this.readyState && this.transport.writable && this.writeBuffer.length ) { debug("flushing buffer to transport"); this.emit("flush", this.writeBuffer); this.server.emit("flush", this, this.writeBuffer); const wbuf = this.writeBuffer; this.writeBuffer = []; if (!this.transport.supportsFraming) { this.sentCallbackFn.push(this.packetsFn); } else { this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn); } this.packetsFn = []; this.transport.send(wbuf); this.emit("drain"); this.server.emit("drain", this); } }

This calls transport.send:

send(packets) {
  const packet = packets.shift();
  if (typeof packet === "undefined") {
    this.writable = true;
    this.emit("drain");
    return;
  }

  // always creates a new object since ws modifies it
  const opts = {};
  if (packet.options) {
    opts.compress = packet.options.compress;
  }

  const send = data => {
    if (this.perMessageDeflate) {
      const len =
        "string" === typeof data ? Buffer.byteLength(data) : data.length;
      if (len < this.perMessageDeflate.threshold) {
        opts.compress = false;
      }
    }
    debug('writing "%s"', data);
    this.writable = false;

    this.socket.send(data, opts, err => { /* <--- */
      if (err) return this.onError("write error", err.stack);
      this.send(packets); /* <--- */
    });
  };

  if (packet.options && typeof packet.options.wsPreEncoded === "string") {
    send(packet.options.wsPreEncoded);
  } else {
    this.parser.encodePacket(packet, this.supportsBinary, send);
  }
}

This calls this.socket.send which has a callback with err argument. But I do not see "write error" message in the log of my test app. Why is that? Maybe WebSocket too has some timeout logic, so it treats the connection as alive for a while. Notice that I drop packets of the connection in my test, I do not close the connection.

(If everything is ok it will proceed with sending the remaining packets recursively. Here it just calls itself.)

After send, write buffer is empty, because we shift-ed it before sending:

$ socket.conn.writeBuffer
[]

So no buffer and the packet is somewhere in WebSockets internals. Or, in ws library internals, the library socket.io uses for WebSockets on server side. (There are also other transports.)

But if I quickly restore the connection the packet will reach the client!

So WebSocket library seems to buffer it.

Interestingly bufferedAmount is zero:

$ socket.conn.transport.socket.bufferedAmount
0

According to ws library documentation, it means that the packet was sent immediately.

Maybe it is buffered at OS, driver, or even network card level now.

But if I wait for some time for the client to disconnect, - for disconnect event to be fired:

disconnect transport close

then the packet is not arriving anymore. I think it is because the client-side endpoint of the connection is closed. So no transmission of data is possible anymore.

As a result, if:

1) we emit data
2) the connection drops at the same moment
3) and the connection is not restored shortly after the loss

then the packet will be lost. And no error listener is activated, through disconnect event is emitted of course.

Client-side

To recap: on the server-side, I found two buffers involved in storing packets:

1) `writeBuffer` of engine.io
2) buffer of WebSocket

But on the client-side there are 3 buffers involved:

1) `sendBuffer` of socket.io
2) `writeBuffer` of engine.io
3) buffer of WebSocket

sendBuffer is used to store packets while we are not connected (this includes reconnection time I believe):

if (this.connected) {
  this.packet(packet);
} else {
  this.sendBuffer.push(packet);
}

So most packets are not added in sendBuffer in normal conditions.

writeBuffer is the real buffer. All packets are going through it.

emit calls packet, packet calls io._packet, io._packet calls write of engine.io:

 function _packet(packet) {
  debug("writing packet %j", packet);
  var encodedPackets = this.encoder.encode(packet);

  for (var i = 0; i < encodedPackets.length; i++) {
    this.engine.write(encodedPackets[i], packet.options);
  }
}

Client library of socket.io - socket.io.js - has this comment in the write function:

// Sometimes the websocket has already been closed but the browser didn't
// have a chance of informing us about it yet, in that case send will
// throw an error

And the following code swallows that error:

try {
  if (usingBrowserWebSocket) {
    // TypeError is thrown when passing the second argument on Safari
    _this3.ws.send(data);
  } else {
    _this3.ws.send(data, opts);
  }
} catch (e) {
  debug("websocket closed before onclose event");
}

debug statement is logging, there would be simply an empty block in production code.

So as I understand if the connection drops just before ws.send, the packet is lost.

Because the sendBuffer is not used while we are connected. And this connected property is just a field in an object: it can not instantly change based on the real WebSocket state at moment.

Next, writeBuffer is cleared

this.writeBuffer.splice(0, this.prevBufferLen);

when all packets are "written" (ignoring whether there were any errors).

And packet is not added to the buffer of WebSocket, because WebSocket already knows we are disconnected.

Or, even if the packet is added to a WebSocket, after the reconnection new socket connection will be established, and these packets will not be resent to the server.

I managed to reproduce this behavior on my machine with socket.io v4.1.3.

I just drop the connection right before emit, and there are no packets either in sendBuffer or writeBuffer, and no packet is received by the server after reconnection.

But again there will be a disconnect event if the connection stays disconnected for a while.

Big packets and timeouts

Another problem is that socket.io will disconnect if one sends a big payload. Because if there are no ping and pong packets for a while - the connection is considered disconnected.

The server-side handles this a bit better - if there are any incoming packets, the timer will reset.

The client-side library wants the true ping packets only. In my testing, I was able to have a disconnection event flooding the client with a lot of packets. I published an issue about it.

For now one can workaroud this by this code on client-side:

socket.on-any(fun(event, data)
  // reset ping timeout on any activity
  socket.io.engine.reset-ping-timeout()
)

But I believe that if the packet is too big, socket.io will disconnect even with this modification. This is a well known issue.

A radical solution to all this can be a second control connection just for ping-pongs. But it looks like a too complicated approach. Another approach is to split payloads into smaller packets to allow ping packets to slip between. socket.io does neither of these.

The problem with big packets becomes more difficult in face of a truly bad connection. In this case, socket.io can infinitely disconnect because of no ping/pongs and force reconnection. And the problem is that we are required to start sending the big payload again over the new connection. With the same consequences. Again, you can add some logic to your application to work around this problem.

How to not lose packets (or lose them not so easily)

One way to fix it - and I remind you that you can just design your app so it is not a problem, - but to fix it, we can make another socket.io atop socket.io with numbering of packets, buffering, and other logic. This makes more overhead and leads to much code, but it seems to me as the simplest solution honestly.

Another way is to somehow get the success/failure status of our last emit from socket.io, or engine.io, or WebSocket, or pooling transport reliably.

Looking at this SO answer on WebSocket's send WebSocket does not expose TCP ACK events, and the same answer correctly points at that TCP ACK is not enough, because it is just a network card, driver, OS-level, not the application itself.

See also this SO answer specifically on socket.io. Seems like we need to do all the bookkeeping ourselves.

So we either build a wrapper around socket.io or make transport replacement.

Bookkeeping

I decided to write a wrapper around socket.io with compatible API (through only the minimum of the API is implemented).

Both server-side and client-side need to do bookkeeping.

We assign a random UUID to every packet.

People say that a sequencial numbering is a way to go, but I do not get it. If the server and the client can reconnect at any time, there can be conflicting situations.

For example:

server sends packet with number 0
client answers with packet 0
but then client reconnects
and sends two packets:
one with number 0
second with number 1

The second 0-packet is a duplicate or a new genuine one? May be sequence numbering can work if we have guaranteed order of packets and treat every connection as a completely new transport. But the goal of the library is to provide a virtual connection that can survive reconnects.

The same is true (from my point of view!) with Date.now() + number. So UUID looks for me like a reliable and simple solution. Sequencial numbering has some good properties, but I do not see how to use it reliably here.

Delivery can be reasonably guaranteed, and the order of packets can usually be preserved. socket.io does not guarantee ordering in the first place. Through it has it over WebSockets. Usage of UUID complicates ordering of packets, so I have not done it.

Loss of packets is possible if, for example, you are flooding the socket with packets the client can not proceed. I did not decide now what to do in this case, but there is a check for too many queued packets (10 000 packets max).

The receiver explicitly acknowledges all packets it receives automatically.

The sender assumes a packet is delivered only if the receiver sent him its ack packet.

There is logic to recognize duplicate packets and answers to them.

We cache the receiver's responses for a while because the line can be broken at any moment, so even if we received a packet from the sender, it does not mean we can safely assume our response will be delivered. And while there is no confirmation of delivery, the sender waits for acks.

For this to work one does need a cache. The cache is needed if we have callbacks (so-called acknowledgments in socket.io). They return data, and we do not want to call callbacks again and again on every duplicate packet.

There is some simple logic to manage this cache to avoid memory leaks. This can too lead to packet loss in extreme situations.

Additionally, we split payloads into smaller packets to allow the continuation of transmission of a payload after reconnection, and to make ping/pong mechanics work better.

All this introduces a significant overhead, but the library is simpler to use as a result. So it is a tradeoff.

As a whole, there is nothing new about the wrapper, but I couldn't find such a library so I wrote one.

And one still can build an event-driven fault-tolerant application on top of this library.

Unfortunately, I have not implemented the ability to chain acks in a callback hell yet.

 

shitpoet@gmail.com