Skip to content

Commit d7a388d

Browse files
committed
Adding customization to outputs
1 parent f5bd1a0 commit d7a388d

File tree

9 files changed

+216
-102
lines changed

9 files changed

+216
-102
lines changed

server/src/main/java/org/diskproject/server/adapters/AirFlowAdapter.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
1919
import org.apache.http.util.EntityUtils;
2020
import org.diskproject.shared.classes.adapters.MethodAdapter;
21-
import org.diskproject.shared.classes.workflow.Variable;
21+
import org.diskproject.shared.classes.workflow.WorkflowVariable;
2222
import org.diskproject.shared.classes.workflow.VariableBinding;
2323
import org.diskproject.shared.classes.workflow.Workflow;
2424
import org.diskproject.shared.classes.workflow.WorkflowRun;
@@ -64,7 +64,7 @@ public List<Workflow> getWorkflowList() {
6464
return list;
6565
}
6666

67-
public List<Variable> getWorkflowVariables(String id) {
67+
public List<WorkflowVariable> getWorkflowVariables(String id) {
6868
return null;
6969
}
7070

@@ -87,7 +87,7 @@ public String getWorkflowLink(String id) {
8787
}
8888

8989
@Override
90-
public List<String> areFilesAvailable(Set<String> filelist, String dType) {
90+
public Map<String, String> areFilesAvailable(Set<String> filelist, String dType) {
9191
// Auto-generated method stub
9292
return null;
9393
}
@@ -99,13 +99,13 @@ public String addData(String url, String name, String type) {
9999
}
100100

101101
@Override
102-
public Map<String, Variable> getWorkflowInputs(String id) {
102+
public Map<String, WorkflowVariable> getWorkflowInputs(String id) {
103103
// Auto-generated method stub
104104
return null;
105105
}
106106

107107
@Override
108-
public List<String> runWorkflow(String wfId, List<VariableBinding> vBindings, Map<String, Variable> inputVariables) {
108+
public List<String> runWorkflow(String wfId, List<VariableBinding> vBindings, Map<String, WorkflowVariable> inputVariables) {
109109
// Auto-generated method stub
110110
return null;
111111
}

server/src/main/java/org/diskproject/server/adapters/SparqlAdapter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@ public byte[] queryCSV(String queryString) throws Exception, QueryParseException
8484

8585
HttpResponse response = httpClient.execute(get);
8686
HttpEntity entity = response.getEntity();
87-
System.out.println("URL> " + url);
88-
System.out.println("SL> " + response.getStatusLine());
8987
ByteArrayOutputStream rawBytes = new ByteArrayOutputStream();
9088
entity.writeTo(rawBytes);
9189
return rawBytes.toByteArray();

server/src/main/java/org/diskproject/server/adapters/StorageManager.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.minio.MinioClient;
1111
import io.minio.ObjectWriteResponse;
1212
import io.minio.PutObjectArgs;
13+
import io.minio.StatObjectArgs;
1314
import io.minio.errors.ErrorResponseException;
1415
import io.minio.errors.InsufficientDataException;
1516
import io.minio.errors.InternalException;
@@ -61,8 +62,31 @@ public boolean init () {
6162
}
6263

6364
public String upload (String filename, String datatype, byte[] bytes) {
65+
if (datatype.equals("text/csv") && !filename.endsWith(".csv"))
66+
filename += ".csv";
67+
6468
System.out.println("Trying to upload " + filename + " -- " + bytes.length);
6569
if (client == null) return null;
70+
71+
//Check if the file is already in minio.
72+
boolean exists = false;
73+
try {
74+
client.statObject(StatObjectArgs.builder()
75+
.bucket(StorageManager.BUCKET_NAME)
76+
.object(filename).build());
77+
exists = true;
78+
} catch (ErrorResponseException e) {
79+
exists = false;
80+
} catch (Exception e) {
81+
e.printStackTrace();
82+
throw new RuntimeException(e.getMessage());
83+
}
84+
if (exists) {
85+
String newUrl = this.url + "/" + StorageManager.BUCKET_NAME + "/" + filename;
86+
System.out.println("File already on bucket: " + newUrl);
87+
return newUrl;
88+
}
89+
6690
try {
6791
ObjectWriteResponse resp = client.putObject(
6892
PutObjectArgs.builder()

server/src/main/java/org/diskproject/server/api/impl/DiskResource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.diskproject.shared.classes.util.ExternalDataRequest;
3838
import org.diskproject.shared.classes.util.QuestionOptionsRequest;
3939
import org.diskproject.shared.classes.vocabulary.Vocabulary;
40-
import org.diskproject.shared.classes.workflow.Variable;
40+
import org.diskproject.shared.classes.workflow.WorkflowVariable;
4141
import org.diskproject.shared.classes.workflow.Workflow;
4242
import org.diskproject.shared.classes.workflow.WorkflowRun;
4343

@@ -378,7 +378,7 @@ public List<Workflow> listWorkflows() {
378378
@GET
379379
@Override
380380
@Path("workflows/{source}/{id}")
381-
public List<Variable> getWorkflowVariables(
381+
public List<WorkflowVariable> getWorkflowVariables(
382382
@PathParam("source") String source,
383383
@PathParam("id") String id) {
384384
return this.repo.getWorkflowVariables(source, id);

server/src/main/java/org/diskproject/server/repository/DiskRepository.java

Lines changed: 103 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
import org.diskproject.shared.classes.util.KBConstants;
6767
import org.diskproject.shared.classes.util.QuestionOptionsRequest;
6868
import org.diskproject.shared.classes.vocabulary.Vocabulary;
69-
import org.diskproject.shared.classes.workflow.Variable;
69+
import org.diskproject.shared.classes.workflow.WorkflowVariable;
7070
import org.diskproject.shared.classes.workflow.VariableBinding;
7171
import org.diskproject.shared.classes.workflow.Workflow;
7272
import org.diskproject.shared.classes.workflow.WorkflowRun;
@@ -345,7 +345,7 @@ public List<Workflow> getWorkflowList() {
345345
return list;
346346
}
347347

348-
public List<Variable> getWorkflowVariables(String source, String id) {
348+
public List<WorkflowVariable> getWorkflowVariables(String source, String id) {
349349
for (MethodAdapter adapter : this.methodAdapters.values()) {
350350
if (adapter.getName().equals(source)) {
351351
return adapter.getWorkflowVariables(id);
@@ -514,8 +514,8 @@ public TriggeredLOI addTriggeredLOI(String username, TriggeredLOI tloi) {
514514
tloi.setDateModified(dateformatter.format(new Date()));
515515
}
516516
writeTLOI(username, tloi);
517-
518-
TLOIExecutionThread wflowThread = new TLOIExecutionThread(username, tloi, false);
517+
LineOfInquiry loi = getLOI(username, tloi.getParentLoiId());
518+
TLOIExecutionThread wflowThread = new TLOIExecutionThread(username, tloi, loi, false);
519519
executor.execute(wflowThread);
520520
return tloi;
521521
}
@@ -1309,7 +1309,6 @@ else if (qVar.equals("_CSV_")) {
13091309

13101310
dataVarBindings.put("_CSV_", new ArrayList<String>());
13111311
dataVarBindings.get("_CSV_").add(csvUri);
1312-
System.out.println("CSV: " + csvUri);
13131312
}
13141313

13151314
TriggeredLOI tloi = new TriggeredLOI(loi, id);
@@ -1406,7 +1405,7 @@ private List<WorkflowBindings> getTLOIBindings(String username, List<WorkflowBin
14061405
tloiBindings.add(tloiBinding);
14071406
MethodAdapter methodAdapter = getMethodAdapterByName(bindings.getSource());
14081407

1409-
List<Variable> allVars = methodAdapter.getWorkflowVariables(bindings.getWorkflow());
1408+
List<WorkflowVariable> allVars = methodAdapter.getWorkflowVariables(bindings.getWorkflow());
14101409

14111410
for (VariableBinding vBinding : bindings.getBindings()) { // Normal variable bindings.
14121411
// For each Variable binding, check :
@@ -1428,7 +1427,6 @@ private List<WorkflowBindings> getTLOIBindings(String username, List<WorkflowBin
14281427
sparqlVar = "_CSV_";
14291428
}
14301429

1431-
System.out.println("keys:" + dataVarBindings.keySet());
14321430
if (sparqlVar == null)
14331431
continue;
14341432

@@ -1450,7 +1448,8 @@ private List<WorkflowBindings> getTLOIBindings(String username, List<WorkflowBin
14501448
if (bindingsAreFiles) {
14511449
String varName = vBinding.getVariable();
14521450
String dType = null;
1453-
for (Variable v: allVars) {
1451+
for (WorkflowVariable v: allVars) {
1452+
//This does not have in consideration output variables.
14541453
if (varName.equals(v.getName())) {
14551454
List<String> classes = v.getType();
14561455
if (classes != null && classes.size() > 0) {
@@ -1518,11 +1517,10 @@ private List<WorkflowBindings> getTLOIBindings(String username, List<WorkflowBin
15181517
private Map<String, String> addData(List<String> dsUrls, MethodAdapter methodAdapter, DataAdapter dataAdapter, String dType)
15191518
throws Exception {
15201519
// To add files to wings and not replace anything, we need to get the hash from the wiki.
1521-
// TODO: here connect with minio.
1520+
// TODO: We should upload all files to minio
15221521
Map<String, String> nameToUrl = new HashMap<String, String>();
15231522
Map<String, String> urlToName = new HashMap<String, String>();
15241523
Map<String, String> filesETag = dataAdapter.getFileHashesByETag(dsUrls); // File -> ETag
1525-
boolean allOk = true; // All is OK if we have all file ETags.
15261524

15271525
for (String fileUrl: dsUrls) {
15281526
if (filesETag.containsKey(fileUrl)) {
@@ -1533,23 +1531,6 @@ private Map<String, String> addData(List<String> dsUrls, MethodAdapter methodAda
15331531
urlToName.put(fileUrl, uniqueName);
15341532
} else {
15351533
System.err.println("ETag not found: " + fileUrl);
1536-
allOk = false;
1537-
}
1538-
}
1539-
1540-
if (!allOk) { // Get hashes from the data-adapter (SPARQL)
1541-
Map<String, String> hashes = dataAdapter.getFileHashes(dsUrls); // File -> SHA1
1542-
for (String fileUrl : dsUrls) {
1543-
if (hashes.containsKey(fileUrl)) {
1544-
if (!urlToName.containsKey(fileUrl)) {
1545-
String hash = hashes.get(fileUrl);
1546-
String uniqueName = "SHA" + hash.substring(0, 6) + "_" + fileUrl.replaceAll("^.*\\/", "");
1547-
nameToUrl.put(uniqueName, fileUrl);
1548-
urlToName.put(fileUrl, uniqueName);
1549-
}
1550-
} else {
1551-
System.err.println("HASH not found: " + fileUrl);
1552-
}
15531534
}
15541535
}
15551536

@@ -1563,14 +1544,27 @@ private Map<String, String> addData(List<String> dsUrls, MethodAdapter methodAda
15631544

15641545
// avoid to duplicate files
15651546
Set<String> names = nameToUrl.keySet();
1566-
List<String> availableFiles = methodAdapter.areFilesAvailable(names, dType);
1567-
names.removeAll(availableFiles);
1547+
Map<String, String> availableFiles = methodAdapter.areFilesAvailable(names, dType);
1548+
names.removeAll(availableFiles.keySet());
15681549

15691550
// upload the files
15701551
for (String newFilename : names) {
15711552
String newFile = nameToUrl.get(newFilename);
1572-
System.out.println("Uploading to " + methodAdapter.getName() + ": " + newFile + " as " + newFilename + "(" + dType + ")");
1573-
methodAdapter.addData(newFile, newFilename, dType);
1553+
System.out.println("Uploading to " + methodAdapter.getName() + ": " + newFile + " as " + newFilename + " (" + dType + ")");
1554+
String dataId = methodAdapter.addData(newFile, newFilename, dType);
1555+
urlToName.put(newFile, dataId);
1556+
}
1557+
1558+
// Set current available urls
1559+
for (String existingFilename: availableFiles.keySet()) {
1560+
for (String newName: urlToName.keySet()) {
1561+
if (urlToName.get(newName).equals(existingFilename)) {
1562+
String newFile = availableFiles.get(existingFilename);
1563+
System.out.println("Replaced " + existingFilename + " -> " + newFile);
1564+
urlToName.put(newName, newFile);
1565+
}
1566+
}
1567+
15741568
}
15751569

15761570
return urlToName;
@@ -1861,10 +1855,12 @@ class TLOIExecutionThread implements Runnable {
18611855
String username;
18621856
boolean metamode;
18631857
TriggeredLOI tloi;
1858+
LineOfInquiry loi;
18641859

1865-
public TLOIExecutionThread(String username, TriggeredLOI tloi, boolean metamode) {
1860+
public TLOIExecutionThread(String username, TriggeredLOI tloi, LineOfInquiry loi, boolean metamode) {
18661861
this.username = username;
18671862
this.tloi = tloi;
1863+
this.loi = loi;
18681864
this.metamode = metamode;
18691865
}
18701866

@@ -1887,7 +1883,7 @@ public void run() {
18871883
break; // This could be `continue`, so to execute the other workflows...
18881884
}
18891885
// Get workflow input details
1890-
Map<String, Variable> inputs = methodAdapter.getWorkflowInputs(bindings.getWorkflow());
1886+
Map<String, WorkflowVariable> inputs = methodAdapter.getWorkflowInputs(bindings.getWorkflow());
18911887
List<VariableBinding> vBindings = bindings.getBindings();
18921888
List<VariableBinding> sendbindings = new ArrayList<VariableBinding>(vBindings);
18931889

@@ -1926,7 +1922,7 @@ public void run() {
19261922

19271923
// Start monitoring
19281924
if (allOk) {
1929-
TLOIMonitoringThread monitorThread = new TLOIMonitoringThread(username, tloi, metamode);
1925+
TLOIMonitoringThread monitorThread = new TLOIMonitoringThread(username, tloi, loi, metamode);
19301926
monitor.schedule(monitorThread, 15, TimeUnit.SECONDS);
19311927
} else {
19321928
System.out.println("[E] Finished: Something when wrong.");
@@ -1937,7 +1933,67 @@ public void run() {
19371933
}
19381934
}
19391935

1940-
private void addConfidenceToTLOI (TriggeredLOI tloi, WorkflowRun run, MethodAdapter methodAdapter) {
1936+
private void processWorkflowOutputs (TriggeredLOI tloi, LineOfInquiry loi, WorkflowBindings workflow, WorkflowRun run, MethodAdapter methodAdapter, boolean meta) {
1937+
Map<String, RunBinding> outputs = run.getOutputs();
1938+
if (outputs == null) return;
1939+
1940+
Map<String,String> outputAssignations = new HashMap<String,String>();
1941+
for (WorkflowVariable wb: methodAdapter.getWorkflowVariables(workflow.getWorkflow())) {
1942+
if (!wb.isInput() || wb.isOutput()) { // This could be more strict
1943+
outputAssignations.put(wb.getName(), "DO_NO_STORE");
1944+
}
1945+
}
1946+
// We need to get the loi var assignations
1947+
String id = workflow.getWorkflow();
1948+
for (WorkflowBindings wb: (meta? loi.getMetaWorkflows() : loi.getWorkflows())) {
1949+
if (id.contains(wb.getWorkflow())) {
1950+
for (VariableBinding b: wb.getBindings()) {
1951+
String varName = b.getVariable();
1952+
System.out.println("> " + varName);
1953+
if (outputAssignations.containsKey(varName)) {
1954+
outputAssignations.put(varName, b.getBinding());
1955+
}
1956+
}
1957+
}
1958+
}
1959+
1960+
System.out.println("OUT VARS: ");
1961+
System.out.println(outputAssignations);
1962+
1963+
// Now process generated outputs.
1964+
for (String outname : outputs.keySet()) {
1965+
for (String varName: outputAssignations.keySet()) {
1966+
String varBinding = outputAssignations.get(varName);
1967+
if (varBinding.contains("DO_NO_STORE") ||
1968+
varBinding.contains("DOWNLOAD_ONLY") ||
1969+
varBinding.contains("IMAGE") ||
1970+
varBinding.contains("VISUALIZE")) {
1971+
// DO NOTHING, some of these should be upload to MINIO
1972+
} else if (varBinding.contains("CONFIDENCE_VALUE")) {
1973+
String dataid = outputs.get(outname).id;
1974+
FileAndMeta fm = methodAdapter.fetchData(dataid);
1975+
byte[] byteConf = fm.data;
1976+
String wingsP = byteConf != null ? new String(byteConf, StandardCharsets.UTF_8) : null;
1977+
Double pVal = null;
1978+
try {
1979+
String strPVal = wingsP != null ? wingsP.split("\n", 2)[0] : "";
1980+
pVal = Double.valueOf(strPVal);
1981+
} catch (Exception e) {
1982+
System.err.println("[M] Error: " + dataid + " is a non valid p-value: " + wingsP);
1983+
}
1984+
if (pVal != null) {
1985+
System.out.println("[M] Detected p-value: " + pVal);
1986+
tloi.setConfidenceValue(pVal);
1987+
tloi.setConfidenceType("P-VALUE");
1988+
}
1989+
} else {
1990+
System.out.println("Output information not found");
1991+
}
1992+
}
1993+
}
1994+
}
1995+
1996+
/*private void addConfidenceToTLOI (TriggeredLOI tloi, WorkflowRun run, MethodAdapter methodAdapter) {
19411997
// Search for p-value on the outputs
19421998
// TODO: change this to allow any output.
19431999
Map<String, RunBinding> outputs = run.getOutputs();
@@ -1963,7 +2019,7 @@ private void addConfidenceToTLOI (TriggeredLOI tloi, WorkflowRun run, MethodAdap
19632019
}
19642020
}
19652021
}
1966-
}
2022+
}*/
19672023

19682024
private Status getOverallRunStatus (TriggeredLOI tloi, boolean metamode) {
19692025
List<WorkflowBindings> wfList = metamode ? tloi.getMetaWorkflows() : tloi.getWorkflows();
@@ -1994,11 +2050,13 @@ class TLOIMonitoringThread implements Runnable {
19942050
String username;
19952051
boolean metamode, error;
19962052
TriggeredLOI tloi;
2053+
LineOfInquiry loi;
19972054
Map<String, Map<String, WorkflowRun>> queuedRuns;
19982055

1999-
public TLOIMonitoringThread(String username, TriggeredLOI tloi, boolean metamode) {
2056+
public TLOIMonitoringThread(String username, TriggeredLOI tloi, LineOfInquiry loi, boolean metamode) {
20002057
this.username = username;
20012058
this.tloi = tloi;
2059+
this.loi = loi;
20022060
this.metamode = metamode;
20032061
this.error = false;
20042062
this.queuedRuns = new HashMap<String, Map<String, WorkflowRun>>();
@@ -2066,13 +2124,14 @@ public void run() {
20662124
}
20672125
//newStatus = newRun.getStatus();
20682126
}
2069-
if (newExec.status == Status.SUCCESSFUL || newExec.status == Status.FAILED) {
2070-
if (newExec.status == Status.FAILED) {
2071-
this.error = true;
2072-
} else {
2073-
addConfidenceToTLOI(tloi, newRun, methodAdapter);
2074-
}
2127+
2128+
if (newExec.status == Status.SUCCESSFUL) {
2129+
processWorkflowOutputs(tloi, loi, bindings, newRun, methodAdapter, metamode);
2130+
//addConfidenceToTLOI(tloi, newRun, methodAdapter); /// HERE
2131+
} else if (newExec.status == Status.FAILED) {
2132+
this.error = true;
20752133
}
2134+
20762135
newRun.setExecutionInfo(newExec);
20772136
bindings.addRun(newRun);
20782137
updatedBindings.add(bindings);
@@ -2089,7 +2148,7 @@ public void run() {
20892148
} else {
20902149
System.out.println("[M] Starting metamode after n workflows.");
20912150
tloi.setStatus(Status.RUNNING);
2092-
TLOIExecutionThread wflowThread = new TLOIExecutionThread(username, tloi, true);
2151+
TLOIExecutionThread wflowThread = new TLOIExecutionThread(username, tloi, loi, true);
20932152
executor.execute(wflowThread);
20942153
}
20952154
} else if (overallStatus == Status.FAILED) {

0 commit comments

Comments
 (0)