projet-donnees-reparties/linda/shm/CentralizedLinda.java
2021-12-17 09:33:01 +01:00

241 lines
6.9 KiB
Java

package linda.shm;
import linda.Callback;
import linda.Linda;
import linda.Tuple;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Reveil implements Callback {
static Lock lock = new ReentrantLock();
Tuple template;
Condition condition;
Reveil(Tuple template) {
this.template = template;
this.condition = lock.newCondition();
}
public void call(Tuple t) {
return;
}
void sleep() {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
void reveil(Tuple t) {
if (t.matches(this.template)) {
condition.signal();
}
}
}
/** Shared memory implementation of Linda. */
public class CentralizedLinda implements Linda {
List<Tuple> tuples;
List<Reveil> reveils;
public CentralizedLinda() {
this.tuples = Collections.synchronizedList(new ArrayList<Tuple>());
this.reveils = Collections.synchronizedList(new ArrayList<Reveil>());
}
public void write(Tuple t) {
tuples.add(t);
reveils.forEach(r -> r.reveil(t));
}
public Tuple take(Tuple template) {
Tuple result = null;
Reveil reveil = new Reveil(template);
result = tryTake(template);
while (result == null) {
reveil.sleep();
result = tryTake(template);
}
return result;
}
public Tuple read(Tuple template) {
Tuple result = null;
Reveil reveil = new Reveil(template);
result = tryRead(template);
while (result == null) {
reveil.sleep();
result = tryRead(template);
}
return result;
}
public Tuple tryTake(Tuple template) {
Tuple result = null;
// Extract the tuple from the tuple list
// TODO: faire avec iterator, hasNext, pour être thread safe :)
for (int i = 0; i < tuples.size(); i++) {
if (tuples.get(i).matches(template)) {
result = tuples.remove(i);
break;
}
}
return result;
}
public Tuple tryRead(Tuple template) {
Tuple result = null;
// Get the tuple from the tuple list
// TODO: faire avec iterator, hasNext, pour être thread safe :)
for (int i = 0; i < tuples.size(); i++) {
result = tuples.get(i);
if (result.matches(template)) {
break;
}
}
return result;
}
public Collection<Tuple> takeAll(Tuple template) {
List<Tuple> results = new ArrayList<Tuple>();
Tuple result = new Tuple();
// Extract the tuples in the tuple list
while (result != null) {
result = tryTake(template);
if (result != null) {
results.add(result);
}
}
return results;
}
public Collection<Tuple> readAll(Tuple template) {
List<Tuple> results = new ArrayList<Tuple>();
Tuple result;
// Extract the tuples in the tuple list
// TODO: faire avec iterator, hasNext, pour être thread safe :)
for (int i = 0; i < tuples.size(); i++) {
result = tuples.get(i);
if (result.matches(template))
results.add(result);
}
return results;
}
public void eventRegister(eventMode mode, eventTiming timing, Tuple template, Callback callback) {
new Thread() {
public void run() {
Tuple result = null;
boolean found = false;
int index;
// Get known tuples for FUTURE timing
List<Tuple> knownTuples = (List<Tuple>) readAll(template);
try {
while (!found) {
// Waiting for access
switch (mode) {
case READ:
requestReading();
break;
case TAKE:
requestWriting();
break;
}
// Find the tuple in the tuple list
for (index = 0; index < tuples.size(); index++) {
if (tuples.get(index).matches(template)) {
// Tuple matching
if (timing == eventTiming.IMMEDIATE || knownTuples.isEmpty()) {
found = true;
} else {
for (Tuple knownTuple : knownTuples) {
if (knownTuple != tuples.get(index)) {
found = true;
break;
}
}
}
// Rebreak to end searching
if (found) {
break;
}
}
}
if (found) {
// Result found
switch (mode) {
case READ:
// Get it from the tuple list
result = tuples.get(index);
break;
case TAKE:
// Remove it from the tuple list
result = tuples.remove(index);
break;
}
}
// End access
switch (mode) {
case READ:
endReading();
break;
case TAKE:
endWriting();
break;
}
}
// Callback with the result tuple
callback.call(result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
public void debug(String prefix) {
try {
// Waiting for reading access
requestReading();
System.out.println(prefix + tuples);
// End reading
endReading();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}