Backpressure and Unbounded Concurrency in Node.js
Posted by Matt Ranney on 16 Sep 2013Backpressure can be a hard problem in both distributed systems and in evented systems. Most busy node.js services will find themselves overwhelmed at some point due to lack of backpressure somewhere. Yes, I know that "backpressure" isn't a proper word. Neither is "performant", but it's a word we want to exist, so it does.
To do backpressure in node, you obviously need streams. Just listen to any talk or blog post from long time node users, and you'll be convinced. Streams are here to help, and they are part of a balanced breakfast. The thing is, streams only fix a small part of this backpressure problem. In some ways, streams may make the problem worse. Certainly streams are better than "not streams", but there are some pretty serious problems here.
Since streams are "fire and forget", it's hard to tell what they are doing once they start. Streams can get stuck for various reasons, and it's hard to detect this. On the Actual Internet, things get stuck or in unexpected states all the time. Gracefully handling and retrying errors becomes difficult if not impossible with streams.
More importantly, when a node process gets busy it is very easy for performance to degrade to where the process is essentially "stuck", even when streams are used as recommended.
Tradeoffs
Before we can fix the problem, we should explore how we got here in the first place. As with anything interesting in life, there are tradeoffs involved. These problems with streams and backpressure are largely because of a tradeoff that node makes, perhaps without even meaning to. Node handles a large number of sockets with a pretty low memory footprint, and node also lets you pretty easily write network programs in a high level language, JavaScript. In exchange for those two awesome features, many of the tricky details of work scheduling are hidden, whether you like it or not. At small scale and for most workloads, this is indeed a feature. By not worrying about threads, scheduling, and socket readiness, you can focus on the logic of whatever application you are writing. It's nice that node maps all of this I/O onto events which are handled by JavaScript, because this eliminates a whole class of potential problems with threads and locking, and there is a long and well understood history of event handling in JS from the browser world.
Node makes it easy to queue work, without any real visibility into whether scheduling this work is a good idea. In fact, node's API explicitly hides the fact that anything is being scheduled or queued on your behalf. Operations that obviously create work are things like: redis.get(), http.get(), fs.createReadStream(), etc. You can happily call these functions without understanding the real costs, and this part of why programming in node is so fun. However, you can run these functions in a tight loop, and they will all still return immediately, but your happiness will be over. If you've never tried this before, it's worth experimenting with. Try making 200,000 HTTP client requests in a tight loop, starting each request without waiting for the previous response.
Internally, streams use the return value of the write() function to know whether they should tell the producer of data to slow down. If write() returns false then the writer wants less work to do, but node still queues the work. It's up to the reader to try to slow down somehow, and when connecting to streams with pipe(), node's internals figure out these details for you. If you are writing directly to a stream, it's interesting to note that returning false is merely a suggestion to slow down. New writes will continue to be accepted without error, and node will queue these writes until they can be processed. This output queue requires memory from the V8 heap for writing Strings or from both the V8 heap and the node process heap when writing Buffers. The memory from the output queue cannot be recovered until the stream finishes, and for queues that live a long time, serious heap fragmentation often results.
The Node concurrency model is kind of like a credit card for computing work. Credit cards free you from the hassle and risks of carrying cash, and they are completely great in almost every way, unless you spend more than you make. It's hard to even know you have spent more than you make until the next bill comes. Similarly, node lets you do more work, and it'll call you back when it's done, whenever that is. You might not even realize that you've been scheduling work, but any time you have an event listener, you must handle that event when it goes off, on the stack where the event was fired. There is no way in node's event system to defer callbacks until the system is "less busy", whatever that means.
This is a subtle but important tradeoff, so it's worth repeating. Any time you have an event listener, you must handle that event when it goes off. This applies to everything from new connection listeners, new request listeners, data events, or anything else. Node will not defer the dispatch of an event when one is emitted.
The Shape of the Problem
When you schedule work in a node process, some memory needs to be allocated, typically some from the V8 heap for JS objects and closures, and some in the node process heap for Buffers or TLS and gzip resources. This memory allocation happens automatically, and it's hard, perhaps impossible, to know in advance how much memory an operation will require, or how much space you have left. Of course the actual size or cost of an operation is probably irrelevant to a program at runtime, but it's interesting to notice where in the system this backpressure problem starts.
If you schedule too much work for a node process, here is how things usually break down and ultimately fail. As work is accepted faster than it is finished, more memory is required. The cost of garbage collection is directly related to the number of objects in the heap, and all of these new objects will steadily grow the heap. As the heap grows, GC cost grows, which cuts into the rate that useful work can be done. Since the process was already unable to keep up, this causes even more work to queue up, growing the heap and GC time in a vicious feedback cycle. Eventually the process spends almost all of its time doing GC and very little time doing actual work. If you are lucky, the heap will grow so large that V8 will exceed its own heap size limits and the process will abort. If you are unlucky, the process will languish in pain for hours, blindly accepting new connections that it'll never be able to service until you eventually run out of file descriptors. Processes will often go into a tight logging loop complaining about "EMFILE", which will then start filling up your disks.
If you've got a node process that's using more memory than you expect, the CPU is at 100%, but work is still proceeding slowly, chances are good that you've hit this problem. If you are on a system with DTrace, you can watch the GC rate or overall performance profile to definitively diagnose this problem. Whether or not you have DTrace, at this point your process is pretty much screwed. You've spent past your credit limit, and now you are stuck paying GC interest payments. Recovering gracefully is nearly impossible, so restarting the process is usually the best move.
But wait, it gets worse. Once a process can't keep up with its work and starts to get slow, this slowness often cascades into other processes in the system. Streams are not the solution here. In fact, streams are kind of what gets us into this mess in the first place by providing a convenient fire and forget API for an individual socket or HTTP request without regard to the health of the rest of the system.
Concurrency
The Hardest Problem in node, in my view, is unbounded concurrency. Most tricky problems that we've tracked down at Voxer have ultimately been problems of too much concurrency. This is a kind of backpressure problem, but at a different level than streams address. Node makes it very easy and relatively efficient to manage high concurrency driven primarily by network events, which is why we use node in the first place. There just aren't good ways to limit this concurrency before it causes problems. Node's unbounded concurrency might also cause problems in other systems it touches, like disk drives and databases. Even if node itself isn't out of memory or CPU, it can easily overwhelm these other systems.
The trick is, when resources are exhausted, something, somewhere, has to give. As demand increases, you can't magically get more performance forever. At some threshold, some users somewhere will need to get an error response like an HTTP 503 in order to slow things down.
One way to limit concurrency at the inbound edge is to throttle incoming connections by setting net.maxConnections on the server object. This will cause node to accept then immediately drop any connections once the overall connection count goes beyond a certain level. Another way is to change the server socket's listen queue of unaccepted connections. Both of these mechanisms are crude and difficult to tune appropriately. As a new connection is being accepted, it's hard to distinguish a brief spike of requests from a generally overloaded state. These inbound limits also do not interact properly with clients that use keepalive, which is handled inside of node at a complex layer that most applications would wisely not want to interact with. Also, the retry logic of different clients is often surprising and bad when connections are dropped. For example, if a connection is dropped right away, many clients will immediately retry, which only makes the problem worse. If a new connection languishes in the accept queue for too long, many clients will try again with a new connection, which also exacerbates the problem.
To limit incoming work, a good first step is some kind of site-wide rate limiting, by IP address, user, session, or hopefully something meaningful to the application. Many load balancers can do rate limiting in a way that's more sophisticated than node's incoming server limits, but they usually don't notice a problem until your process is already deep in trouble.
We've implemented distributed rate limiting in node using some custom code and some Redis servers, and it works OK. It's only slightly more effective than what a load balancer might do, but it does often prevent us from runaway feedback loops caused by our own software.
Some Solutions
The only real solution to these concurrency problems is to limit work before it is scheduled. Since all event handlers must be serviced when they fire, we must handle these new requests, and then make sure to not schedule any additional work as a result. We can also hopefully send something back to the clients to make them slow down.
To put some bounds on concurrency across the entire process, we need to know if we are too busy before scheduling new work. If we are too busy, then we'll have to signal upstream somehow to slow down. In most cases, this looks like an HTTP 503 Server Busy response, which is inconvenient for the user, but something has to give.
HTTP
HTTP is our primary protocol for both client to server communication across the Internet as well as for server to server communication across the datacenter. Node is pretty good at HTTP, especially as a server. The node HTTP client has a number of deficiencies, and it's with the HTTP client that we've spent most of our time addressing backpressure. The module we've written at Voxer to solve this problem is called poolee, and it acts like an in-process load balancer with concurrency controls. poolee will dispatch HTTP requests to a set of equivalent servers, balancing the load and doing real HTTP client keepalive. We make a pool for every service that a process wants to use, including Riak, since it has an HTTP interface. Failed requests are retried if possible, and "stuck" requests are handled as well.
When the count of pending requests to a given pool exceeds its preconfigured limit, the request is failed immediately. This helps save both the current process, the downstream service, and any upstream services. Unfortunately, it does require some careful tuning that's specific to your application and environment, but at least the controls are there.
This brings up another problem with streams, which is recovering from errors. If a request fails for whatever reason under normal load conditions, it is useful to retry that request. If the request is handling a stream, it is generally impossible to retry. When using streams, errors must be propagated all the back to the origin. Obviously there are some requests that only make sense as streams, and their errors must go all the way back to the clients. But for requests that are small enough to be buffered by intermediate layers, it's much better to not use streams. With poolee you can use either streams or complete requests. With complete requests, they will be automatically retried when errors are encountered. This helps minimize the impact of rolling restarts, crashing bugs, and rare or mysterious behavior.
Redis
Since it's an in-memory database, Redis is usually plenty fast and not the obvious bottleneck in an application. Unfortunately, sometimes Redis is slow to respond due to swapping on the server or using expensive operations like SINTERSTORE. When your Redis server starts responding slowly, things can get behind in your application pretty quickly. Many applications are designed with the assumption that Redis is always fast, so many operations typically need to be processed before responding to a client. An impatient user can retry a slow request, but there's no way to cancel an in-flight Redis request, so the retried request will consume even more resources.
The depth of the request queue is exposed by node_redis both as a number (if you care to look) or as the boolean return value of every command. A return value of false means the command queue is "too full" and you should probably slow down. This return value is confusing to many users who expect a synchronous response to a command. Sorry about that.
Checking every single request into Redis is kind of a pain. Most people don't ever assume that Redis can be slow or bad, and it usually isn't. Therefore, it may not be worth the additional complexity in your code to handle these conditions. As the number and size of the Redis servers in our infrastructure has grown, we've had many cascading failures from a single Redis server being slow.
Conclusion
Using poolee is one way to guard against this problem if your primary RPC mechanism is HTTP. If you are talking to other databases with non-HTTP interfaces, you'll need to apply a similar detection and early exit mechanism, probably somewhere in the client library.
All systems degrade differently under stress. The convenient abstraction that node provides can make it easy to fall into traps if you aren't careful. There are ways to avoid these traps, but you probably need to fall into them a few times before your system is tuned properly to avoid them.
If you haven't yet hit these unbounded concurrency problems, it can be hard to know where they'll crop up and where you can most effectively stop them. I recommend graphing the V8 heap size, process heap size, server connection count, and file descriptor counts at least at 1 minute resolution. While this won't prevent the problem, at least you'll know where to look when things go wrong.
 Voxer Engineering
Voxer Engineering