|
The low-level features necessary to perform distributed parallelism in
Magma are sockets and I/O multiplexing; these have been in place for a
considerable while. However, using them effectively was often challenging
and fiddly, especially when trying to be robust in the face of network
errors. Amongst other issues, communicating anything but the simplest
data meant needing to take into account the possibilities of
fragmentation of that data and managing the reassembly explicitly.
Previously two key features for simplifying distributed communication have
been introduced: object transmission and
asynchronous I/O. Together, these remove most or all of the complexity
in the communication component of a distributed parallel computation.
The range of objects that support transmission in this way continues
to be expanded as part of our ongoing efforts.
The next step up is to simplify the process of implementing distributed
parallel computations themselves. There are several ways this might be
done; for the V2.26 release the concentration is on the manager/worker
model, with the goal to make implementing such a system as easy as
possible. Other models are under consideration, but are left for
the future.
In this model of distributed parallelism, there is one special process
designated the manager, and one or more other processes
designated workers.
The manager has a list of data defining work that needs to be done.
Workers contact the manager to register their availability to do work,
then settle into a cycle of performing tasks received from the manager
and reporting the results.
The manager allocates tasks to workers and collates the results, then
returns them to the user when the computation is finished.
It also needs to manage the pool of workers as a whole, handling the
addition and removal of workers from this pool as necessary.
We focus on a particular way of using this model. In this approach, a
contact socket is set up for the manager, and a list defining work tasks
is given to it. Then workers are started on various machines and told
to contact the manager (using the known socket information). Once all
of the work has been completed the workers exit and the manager reports
the results.
This particular approach is well-suited to the situation where the
entire computation is distributed, with a clearly defined set of work that
is known up-front. For instance, if one were to want to compute some
data for each of a large number of objects and then save the results
to a file to later be integrated into a database --- this would naturally
be handled by setting up a manager with the list of those objects and
workers that know how to compute that data for one of the objects.
We have found this approach to be a good fit for many of our needs,
and the implementation of the manager/worker model to be a powerful
tool for this. However, it is not always the best approach to take;
consequently, we also show details of an implementation so that it can
be adjusted to meet different circumstances.
At the simplest level, the interface consists of two intrinsics:
DistributedManager and DistributedWorker.
A user wishing to use distributed parallelism first creates a
socket for the manager to use and then calls DistributedManager.
Then workers are started on the desired machines, with each worker calling
DistributedWorker.
The manager will allocate tasks to each worker and collect the results.
When all tasks are complete the DistributedManager
returns the collated results, and each call to
DistributedWorker exits.
DistributedManager(socket, items : parameters) : IO, SeqEnum ->
DistributedManager(socket, items : parameters) : IO, List ->
DistributedManager(socket, items : parameters) : IO, Process ->
The arguments for DistributedManager are a previously-created
server socket for communication purposes and a sequence (or list) of work
items. Each work item will be sent to a worker. The return value
is the sequence of results computed by the workers, in the order of
the corresponding work items.
There is also a version that takes a Process for cases where
it is not desirable to generate all inputs at once.
The parameters are explained later in the sections on Alternative Results
Handling (Alternative Results Handling) and Task Group Management (Task Group Management).
DistributedWorker(host, port, work_function) : MonStgElt, RngIntElt, UserProgram ->
The arguments to DistributedWorker are the host and port
information for the manager's socket, and a function that takes a work
item as input and returns the appropriate result.
For example purposes we will use a setup with four machines, named
circle, triangle, square, and star. The manager will run on circle,
while one worker will run on each of triangle, square, and star.
We will use port 10000 on circle for communication.
The workload in question will be some very basic information about
the Collatz map.
This map acts on a positive integer n to produce another positive
integer, as follows: If n is even, then the result is n/2; if
n is odd, then the result is 3n + 1.
It is widely believed that repeated iteration of the Collatz map
will always eventually produce the number 1, regardless of the
original starting number, but a proof has yet to be found.
Workers will be given a number n and return a tuple containing the
original value of n, the number of iterations of the Collatz map
required to reach 1, and the largest value that was encountered in the
process.
The manager will compute this data for each of the first 10 integers.
The worker code is the following, which we put in a file
"collatz_worker.m"
to make it easier to spawn workers. We will use this worker code
unchanged in the basic examples.
host := "circle";
port := 10000;
procedure sleep_ms(ms) // "sleeps" for the given number of milliseconds
_ := WaitForIO([] : TimeLimit := ms);
end procedure;
collatz := func<n | IsEven(n) select n div 2 else 3*n + 1>;
function collatz_info(n)
k := n;
niters := 0;
maxval := k;
while k gt 1 do
k := collatz(k);
niters +:= 1;
maxval := Max(maxval, k);
end while;
sleep_ms(100*Random(1, 3)); // random delay for example purposes only
return <n, niters, maxval>;
end function;
DistributedWorker(host, port, collatz_info);
quit;
The implementation is straightforward except for the call to
sleep_ms; that line causes the work function to delay
for one of 100ms, 200ms, or 300ms. This is done purely for example
purposes as the workload is so small that the first worker would
likely accept and compute the data for all tasks before the second
worker could even be started. Obviously actual workloads should not
have such a delay.
The manager session (from a Magma launched on circle) might look
like this:
> socket := Socket(: LocalHost := "circle", LocalPort := 10000);
> DistributedManager(socket, [1..10]);
Now this Magma is waiting for workers. We start them on the other
machines with the following command:
magma collatz_worker.m
A short while after doing the above, the workers on each machine exit
and the call to DistributedManager returns:
[ <1, 0, 1>, <2, 1, 2>, <3, 7, 16>, <4, 2, 4>, <5, 5, 16>,
<6, 8, 16>, <7, 16, 52>, <8, 3, 8>, <9, 19, 52>, <10, 6, 16> ]
Two verbose flags are available for the manager's use. These provide
a little information about the workers, and can be useful in order
to track down issues, to reassure yourself that things are actually
happening, or to log partial progress in case of a crash.
Setting this verbose flag to 1 will cause a message to be printed
each time a worker joins or leaves. This message includes the
worker's local host and port.
Setting this verbose flag to 2 will additionally cause a message to
be printed each time a worker is assigned a task or reports the
results of an assigned task. The task data (and result) will also
be shown.
Setting this verbose flag to 1 will add timestamps to the other
verbose output.
We run the basic example again, but with verbose output on.
> SetVerbose("ManagerWorker", 2);
> socket := Socket(: LocalHost := "circle", LocalPort := 10000);
> DistributedManager(socket, [1..10]);
New worker joined from triangle:14342
Worker triangle:14342 assigned task 1
Worker triangle:14342 completed task 1 with result <1, 0, 1>
Group 1 complete, terminating 0 other workers
Worker triangle:14342 assigned task 2
New worker joined from square:12711
Worker square:12711 assigned task 3
Worker triangle:14342 completed task 2 with result <2, 1, 2>
Group 2 complete, terminating 0 other workers
Worker triangle:14342 assigned task 4
Worker triangle:14342 completed task 3 with result <3, 7, 16>
Group 3 complete, terminating 0 other workers
Worker square:12711 assigned task 5
Worker triangle:14342 completed task 4 with result <4, 2, 4>
Group 4 complete, terminating 0 other workers
Worker triangle:14342 assigned task 6
New worker joined from star:17338
Worker star:17338 assigned task 7
Worker star:17338 completed task 7 with result <7, 16, 52>
Group 7 complete, terminating 0 other workers
Worker star:17338 assigned task 8
Worker square:12711 completed task 5 with result <5, 5, 16>
Group 5 complete, terminating 0 other workers
Worker square:12711 assigned task 9
Worker triangle:14342 completed task 6 with result <6, 8, 16>
Group 6 complete, terminating 0 other workers
Worker triangle:14342 assigned task 10
Worker square:12711 completed task 9 with result <9, 19, 52>
Group 9 complete, terminating 0 other workers
Worker star:17338 completed task 8 with result <8, 3, 8>
Group 8 complete, terminating 0 other workers
Worker triangle:14342 completed task 10 with result <10, 6, 16>
Group 10 complete, terminating 0 other workers
[ <1, 0, 1>, <2, 1, 2>, <3, 7, 16>, <4, 2, 4>, <5, 5, 16>,
<6, 8, 16>, <7, 16, 52>, <8, 3, 8>, <9, 19, 52>, <10, 6, 16> ]
Here workers are identified by their host and local port number.
The "task n" components are not the task number, but the task data;
in this example they happen to be the same. The meaning of the groups
and worker termination messages is explained later in the section on
group tasks.
The simple approach described above works for many cases, but there will
be other situations where it does not suffice. One common example is
when the results are incompatible elements that cannot be put into the
same sequence. To handle such situations, DistributedManager
has three parameters related to results that may be set.
var initial_results: Any Default: [ ]
var update_results: UserProgram Default:
var process_results: UserProgram Default:
The parameter
initial_results provides the initial value to use for the
results of the routine. If not set, this defaults to the empty
sequence. If it is instead set to the empty list ([* *]) then
the results will be returned in a list rather than a sequence.
The parameter
update_results is a procedure that modifies the results
value to incorporate the result that has been computed for a
particular input. It must have one of two forms:
- procedure(~{results, result, index)}
or
- procedure(~{results, result, index, taskdata)}
The arguments are:
- results is the current value of the results information,
which is initially set to initial_results.
- result is the new result to incorporate into the results.
- index is the index of the item (in the items array passed to
DistributedManager) that this result is for.
- taskdata (in the second form) is the specific task data used
to compute this result. This will be original input item unless task
groups are used (see later).
The parameter
process_results is a procedure that modifies the results
information after it has all been computed and before returning it
to the user. It must have the form:
- procedure(~{results)}
This is completely equivalent to a similar call on the results
after delivery. Its uses are thus limited, but it is occasionally
convenient to do things this way.
These parameters are all optional, except that the update_results
must be provided if the initial_results were set to
something other than the empty sequence or the empty list.
We run the basic example again, but with list results.
> socket := Socket(: LocalHost := "circle", LocalPort := 10000);
> DistributedManager(socket, [1..10] : initial_results := [* *]);
[* <1, 0, 1>, <2, 1, 2>, <3, 7, 16>, <4, 2, 4>, <5, 5, 16>,
<6, 8, 16>, <7, 16, 52>, <8, 3, 8>, <9, 19, 52>, <10, 6, 16> *]
Suppose that, instead of the results for all items, we only wanted the
maximum number encountered (and the associated initial value that led
to it). Then we could change our results item to be a tuple of this
maximum value and input value, as follows:
> initial_results_maxval := < 0, 0 >;
> update_results_maxval := procedure(~results, result, index)
> n, niters, maxval := Explode(result);
> assert index eq n;
> if results[1] lt maxval then
> results := < maxval, n >;
> end if;
> end procedure;
>
> socket := Socket(: LocalHost := "circle", LocalPort := 10000);
> DistributedManager(socket, [1..10] :
> initial_results := initial_results_maxval,
> update_results := update_results_maxval
> );
< 52, 7 >
Of course, for such a small example we could have just examined the
results at the end to compute this, but for more interesting workloads
it may be undesirable to keep large outputs around when only a small
amount of data is wanted.
This time, we will only care about the average number of iterations
taken over all inputs. We thus accumulate the total number of iterations
during the computation, then process the result at the end.
> initial_results_niters := < 0, 0 >;
> update_results_niters := procedure(~results, result, index)
> n, niters, maxval := Explode(result);
> assert index eq n;
> results := < results[1] + niters, results[2] + 1 >;
> end procedure;
> process_results_niters := procedure(~results)
> results := results[1] / results[2];
> end procedure;
>
> socket := Socket(: LocalHost := "circle", LocalPort := 10000);
> DistributedManager(socket, [1..10] :
> initial_results := initial_results_niters,
> update_results := update_results_niters,
> process_results := process_results_niters
> );
67/10
While the options already presented work well for simple situations,
there are a few common patterns that arise sufficiently often that
special handling has been provided for them.
One such pattern is based on trying several different approaches for
an item, and taking whichever result finishes first.
This is particularly useful when you have a set of heuristics about
ways to solve the problem, but also want to cater for the cases where
those heuristics are not correct.
Indeed, it can be a powerful tool to aid in the development of such
heuristics.
A subcategory of this pattern is the ability to try a task with a
small bound, and then retry it with a higher bound if unsuccessful.
Another common approach is to split the work for a particular input
into several components, such as localisations at certain primes,
and then combine all of the factors together to produce the desired
result for that item.
While this could be handled in the existing framework by making separate
items for each local component, it can become less clear to do so.
In order to handle the above usage patterns, and others, the notion
of task groups is introduced. With task groups, each input item can
be split into multiple tasks that are all associated with the original
item. The results of these subtasks can be combined, and once a
satisfactory result is computed any remaining workers still assigned
to the no-longer-necessary tasks of that group are caused to exit.
(This requires a means of restarting such workers, which we describe
below.)
There are three further parameters to DistributedManager
that provide functionality for task groups.
var group_tasks: UserProgram Default:
var update_group: UserProgram Default:
var incomplete_group: UserProgram Default:
The parameter
group_tasks is a function that takes an input item and
returns the initial data defining the associated task group. It
must have the following form:
- function(item) -> group_result, task_data
The first return value is an initial value for the group result (in
a similar way to how the parameter initial_results provides
an initial value for the overall results). The second return value is
a sequence, list, or other iterable aggregate of task data that will
be passed to workers.
The parameter
update_group is a function that will take the result of a
task and use it to adjust the group result (in a similar manner to
how update_results updates the overall results). It must
have the following form:
- function(item, task, task_result, group_result)
- -> group_result, task_data, done
The arguments are:
- item is the original input item with which this task group is
associated.
- task is the data for the specific task within that group that
was just completed.
- task_result is the result of that computation.
- group_result is the current partial result for the task
group.
The first return value is the new value for the group result that
incorporates the result of this task.
The second return value is a sequence, list, etc. of task data to be
used for more tasks.
The third return value is a boolean status indicating whether the
computation for this group is now complete.
The parameter
incomplete_group is a function used to handle groups which
ran out of associated tasks without reaching a definitive conclusion.
i.e., for which the third value returned by update_group was
always false.
It must be a function with the following form:
- function(item, group_result) -> result
The first argument is the original input item that the task group is
associated with, while the second is the partial group result that was
computed for it.
It returns a result value to include in the results (and processed by
the update_results routine, if one is given).
Note that update_group may produce further tasks to try;
this allows first trying a simple approach, and then spawning more
complicated (or time-consuming) approaches if that is unsuccessful.
All of these parameters are optional.
We will treat each input item as a shorthand expressing a range of
one hundred values of interest (item n indicating the range
100n - 99 .. 100n).
For a given range, we return the largest value encountered and the
associated starting value, as in variation one of the basic example.
> group_tasks_hundred := function(n)
> return < 0, <0, 0> >, [ 100*n - 99 .. 100*n ];
> end function;
>
> update_group_maxval := function(item, task, tresult, gresult)
> n, niters, maxval := Explode(tresult);
> gcount, gtuple := Explode(gresult);
> if gtuple[1] lt maxval then
> gtuple := < maxval, n >;
> end if;
> gcount +:= 1;
> done := gcount eq 100;
> gresult := done select gtuple else < gcount, gtuple >;
> return gresult, [], done;
> end function;
>
> socket := Socket(: LocalHost := "circle", LocalPort := 10000);
> DistributedManager(socket, [1..10] :
> group_tasks := group_tasks_hundred,
> update_group := update_group_maxval
> );
[ <9232, 27>, <9232, 103>, <13120, 255>, <13120, 383>, <39364, 447>,
<39364, 511>, <41524, 639>, <250504, 703>, <190996, 871>,
<250504, 937> ]
Note: If running this example, we recommend adjusting the millisecond delay
in the worker function down by using a multiplier of 10 instead of 100.
A key point to note in this example is that our partial group result
includes an extra value (gcount in the code) that is not in the
final result. This is so we can tell when the computation is actually
complete; since some tasks finish earlier than others, we cannot just
check the task data in order to decide whether the group computation is
complete or not.
Of course, we then have to remove it from the returned group result
if the computation is complete. This is done in the second last
line of that function where we conditionally assemble the new group result.
We could have left the count in and removed it with a suitable
update_results routine (or even with process_results),
but in this instance it felt tidier to do it in the group update function.
The original example generated all tasks in the range at the beginning,
and no new ones after that. We demonstrate another alternative by
making each task in the group generate the next one, until the group
is complete.
> group_tasks_hundred := function(n)
> return < 0, 0 >, [ 100*n - 99 ];
> end function;
>
> update_group_maxval := function(item, task, tresult, gresult)
> n, niters, maxval := Explode(tresult);
> assert task eq n;
> if gresult[1] lt maxval then
> gresult := < maxval, n >;
> end if;
> done := IsDivisibleBy(n, 100);
> tasks := done select [] else [ task + 1 ];
> return gresult, tasks, done;
> end function;
>
> socket := Socket(: LocalHost := "circle", LocalPort := 10000);
> DistributedManager(socket, [1..10] :
> group_tasks := group_tasks_hundred,
> update_group := update_group_maxval
> );
[ <9232, 27>, <9232, 103>, <13120, 255>, <13120, 383>, <39364, 447>,
<39364, 511>, <41524, 639>, <250504, 703>, <190996, 871>,
<250504, 937> ]
Here there is only ever one task in the group at a time; when that
task completes it either spawns the next one (by returning the increased
task number in a sequence) or finishes if this was the last task in
the group.
In this scenario we can use the task data instead of needing a
gcount value or similar, because we have guaranteed the order
in which tasks within the group run.
However, note that by using this approach we have also greatly reduced
the amount of parallelism available --- there is at most one task
associated with a group at any time, so if we had more workers than
task groups some of the workers would be idle.
Still, it is sometimes useful to do this; an example might be a search
for "small" points on a curve where the nature of the two-dimensional
search can make this better than an incremental approach.
The result of a group could become known while there are still workers
processing (no longer needed) tasks in that group.
When that happens, it would clearly be desirable if the manager could
tell those workers to stop those tasks and work on new ones instead.
Unfortunately, this is not straightforward; the worker might be deep
in the throes of a long computation that cannot be interrupted, or at
least not without causing the worker process to exit.
Instead, a less sophisticated approach is used: The manager will close
its communication port with such workers, and when they finally try to
report a result they will fail and so exit. Whatever spawned the worker
originally can then detect this abnormal exit and spawn a new worker to
replace it.
A variation of the DistributedWorker intrinsic is provided
for these situations:
MonitoredDistributedWorker(host, port, work_function) : MonStgElt, RngIntElt, UserProgram ->
This procedure behaves the same as
DistributedWorker, except that it expects that it may be
unexpectedly terminated and uses exit code statuses to communicate
with the calling environment.
Additionally, on non-Windows platforms, a monitored worker will notice
when the communication channel closes and exit immediately, instead of
having to finish whatever computation it is working on.
The special exit status of 5 is used to indicate that a worker should
not be respawned.
The recommended procedure for spawning monitored workers, then, is to
have a loop that creates a worker and waits for it to exit. The loop
terminates if the exit status is 5, otherwise it continues.
Using a standard UNIX shell, the following code would do:
while :
do
magma collatz_worker.m
[ ? - eq 5 ] && break
done
while :
do
magma collatzworker.m
[ ? -eq 5 ] && break
done
In this example we shall not require the best answer (the largest
possible value encountered in the range), but simply one that is
"large enough". More precisely, if the upper limit of our range
is 100n then we shall be satisfied with any answer that is at
least 6000n. If there is no such value, we will return false.
> group_tasks_hundred := function(n)
> return < 0, 0 >, [ 100*n - 99 .. 100*n ];
> end function;
>
> update_group_maxval := function(item, task, tresult, gresult)
> n, niters, maxval := Explode(tresult);
> if maxval ge 6000*item then
> return < maxval, n >, [], true;
> else
> return gresult, [], false;
> end if;
> end function;
>
> incomplete_group_maxval := function(item, gresult)
> return false;
> end function;
>
> socket := Socket(: LocalHost := "circle", LocalPort := 10000);
> DistributedManager(socket, [1..10] :
> initial_results := [* *],
> group_tasks := group_tasks_hundred,
> update_group := update_group_maxval,
> incomplete_group := incomplete_group_maxval
> );
[* <9232, 27>, false, false, false, <39364, 447>, <39364, 511>,
false, <250504, 703>, <190996, 871>, <250504, 937> *]
> delete socket; // ensure that workers terminate
Note the use of the incomplete group handling here; in particular, how
using it enables the group updating code to be quite simple. We could
also have used this approach in the original advanced example and thereby
avoided the need to track counts, but for expository purposes chose
not to.
Also note the explicit deletion of the socket at the end --- this ensures
that the workers will terminate promptly. It would be possible to keep
the socket around and re-use it for multiple runs instead, as long as
the workers were doing the same kind of work.
We show a stand-alone implementation of the manager/worker model for a
particular workload.
This will demonstrate the usage of both the asynchronous I/O and object
transmission facilities provided by Magma.
For more complete details about these facilities, see Chapter INPUT AND OUTPUT,
particularly the section on asynchronous I/O
(Section Asynchronous I/O).
Of course, the DistributedManager and
DistributedWorker intrinsics are more convenient to use
(and more powerful) than the example shown here.
However, this example will serve to illustrate the relevant features so
that users can implement alternative models of distributed parallelism
as needed.
The foundational concepts and the associated intrinsics are:
- 1.
- Protocol version negotiation: ExchangeVersions.
- 2.
- Object transmission: ReadObject, WriteObject, and
similar.
These are synchronous operations: The intrinsics return only after
the corresponding I/O operation has completed.
- 3.
- Asynchronous I/O: AsyncReadObject, AsyncWriteObject,
and similar.
These operations are asynchronous: The intrinsics return immediately,
while the corresponding I/O operation may occur later.
NB: For those I/O operations to proceed, the associated channel must be
waited for using WaitForIO.
- 4.
- Management of multiple I/O streams: WaitForIO.
This routine exchanges information about the version of the object
transmission protocol understood by each side of the channel.
This allows newer versions of Magma to still communicate correctly with
older versions, using a commonly understood format.
Each side of the communication channel should call this before performing
any I/O on the channel.
Reads Magma Object Format data from I and returns the corresponding
Magma object.
Writes the Magma object x to I, using the Magma Object Format.
Queues an asynchronous read request of a Magma object to I.
Queues an asynchronous write request of x to I.
TimeLimit: RngIntElt Default: ∞
Given a sequence of I/O objects R, returns the sequence of those elements
of R which are ready to have read operations performed upon them.
If no elements of R are ready then this intrinsic will wait until at
least one does become ready, or until the specified time limit has elapsed,
whichever comes first.
The time limit is measured in milliseconds.
Server sockets will be considered ready for reading when a connection
attempt has been made (and so a call to WaitForConnection will
return without delay).
Other channels will be considered ready for reading if either a
pending (asynchronous) read request has been fulfilled, or there are no
pending read requests and some data is available for reading.
TimeLimit: RngIntElt Default: ∞
Given sequences of I/O objects R and W, returns two sequences:
the sequence of those elements of R which are ready to have read
operations performed upon them, and
the sequence of those elements of W which are ready to have write
operations performed upon them.
If no elements of R or W are ready then this intrinsic will wait until
at least one does become ready, or until the specified time limit has
elapsed, whichever comes first.
The time limit is measured in milliseconds.
The condition for channels to be considered ready for reading are as
described in the previous intrinsic.
A channel will be considered ready for writing if all pending (asynchronous)
write requests have been fulfilled and it would be possible to write some
data without delay.
As in earlier sections, the workload will be based on computing information
about the Collatz map.
Given an integer n, we iterate the Collatz map until we reach 1;
the number of iterations taken and the maximum intermediate value will be
returned to the manager.
For the sake of simplicity we assume that 1 will always be reached.
The worker side of the computation involves a work function to actually
do the work and a simple wrapper around it that interacts with the manager.
The work function will take whatever object the manager sends, interpret it,
do the computation, and return a single object as the result.
Since we have multiple values to return, we combine them into a single
tuple.
A small random time delay is inserted in each computation for the purposes
of this example so that the effects of multiple workers can be seen;
real workloads would not include such a delay.
> collatz := function(n)
> orign := n;
> niters := 0;
> maxval := n;
> while n gt 1 do
> n := IsEven(n) select n div 2 else 3*n + 1;
> niters +:= 1;
> maxval := Max(maxval, n);
> end while;
> // delay by 100, 200, or 300 milliseconds:
> _ := WaitForIO([] : TimeLimit := 100*Random(1,3));
> return <orign, niters, maxval>;
> end function;
The wrapper for the worker takes a host and port to contact the manager on,
and is a simple work-processing loop thereafter.
It uses synchronous I/O calls throughout, as the only I/O it could be
waiting for involves the server.
> worker := procedure(host, port)
> S := Socket(host, port);
> ExchangeVersions(S);
> while true do
> ok, data := ReadObjectCheck(S);
> if not ok then break; end if;
> result := collatz(data);
> WriteObject(S, result);
> end while;
> end procedure;
First we create the communication channel and then immediately exchange
versions with the server.
This ensures that the program will work even if the manager and workers
are using different versions of Magma.
Then we loop on the work, despatching it and returning the results;
using object I/O avoids any concerns about incomplete I/O operations.
We use ReadObjectCheck in order to gracefully exit on EOF, which
we will receive when the server has no more work to allocate.
Now for the manager side of things.
The manager will use asynchronous I/O calls in order to remain responsive.
For the purposes of this example we will print output about the workers,
so we start with a simple function to produce an identifier for each
communication channel.
> channelid := func<I | t where _,t := SocketInformation(I)>;
The manager takes a sequence of work items as its argument (in this
example, the work items are simply integers).
It would also be reasonable for it to take the host and port information
that it should listen on (since the workers will need to know this),
but we instead let the system select those values and print them out.
> manager := procedure(workleft)
> server := Socket();
> host, port := Explode(SocketInformation(server));
> printf "Manager listening on port %o of %o n", port, host;
> busy := ;
> while #workleft gt 0 or #busy gt 0 do
> ready := WaitForIO([server] cat Setseq(busy));
> idle := [ ]; // workers awaiting work
> for I in ready do
> if I eq server then // new worker has connected
> W := WaitForConnection(I);
> printf "New worker %o joined n", channelid(W);
> AsyncExchangeVersions(W);
> Append(~idle, W);
> else
> result := ReadObject(I);
> n, count, maxv := Explode(result);
> printf "%o: %o reached 1 (max %o) after %o iterations n",
> channelid(I), n, maxv, count;
> Exclude(~busy, I);
> Append(~idle, I);
> end if;
> end for;
> for W in idle do
> if #workleft eq 0 then break; end if;
> data := workleft[1];
> Remove(~workleft, 1);
> AsyncWriteObject(W, data);
> AsyncReadObject(W);
> Include(~busy, W);
> end for;
> end while;
> end procedure;
First the manager creates the server socket for workers to connect to; it
prints out the associated information so that we know what arguments to give
when we start the workers.
During the computation, the set busy will keep track of those
channels associated with workers that currently have work assigned to them,
and the sequence idle will track those workers which are waiting to
have work assigned to them.
The manager is done only when all work has been assigned, and
all assigned work has been completed.
We wait for new workers to connect (on the server socket) and for completed
work to be reported by the busy workers.
A subtle point in the above is that we used the version of WaitForIO
that only waits for read events.
This is the typical case --- the manager queues both the write and the read,
and the worker will only send results after having read the data.
Thus only the completion of the read event is of interest to the manager.
After a new worker joins then version exchange is queued immediately,
since we will be sending and receiving objects.
Note the use of the asynchronous AsyncExchangeVersions
for this, in order to keep the manager responsive.
The worker is then added to the idle list.
When a worker has completed its task and the result is ready, we perform
the synchronous ReadObject to get this result.
This is safe (that is, will not delay arbitrarily) since the manager
earlier issued the corresponding asynchronous request AsyncReadObject
and the channel becoming ready for reading indicates that this request has
completed.
We then mark the worker as idle (and no longer busy).
Finally, for each idle worker we attempt to (asynchronously) assign
remaining work to it.
We queue both the write and the following read, and mark the worker as busy.
We now demonstrate the code in action with two workers.
First we start the manager in order to get the channel information for
the workers to use.
> // manager
> manager([1..10]);
Manager listening on port 46119 of 0.0.0.0
Then we launch the workers (in this case, on the same machine).
> // worker 1
> worker("localhost", 46119);
> // worker 2
> worker("localhost", 46119);
And the manager displays the progress of the computation for us.
Note that without the introduced delay the first worker would probably
have performed all the computation before we could launch the second one.
New worker <"127.0.0.1", 43352> joined
<"127.0.0.1", 43352>: 1 reached 1 (max 1) after 0 iterations
New worker <"127.0.0.1", 43354> joined
<"127.0.0.1", 43354>: 3 reached 1 (max 16) after 7 iterations
<"127.0.0.1", 43352>: 2 reached 1 (max 2) after 1 iterations
<"127.0.0.1", 43352>: 5 reached 1 (max 16) after 5 iterations
<"127.0.0.1", 43354>: 4 reached 1 (max 4) after 2 iterations
<"127.0.0.1", 43354>: 7 reached 1 (max 52) after 16 iterations
<"127.0.0.1", 43352>: 6 reached 1 (max 16) after 8 iterations
<"127.0.0.1", 43354>: 8 reached 1 (max 8) after 3 iterations
<"127.0.0.1", 43352>: 9 reached 1 (max 52) after 19 iterations
<"127.0.0.1", 43354>: 10 reached 1 (max 16) after 6 iterations
A manager cannot tell which user created the worker that is connecting to it.
(Indeed, the very concept is not well-defined if the worker is running
on another machine.)
Thus it is possible for someone on another machine on the same network
to connect to "your" manager, whether by accident or otherwise, and
this cannot be directly distinguished from an intended connection from
a worker under your control.
Problems may then arise if this worker does not interact in the expected
way with the manager.
If you are reasonably sure that no-one else with access to your machine
or its network will act in such a way, then you can simply use the
manager/worker functions as described above.
If your machine is accessible to a wider network, however, then
some kind of authentication process is probably desirable.
A simple method is to require that client connections first send some
data known only to the server, and have the server reject connections
which do not do so.
Unfortunately, such behaviour is not currently integrated with the
manager/worker routines described here, and so they cannot be used
when authentication is a requirement.
The code in Section Implementing Distributed Parallelism can be used as a base
for re-implementing manager/worker functionality with authentication
added.
A more robust and convenient version of authentication is being worked
upon, and will appear in a later patch release.
[Next][Prev] [Right] [Left] [Up] [Index] [Root]
|