package linda.shm; import linda.Callback; import linda.Linda; import linda.Tuple; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** Shared memory implementation of Linda. */ public class CentralizedLinda implements Linda { List tuples; Lock moniteur; Condition accessDemand; Condition SAS; int nbReaders; int nbWaiting; boolean writing; boolean SASused; public CentralizedLinda() { nbReaders = 0; nbWaiting = 0; writing = false; SASused = false; moniteur = new ReentrantLock(); accessDemand = moniteur.newCondition(); SAS = moniteur.newCondition(); tuples = new ArrayList(); } public void write(Tuple t) { try { // Wait for writing access requestWriting(); // Add the new tuple tuples.add(t); // End writing endWriting(); } catch (InterruptedException e) { e.printStackTrace(); } } public Tuple take(Tuple template) { Tuple result = null; boolean found = false; int index; try { while (!found) { // Wait for writing access requestWriting(); // Find the tuple in the tuple list for (index = 0; index < tuples.size(); index++) { if (tuples.get(index).matches(template)) { found = true; break; } } if (found) { // Result found, remove it from the tuple list result = tuples.remove(index); } // End writing endWriting(); } } catch (InterruptedException e) { e.printStackTrace(); } return result; } public Tuple read(Tuple template) { Tuple result = null; boolean found = false; int index; try { while (!found) { // Waiting for reading access requestReading(); // Find the tuple in the tuple list for (index = 0; index < tuples.size(); index++) { if (tuples.get(index).matches(template)) { found = true; break; } } if (found) { // Result found, get it from the tuple list result = tuples.get(index); } // End reading endReading(); } } catch (InterruptedException e) { e.printStackTrace(); } return result; } public Tuple tryTake(Tuple template) { Tuple result = null; int index; try { // Wait for writing access requestWriting(); // Extract the tuple in the tuple list for (index = 0; index < tuples.size(); index++) { if (tuples.get(index).matches(template)) { result = tuples.remove(index); break; } } // End writing endWriting(); } catch (InterruptedException e) { e.printStackTrace(); } return result; } public Tuple tryRead(Tuple template) { Tuple result = null; int index; try { // Waiting for reading access requestReading(); // Find the tuple in the tuple list for (index = 0; index < tuples.size(); index++) { if (tuples.get(index).matches(template)) { result = tuples.get(index); break; } } // End reading endReading(); } catch (InterruptedException e) { e.printStackTrace(); } return result; } public Collection takeAll(Tuple template) { List results = new ArrayList(); int index; try { // Wait for writing access requestWriting(); // Extract the tuples in the tuple list for (index = 0; index < tuples.size(); index++) { if (tuples.get(index).matches(template)) { results.add(tuples.remove(index)); } } // End writing endWriting(); } catch (InterruptedException e) { e.printStackTrace(); } return results; } public Collection readAll(Tuple template) { List results = new ArrayList(); int index; try { // Waiting for reading access requestReading(); // Find the tuples in the tuple list for (index = 0; index < tuples.size(); index++) { if (tuples.get(index).matches(template)) { results.add(tuples.get(index)); } } // End reading endReading(); } catch (InterruptedException e) { e.printStackTrace(); } return results; } public void eventRegister(eventMode mode, eventTiming timing, Tuple template, Callback callback) { new Thread() { public void run() { Tuple result = null; boolean found = false; int index; // Get known tuples for FUTURE timing List knownTuples = (List) readAll(template); try { while (!found) { // Waiting for reading access requestReading(); // Find the tuple in the tuple list for (index = 0; index < tuples.size(); index++) { if (tuples.get(index).matches(template)) { // Tuple matching switch (timing) { case IMMEDIATE: // Tuple found found = true; break; case FUTURE: // Is tuple unknown for (Tuple knownTuple : knownTuples) { if (knownTuple != tuples.get(index)) { found = true; break; } } break; } // Rebreak to end searching if (found) { break; } } } if (found) { // Result found switch (mode) { case READ: // Get it from the tuple list result = tuples.get(index); break; case TAKE: // Remove it from the tuple list result = tuples.remove(index); break; } } // End reading endReading(); } // Callback with the result tuple callback.call(result); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); } public void debug(String prefix) { System.out.println(prefix + tuples); } private void requestReading() throws InterruptedException { moniteur.lock(); if (!(!writing && nbWaiting == 0)) { nbWaiting++; accessDemand.await(); nbWaiting--; } nbReaders++; accessDemand.signal(); moniteur.unlock(); } private void endReading() throws InterruptedException { moniteur.lock(); nbReaders--; if (nbReaders == 0) { if (SASused) { SAS.signal(); } else { accessDemand.signal(); } } moniteur.unlock(); } private void requestWriting() throws InterruptedException { moniteur.lock(); if (!(!writing && nbReaders == 0 && !SASused && nbWaiting == 0)) { nbWaiting++; accessDemand.await(); nbWaiting--; } if (nbReaders > 0) { SASused = true; SAS.await(); SASused = false; } writing = true; moniteur.unlock(); } private void endWriting() throws InterruptedException { moniteur.lock(); writing = false; accessDemand.signal(); moniteur.unlock(); } }