cft

Oldies but Goldies: Scaling Erlang

This article was first published at the Inaka Blog on October 07, 2011(It's mostly unedited, so some stuff might be already outdated)


user

Brujo Benavides

2 years ago | 21 min read

So, since TealFeed allows me to republish old articles, I thought I would start rescuing some very old ones that were lost when the Inaka Blog was closed. You can still find them all at the webarchive, but I thought it would be a good idea to reshare them here for visibility. This is the first one… Enjoy!

-Who is that “Fernando” guy, I don't know-

One of the most common reasons why people choose Erlang is to build highly scalable systems. And Erlang does a great job helping developers reach those goals. But creating a scalable system is not a matter of just writing it in Erlang.

At Inaka, we usually had complex systems written in Erlang or Ruby and for each one of them, at some point, we need to be sure that they could handle many concurrent users. We've spent a lot of time testing and improving systems and we've learned a lot from that process. I would like to share part of this knowledge in this post. To help in understanding the process and the reasoning behind our techniques, we'll create a sample project and we'll walk through our usual scale test plan with it.

The Sample Project (Match Stream)

In order to provide real examples and work on something concrete, we've created a sample project on GitHub. MatchStream is a very basic back-end for an application like ESPN's MatchCast. The user chooses a match and MatchStream keeps him up-to-date on its score and teams and provides a report of what's happening on the field.

On the other side, somebody that's actually watching the game (we call him "the watcher") is supposed to be registering events in the system. Since that is not our main concern, we've created a fake_watcher module that just reads events from a file.

To make our lives easier, MatchStream clients work over plain TCP connections with a very simple protocol. Here is a sample client session (the first line is sent by the client, the rest of it is the server response):

VERSION:1:CONNECT:elbrujohalcon:MATCH:elp-tig-2011-09-10

2011-09-13 13:48:48: status:

home: <<"elp">>

home_players:

Albil (25)

Mercado (14)

Desabato (2)

Cellay (3)

Dominguez (27)

Veron (11)

Fernandez (10)

Sanchez (5)

Gonzalez (19)

Fernandez (18)

Boselli (9)

home_score: 0

visit: <<"tig">>

visit_players:

Garcia (12)

Casteglione (13)

Echeverria (21)

Blengio (3)

Diaz (15)

Castano (5)

Martinez (6)

Leone (11)

Morales (10)

Luna (7)

Maggiolo (9)

visit_score: 0

period: first

2011-09-13 13:48:51: goal:

player: Luna (7)

team: <<"tig">>

2011-09-13 13:49:03: penalty:

player: Martinez (6)

team: <<"tig">>

2011-09-13 13:49:04: card:

player: Albil (25)

card: red

team: <<"elp">>

2011-09-13 13:49:05: substitution:

player_out: Fernandez (18)

team: <<"elp">>

player_in: Silva (21)

2011-09-13 13:49:07: card:

player: Desabato (2)

card: yellow

team: <<"elp">>

2011-09-13 13:49:08: goal:

player: Morales (10)

team: <<"tig">>

...

Finally, the system provides a RESTful API (built on mochiweb) that lets users get information about the available matches.

The initial project architecture was quite simple. The documentation is on GitHub and here is the graph that appmon shows when it's running

MatchStream's supervision tree

As you can see, Match Stream was designed as a regular OTP application, with one main supervisor (match_stream_sup) that managed three other supervisors (match_stream_match_supmatch_stream_user_sup, and match_stream_client_sup) and three workers (match_stream_webmatch_stream_db, and match_stream_client_listener).

On the server-side, new matches were stored using match_stream_db. We implemented it using Redis through our own fork of erldis. We decided to use it as a database just because it simplifies our samples, but since all persistency is abstracted behind match_stream_db, it is easy to switch to a different technology.

When a match starts (and it may be started with any event), a new match_stream_match process is spawned under the supervision of match_stream_match_sup and it then starts a linked gen_event manager to dispatch events.

On the other side, the application listens for client TCP connections using match_stream_client_listener (a non-blocking TCP listener implemented as seen in this article). Each connection is handled by a match_stream_client process. When a client sends the connection message to the server, the match_stream_client process notifies the corresponding match_stream_user process, if it's not already started, match_stream_user_sup starts it right away. The user process then subscribes (and links) itself to the match event manager. It will then send every event it gets from the match to the associated client process, which will in turn format it and send it over the TCP connection.

Finally, there is a mochiweb server called match_stream_web that processes API requests from web users. The users should use this API to know which matches are available before trying to connect to them.

Throughout this post, we'll describe a series of improvements. Checking its commits on GitHub, and starting from this commit, you can follow the steps described in this post.

Now that we introduced the system, let's start scaling it!

The following sections will represent the five stages that make our scalability procedure. Each of them starts by defining our goals for it and the tools we'll need. Then we describe the steps involved in reaching the goals. If possible, with each step you'll find the related commit on MatchStream so you can see an example of how to implement what the step describes. Finally, we show you our results (i.e., how much our test system improved with the proposed changes). It's important to notice that the procedure is rather generic, even when the examples are focused on our sample project.

1. Is it really working?

Goals

This is the first stage, where we set the grounds for the next ones. At this point, we want to be sure the system is working correctly and we want to be able to describe standard user interaction paths. At this stage, we should build the user simulator, a fundamental tool for all the remaining stages.

Tools

  • An HTTP logging system.
    • We usually install all our web apps behind Nginx, if only to be able to collect and analyze its logs, but also because Nginx's connection handling semantics, basic load balancing, and support for compression and SSL connections are so easy to configure.
  • Logging tools.
    • We will need to direct the server's own logs as well as those generated by SASL to file(s) in order to check them later. We generally use elog for this since we know it well, and since we can compile it out of modules we don't need to log for speed, but you may want to try using Lager or any other such application.

Steps

  • Manual test: trying the system by ourselves.
    • We need to be sure the system is working so we just use it.
    • In our case (for MatchStream), we start a match using the fake_watcher module, run $ curl http://localhost:8888/matches in another console to verify that the match is registered, open a telnet session, type in the necessary parameters and watch the match flow before our eyes until it ends.
  • Improving the logging mechanisms.
    • We add the corresponding rebar dependencies and turn all system logging into our chosen logger. (related commit)
    • We run the test manually once more but now we check the generated log files instead of the server console. It should give us enough information to see that everything is working as it should.
  • Creating the simulator.
    • This must be a process as simple as it may be that reproduces the standard user behavior.
    • It must not be smart: it doesn’t matter if it understands the information it receives, it just needs to connect and get the events.
    • The main requirement for this piece of code is that it will emit an alert if it can’t connect or gets disconnected too early. It will also show a success message if it manages to do what a regular user expects to do when he uses the system.
    • Another important requirement is that it must be able to run without any internal knowledge of the server. In other words, this component is allowed to know about the server only those things that any possible client knows.
    • For our sample system, we call it fake_client and we place it on the test folder. (related commit)
    • As you can see, at this point its main function just connects with the server and loops until the connection is closed. It's as simple as it may be, it detects incomplete matches and events as well as connection drops, but nothing else. We'll eventually add functionality to it if we need it.
    • After that, we use the simulator to run a single test, on its own, just to verify that it's working as expected.
    • We ignore web requests in this stage, we're going to test them later.
  • Testing with the simulator.
    • We run the simulator along with us.
    • We start a match on the server, then we start a simulator, and, at the same time, we open a telnet session and watch the match.
    • As the last test, we run a bunch of simulators together. We're not trying to test the server's ability to scale, but the simulator's ability to run multiple instances of itself, so we run just four or five of them.
  • Checking the user interaction paths.
    • We want to be sure the simulator behaves like a regular user.
    • For MatchStream, that's not really difficult because user interactions with the system are simple: the user connects, sends a message, and receives responses until the server hangs up and we know that's exactly what we coded in fake_client. But in more complex applications, you may need to check other stuff (e.g. the user may usually disconnect and connect again, the system may have different kinds of clients, etc.). In such cases, it's useful to detect and compile a list of common user interaction paths and then make sure you test all of them in the following stages.
    • It's also important to determine the massive user interaction paths: All users tend to connect at the same moment? Do they usually disconnect together? How will the connections be spread over time? etc.
    • To gather the info we want, we make sure nobody else is using the system, then we clean all the logs and reproduce standard user interactions (in our case: call the API, connect, watch a match, wait for the end of it, disconnect) by ourselves.
    • We move server log files (describing the user interaction during the game) to a different folder. Then we let simulators watch a game. After that, both log files (i.e. human interaction logs and simulator interaction logs) are compared. They must be very similar, except for the API part. If they’re not, the simulator is modified to reduce the differences and testing goes back to previous steps until we're satisfied.

Our Results

In each of the previous steps, we checked the server to see if it was working before, during, and after the match. When we found errors (and you can see from the commits we've found some of them), we fixed them and we tested again. After a while, we were pretty sure everything was working as expected.

2. Finding the Boundaries

Goals

Now that we have our system working and a good simulator to help us with the tests, we want to know how good it is. At the end of this stage, we want to be able to tell (in numbers) how many concurrent users can be handled by the system.

Tools

  • Two computers: one for server, the other to run simulators.
    • This way we make communications between server and clients go through the network and not just inside the machine. Also with this schema, if we hang our server machine, we can still check the results on the clients.
  • A multi-ssh tool.
    • It proves useful because we'll need to run many things on both computers at once.
    • You can use csshX on OSX or mussh on Linux.
  • An HTTP load tester.
    • Like ApacheBench or Tsung.
    • The right choice will depend on how complicated your HTTP user interaction paths are.
    • For MatchStream, ApacheBench seems to be the best choice.

Steps

  • Base Numbers.
    • The idea is to start with a small number of connections and then keep increasing until we reach the maximum available. We usually start with something really small (like 16 users) just to be sure. Let's call that number N.
    • Another important number that we will call C is the number of concurrent attempts (they may be TCP connection attempts or HTTP requests). We'll try to increase it along with N to represent the user interaction paths we gather in stage 1 in the most accurate possible way.
  • HTTP Test.
    • We create an HTTP test based on the user interaction paths.
    • In our case, it's just a shell script with 2 ApacheBench calls. (related commit)
  • Running the tests.
    • We run N HTTP simultaneous tests, then N simulators, and then both together.
    • At the same time, one person is using the system himself to have a visual feeling of how it behaves.
    • We do it this way to be able to detect if the problem is the web, the TCP connections, or the fact that they both run together. If your project has other different types of connections you should test each one of them alone at first and then everything combined. Don't forget to have somebody actually using the system at the same time: that's how you know it is in fact working.
    • The bigger the N, the more client machines we need. A good practice is to create an image for a machine that’s working and then replicate it as many times as needed. Amazon servers, combined with csshX or mussh are great for that.
    • If everything worked and we were able to use the system correctly, we go back to the previous step but this time with a bigger N, say 2N, and a bigger C if possible.
    • On the other hand, if something failed, we mark the current N as our current limit. We will use that value later, and we will come back here as many times as we need until we get N to be higher than the number of users we expect our system to handle.

Our Results

We needed to make some changes to our tests, With those, we discovered that our MatchStrem server could handle up to 1000 concurrent TCP connections but no more than 4 at a time. Not a really good result. We also discovered that our web API can handle 2048 (128 at a time) with no errors. With higher values, the history API call fails a lot. That's something we definitively wanted to improve, too.

3. Blackbox Tests

Goals

In our experience, many scaling problems are not related to the system itself as they're related to the environment in which it's running. In order to get the best from your system, you need to tune in the machine, the operating system, and the virtual machine in which it's running. When this stage is through we expect to have a set of servers and nodes properly configured to run at their optimal performance. It’s not a goal in this stage to make any code changes - only to tune the deployed system.

Tools

  • A privileged account (e.g. a sudoer on Linux systems) on the server.
  • Tools for checking server status.
    • Tools like htop, watch, netstat, etc. will be very helpful.

Steps

  • Kernel Variables.
    • We check server kernel variables to see if their values fit the system needs. A non exhaustive list of commands for *NIX servers follows (we use all of these in our production environments):

# Increase the ipv4 port range:

sysctl -w net.ipv4.ip_local_port_range="1024 65535"

# General gigabit tuning:

sysctl -w net.core.rmem_max=16777216

sysctl -w net.core.wmem_max=16777216

sysctl -w net.ipv4.tcp_rmem="4096 87380 16777216"

sysctl -w net.ipv4.tcp_wmem="4096 65536 16777216"

sysctl -w net.ipv4.tcp_syncookies=1

# This gives the kernel more memory for tcp which you need with many (100k+) open socket connections

sysctl -w net.ipv4.tcp_mem="50576 64768 98152"

sysctl -w net.core.netdev_max_backlog=2500

# This set the tcp max connections

sysctl -w net.netfilter.nf_conntrack_max=1233000

  • Open files limit.
    • For TCP connections and also for HTTP connections, servers usually require higher than usual numbers of simultaneously open files. In other words, the value returned by the next command should be high enough: $ ulimit -n
    • If it’s not (the default value is usually 1024), it must be set up to a bigger number, for instance running the following command: $ ulimit -n 999999
  • Erlang VM parameters.
    • We verify the command line parameters we pass erl when we start our system nodes.
    • Erl presents a long list of command-line options, we usually need to use just a few of them (+P to have a higher number of processes, +K to enable kernel poll)
    • In our project, we have the parameters in Makefile, so we only need to modify that file.
    • We tweak a parameter at a time, running Stage 2 again with bigger values for N until we're satisfied.

Our Results

With all the tweaks, we got our N up to 4096 but we couldn't change the fact that we couldn't connect more than 4 clients at a time. We deal with that in the next stages.

4. Erlang Tuning

Goals

Now that we covered all aspects but the system code itself, it's time to focus on it. This stage tends to be unique in every project. There may be a lot of different reasons why a process or a function may become a bottleneck and a lot of ways to solve each problem. This stage is usually the longest one, so prepair yourself to spend days (or even weeks) with it. We don't put our assumptions about which will be a problem here until we start this phase. Then we iteratively work on this section, expanding it and listing our findings. However, over time we have compiled a list of classic problem classes that we usually encounter. We show you the list after the steps. The goal of this stage is to be as sure as we can that we have nothing else to tweak in the system in order to improve its performance.

Tools

  • An Erlang top-like tool.
  • Ad-Hoc Erlang functions or processes.
    • We'll need helpers to analyze the load on different processes, nodes, etc...
    • These may range from properly inserted log lines with time measures to processes that keep an eye on some Erlang measures and traces, etc...

Steps

  • Checking Message Queue Lengths.
    • We start an instance of the top-like tool of choice (we’ll call it top to abbreviate).
    • We sort lines by message queue length in order to detect processes with long lists of unprocessed messages. These processes may or may not be stalling the system, but they're a great starting point for further checks.
    • We run the steps from stage 2 with the last N that worked fine checking the top console to see if there’s any process(es) with increasing message queues.
    • We run the steps from stage 2 with the first N that didn't work checking the top console, too.
  • Checking Memory.
    • We repeat the same procedure as the previous step but this time keeping an eye on Memory instead of Message Queue Length.
  • Fixes.
    • In the previous steps, we should have found a group of processes to analyze.
    • First, we need to find the associated modules for them.
    • If the process is registered with a name, finding the module is easy. If it's not we usually get enough information to describe it running erlang:process_info(PID). on the server console.
    • Once we've found the module, it's also useful to run erlang:process_info(PID, [messages]). to see which messages are increasing the process message queue or just erlang:process_info(PID). to see how the process is using the memory.
    • Then we try to "fix the problem". We can't describe exactly how to do it, but we have a list of tips and tricks that worked in the past...

Tips & Tricks

  • Mnesia startup delays.
    • When Mnesia databases grow, it takes them longer and longer to boot up.
    • If this happens, it's time to consider moving from Mnesia to other DB engines.
  • Timers.
    • Timers created using the timer module are usually worse for performance than those created with functions like erlang:send_after.
    • When we detect this kind of stuff, we change those we find appropriate.
  • TCP connection backlog.
    • The default backlog value for listening TCP connections (both on listeners and websites) is 5.
    • This may be too low if you want to handle lots of concurrent connections.
    • We increase it, usually over 128K.
    • For MatchStream, this effectively removed our C limitations. (related commit)
  • gen_event supervised handlers.
    • We've already written a blog post about this.
    • gen_event managers are linked to all their gen_event supervised handlers, thus multiplying the delivery of termination messages exponentially.
    • We replace calls to gen_event:add_sup_handler/3 with proper erlang process monitoring tools. (related commit)
  • Logging too much.
    • A crowded logging system is usually a bottleneck.
    • We always have different logging configurations for production and development and spend a considerable amount of time deciding which events must be logged and how. (related commit)
  • gen_servers timing out on calls.
    • Among those processes with long queues, there are usually gen_servers stuck with messages that are mostly calls of the same kind (i.e. they’re handled by the same handle_call clause).
    • It’s convenient to check them and see if that clause implementation may be divided into two parts: one that must be executed on the main gen_server process because it affects its state, and the other that does not affect the server state and therefore may be executed in an ad hoc process spawn_link’ed for it.
    • That child process must include a call to gen_server:reply/2.
    • handle_call/3 in match_stream_db is the perfect example for that. (related commit)
  • Unregistered processes.
    • Sometimes processes are not registered, but their PIDs are kept in dictionaries or ETS tables monitored by other processes.
    • Keeping track of those PIDs in this way can create too much work, and it creates potential failure points in the system.
    • We register those processes with a dynamic but consistently built process name to make them easier to find (i.e. with no need for a gen_server call) by the processes that need to. (related commit)
  • Too few outbound TCP connections.
    • Outbound TCP connections (usually, connections to DBs) are generally limited (e.g. just one connection per application).
    • If there's no real need for that, having a proper controlling process in front of them, systems may benefit from using multiple connections to the same database.
    • That's exactly our case in MatchStream when connecting with Redis. (related commit)
  • gen_servers consuming too much memory.
    • If we detect gen_servers with large memory footprints, we consider making them hibernate by returning {reply, Reply, State, hibernate} to calls and/or {noreply, State, hibernate} to casts or infos.
    • This reduces process footprint and, unless the process is constantly in use, is bearly noticeable otherwise.
    • In our sample project, we changed two gen_servers this way. (related commit)
  • Overcrowded supervisors.
    • This is a critical design change that also sets the foundation for cross-node monitoring
    • If the process with a long queue is a simple_one_for_one supervisor that’s required to start/stop many child processes at once, we consider the possibility to turn that supervisor into a supervisor tree.
    • To do that, instead of the current child processes, the supervisor must have a (maybe long) list of supervisor children.
    • Every one of those new supervisors behaves exactly as the original one is behaving now and the function that currently calls supervisor:start_child/2 (if there’s not just one function that does that in the current supervisor module, then it should be) will use a randomly chosen supervisor child as its first parameter (e.g. supervisor:start_child(list_to_atom(“module-name_” ++ integer_to_list(random:uniform(#ofSupervisors))))).
    • In MatchStreamer, we've implemented this strategy on the match_stream_user_sup hierarchy. (related commit)
  • gen_servers taking too long to initialize.
    • If we detect gen_servers that take too long to initialize (i.e. their init/1 takes too long), we try to reply {ok, State, 0} there and then implement proper initialization on handle_info(timeout, State).
      • Note from the Future: Now you can use continuations (i.e. handle_continue/2 for this exact purpose. No hacking needed.
    • We implemented this trick in match_stream_user. (related commit)
  • Long delivery queues.
    • One problem usually found on gen_events is that sometimes the publisher takes too long to deliver messages to all its subscribers.
    • If that’s the case we consider the addition of repeaters that let subscribers be distributed.
    • This works in the same way as the previous item works for supervisors.
    • We didn't need this for MatchStream but we show you a gen_event_repeater here, just in case.
  • Too many connections on one inbound TCP port.
    • This may be seen on TCP listeners or websites.
    • When being able to listen to too many connections at once is an issue, we try to set things up so there are multiple open ports for each service and then we place a load balancing mechanism in front of it.
    • For websites, we implement load balancing using a proper Nginx (or we could use Varnish) configuration (defining a proper upstream using ip_hash) in front of the website.
    • For TCP listeners, we implement a new API call that tells the client which TCP port to connect to. (related commit)

Our Results

After all the tweaks, we got our N up to 64000 and, with the TCP backlog and the multi-port change we made our system able to connect over 8000 clients at a time. We have similar results on the web side with one server node.

5. Adding Nodes

Goals

One of the greatest things when scaling Erlang projects is how easy it is to just add a new node to the cluster to make it run better. With a well-developed system, it's fairly easy to do so. Our experience shows that the hardest one is the second node: once you've managed to have your server running in two nodes, to keep adding nodes on demand is straightforward. Those nodes may or may not need to be connected and they may or may not run in the same machine. At the end of this stage, we want to know how much the system improves in terms of performance with the addition of new nodes and we expect to have the system ready to let us do that whenever we need it. At this point, we should know the number of users that we are able to handle with one server instance. One important thing to keep in mind is that if we find any code improvements, we should go back to the previous stage and retest the system with just one node before testing with many of them.

Tools

We won't need any extra tools for this stage, but we'll need at least two computers to work as servers.

Steps

  • Preparing the system to run on many nodes.
    • We make the code changes needed to be able to run more than one instance of the server using the same binaries.
    • For instance, we move configuration values from hrl files to application environment variables.
    • We change our startup mechanisms in order to ease this task. (related commit)
    • In order to add interconnected nodes, we determine which processes should be run just once (i.e. the system must have just one instance of them) and which ones may be started once in each node. How to make sure that happens involves per-system decisions, but usually pg2 and/or global process registration come in handy.
    • For MatchStream, since it's a highly parallelizable system and users and matches are totally independent of each other, almost every process may run in every node.
    • One exception is match_stream_db: even when it may have different running copies, we don't want it to duplicate data. Then we split its functionality in two: a reader that may run in every node and a writer that needs to be unique. (related commit)
    • On the other hand, we want match_stream_match processes to run independently in each node, but we want exactly one of them in every node for each live match. So, we need to start and stop them together. We use pg2 for that. (related commit)
  • Interconnected instances.
    • We start two interconnected server instances in the same machine and run the tests from previous stages against them just to see everything works.
    • We start adding new nodes and running the tests again until we hit a limit (i.e. adding another node doesn't improve performance at all).
  • Independent instances.
    • Sometimes it's better to just run independent instances of the system instead of interconnected nodes.
    • In those cases, it's important to consider which external resources (e.g. databases) should be shared or not.
    • In our case, that would be painful for the watcher since he'll need to update many nodes at once but if we come up with an interface (say, a simple website) that does that for him, we can effectively increase our system capacity by just booting up new independent servers.
    • We start two independent server instances in the same machine and run the tests from previous stages against them just to see everything works.
    • We start adding new instances (in the same server or in new ones) and running the tests until we hit a limit (i.e. adding another node doesn't improve performance at all).
  • The optimal configuration.
    • At this point, we should be able to state the optimal configuration for the system: how many servers we need to start, with how many interconnected or independent nodes are handled in each of them, to handle the number of users we want and how much can we later improve performance by adding another interconnected or independent node to the cluster.

Our Results

For MatchStream we only tried with interconnected nodes and we found that, with this configuration, every new node adds capacity for 25K extra users without problems until the computer that holds them runs out of memory. They may be able to handle greater amounts of users, but 25K is a good approximation at a comfortable state. We started 3 nodes on our test server and we were able to effectively handle 75K concurrent users, which was enough for us.

Summary

Along the lines of this (maybe too long) post we tried to describe something that actually happens a lot in our company, a process that we repeat (and usually improve) for each of our projects. Designing highly scalable systems is not an easy task, but it is one of the most interesting and challenging tasks we do. And it's a job that begets the greatest rewards when it succeeds.

With this process, we managed to make one of our systems that choked with just 1K users at a time able to handle 100K users on just one server. I hope this article helps you achieve even more significant results.

Upvote


user
Created by

Brujo Benavides

I'm a man with a one-track mind, so much to do in one life-time.


people
Post

Upvote

Downvote

Comment

Bookmark

Share


Related Articles