Hi, I am Saša Jurić, a software developer with 10+ years of professional experience in programming of web and desktop applications using Elixir, Erlang, Ruby, JavaScript, C# and C++. I'm also the author of the upcoming Elixir in Action book. In this blog you can read about Erlang and other programming related topics. You can subscribe to the feed, follow me on Twitter or fork me on GitHub.

Outside Elixir: running external programs with ports

| Comment on this post
Occasionally it might be beneficial to implement some part of the system in something other than Erlang/Elixir. I see at least two reasons for doing this. First, it might happen that a library for some particular functionality is not available, or not as mature as its counterparts in other languages, and creating a proper Elixir implementation might require a lot of effort. Another reason could be raw CPU speed, something which is not Erlang's forte, although in my personal experience that rarely matters. Still, if there are strong speed requirement in some CPU intensive part of the system, and every microsecond is important, Erlang might not suffice.

There may exist some other situations where Erlang is possibly not the best tool for the job. Still, that's not necessarily the reason to dismiss it completely. Just because it's not suitable for some features, doesn't mean it's not a good choice to power most of the system. Moreover, even if you stick with Erlang you still can resort to other languages to implement some parts of it. Erlang provides a couple of techniques to do this, but in my personal opinion the most compelling option is to start external programs from Erlang via ports. This is the approach I'd consider first, and then turn to other alternatives in some special cases. So in this article, I'll talk about ports but before parting, I'll briefly mention other options and discuss some trade-offs.


Basic theory

An Erlang port is a process-specific resource. It is owned by some process and that process is the only one that can talk to it. If the owner process terminates, the port will be closed. You can create many ports in the system, and a single process can own multiple ports. It's worth mentioning that a process can hand over the ownership of the port to another process.

An examples of ports are file handles and network sockets which are connected to the owner process and closed if that process terminates. This allows proper cleanup in an well structured OTP application. Whenever you take down some part of the supervision tree, all resources owned by terminated processes will be closed.

From the implementation standpoint, ports come in two flavors. They can either be powered by a code which runs directly in the VM itself (port driver), or they can run as an external OS process outside of the BEAM. Either way, the principles above hold and you use mostly the same set of functions exposed in the Port module - tiny wrappers around port related functions from the :erlang module. In this article I'll focus on ports as external processes. While not the fastest option, I believe this is often a sensible approach because it preserves fault-tolerance properties.

Before starting, I should also mention the Porcelain library, by Alexei Sholik, which can simplify working with ports in some cases. You should definitely check it out, but in this article I will just use the Port module to avoid the extra layer of abstraction.

First take

Let's see a simple example. In this exercise we'll introduce the support for running Ruby code from the Erlang VM. Under the scene, we'll start a Ruby process from Erlang and send it Ruby commands. The process will eval those commands and optionally send back responses to Erlang. We'll also make the Ruby interpreter stateful, allowing Ruby commands to share the same state. Of course, it will be possible to start multiple Ruby instances and achieve isolation as well.

The initial take is simple. To run an external program via port, you need to open a port via Port.open/2, providing a command to start the external program. Then you can use Port.command/2 to issue requests to the program. If the program sends something back, the owner process will receive a message. This is pretty resemblant to the classic message passing approach.

On the other side, the external program uses standard input/output to talk to its owner process. Basically, it needs to read from stdin, decode the input, do its stuff, and optionally print the response on stdout which will result in a message back to the Erlang process. When the program detects EOF on stdin, it can assume that the owner process has closed the port.

Let's see this in action. First, we'll define the command to start the external program, in this case a Ruby interpreter:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
cmd = ~S"""
  ruby -e '
    STDOUT.sync = true
    context = binding

    while (cmd = gets) do
      eval(cmd, context)
    end
  '
"""

This is a simple program that reads lines from stdin and evals them in the same context, thus ensuring that the side effect of the previous commands is visible to the current one. The STDOUT.sync = true bit ensures that whatever we output is immediately flushed, and thus sent back to the owner Erlang process.

Now we can start the port:

1
port = Port.open({:spawn, cmd}, [:binary])

The second argument contains port options. For now, we'll just provide the :binary option to specify that we want to receive data from the external program as binaries. We'll use a couple of more options later on, but you're advised to read the official documentation to learn about all the available options.

Assuming you have a Ruby interpreter somewhere in the path, the code above should start a corresponding OS process, and you can now use Port.command/2 to talk to it:

1
2
3
Port.command(port, "a = 1\n")
Port.command(port, "a += 2\n")
Port.command(port, "puts a\n")

This is fairly straightforward. We just send some messages to the port, inserting newlines to make sure the other side gets them (since it uses gets to read line by line). The Ruby program will eval these expressions (since we've written it that way). In the very last expression, we print the contents of the variable. This last statement will result in a message to the owner process. We can receive this message as usual:

1
2
3
4
5
6
receive do
  {^port, {:data, result}} ->
    IO.puts("Elixir got: #{inspect result}")
end

# Elixir got: "3\n"

The full code is available here.

Program termination

It's worth noting again, that a port is closed when the owner process terminates. In addition, the owner process can close the port explicitly with Port.close/1. When a port is closed the external program is not automatically terminated, but pipes used for communication will be closed. When the external program reads from stdin it will get EOF and can do something about it, for example terminate.

This is what we already do in our Ruby program:

1
2
3
while (cmd = gets) do
  eval(cmd, context)
end

By stopping the loop when gets returns nil we ensure that the program will terminate when the port is closed.

There are a few caveats though. Notice how we eval inside the loop. If the code in cmd takes a long time to run, the external program might linger after the port is closed. This is simply due to the fact that the program is busy processing the current request, so it can't detect that the other side has closed the port. If you want to ensure immediate termination, you can consider doing processing in a separate thread, while keeping the main thread focused on the communication part.

Another issue is the fact that closing the port closes both pipes. This may present a problem if you want to directly use tools which produce their output only after they receive EOF. In the context of port, when this happens, both pipes are already closed, so the tool can't send anything back via stdout. There are quite a few discussion on this issue (see here for example). Essentially, you shouldn't worry about it if you implement your program to act as a server which waits for requests, does some processing, and optionally spits out the result. However, if you're trying to reuse a program which is not originally written to run as a port, you may need to wrap it in some custom script, or resort to libraries which offer some workarounds, such as the aforementioned Porcelain.

Packing messages

The communication between the owner process and the port is by default streamed, which means there are no guarantees about message chunks, so you need to somehow parse messages yourself, character by character.

In the previous example the Ruby code relies on newlines to serve as command separators (by using gets). This is a quick solution, but it prevents us from running multiline commands. Moreover, when receiving messages in Elixir, we don't have any guarantees about chunking. Data is streamed back to us as it is printed, so a single message might contain multiple responses, or a single response might span multiple messages.

A simple solution for this is to include the information about the message size in the message itself. This can be done by providing the {:packet, n} option to Port.open/2:

1
port = Port.open({:spawn, cmd}, [:binary, {:packet, 4}])

Each message sent to the port will start with n bytes (in this example 4) which represent the byte size of the rest of the message. The size is encoded as an unsigned big-endian integer.

The external program then needs to read this 4 bytes integer, and then get the corresponding number of bytes to obtain the message payload:

1
2
3
4
5
6
7
def receive_input
  encoded_length = STDIN.read(4)                # get message size
  return nil unless encoded_length

  length = encoded_length.unpack("N").first     # convert to int
  STDIN.read(length)                            # read message
end

Now we can use receive_input in the eval loop:

1
2
3
while (cmd = receive_input) do
  eval(cmd, context)
end

These changes allow the Elixir client to send multi-line statements:

1
2
3
4
5
6
Port.command(port, "a = 1")
Port.command(port, ~S"""
  while a < 10 do
    a *= 3
  end
""")

When the Ruby program needs to send a message back to Erlang, it must also include the size of the message:

1
2
3
4
5
6
def send_response(value)
  response = value.inspect
  STDOUT.write([response.bytesize].pack("N"))
  STDOUT.write(response)
  true
end

Elixir code can then use send_response to make the Ruby code return something. To prove that responses are properly chunked, let's send two responses:

1
2
3
4
Port.command(port, ~S"""
  send_response("response")
  send_response(a)
""")

Which will result in two messages on the Elixir side:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
receive do
  {^port, {:data, result}} ->
    IO.puts("Elixir got: #{inspect result}")
end

receive do
  {^port, {:data, result}} ->
    IO.puts("Elixir got: #{inspect result}")
end

# Elixir got: "\"response\""
# Elixir got: "27"

The complete code is available here.

Encoding/decoding messages

The examples so far use plain string as messages. In more involved scenarios you may need to deal with various data types. There's no special support for this. Essentially a process and a port exchange byte sequences, and it is up to you to implement some encoding/decoding scheme to facilitate data typing. You can resort to popular formats such as JSON for this purpose.

In this example, I'll use Erlang's External Term Format (ETF). You can easily encode/decode any Erlang term to ETF via :erlang.term_to_binary/1 and :erlang.binary_to_term/1. A nice benefit of this is that you don't need any third party library on the Elixir side.

Let's see this in action. Instead of plain strings, we'll send {:eval, command} tuples to the Ruby side. The Ruby program will execute the command only if it receives :eval tagged tuple. In addition, when responding back, we'll again send the message as tuple in form of {:response, value}, where value will also be an Erlang term.

On the Elixir side we'll introduce a helper lambda to send {:eval, command} tuples to the port. It will simply pack the command into a tuple and encode it to ETF binary:

1
2
3
send_eval = fn(port, command) ->
  Port.command(port, :erlang.term_to_binary({:eval, command}))
end

The function can then be used as:

1
2
3
4
5
6
7
send_eval.(port, "a = 1")
send_eval.(port, ~S"""
  while a < 10 do
    a *= 3
  end
""")
send_eval.(port, "send_response(a)")

On the Ruby side, we need to decode ETF byte sequence. For this, we need to resort to some 3rd party library. After a quick (and very shallow) research, I opted for erlang-etf. We need to create a Gemfile with the following content:

1
2
3
source "https://rubygems.org"

gem 'erlang-etf'

And then run bundle install to fetch gems.

Now, in our Ruby code, we can require necessary gems:

1
2
3
require "bundler"
require "erlang/etf"
require "stringio"

Then, we can modify the read_input function to decode the byte sequence:

1
2
3
4
5
def receive_input
  # ...

  Erlang.binary_to_term(STDIN.read(length))
end

The eval loop now needs to check that the input message is a tuple and that it contains the :eval atom as the first element:

1
2
3
4
5
while (cmd = receive_input) do
  if cmd.is_a?(Erlang::Tuple) && cmd[0] == :eval
    eval(cmd[1], context)
  end
end

Then we need to adapt the send_response function to encode the response message as {:response, value}:

1
2
3
4
def send_response(value)
  response = Erlang.term_to_binary(Erlang::Tuple[:response, value])
  # ...
end

Going back to the Elixir side, we now need to decode the response message with :erlang.binary_to_term/1:

1
2
3
4
5
6
receive do
  {^port, {:data, result}} ->
    IO.puts("Elixir got: #{inspect :erlang.binary_to_term(result)}")
end

# Elixir got: {:response, 27}

Take special note how the received value is now an integer (previously it was a string). This happens because the response is now encoded to ETF on the Ruby side.

The complete code is available here.

Bypassing stdio

Communication via stdio is somewhat unfortunate. If in the external program we want to print something, perhaps for debugging purposes, the output will just be sent back to Erlang. Luckily, this can be avoided by instructing Erlang to use file descriptors 3 and 4 for communication with the program. Possible caveat: I'm not sure if this feature will work on Windows.

The change is simple enough. We need to provide the :nouse_stdio option to Port.open/2:

1
port = Port.open({:spawn, cmd}, [:binary, {:packet, 4}, :nouse_stdio])

Then, in Ruby, we need to open files 3 and 4, making sure that the output file is not buffered:

1
2
3
@input = IO.new(3)
@output = IO.new(4)
@output.sync = true

Finally, we can simply replace references to STDIN and STDOUT with @input and @output respectively. The code is omitted for the sake of brevity.

After these changes, we can print debug messages from the Ruby process:

1
2
3
4
5
6
7
8
9
while (cmd = receive_input) do
  if cmd.is_a?(Erlang::Tuple) && cmd[0] == :eval
    puts "Ruby: #{cmd[1]}"
    res = eval(cmd[1], context)
    puts "Ruby: => #{res.inspect}\n\n"
  end
end

puts "Ruby: exiting"

Which gives the output:

Ruby: a = 1
Ruby: => 1

Ruby:   while a < 10 do
    a *= 3
  end
Ruby: => nil

Ruby: send_response(a)
Ruby: => true

Elixir got: {:response, 27}
Ruby: exiting

The code is available here.

Wrapping the port in a server process

Since the communication with the port relies heavily on message passing, it's worth managing the port inside a GenServer. This gives us some nice benefits:
  • The server process can provide an abstract API to its clients. For example, we could expose RubyServer.cast and RubyServer.call. The first operation just issues a command without producing the output. The second one will instruct Ruby program to invoke send_response and send the response back. In addition, the server process will handle the response message by notifying the client process. The coupling between Erlang and the program remains in the code of the server process.
  • The server process can include additional unique id in each request issued to the port. Ruby program will include this id in the response message, so the server can reliably match the response to a particular client request.
  • The server process can be notified if the Ruby program crashes, and in turn crash itself.
Let's see an example usage of such server:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{:ok, server} = RubyServer.start_link

RubyServer.cast(server, "a = 1")
RubyServer.cast(server, ~S"""
  while a < 10 do
    a *= 3
  end
""")

RubyServer.call(server, "Erlang::Tuple[:response, a]")
|> IO.inspect

# {:response, 27}

Of course, nothing stops you from creating another Ruby interpreter:

1
2
3
4
5
6
{:ok, another_server} = RubyServer.start_link
RubyServer.cast(another_server, "a = 42")
RubyServer.call(another_server, "Erlang::Tuple[:response, a]")
|> IO.inspect

# {:response, 42}

These two servers communicate with different interpreter instances so there's no overlap:

1
2
3
4
RubyServer.call(server, "Erlang::Tuple[:response, a]")
|> IO.inspect

# {:response, 27}

Finally, a crash in the Ruby program will be noticed by the GenServer which will in turn crash itself:

1
2
3
4
5
6
7
8
RubyServer.call(server, "1/0")

# ** (EXIT from #PID<0.48.0>) an exception was raised:
#     ** (ErlangError) erlang error: {:port_exit, 1}
#         ruby_server.ex:43: RubyServer.handle_info/2
#         (stdlib) gen_server.erl:593: :gen_server.try_dispatch/4
#         (stdlib) gen_server.erl:659: :gen_server.handle_msg/5
#         (stdlib) proc_lib.erl:237: :proc_lib.init_p_do_apply/3

The implementation is mostly a rehash of the previously mentioned techniques, so I won't explain it here. The only new thing is providing of the :exit_status option to Port.open/2. With this option, we ensure that the owner process will receive the {port, {:exit_status, status}} message, and do something about the port crash. You're advised to try and implement such GenServer yourself, or analyze my basic solution.

Alternatives to ports

Like everything else, ports come with some associated trade-offs. The most obvious one is the performance hit due to encoding and communicating via pipes. If the actual processing in the port is very short, this overhead might not be tolerable. With a lot of hand waving I'd say that ports are more appropriate when the external program will do some "significant" amount of work, something that's measured at least in milliseconds.

In addition, ports are coupled to the owner (and vice-versa). If the owner stops, you probably want to stop the external program. Otherwise the restarted owner will start another instance of the program, while the previous instance won't be able to talk to Erlang anymore.

If these issues are relevant for your specific case, you might consider some alternatives:
  • Port drivers (sometimes called linked-in drivers) have characteristics similar to ports, but there is no external program involved. Instead, the code, implemented in C/C++, is running directly in the VM.
  • NIFs (native implemented functions) can be used to implement Erlang functions in C and run them inside the BEAM. Unlike port drivers, NIFs are not tied to a particular process.
  • It is also possible to make your program look like an Erlang node. Some helper libraries are provided for C and Java. Your Erlang node can then communicate with the program, just like it would do with any other node in the cluster.
  • Of course, you can always go the "microservices" style: start a separate program, and expose some HTTP interface so your Erlang system can talk to it.
The first two alternatives might give you significant speed improvement at the cost of safety. An unhandled exception in a NIF or port driver will crash the entire BEAM. Moreover, both NIFs and port-drivers are running in scheduler threads, so you need to keep your computations short (<= 1ms), otherwise you may end up compromising the scheduler. This can be worked around with threads and usage of dirty schedulers, but the implementation might be significantly more involved.

The third option provides looser coupling between two parties, allowing them to restart separately. Since distributed Erlang is used, you should still be able to detect crashes of the other side.

A custom HTTP interface is more general than an Erlang-like node (since it doesn't require an Erlang client), but you lose the ability to detect crashes. If one party needs to detect that the other party has crashed, you'll need to roll your own health checking (or reuse some 3rd party component for that).

I'd say that nodes and separate services seem suitable when two parties are more like peers, and each one can exist without the other. On the other hand, ports are more interesting when the external program makes sense only in the context of the whole system, and should be taken down if some other part of the system terminates.

As you can see, there are various options available, so I think it's safe to say that Erlang is not an island. Moving to Erlang/Elixir doesn't mean you lose the ability to implement some parts of the system in other languages. So if for whatever reasons you decide that something else is more suitable to power a particular feature, you can definitely take that road and still enjoy the benefits of Erlang/Elixir in the rest of your system.

Optimizing a function with the help of Elixir macros

| Comment on this post

Author: Tallak Tveide


Today I have a pleasure of hosting a post by Tallak Tveide, who dived into Elixir macros, came back alive, and decided to share his experience with us. This is his story.

In this blog post we will cover optimizing an existing function for certain known inputs, using macros. The function that we are going to optimize is 2D matrix rotation. The problem was chosen for it's simplicity. When I first used these techniques there were a few extra complexities that have been left out, please keep this in mind if the code seems like overkill.

If you are unfamiliar with macros, this blog post may be difficult to read. In that case one tip is to read Saša Jurić's articles about Elixir macros first, then revisit this post.

Two dimensional vector rotation

We want to take a vector {x, y} and apply any number of translate and rotate transforms on it. We want to end up with code looking like:

1
2
3
4
transformed_point =
  {10.0, 10.0}
  |> rotate(90.0)
  |> translate(1.0, 1.0)

The translate function would simply look something like this:

1
2
3
4
5
defmodule TwoD do
  def translate({x, y}, dx, dy) do
    {x + dx, y + dy}
  end
end

And then the rotate function might look like:

1
2
3
4
5
6
7
8
9
defmodule TwoD do
  @deg_to_rad 180.0 * :math.pi

  def rotate({x, y}, angle) do
    radians = angle * @deg_to_rad
    { x * :math.cos(radians) - y * :math.sin(radians),
      x * :math.sin(radians) + y * :math.cos(radians) }
  end
end

The first subtle macro magic is already happening at this point. We are precalculating the module attribute @deg_to_rad at compile time to avoid calling :math.pi and performing a division at runtime.

I have left out translate from here on for clarity.

The idea

When I first started to look at these transforms, most of my rotations were in multiples of 90 degrees. For these operations, :math.sin(x) and math.cos(x) will return the values -1.0, 0.0 or 1.0, and the rotate function is reduced to reordering and changing signs of the vector tuple values in {x, y}.

If we spelled the code out, it would look something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
defmodule TwoD do
  def rotate({x, y}, 90.0), do: {-y, x}
  def rotate({x, y}, 180.0), do: {-x, -y}
  # ... more optimized versions here

  # failing an optimized match, use the generic rotate
  def rotate({x, y}, angle) do
    radians = angle * @deg_to_rad
    { x * :math.cos(radians) - y * :math.sin(radians),
      x * :math.sin(radians) + y * :math.cos(radians) }
  end
end

For this particular problem, the code above, without macros, is most readable, maintainable and is also as efficient as any other code.


The first attempt

There are basically just four variants at [0, 90, 180, 270] degrees that are interesting to us as sin and cos are cyclic. Our initial approach will select one of these four variants based on a parameter, and then inject some code into the TwoD module:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
  defmodule TwoD.Helpers do
    @deg_to_rad 180.0 * :math.pi

    def rotate({x, y}, angle) do
      radians = angle * @deg_to_rad
      { x * :math.cos(radians) - y * :math.sin(radians),
        x * :math.sin(radians) + y * :math.cos(radians) }
    end

    defmacro def_optimized_rotate(angle_quoted) do
      # angle is still code, so it must be evaluated to get a number
      {angle, _} = Code.eval_quoted(angle_quoted)

      x_quoted = Macro.var(:x, __MODULE__)
      y_quoted = Macro.var(:y, __MODULE__)
      neg_x_quoted = quote do: (-unquote(Macro.var(:x, __MODULE__)))
      neg_y_quoted = quote do: (-unquote(Macro.var(:y, __MODULE__)))

      # normalize to 0..360; must add 360 in case of negative angle values
      normalized = angle |> round |> rem(360) |> Kernel.+(360) |> rem(360)

      result_vars_quoted = case normalized do
        0 ->
          [x_quoted, y_quoted]
        90 ->
          [neg_y_quoted, x_quoted]
        180 ->
          [neg_x_quoted, neg_y_quoted]
        270 ->
          [y_quoted, neg_x_quoted]
        _ ->
          raise "Optimized angles must be right or straight"
      end

      # at last return a quoted function definition
      quote do
        def rotate({x, y}, unquote(angle * 1.0)) do
          {unquote_splicing(result_vars_quoted)}
        end
      end
    end
  end

  defmodule TwoD do
    require TwoD.Helpers

    # Optimized versions of the code
    TwoD.Helpers.def_optimized_rotate(-270)
    TwoD.Helpers.def_optimized_rotate(-180)
    TwoD.Helpers.def_optimized_rotate(-90)
    TwoD.Helpers.def_optimized_rotate(0)
    TwoD.Helpers.def_optimized_rotate(90)
    TwoD.Helpers.def_optimized_rotate(180)
    TwoD.Helpers.def_optimized_rotate(270)

    def rotate(point, angle), do: TwoD.Helpers.rotate(point, angle)
  end

The rotate function has been moved to the TwoD.Helpers module, and then replaced with a simple forwarding call. It will be useful when we later want to test our optimized function towards the unoptimized one.

When I first implemented def_optimized_rotate I was caught a bit off guard as the parameters to the macro are not available as the simple numbers that I passed them. The parameter angle_quoted is actually passed as a block of code. So in order for the macro to be able to precalculate the code, we have to add {angle, _} = Code.eval_quoted angle_quoted at the top of our macro to expand the code for the number into an actual value.

Please note that I would not recommend using Code.eval_quoted for reasons that will hopefully become clear later.

For this particular problem, I am quite happy spelling out all the seven values that I want to optimize. But if there were many more interesting optimizations (for instance if the rotation was in 3D), spelling all of these out is not a good option. Let's wrap the macro call in a for comprehension instead.

Inserting dynamic module definitions

Before writing the for comprehension, let's look at how a function may be defined dynamically. We'll start by making a function that simply returns it's name, but that name is assigned to a variable at compile time, before the function is defined:

1
2
3
4
5
6
7
defmodule Test do
  function_name = "my_test_function"

  def unquote(function_name |> String.to_atom)() do
    unquote(function_name)
  end
end

And when run it in IEx, we get:

1
2
iex(2)> Test.my_test_function
"my_test_function"

The thing to note is that when we are defining a module, we are in a way already inside an implicit quote statement, and that we may use unquote to expand dynamic code into our module. The first unquote inserts an atom containing the function name, the second inserts the return value.

Actually, I have yet to see unquote used like this in a module definition. Normally you would prefer to use module attributes as often as possible, as they will automatically unquote their values. On the other hand, it seems unquote offers a bit more flexibility.

1
2
3
4
5
6
7
defmodule Test do
  @function_name "my_test_function"

  def unquote(@function_name |> String.to_atom)() do
    @function_name
  end
end

Our next step is to let the for comprehension enumerate all the angles that we want to optimize. Our TwoD module now looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
defmodule TwoD do
  require TwoD.Helpers

  @angles for n <- -360..360, rem(n, 90) == 0, do: n

  # Optimized versions of the code
  for angle <- @angles, do: TwoD.Helpers.def_optimized_rotate(angle)

  # This general purpose implementation will serve any other angle
  def rotate(point, angle), do: TwoD.Helpers.rotate(point, angle)
end

This introduces a new problem to our code. Our macro def_optimized_rotate now receives the quoted reference to angle which is not possible to evaluate in the macro context. Actually our first implementation implicitly required that the angle parameter be spelled out as a number. It seems wrong that the user of our macro has to know that the parameter must have a particular form.

This is the first time we will see a pattern with macro programming, and one reason to be wary of using macros: The macro might work well in one instance, but changes made in code outside of the macro could easily break it. To paraphrase a saying: The code is far from easy to reason about.

Delaying the macro logic

If the mountain will not come to Muhammad, Muhammad must go to the mountain.

There are two ways to use the angle values from the for comprehension in our macro:
  • move the for comprehension into our macro, thus hardcoding the optimized angles
  • inject everything into the resulting module definition
We'll choose the latter option beacuse I think it is more clear that the
optimized angles are stated in the TwoD module rather than in the macro.

There is no way to evaluate the code in the macro parameter correctly inside the macro. Instead we must move all the code into a context where the parameter may be evaluated correctly.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
defmodule TwoD.Helpers do
  @deg_to_rad :math.pi / 180.0

  def rotate({x, y}, angle) do
    radians = angle * @deg_to_rad
    { x * :math.cos(radians) - y * :math.sin(radians),
      x * :math.sin(radians) + y * :math.cos(radians) }
  end

  defmacro def_optimized_rotate(angle) do
    quote(bind_quoted: [angle_copy: angle], unquote: false) do
      x_quoted = Macro.var(:x, __MODULE__)
      y_quoted = Macro.var(:y, __MODULE__)
      neg_x_quoted = quote do: (-unquote(Macro.var(:x, __MODULE__)))
      neg_y_quoted = quote do: (-unquote(Macro.var(:y, __MODULE__)))

      # normalize to 0..360; must add 360 in case of negative angle values
      normalized = angle_copy |> round |> rem(360) |> Kernel.+(360) |> rem(360)

      result_vars_quoted = case normalized do
        0 ->
          [x_quoted, y_quoted]
        90 ->
          [neg_y_quoted, x_quoted]
        180 ->
          [neg_x_quoted, neg_y_quoted]
        270 ->
          [y_quoted, neg_x_quoted]
        _ ->
          raise "Optimized angles must be right or straight"
      end

      def rotate({unquote_splicing([x_quoted, y_quoted])}, unquote(1.0 * angle_copy)) do
        {unquote_splicing(result_vars_quoted)}
      end
    end
  end
end

Compared to the initial rotate function, this code is admittedly quite dense. This is where I gradually realize why everyone warns against macro overuse.

The first thing to note is that all the generated code is contained inside a giant quote statement. Because we want to insert unquote calls into our result (to be evaluated inside the module definition), we have to use the option unquote: false.

We may no longer use unquote to insert the angle parameter quoted. To mend this, we add the option bind_quoted: [angle_copy: angle]. The result of adding the bind_quoted option is best shown with an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
iex(1)> angle = quote do: 90 * 4.0
{:*, [context: Elixir, import: Kernel], [90, 4.0]}

iex(2)> Macro.to_string(quote(bind_quoted: [angle_copy: angle]) do
...(2)> rot_x = TwoD.Helpers.prepare_observed_vector {1, 0}, angle_copy, :x
...(2)> # more code
...(2)> end) |> IO.puts
(
  angle_copy = 90 * 4.0
  rot_x = TwoD.Helpers.prepare_observed_vector({1, 0}, angle_copy, :x)
)
:ok

bind_quoted is really quite simple. It just adds an assignment before any other code. This also has the benefit of ensuring that the parameter code is only evaluated once. Seems we should be using bind_quoted rather than inline unquoting in most circumstances.

As we don't really use the angle in the macro anymore, we no longer need Code.eval_quoted. I admit using it was a bad idea in the first place.

This is the second time the macro stopped working due to changes in the calling code. It seems the first version of out macro worked more or less by accident. The code:

1
2
3
def rotate({x, y}, unquote(angle_copy)) do
  {unquote_splicing(result_vars_quoted)}
end

had to be replaced with:

1
2
3
def rotate({unquote_splicing([x_quoted, y_quoted])}, unquote(angle_copy)) do
  {unquote_splicing(result_vars_quoted)}
end

The reason for this being that the quoted code for the result did not, due to macro hygiene, map directly to {x,y}.

This does the trick, and the code now works as intended.

Testing

To test the code, we will compare the output of our optimized function and the generic implementation. The test might look like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# in file test/two_d_test.exs
defmodule TwoD.Tests do
  use ExUnit.Case, async: true
  alias TwoD.Helpers, as: H

  @point {123.0, 456.0}

  def round_point({x, y}), do: {round(x), round(y)}

  test "optimized rotates must match generic version" do
    assert (TwoD.rotate(@point, -270.0) |> round_point) ==
      (H.rotate(@point, -270.0) |> round_point)

    assert (TwoD.rotate(@point, 0.0) |> round_point) ==
      (H.rotate(@point, 0.0) |> round_point)

    assert (TwoD.rotate(@point, 90.0) |> round_point) ==
      (H.rotate(@point, 90.0) |> round_point)
  end

  test "the non right/straight angles should still work" do
    assert (TwoD.rotate(@point, 85.0) |> round_point) ==
      (H.rotate(@point, 85.0) |> round_point)
  end
end

Benchmarking the results

A final difficulty remains: we are still not sure whether our optimized code is actually running, or the generic implementation is still handling all function calls.

If the optimization is working, a benchmark should show us that. In any event it is useful to measure that the optimization is worthwhile. I decided to use the benchwarmer package for this. The mix.exs file is modified to include:

1
2
3
4
5
  defp deps do
    [
      { :benchwarmer, "~> 0.0.2" }
    ]
  end

And then we'll add a simple benchmark script like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# in file lib/mix/tasks/benchmark.ex
defmodule Mix.Tasks.Benchmark do
  use Mix.Task

  def run(_) do
    IO.puts "Checking optimized vs unoptimized"
    Benchwarmer.benchmark(
      [&TwoD.Helpers.rotate/2, &TwoD.rotate/2], [{123.0, 456.0}, 180.0]
    )

    IO.puts "Checking overhead of having optimizations"
    Benchwarmer.benchmark(
      [&TwoD.Helpers.rotate/2, &TwoD.rotate/2], [{123.0, 456.0}, 182.0]
    )
  end
end

in turn giving us:

$ mix benchmark
Checking optimized vs unoptimized
*** &TwoD.Helpers.rotate/2 ***
1.6 sec   524K iterations   3.18 μs/op

*** &TwoD.rotate/2 ***
1.4 sec     2M iterations   0.71 μs/op

Checking overhead of having optimizations
*** &TwoD.Helpers.rotate/2 ***
1.3 sec     1M iterations   1.34 μs/op

*** &TwoD.rotate/2 ***
1.8 sec     1M iterations   1.78 μs/op

I find it a bit interesting that we are getting a 4X speedup for the straight and right angles, while at the same time the general purpose call is 20% slower. Neither of these results should come as a big surprise.

In conclusion, this technique is worthwhile if you have a slow computation that is mostly called with a specific range of arguments. It also seems wise to factor in the loss of readability.

You may browse the complete source code at GitHub

Thanks

Thanks to @mgwidmann for pointing out that unquote is so useful inside a module definition.

Thanks to Saša Jurić for getting me through difficult compiler issues, and then helping me out with the code examples and text.

Beyond Task.async

| 4 comments
In this post I'll talk about less typical patterns of parallelization with tasks. Arguably, the most common case for tasks is to start some jobs concurrently with Task.async and then collect the results with Task.await. By doing this we might run separate jobs in parallel, and thus perform the total work more efficiently. This can be done very elegantly with async/await without the much overhead in the code.

However, async/await have some properties which may not be suitable in some cases, so you might need a different approach. That is the topic of this post, but first, let's quickly recap the basic async/await pattern.

Parallelizing with async/await

Async/await makes sense when we need to perform multiple independent computations and aggregate their results into the total output. If computations take some time, we might benefit by running them concurrently, possibly reducing the total execution time from sum(computation_times) to max(computation_times).

The computation can be any activity such as database query, a call to a 3rd party service, or some CPU bound calculation. In this post, I'll just use a contrived stub:

1
2
3
4
5
6
defmodule Computation do
  def run(x) when x > 0 do
    :timer.sleep(x)  # simulates a long-running operation
    x
  end
end

This "computation" takes a positive integer x, sleeps for x milliseconds, and returns the number back. It's just a simulation of a possibly long running operation.

Now, let's say that we need to aggregate the results of multiple computations. Again, I'll introduce a simple stub:

1
2
3
4
5
6
7
8
9
defmodule Aggregator do
  def new, do: 0
  def value(aggregator), do: aggregator

  def add_result(aggregator, result) do
    :timer.sleep(50)
    aggregator + result
  end
end

This is just a simple wrapper which sums input numbers. In real life, this might be a more involved aggregator that somehow combines results of multiple queries into a single "thing".

Assuming that different computations are independent, there is potential to run them concurrently, and this is where tasks come in handy. For example, let's say we need to run this computation for ten different numbers:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
defmodule AsyncAwait do
  def run do
    :random.seed(:os.timestamp)

    1..10
    |> Enum.map(fn(_) -> :random.uniform(1000) end)
    |> Enum.map(&Task.async(fn -> Computation.run(&1) end))
    |> Enum.map(&Task.await/1)
    |> Enum.reduce(Aggregator.new, &Aggregator.add_result(&2, &1))
    |> Aggregator.value
  end
end

This is a fairly simple technique. First, we generate some random input and start the task to handle each element. Then, we await on results of each task and reduce responses into the final value. This allow us to improve the running time, since computations might run in parallel. The total time should be the time of the longest running computation plus the fixed penalty of 500 ms (10 * 50 ms) to include each result into the total output. In this example it shouldn't take longer than 1500 ms to get the final result.

Properties of async/await

Async/await is very elegant and brings some nice benefits, but it also has some limitations.

The first problem is that we await on results in the order we started the tasks. In some cases, this might not be optimal. For example, imagine that the first task takes 500 ms, while all others take 1 ms. This means that we'll process the results of short-running tasks only after we handle the slow task. The total execution time in this example will be about 1 second. From the performance point of view, it would be better if we would take results as they arrive. This would allow us to aggregate most of the results while the slowest task is still running, reducing the execution time to 550 ms.

Another issue is that it's not easy to enforce a global timeout. You can't easily say, "I want to give up if all the results don't arrive in 500 ms". You can provide a timeout to Task.await (it's five seconds by default), but this applies only to a single await operation. Hence, a five seconds timeout actually means we might end up waiting 50 seconds for ten tasks to time out.

Finally, you should be aware that async/await pattern takes the all-or-nothing approach. If any task or the master process crashes, all involved processes will be taken down (unless they're trapping exits). This happens because Task.async links the caller and the spawned task process.

In most situations, these issues won't really matter, and async/await combo will be perfectly fine. However, sometimes you might want to change the default behaviour.

Eliminating await

Let's start by making the "master" process handle results in the order of arrival. This is fairly simple if we rely on the fact that Task.async reports the result back to the caller process via a message. We can therefore receive a message, and check if it comes from one of our task. If so, we can add the result to the aggregator.

To do this, we can rely on Task.find/2 that takes the list of tasks and the message, and returns either {result, task} if the message corresponds to the task in the list, or nil if the message is not from a task in the given list:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
defmodule AsyncFind do
  def run do
    :random.seed(:os.timestamp)

    1..10
    |> Enum.map(fn(_) -> :random.uniform(1000) end)
    |> Enum.map(&Task.async(fn -> Computation.run(&1) end))
    |> collect_results
  end

  defp collect_results(tasks, aggregator \\ Aggregator.new)

  defp collect_results([], aggregator), do: Aggregator.value(aggregator)
  defp collect_results(tasks, aggregator) do
    receive do
      msg ->
        case Task.find(tasks, msg) do
          {result, task} ->
            collect_results(
              List.delete(tasks, task),
              Aggregator.add_result(aggregator, result)
            )

          nil ->
            collect_results(tasks, aggregator)
        end
    end
  end
end

Most of the action happens in collect_results. Here, we loop recursively, waiting for a message to arrive. Then we invoke Task.find/2 to determine whether the message comes from a task. If yes, we delete the task from the list of pending tasks, aggregate the response and resume the loop. The loop stops when there are no more pending tasks in the list. Then, we simply return the aggregated value.

In this example I'm using explicit receive, but in production you should be careful about it. If the master process is a server, such as GenServer or Phoenix.Channel, you should let the underlying behaviour receive messages, and invoke Task.find/2 from the handle_info callback. For the sake of brevity, I didn't take that approach here, but as an exercise you could try to implement it yourself.

One final note: by receiving results as they arrive we lose the ordering. In this case, where we simply sum numbers, this doesn't matter. If you must preserve the ordering, you'll need to include an additional order info, and then sort the results after they are collected.

Handling timeouts

Once we moved away from Task.await, the master process becomes more flexible. For example, we can now easily introduce a global timeout. The idea is simple: after the tasks are started, we can use Process.send_after/3 to send a message to the master process after some time:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
defmodule Timeout do
  def run do
    # exactly the same as before
  end

  defp collect_results(tasks) do
    timeout_ref = make_ref
    timer = Process.send_after(self, {:timeout, timeout_ref}, 900)
    try do
      collect_results(tasks, Aggregator.new, timeout_ref)
    after
      :erlang.cancel_timer(timer)
      receive do
        {:timeout, ^timeout_ref} -> :ok
        after 0 -> :ok
      end
    end
  end

  # ...
end

Here, we create the timer, and a reference which will be a part of the timeout message. Then we enqueue the timeout message to be sent to the master process after 900 ms. Including the reference in the message ensures that the timeout message will be unique for this run, and will not interfere with some other message.

Finally, we start the receive loop and return it's result.

Take special note of the after block where we cancel the timer to avoid sending a timeout message if all the results arrive on time. However, since timer works concurrently to the master process, it is still possible that the message might have been sent just before we canceled the timer, but after all the results are already collected. Thus, we do a receive with a zero timeout to flush the message if it's already in the queue.

With this setup in place, we now need to handle the timeout message:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
defp collect_results([], aggregator, _), do: {:ok, Aggregator.value(aggregator)}
defp collect_results(tasks, aggregator, timeout_ref) do
  receive do
    {:timeout, ^timeout_ref} ->
      {:timeout, Aggregator.value(aggregator)}

    msg ->
      case Task.find(tasks, msg) do
        {result, task} ->
          collect_results(
            List.delete(tasks, task),
            Aggregator.add_result(aggregator, result),
            timeout_ref
          )

        nil -> collect_results(tasks, aggregator, timeout_ref)
      end
  end
end

The core change here is in lines 4-5 where we explicitly deal with the timeout. In this example, we just return what we currently have. Depending on the particular use case, you may want to do something different, for example raise an error.

Explicitly handling errors

The next thing we'll tackle is error handling. Task.async is built in such a way that if something fails, everything fails. When you start the task via async the process will be linked to the caller. This holds even if you use Task.Supervisor.async. As the result, if some task crashes, the master process will crash as well, taking down all other tasks.

If this is not a problem, then Task.async is a perfectly valid solution. However, sometimes you may want to explicitly deal with errors. For example, you might want to just ignore failing tasks, reporting back whatever succeeded. Or you may want to keep the tasks running even if the master process crashes.

There are two basic ways you can go about it: catch errors in the task, or use Task.Supervisor with start_child.

Catching errors

The simplest approach is to encircle the task code with a try/catch block:

1
2
3
4
5
6
7
Task.async(fn ->
  try do
    {:ok, Computation.run(...)}
  catch _, _ ->
    :error
  end
end)


Then, when you receive results, you can explicitly handle each case, ignoring :error results. The implementation is mostly mechanical and left to you as an exercise.

I've occasionally seen some concerns that catching is not the Erlang/Elixir way, so I'd like to touch on this. If you can do something meaningful with an error, catching is a reasonable approach. In this case, we want to collect all the successful responses, so ignoring failed ones is completely fine.

So catching is definitely a simple way of explicitly dealing with errors, but it's not without shortcomings. The main issue is that catch doesn't handle exit signals. Thus, if the task links to some other process, and that other process terminates, the task process will crash as well. Since the task is linked to the master process, this will cause the master process to crash, and in turn crash all other tasks. The link between the caller and the task also means that if the master process crashes, for example while aggregating, all tasks will be terminated.

To overcome this, we can either make all processes trap exits, or remove the link between processes. Trapping exits might introduce some subtle issues (see here for some information), so I'll take the second approach.

Replacing async

The whole issue arises because async links the caller and the task process, which ensures "all-or-nothing" property. This is a perfectly fine decision, but it's not necessarily suitable for all cases. I wonder whether linking should be made optional, but I don't have a strong opinion at the moment.

As it is, Task.async currently establishes a link, and if we want to avoid this, we need to reimplement async ourselves. Here's what we'll do:
  • Start a Task.Supervisor and use Task.Supervisor.start_child to start tasks.
  • Manually implement sending of the return message from the task to the caller.
  • Have the master process monitor tasks so it can be notified about potential crashes. Explicitly handle such messages by removing the crashed task from the list of tasks we await on.
The first point allow us to run tasks in a different part of the supervision tree from the master. Tasks and the master process are no longer linked, and failure of one process doesn't cause failure of others.

However, since we're not using async anymore, we need to manually send the return message to the caller process.

Finally, using the monitor ensures that the master process will be notified if some task crashes and can stop awaiting on their results.

This requires more work, but it provides stronger guarantees. We can now be certain that:
  • A failing task won't crash anyone else.
  • The master process will be informed about the task crash and can do something about it.
  • Even a failure of master process won't cause tasks to crash.
If the third property doesn't suit your purposes, you can simply place the master process and the tasks supervisor under the same common supervisor, with one_for_all or rest_for_one strategy.

This is what I like about Erlang fault-tolerance approach. There are various options with strong guarantees. You can isolate crashes, but you can also connect failures if needed. Some scenarios may require more work, but the implementation is still straightforward. Supporting these scenarios without process isolation and crash propagation would be harder and you might end up reinventing parts of Erlang.

Let's implement this. The top-level run/0 function is now changed a bit:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
defmodule SupervisedTask do
  def run do
    :random.seed(:os.timestamp)
    Task.Supervisor.start_link(name: :task_supervisor)

    work_ref = make_ref

    1..10
    |> Enum.map(fn(_) -> :random.uniform(1000) - 500 end)
    |> Enum.map(&start_computation(work_ref, &1))
    |> collect_results(work_ref)
  end

  # ...
end

First, a named supervisor is started. This is a quick hack to keep the example short. In production, this supervisor should of course reside somewhere in the supervision hierarchy.

Then, a work reference is created, which will be included in task response messages. Finally, we generate some random numbers and start our computations. Notice the :random.uniform(1000) - 500. This ensures that some numbers will be negative, which will cause some tasks to crash.

Tasks now have to be started under the supervisor:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
defp start_computation(work_ref, arg) do
  caller = self

  # Start the task under the named supervisor
  {:ok, pid} = Task.Supervisor.start_child(
    :task_supervisor,
    fn ->
      result = Computation.run(arg)

      # Send the result back to the caller
      send(caller, {work_ref, self, result})
    end
  )

  # Monitor the started task
  Process.monitor(pid)
  pid
end

Finally, we need to expand the receive loop to handle :DOWN messages, which we'll receive when the task terminates:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
defp collect_results(tasks, work_ref) do
  timeout_ref = make_ref
  timer = Process.send_after(self, {:timeout, timeout_ref}, 400)
  try do
    collect_results(tasks, work_ref, Aggregator.new, timeout_ref)
  after
    :erlang.cancel_timer(timer)
    receive do
      {:timeout, ^timeout_ref} -> :ok
      after 0 -> :ok
    end
  end
end

defp collect_results([], _, aggregator, _), do: {:ok, Aggregator.value(aggregator)}
defp collect_results(tasks, work_ref, aggregator, timeout_ref) do
  receive do
    {:timeout, ^timeout_ref} ->
      {:timeout, Aggregator.value(aggregator)}

    {^work_ref, task, result} ->
      collect_results(
        List.delete(tasks, task),
        work_ref,
        Aggregator.add_result(aggregator, result),
        timeout_ref
      )

    {:DOWN, _, _, pid, _} ->
      if Enum.member?(tasks, pid) do
        # Handling task termination. In this case, we simply delete the
        # task from the list of tasks, and wait for other tasks to finish.
        collect_results(List.delete(tasks, pid), work_ref, aggregator, timeout_ref)
      else
        collect_results(tasks, work_ref, aggregator, timeout_ref)
      end
  end
end

This is mostly straightforward, with the major changes happening in lines 29-36. It's worth mentioning that we'll receive a :DOWN message even if the task doesn't crash. However, this message will arrive after the response message has been sent back, so the master process will first handle the response message. Since we remove the task from the list, the subsequent :DOWN message of that task will be ignored. This is not super efficient, and we could have improved this by doing some extra bookkeeping and demonitoring the task after it returns, but I refrained from this for the sake of brevity.

In any case, we can now test it. If I start SupervisedTask.run, I'll see some errors logged (courtesy of Logger), but I'll still get whatever is collected. You can also try it yourself. The code is available here.

Reducing the boilerplate

As we moved to more complex patterns, our master process became way more involved. The plain async/await has only 12 lines of code, while the final implementation has 66. The master process is burdened with a lot of mechanics, such as keeping references, starting a timer message, and handling received messages. There's a lot of potential to extract some of that boilerplate, so we can keep the master process more focused.

There are different approaches to extracting the boilerplate. If a process has to behave in a special way, you can consider creating a generic OTP-like behaviour that powers the process. The concrete implementation then just has to fill in the blanks by providing necessary callback functions.

However, in this particular case, I don't think creating a behaviour is a good option. The thing is that the master process might already be powered by a behaviour, such as GenServer or Phoenix.Channel. If we implement our generic code as a behaviour, we can't really combine it with another behaviour. Thus, we'll always need to have one more process that starts all these tasks and collects their results. This may result in excessive message passing, and have an impact on performance.

An alternative is to implement a helper module that can be used to start tasks and process task related messages. For example, we could have the following interface for starting tasks:

1
2
3
4
5
6
7
8
9
runner = TaskRunner.run(
  [
    {:supervisor1, {SomeModule, :some_function, args}},
    {:supervisor2, {AnotherModule, :some_function, other_args}},
    {:supervisor3, fn -> ... end},
    # ...
  ],
  timeout
)

Under the hood, TaskRunner would start tasks under given supervisors, setup work and timer references, and send the timeout message to the caller process. By allowing different tasks to run under different supervisors, we have more flexibility. In particular, this allows us to start different tasks on different nodes.

The responsibility of receiving messages now lies on the caller process. It has to receive a message either via receive or for example in the handle_info callback. When the process gets a message, it has to first pass it to TaskRunner.handle_message which will return one of the following:
  • nil - a message is not task runner specific, feel free to handle it yourself
  • {{:ok, result}, runner} - a result arrived from a task
  • {{:task_error, reason}, runner} - a task has crashed
  • {:timeout, runner} - timeout has occurred
Finally, we'll introduce a TaskRunner.done?/1 function, which can be used to determine whether all tasks have finished.

This is all we need to make various decision in the client process. The previous example can now be rewritten as:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
defmodule TaskRunnerClient do
  def run do
    :random.seed(:os.timestamp)
    Task.Supervisor.start_link(name: :task_supervisor)

    1..10
    |> Enum.map(fn(_) -> :random.uniform(1000) - 500 end)
    |> Enum.map(&{:task_supervisor, {Computation, :run, [&1]}})
    |> TaskRunner.run(400)
    |> handle_messages(Aggregator.new)
  end


  defp handle_messages(runner, aggregator) do
    if TaskRunner.done?(runner) do
      {:ok, Aggregator.value(aggregator)}
    else
      receive do
        msg ->
          case TaskRunner.handle_message(runner, msg) do
            nil -> handle_messages(runner, aggregator)

            {{:ok, result}, runner} ->
              handle_messages(runner, Aggregator.add_result(aggregator, result))

            {{:task_error, _reason}, runner} ->
              handle_messages(runner, aggregator)

            {:timeout, _runner} ->
              {:timeout, Aggregator.value(aggregator)}
          enda
      end
    end
  end
end

This is less verbose than the previous version, and the receive loop is now focused only on handling of success, error, and timeout, without worrying how these situations are detected.

The code is still more involved than the simple async/await pattern, but it offers more flexibility. You can support various scenarios, such as stopping on first success or reporting the timeout back to the user while letting the tasks finish their jobs. If this flexibility is not important for your particular scenarios, then this approach is an overkill, and async/await should do just fine.

I will not describe the implementation of TaskRunner as it is mostly a refactoring of the code from SupervisedTask. You're advised to try and implement it yourself as an exercise. A basic (definitely not complete or tested) take can be found here.

Parting words

While this article focuses on tasks, in a sense they serve more as an example to illustrate concurrent thinking in Erlang.

Stepping away from Task.await and receiving messages manually allowed the master process to be more flexible. Avoiding links between master and the tasks decoupled their lives, and gave us a better error isolation. Using monitors made it possible to detect failures and perform some special handling. Pushing everything to a helper module, without implementing a dedicated behaviour, gave us the generic code that can be used in different types of processes.

These are in my opinion more important takeaways of this article. In the future the Elixir team may introduce additional support for tasks which will make most of these techniques unnecessary. But the underlying reasoning should be applicable in many other situations, not necessarily task-related.