One difficulty that arises when coordinating multiple channels is
dealing with incomplete data.
Due to network delays or other factors, data written by one side may
arrive at the reader's side in several chunks with time delays between
them.
If we insist on reading all of the data then we will wait, and become
unresponsive to the other channels in the meantime.
If we try to manage partial data and reassemble it ourselves then the I/O
handling becomes extremely complicated.
File objects are considered to be "fast" --- it is expected that I/O on
them will always complete immediately, without waiting.
This is not always true (for instance, with network-mounted filesystems),
but reflects an equivalent assumption in Linux.
Consequently, although it is permitted to pass file objects to WaitForIO
it is rarely useful,
and asynchronous I/O operations will attempt to complete immediately.
Pipe objects are not properly handled yet, and attempting to perform
asynchronous I/O on them will produce an error.
When used with WaitForIO, pipes will incorrectly be treated as fast
channels, which may cause undesired blocking.
These issues will be fixed in an upcoming version of Magma.
We show an implementation of the manager/worker model that uses
asynchronous I/O and object transmission, greatly simplifying the I/O loop.
The workload chosen will be computing hailstone sequences.
Given an integer n, we iterate the Collatz map until we reach 1;
the number of iterations taken and the maximum intermediate value will be
returned to the manager.
For the sake of simplicity we assume that 1 will always be reached.
The worker side of the computation involves a work function to actually
do the work and a simple wrapper around it that interacts with the manager.
The work function will take whatever object the manager sends, interpret it,
do the computation, and return a single object as the result.
Since we have multiple values to return, we combine them into a single
tuple.
A small random time delay is inserted in each computation for the purposes
of this example so that the effects of multiple workers can be seen;
real workloads would not include such a delay.
> collatz := function(n)
> orign := n;
> niters := 0;
> maxval := n;
> while n gt 1 do
> n := IsEven(n) select n div 2 else 3*n + 1;
> niters +:= 1;
> maxval := Max(maxval, n);
> end while;
> // delay by 100, 200, or 300 milliseconds:
> _ := WaitForIO([] : TimeLimit := 100*Random(1,3));
> return <orign, niters, maxval>;
> end function;
The wrapper for the worker takes a host and port to contact the manager on,
and is a simple work-processing loop thereafter.
It uses synchronous I/O calls throughout, as the only I/O it could be
waiting for involves the server.
> worker := procedure(host, port)
> S := Socket(host, port);
> ExchangeVersions(S);
> while true do
> ok, data := ReadObjectCheck(S);
> if not ok then break; end if;
> result := collatz(data);
> WriteObject(S, result);
> end while;
> end procedure;
First we create the communication channel and then immediately exchange
versions with the server.
This ensures that the program will work even if the manager and workers
are using different versions of Magma.
Then we loop on the work, despatching it and returning the results;
using object I/O avoids any concerns about incomplete I/O operations.
We use
ReadObjectCheck in order to gracefully exit on EOF.
Now for the manager side of things.
The manager will use asynchronous I/O calls in order to remain responsive.
For the purposes of this example we will print output about the workers,
so we start with a simple function to produce an identifier for each
communication channel.
> channelid := func<I | t where _,t := SocketInformation(I)>;
The manager takes a sequence of work items as its argument (in this
example, the work items are simply integers).
It would also be reasonable for it to take the host and port information
that it should listen on (since the workers will need to know this),
but we instead let the system select those values and print them out.
> manager := procedure(workleft)
> server := Socket();
> host, port := Explode(SocketInformation(server));
> printf "Manager listening on port %o of %o n", port, host;
> busy := ;
> while #workleft gt 0 or #busy gt 0 do
> ready := WaitForIO([server] cat Setseq(busy));
> idle := []; // workers awaiting work
> for I in ready do
> if I eq server then // new worker has connected
> W := WaitForConnection(I);
> printf "New worker %o joined n", channelid(W);
> AsyncExchangeVersions(W);
> Append(~idle, W);
> else
> result := ReadObject(I);
> n, count, maxv := Explode(result);
> printf "%o: %o reached 1 (max %o) after %o iterations n",
> channelid(I), n, maxv, count;
> Exclude(~busy, I);
> Append(~idle, I);
> end if;
> end for;
> for W in idle do
> if #workleft eq 0 then break; end if;
> data := workleft[1];
> Remove(~workleft, 1);
> AsyncWriteObject(W, data);
> AsyncReadObject(W);
> Include(~busy, W);
> end for;
> end while;
> end procedure;
First the manager creates the server socket for workers to connect to; it
prints out the associated information so that we know what arguments to give
to start the workers with.
During the computation, the set busy will keep track of those
channels associated with workers that currently have work assigned to them,
and the sequence idle will track those workers which are waiting to
have work assigned to them.
The manager is done only when all work has been assigned, and
all assigned work has been completed.
We wait for new workers to connect (on the server socket) and for completed
work to be reported by the busy workers.
A subtle point in the above is that we used the version of WaitForIO
that only waits for read events.
This is the typical case --- the manager queues both the write and the read,
and the worker will only send results after having read the data.
Thus only the completion of the read event is of interest to the manager.
After a new worker joins then version exchange is queued immediately,
since we will be sending and receiving objects.
The worker is then added to the idle list.
When a worker has completed its task and the result is ready, we perform
the synchronous ReadObject to get this result.
We then mark the worker as idle (and no longer busy).
Finally, for each idle worker we attempt to (asynchronously) assign
remaining work to it.
We queue both the write and the following read, and mark the worker as busy.
We now demonstrate the code in action with two workers.
First we start the manager in order to get the channel information for
the workers to use.
> // manager
> manager([1..10]);
Manager listening on port 46119 of 0.0.0.0
Then we launch the workers (in this case, on the same machine).
> // worker 1
> worker("localhost", 46119);
> // worker 2
> worker("localhost", 46119);
And the manager displays the progress of the computation for us.
Note that without the introduced delay the first worker would probably
have performed all the computation before we could launch the second one.
New worker <"127.0.0.1", 43352> joined
<"127.0.0.1", 43352>: 1 reached 1 (max 1) after 0 iterations
New worker <"127.0.0.1", 43354> joined
<"127.0.0.1", 43354>: 3 reached 1 (max 16) after 7 iterations
<"127.0.0.1", 43352>: 2 reached 1 (max 2) after 1 iterations
<"127.0.0.1", 43352>: 5 reached 1 (max 16) after 5 iterations
<"127.0.0.1", 43354>: 4 reached 1 (max 4) after 2 iterations
<"127.0.0.1", 43354>: 7 reached 1 (max 52) after 16 iterations
<"127.0.0.1", 43352>: 6 reached 1 (max 16) after 8 iterations
<"127.0.0.1", 43354>: 8 reached 1 (max 8) after 3 iterations
<"127.0.0.1", 43352>: 9 reached 1 (max 52) after 19 iterations
<"127.0.0.1", 43354>: 10 reached 1 (max 16) after 6 iterations
[Next][Prev] [Right] [Left] [Up] [Index] [Root]