/** * A class which functions as a bounded buffer and all * related operations. */ public class BoundBuffer { /** The buffer that contains all information of messages in it. */ private int[][] buffer; /** size of the buffer. */ private int numSlot; /** The number of consumers, used as a isRead flag for each consumer. */ private int numConsumers; /** * Constructor: creat and initialize the buffer. */ public BoundBuffer(int s, int c) { numSlot = s; numConsumers = c; buffer = new int[s][c+1]; for (int i = 0; i < s; i++) { for (int j = 0; j < c; j++) buffer[i][j] = 1; buffer[i][c] = -1; } } /** * Add a message (including its id) to slot s and * reinitialize that slot. */ public void addMessage(int s, int id) { for (int i = 0; i < numConsumers; i++) buffer[s][i] = 1; buffer[s][numConsumers] = id; //notifyAll(); } /** * Return an available slot, or -1 if no slot available. */ public int findSlot() { int slot = -1; for (int j = 0; j < numSlot; j++) { if (isAvail(j)) { slot = j; break; } } //notifyAll(); return slot; } /** * Check if this message has been read by consumer c. */ public synchronized boolean isRead(int s, int c) { boolean b = false; if (buffer[s][c] == 0) b = true; notifyAll(); return b; } /** * Read/consume the message. */ public void readMsg(int s, int id) { buffer[s][id] = 0; } /** * Get the id of the message. */ public int getID(int s) { int i = buffer[s][numConsumers]; //notifyAll(); return i; } /** * Check if slot s is available for new message to be put in. */ public boolean isAvail(int s) { boolean b = false; if (buffer[s][numConsumers] == -1) b = true; else { for (int i = 0; i < numConsumers; i++) { if (buffer[s][i] == 1) b = false; else b = true; } } //notifyAll(); return b; } } import java.util.*; /** * A child class of Thread, to produce and send messages to the * pool when it is its turn and there is a slot in the pool. */ public class Producer extends Thread { /** The average message generation time. */ private int avgT; /** The number of messages generated. */ private int msgGenerated; /** A monitor varible. */ private Monitor buffer; // A main class variable; a1q3 a; // A new random object with seed. Random r = new Random(a.seed); /** * Constructor. */ public Producer(Monitor b, int i) { buffer = b; avgT = i; } /** * I start to run. */ public void run() { produceMessage(); } /** * Produce a message if condition is satisfied. */ private void produceMessage() { // Loop until the end of the simulation. while (System.currentTimeMillis() < a.simuTime) { long time = System.currentTimeMillis() - a.startTime; System.out.println("<" + time + "> message " + msgGenerated + " ready on slot assignment"); // Get my ticket. int myTurn = buffer.getTicket(); if (myTurn < buffer.readCount()); // Wait until my turn. buffer.wait(myTurn); int slot = buffer.findSlot(); if (slot >= 0) { // Generate a message. buffer.addMessage(slot, msgGenerated); time = System.currentTimeMillis() - a.startTime; System.out.println("<" + time + "> message " + msgGenerated + " assigned slot " + slot); msgGenerated++; long waitingT = (long)( a.avgReqT * r.nextFloat()); // Done. notify and take a short break. buffer.advance(); try { Thread.sleep(waitingT); } catch(InterruptedException e) {} } else // Do nothing except telling I'm done. buffer.advance(); } } } import java.util.*; /** * A child class of Thread, to consume the messages produced by the * producer when conditions are satisfied. */ public class Consumer extends Thread { /** Average message request time. */ private int avgT; /** The slot where I'm going to consume. */ private int slot; // A Monitor variable. private Monitor buffer; // A main class variable; a1q3 a; // A new random object with seed. Random r = new Random(a.seed); /** * Constructor. */ public Consumer(Monitor m, int i) { buffer = m; avgT = i; } /** * I start to run. */ public void run() { readMessage(); } /** * Read/Consume a message when conditions are satisfied. */ private synchronized void readMessage() { int slot = 0; // Loop until the end of the simulation. while (System.currentTimeMillis() < a.simuTime) { // Get my ticket number. int myTurn = buffer.getTicket(); if (myTurn < buffer.readCount()); // Wait my turn. buffer.wait(myTurn); int msgID = buffer.getID(slot); if (msgID >= 0 && !buffer.isRead(slot)) { // Then take a message from pool. buffer.readMsg(slot); long time = System.currentTimeMillis() - a.startTime; System.out.println("<" + time + "> message " + buffer.getID(slot) + " from slot " + slot + " consumed"); // Notify and take a break. long waitingT = (long)(2 * avgT * r.nextFloat()); buffer.advance(); try { Thread.sleep(waitingT); } catch(InterruptedException e) {} } else // Do nothing except telling I'm done. buffer.advance(); slot++; slot %= a.bufferSize; } } }