defmodule Counter do @doc """ This is the accumulator thread. All it does is wait to get a message. It handles the message and then restarts itself to wait for the next message. (Except when receiving an end message.) When the message is a new number store that in the value that is passed in the recursion. """ def loop(value) do receive do {:print} -> IO.puts "Current sum #{value}" loop(value) {:end, caller} -> #IO.puts "Got END" send(caller, {:final, value}) {:update, num, th_num} -> IO.puts("Got #{num} from thread #{th_num}") loop(value+num) end value end @doc """ This is the thing that is doing the work in the thread 4 parameters: 1 the number to count up to 2 the sum of the numbers 3 the place to report the final sum to 4 the "number" of the thread (just for giggles) """ def partt(0, tot, rcvr, th_num) do send(rcvr, {:update, tot, th_num}) if rem(th_num, 10)==0 do send(rcvr, {:print}) end end def partt(num, tot, rcvr, th_num) do partt(num-1, tot+num, rcvr, th_num) end @doc """ Do the work """ def mainThread(threads, times) do # start a thread to be the message receiver {:ok, rcvr} = Task.start_link(Counter, :loop, [0]) max_concurrency = System.schedulers_online() * 2 # Note that this line just sets up the stream, it does not actually start stream = Task.async_stream(1..threads, fn sv -> partt(times, 0, rcvr, sv) end , max_concurrency: max_concurrency, timeout: :infinity) # run and wait for completion Stream.run(stream) # Once the run completes send a message to the receiver thread # asking to print the sum of messages received send(rcvr, {:print}) # Tell the receiver thread to shut down send(rcvr, {:end, self()}) # finally wait to get a message from the receiver thread that it # really did shut down receive do {:final, aa} -> IO.puts("Got message back indicating shutdown #{aa}") end end end Counter.mainThread(100, 1000000000)