Implement a bounded-buffer producer-consumer using JCSP's message passing.
The final version of the solution is shown below, with all the intermediate stages incorporated.
The first class shown here is the producer. This is fairly straightforward, generating 100 random numbers
then sending -1
to indicate that the producer is finished.
import jcsp.lang.*; /** Producer class: produces 100 random integers and sends them on * output channel, then sends -1 and terminates. * The random integers are in a given range [start...start+100) */ public class Producer2 implements CSProcess { private One2OneChannelInt channel; private int start; public Producer2 (final One2OneChannelInt out, int start) { channel = out; this.start = start; } // constructor public void run () { int item; for (int k = 0; k < 100; k++) { item = (int)(Math.random()*100)+1+start; channel.write(item); } // for channel.write(-1); System.out.println("Producer" + start + " ended."); } // run } // class Producer2
The consumer is a little more complicated. This needs to signal to the buffer process that it
is ready to read an item. This is due to the fact that the JCSP "alternative" mechanism only works
with "input" channels — there is no direct way of telling whether an output channel is ready to be read.
The consumer "signals" the buffer that it is ready-to-read by writing to a second channel,
called req
in the class below. The buffer process then uses this "request" channel with a guarded
alternative construct in order to tell when the consumers are ready to read/consume more data.
import jcsp.lang.*; /** Consumer class: reads ints from input channel, displays them, then * terminates when a negative value is read. */ public class Consumer2 implements CSProcess { private One2OneChannelInt in; private One2OneChannelInt req; public Consumer2 (final One2OneChannelInt req, final One2OneChannelInt in) { this.req = req; this.in = in; } // constructor public void run () { int item; while (true) { req.write(0); // Request data - blocks until data is available item = in.read(); if (item < 0) break; System.out.println(item); } // for System.out.println("Consumer ended."); } // run } // class Consumer2
The buffer is the most complex part of the system. Note how it uses an Alternative
construct with the input channels (from the two producers, and the "request" channels from the two consumers)
to respond to whichever of the four processes is ready (assuming that space and/or data is available).
Note too how the "clean termination" is handled, by counting down the number of negative values read from the producers and written to the consumers.
There are a number of other possible solutions to this, and over the years some people have come up
with some
very ingenious methods using the "select" methods of the Alternative
class which
take a boolean array of preconditions.
Note that using the fairSelect()
method with a Skip
is somewhat
problematic, as it ensures that the skip option gets a fair chance (it is always "ready") even
when a producer or consumer may be ready. This is not optimally efficient. Likewise,
using a priSelect
is not a good idea here, as it will prioritise either the producers
or the consumers, depending on how you have ordered your guard array.
import jcsp.lang.*; /** Buffer class: Manages communication between Producer2 * and Consumer2 classes. */ public class Buffer implements CSProcess { private One2OneChannelInt[] in; // Input from Producer private One2OneChannelInt[] req; // Request for data from Consumer private One2OneChannelInt[] out; // Output to Consumer // The buffer itself private int[] buffer = new int[10]; // Subscripts for buffer int hd = -1; int tl = -1; public Buffer (final One2OneChannelInt[] in, final One2OneChannelInt[] req, final One2OneChannelInt[] out) { this.in = in; this.req = req; this.out = out; } // constructor public void run () { final Guard[] guards = { in[0], in[1], req[0], req[1] }; final Alternative alt = new Alternative(guards); int countdown = 4; // Number of processes running while (countdown > 0) { int index = alt.select(); switch (index) { case 0: case 1: // A Producer is ready to send if (hd < tl + 11) // Space available { int item = in[index].read(); if (item < 0) countdown--; else { hd++; buffer[hd%buffer.length] = item; } } break; case 2: case 3: // A Consumer is ready to read if (tl < hd) // Item(s) available { req[index-2].read(); // Read and discard request tl++; int item = buffer[tl%buffer.length]; out[index-2].write(item); } else if (countdown <= 2) // Signal consumer to end { req[index-2].read(); // Read and discard request out[index-2].write(-1); // Signal end countdown--; } break; } // switch } // while System.out.println("Buffer ended."); } // run } // class Buffer
Finally, the main program is relatively straightforward, simply needing to create the necessary processes and channels and connect them all together appropriately.
import jcsp.lang.*; /** Main program class for Producer/Consumer example. * Sets up channels, creates processes then * executes them in parallel, using JCSP. */ public final class PCMain2 { public static void main (String[] args) { new PCMain2(); } // main public PCMain2 () { // Create channel objects final One2OneChannelInt[] prodChan = { new One2OneChannelInt(), new One2OneChannelInt() }; // Producers final One2OneChannelInt[] consReq = { new One2OneChannelInt(), new One2OneChannelInt() }; // Consumer requests final One2OneChannelInt[] consChan = { new One2OneChannelInt(), new One2OneChannelInt() }; // Consumer data // Create parallel construct CSProcess[] procList = { new Producer2(prodChan[0], 0), new Producer2(prodChan[1], 100), new Buffer(prodChan, consReq, consChan), new Consumer2(consReq[0], consChan[0]), new Consumer2(consReq[1], consChan[1]) }; // Processes Parallel par = new Parallel(procList); // PAR construct par.run(); // Execute processes in parallel } // PCMain constructor } // class PCMain2
Diagrammatically this looks like the following.