projet-donnees-reparties/linda/shm/CentralizedLinda.java

370 lines
10 KiB
Java
Raw Normal View History

2021-11-27 16:50:33 +00:00
package linda.shm;
import linda.Callback;
import linda.Linda;
import linda.Tuple;
2021-11-27 19:24:42 +00:00
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
2021-11-27 16:50:33 +00:00
/** Shared memory implementation of Linda. */
public class CentralizedLinda implements Linda {
2021-11-27 19:24:42 +00:00
List<Tuple> tuples;
Lock moniteur;
Condition accessDemand;
Condition SAS;
int nbReaders;
int nbWaiting;
boolean writing;
boolean SASused;
2021-11-27 16:50:33 +00:00
public CentralizedLinda() {
2021-11-27 19:24:42 +00:00
nbReaders = 0;
nbWaiting = 0;
writing = false;
SASused = false;
moniteur = new ReentrantLock();
accessDemand = moniteur.newCondition();
SAS = moniteur.newCondition();
tuples = new ArrayList<Tuple>();
}
public void write(Tuple t) {
try {
// Wait for writing access
requestWriting();
// Add the new tuple
tuples.add(t);
// End writing
endWriting();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Tuple take(Tuple template) {
Tuple result = null;
boolean found = false;
int index;
try {
while (!found) {
// Wait for writing access
requestWriting();
// Find the tuple in the tuple list
for (index = 0; index < tuples.size(); index++) {
if (tuples.get(index).matches(template)) {
found = true;
break;
}
}
if (found) {
// Result found, remove it from the tuple list
result = tuples.remove(index);
}
// End writing
endWriting();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
2021-11-27 16:50:33 +00:00
}
2021-11-27 19:24:42 +00:00
public Tuple read(Tuple template) {
Tuple result = null;
boolean found = false;
int index;
try {
while (!found) {
// Waiting for reading access
requestReading();
// Find the tuple in the tuple list
for (index = 0; index < tuples.size(); index++) {
if (tuples.get(index).matches(template)) {
found = true;
break;
}
}
if (found) {
2021-11-28 11:44:46 +00:00
// Result found, get it from the tuple list
2021-11-27 19:24:42 +00:00
result = tuples.get(index);
}
// End reading
endReading();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
public Tuple tryTake(Tuple template) {
Tuple result = null;
int index;
try {
// Wait for writing access
requestWriting();
2021-11-28 11:44:46 +00:00
// Extract the tuple in the tuple list
2021-11-27 19:24:42 +00:00
for (index = 0; index < tuples.size(); index++) {
if (tuples.get(index).matches(template)) {
result = tuples.remove(index);
break;
}
}
// End writing
endWriting();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
public Tuple tryRead(Tuple template) {
Tuple result = null;
int index;
try {
// Waiting for reading access
requestReading();
// Find the tuple in the tuple list
for (index = 0; index < tuples.size(); index++) {
if (tuples.get(index).matches(template)) {
result = tuples.get(index);
break;
}
}
// End reading
endReading();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
public Collection<Tuple> takeAll(Tuple template) {
List<Tuple> results = new ArrayList<Tuple>();
int index;
try {
// Wait for writing access
requestWriting();
2021-11-28 11:44:46 +00:00
// Extract the tuples in the tuple list
2021-11-27 19:24:42 +00:00
for (index = 0; index < tuples.size(); index++) {
if (tuples.get(index).matches(template)) {
2021-11-28 11:44:46 +00:00
results.add(tuples.remove(index));
2021-11-27 19:24:42 +00:00
}
}
// End writing
endWriting();
} catch (InterruptedException e) {
e.printStackTrace();
}
return results;
}
public Collection<Tuple> readAll(Tuple template) {
List<Tuple> results = new ArrayList<Tuple>();
int index;
try {
// Waiting for reading access
requestReading();
2021-11-28 11:44:46 +00:00
// Find the tuples in the tuple list
2021-11-27 19:24:42 +00:00
for (index = 0; index < tuples.size(); index++) {
if (tuples.get(index).matches(template)) {
results.add(tuples.get(index));
}
}
// End reading
endReading();
} catch (InterruptedException e) {
e.printStackTrace();
}
return results;
}
public void eventRegister(eventMode mode, eventTiming timing, Tuple template, Callback callback) {
2021-11-28 11:44:46 +00:00
new Thread() {
public void run() {
Tuple result = null;
boolean found = false;
int index;
// Get known tuples for FUTURE timing
List<Tuple> knownTuples = (List<Tuple>) readAll(template);
try {
while (!found) {
2021-11-30 16:39:17 +00:00
// Waiting for access
switch (mode) {
case READ:
requestReading();
break;
case TAKE:
requestWriting();
break;
}
2021-11-28 11:44:46 +00:00
// Find the tuple in the tuple list
for (index = 0; index < tuples.size(); index++) {
if (tuples.get(index).matches(template)) {
// Tuple matching
2021-11-30 16:39:17 +00:00
if (timing == eventTiming.IMMEDIATE || knownTuples.isEmpty()) {
found = true;
} else {
for (Tuple knownTuple : knownTuples) {
if (knownTuple != tuples.get(index)) {
found = true;
break;
2021-11-28 11:44:46 +00:00
}
2021-11-30 16:39:17 +00:00
}
2021-11-28 11:44:46 +00:00
}
// Rebreak to end searching
if (found) {
break;
}
}
}
if (found) {
// Result found
switch (mode) {
case READ:
// Get it from the tuple list
result = tuples.get(index);
break;
case TAKE:
// Remove it from the tuple list
result = tuples.remove(index);
break;
}
}
2021-11-30 16:39:17 +00:00
// End access
switch (mode) {
case READ:
endReading();
break;
case TAKE:
endWriting();
break;
}
2021-11-28 11:44:46 +00:00
}
// Callback with the result tuple
callback.call(result);
2021-11-27 19:24:42 +00:00
2021-11-28 11:44:46 +00:00
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
2021-11-27 19:24:42 +00:00
}
public void debug(String prefix) {
2021-11-30 16:39:17 +00:00
try {
// Waiting for reading access
requestReading();
System.out.println(prefix + tuples);
// End reading
endReading();
} catch (InterruptedException e) {
e.printStackTrace();
}
2021-11-27 19:24:42 +00:00
}
private void requestReading() throws InterruptedException {
moniteur.lock();
if (!(!writing && nbWaiting == 0)) {
nbWaiting++;
accessDemand.await();
nbWaiting--;
}
nbReaders++;
accessDemand.signal();
moniteur.unlock();
}
private void endReading() throws InterruptedException {
moniteur.lock();
nbReaders--;
if (nbReaders == 0) {
if (SASused) {
SAS.signal();
} else {
accessDemand.signal();
}
}
moniteur.unlock();
}
private void requestWriting() throws InterruptedException {
moniteur.lock();
if (!(!writing && nbReaders == 0 && !SASused && nbWaiting == 0)) {
nbWaiting++;
accessDemand.await();
nbWaiting--;
}
if (nbReaders > 0) {
SASused = true;
SAS.await();
SASused = false;
}
writing = true;
moniteur.unlock();
}
private void endWriting() throws InterruptedException {
moniteur.lock();
writing = false;
accessDemand.signal();
moniteur.unlock();
}
2021-11-27 16:50:33 +00:00
}