|
Some care is needed when using multiple communication channels at once
in order to maintain responsiveness, since any individual I/O operation
might take arbitrarily long to complete.
(For instance, reading from a channel will not complete until the other
end of the channel has written the requisite data.)
This is particularly important when implementing distributed parallelism,
as processes which are waiting for I/O to occur are not doing useful work.
In particular, consider the manager/worker model of parallelism, where
a manager process assigns pieces of work to individual worker processes,
collects the results, and assigns new work, repeating until all of the
desired work has been completed.
If the manager waits for the results from a particular worker (i.e.,
performs a read operation on the associated channel) then they will do
nothing else until that worker has finished their task.
Other workers who finish their tasks in the meantime will not be given
more work to do until the first worker is finished, potentially causing
them to wait for a very long time doing nothing.
One solution to this problem is for the manager not to wait unconditionally
on a specific worker, but instead to wait until at least one of the set of
workers has results to report, and then process those.
In this way the manager never spends time waiting for one worker while
another worker has results to report, and thus workers never spend
significant time waiting for the manager to receive their results and
give them new work to do.
Reinterpreting this approach in terms of I/O channels, the above indicates
that we want a way to wait until at least one of a set of channels is
ready for I/O to be performed on it.
(By "ready" we mean that an I/O operation of the appropriate type
will make at least some progress on the channel without further waiting.)
This is accomplished with the WaitForIO intrinsic, as described
below.
Note that, in principle, I/O operations may only partially complete before
waiting becomes necessary.
(For instance, network congestion may delay some network packets.)
Thus some delays might still arise by using this approach with I/O
operations that cannot partially complete (all writes; reads where the
byte count is specified; reads of objects).
In most cases this is rarely a serious issue, but a more robust approach
is possible by using asynchronous I/O, as described in the next section.
TimeLimit: RngIntElt Default: ∞
Given a sequence of I/O objects R, returns the sequence of those elements
of R which are ready to have read operations performed upon them.
If no elements of R are ready then this intrinsic will wait until at
least one does become ready, or until the specified time limit has elapsed,
whichever comes first.
The time limit is measured in milliseconds.
Server sockets will be considered ready for reading when a connection
attempt has been made and so a call to WaitForConnection will
return without delay.
Other channels will be considered ready for reading if either a
pending read request has been fulfilled, or there are no
pending read requests and some data is available for reading.
TimeLimit: RngIntElt Default: ∞
Given sequences of I/O objects R and W, returns two sequences:
the sequence of those elements of R which are ready to have read
operations performed upon them, and
the sequence of those elements of W which are ready to have write
operations performed upon them.
If no elements of R or W are ready then this intrinsic will wait until
at least one does become ready, or until the specified time limit has
elapsed, whichever comes first.
The time limit is measured in milliseconds.
The condition for channels to be considered ready for reading are as
described in the previous intrinsic.
Channels will be considered ready for writing if all pending write
requests have been fulfilled and it would be possible to write some
data without delay.
We demonstrate the use of WaitForIO by writing an extremely simple
"chat" server. This will accept connections and read messages from
connected channels, forwarding those messages to all connected channels.
A simple ID scheme based on port numbers is implemented in order to
distinguish senders.
> chatserver := procedure(host, port)
> server := Socket(: LocalHost := host, LocalPort := port);
> chatters := ;
> ids := AssociativeArray();
> while true do
> text := "";
> ready := WaitForIO([server] cat Setseq(chatters));
> for I in ready do
> if I eq server then // new chatter
> C := WaitForConnection(server);
> id := Sprintf("[%o]", r[2]) where _,r := SocketInformation(I);
> text cat:= Sprintf("%o has joined n", id);
> ids[C] := id;
> Include(~chatters, C);
> else
> ok, msg := ReadCheck(I);
> if not ok or IsEof(msg) then // error or EOF, assume they left
> text cat:= Sprintf("%o has left n", ids[I]);
> Remove(~ids, I);
> Exclude(~chatters, I);
> else // all is well; add message to text to send
> text cat:= Sprintf("%o: %o", ids[I], msg);
> end if;
> end if;
> end for;
> if #text gt 0 then
> for I in chatters do
> Write(I, text);
> end for;
> end if;
> end while;
> end procedure;
> chatserver("localhost", 7000);
After starting the chatserver; we test it (not shown here) using the UNIX
telnet program to connect to the given host and port.
As desired, messages sent by connected telnet instances are seen by all
of the connected telnet instances.
Exercise:
Modify the chat server to allow a client to change its reported ID by
sending an appropriate message.
Suggested syntax: /name new-name.
Be careful with newlines.
Further exercise: Disallow changing ID to an already-used ID.
If that is attempted, send an error message to the relevant client only.
[Next][Prev] [Right] [Left] [Up] [Index] [Root]
|