Computer Science Honours 2011

Distributed & Parallel Processing
Practical Two Solution

Implement a bounded-buffer producer-consumer using JCSP's message passing.

The final version of the solution is shown below, with all the intermediate stages incorporated.

The first class shown here is the producer.  This is fairly straightforward, generating 100 random numbers then sending -1 to indicate that the producer is finished.

import jcsp.lang.*;

/** Producer class: produces 100 random integers and sends them on
  * output channel, then sends -1 and terminates.
  * The random integers are in a given range [start...start+100)
  */
public class Producer2 implements CSProcess
  { private One2OneChannelInt channel;
    private int start;

    public Producer2 (final One2OneChannelInt out, int start)
      { channel = out;
        this.start = start;
      } // constructor

    public void run ()
      { int item;
        for (int k = 0; k < 100; k++)
          { item = (int)(Math.random()*100)+1+start;
            channel.write(item);
          } // for
        channel.write(-1);
        System.out.println("Producer" + start + " ended.");
      } // run

  } // class Producer2

The consumer is a little more complicated.  This needs to signal to the buffer process that it is ready to read an item.  This is due to the fact that the JCSP "alternative" mechanism only works with "input" channels — there is no direct way of telling whether an output channel is ready to be read.  The consumer "signals" the buffer that it is ready-to-read by writing to a second channel, called req in the class below.  The buffer process then uses this "request" channel with a guarded alternative construct in order to tell when the consumers are ready to read/consume more data.

import jcsp.lang.*;

/** Consumer class: reads ints from input channel, displays them, then
  * terminates when a negative value is read.
  */
public class Consumer2 implements CSProcess
  { private One2OneChannelInt in;
    private One2OneChannelInt req;

    public Consumer2 (final One2OneChannelInt req, final One2OneChannelInt in)
      { this.req = req;
        this.in = in;
      } // constructor

    public void run ()
      {  int item;
         while (true)
           { req.write(0); // Request data - blocks until data is available
             item = in.read();
             if (item < 0)
               break;
             System.out.println(item);
           } // for
         System.out.println("Consumer ended.");
      } // run

  } // class Consumer2

The buffer is the most complex part of the system.  Note how it uses an Alternative construct with the input channels (from the two producers, and the "request" channels from the two consumers) to respond to whichever of the four processes is ready (assuming that space and/or data is available).

Note too how the "clean termination" is handled, by counting down the number of negative values read from the producers and written to the consumers.

There are a number of other possible solutions to this, and over the years some people have come up with some very ingenious methods using the "select" methods of the Alternative class which take a boolean array of preconditions.

Note that using the fairSelect() method with a Skip is somewhat problematic, as it ensures that the skip option gets a fair chance (it is always "ready") even when a producer or consumer may be ready.  This is not optimally efficient.  Likewise, using a priSelect is not a good idea here, as it will prioritise either the producers or the consumers, depending on how you have ordered your guard array.

import jcsp.lang.*;

/** Buffer class: Manages communication between Producer2
  * and Consumer2 classes.
  */
public class Buffer implements CSProcess
  { private One2OneChannelInt[] in; // Input from Producer
    private One2OneChannelInt[] req; // Request for data from Consumer
    private One2OneChannelInt[] out; // Output to Consumer
    // The buffer itself
    private int[] buffer = new int[10];
    // Subscripts for buffer
    int hd = -1;
    int tl = -1;

    public Buffer (final One2OneChannelInt[] in, final One2OneChannelInt[] req, final One2OneChannelInt[] out)
      { this.in = in;
        this.req = req;
        this.out = out;
      } // constructor

    public void run ()
      { final Guard[] guards = { in[0], in[1], req[0], req[1] };
        final Alternative alt = new Alternative(guards);
        int countdown = 4; // Number of processes running
        while (countdown > 0)
          { int index = alt.select();
            switch (index)
              { case 0:
                case 1: // A Producer is ready to send
                    if (hd < tl + 11) // Space available
                      { int item = in[index].read();
                        if (item < 0)
                          countdown--;
                        else
                          { hd++;
                            buffer[hd%buffer.length] = item;
                          }
                      }
                    break;
                case 2:
                case 3: // A Consumer is ready to read
                    if (tl < hd) // Item(s) available
                      { req[index-2].read(); // Read and discard request
                        tl++;
                        int item = buffer[tl%buffer.length];
                        out[index-2].write(item);
                      }
                    else if (countdown <= 2) // Signal consumer to end
                      { req[index-2].read(); // Read and discard request
                        out[index-2].write(-1); // Signal end
                        countdown--;
                      }
                    break;
              } // switch
          } // while
        System.out.println("Buffer ended.");
      } // run

  } // class Buffer

Finally, the main program is relatively straightforward, simply needing to create the necessary processes and channels and connect them all together appropriately.

import jcsp.lang.*;

/** Main program class for Producer/Consumer example.
  * Sets up channels, creates processes then
  * executes them in parallel, using JCSP.
  */
public final class PCMain2
  {
    public static void main (String[] args)
      { new PCMain2();
      } // main

    public PCMain2 ()
      { // Create channel objects
        final One2OneChannelInt[] prodChan = { new One2OneChannelInt(), new One2OneChannelInt() }; // Producers
        final One2OneChannelInt[] consReq = { new One2OneChannelInt(), new One2OneChannelInt() }; // Consumer requests
        final One2OneChannelInt[] consChan = { new One2OneChannelInt(), new One2OneChannelInt() }; // Consumer data

        // Create parallel construct
        CSProcess[] procList = { new Producer2(prodChan[0], 0),
                                 new Producer2(prodChan[1], 100),
                                 new Buffer(prodChan, consReq, consChan),
                                 new Consumer2(consReq[0], consChan[0]),
                                 new Consumer2(consReq[1], consChan[1]) }; // Processes
        Parallel par = new Parallel(procList); // PAR construct
        par.run(); // Execute processes in parallel
      } // PCMain constructor

  } // class PCMain2

Diagrammatically this looks like the following.


George Wells, G.Wells@ru.ac.za