|
41 | 41 | import com.treasuredata.client.model.TDJobRequest; |
42 | 42 | import com.treasuredata.client.model.TDJobRequestBuilder; |
43 | 43 | import com.treasuredata.client.model.TDJobSummary; |
44 | | -import com.treasuredata.client.model.TDPartialDeleteJob; |
45 | 44 | import com.treasuredata.client.model.TDResultFormat; |
46 | 45 | import com.treasuredata.client.model.TDSaveQueryRequest; |
47 | 46 | import com.treasuredata.client.model.TDSavedQuery; |
@@ -1001,85 +1000,6 @@ private static String newTemporaryName(String prefix) |
1001 | 1000 | return prefix + "_" + dateStr; |
1002 | 1001 | } |
1003 | 1002 |
|
1004 | | - @Test |
1005 | | - public void partialDeleteTest() |
1006 | | - throws Exception |
1007 | | - { |
1008 | | - String t = newTemporaryName("td_client_test"); |
1009 | | - try { |
1010 | | - client.deleteTableIfExists(SAMPLE_DB, t); |
1011 | | - |
1012 | | - String jobId = client.submit(TDJobRequest.newPrestoQuery(SAMPLE_DB, |
1013 | | - String.format("CREATE TABLE %s AS SELECT * FROM (VALUES TD_TIME_PARSE('2015-01-01', 'UTC'), TD_TIME_PARSE('2015-02-01', 'UTC')) as sample(time)", t, t))); |
1014 | | - |
1015 | | - waitJobCompletion(jobId); |
1016 | | - |
1017 | | - String before = queryResult(SAMPLE_DB, String.format("SELECT * FROM %s", t)); |
1018 | | - |
1019 | | - assertTrue(before.contains("1420070400")); |
1020 | | - assertTrue(before.contains("1422748800")); |
1021 | | - |
1022 | | - // delete 2015-01-01 entry |
1023 | | - try { |
1024 | | - client.partialDelete(SAMPLE_DB, t, 1420070400, 1420070400 + 1); |
1025 | | - fail("should not reach here"); |
1026 | | - } |
1027 | | - catch (TDClientException e) { |
1028 | | - assertEquals(TDClientException.ErrorType.INVALID_INPUT, e.getErrorType()); |
1029 | | - } |
1030 | | - long from = 1420070400 - (1420070400 % 3600); |
1031 | | - long to = from + 3600; |
1032 | | - TDPartialDeleteJob partialDeleteJob = client.partialDelete(SAMPLE_DB, t, from, to); |
1033 | | - logger.debug("partial delete job: " + partialDeleteJob); |
1034 | | - assertEquals(from, partialDeleteJob.getFrom()); |
1035 | | - assertEquals(to, partialDeleteJob.getTo()); |
1036 | | - assertEquals(SAMPLE_DB, partialDeleteJob.getDatabase()); |
1037 | | - assertEquals(t, partialDeleteJob.getTable()); |
1038 | | - |
1039 | | - waitJobCompletion(partialDeleteJob.getJobId()); |
1040 | | - |
1041 | | - String after = queryResult(SAMPLE_DB, String.format("SELECT * FROM %s", t)); |
1042 | | - assertFalse(after.contains("1420070400")); |
1043 | | - assertTrue(after.contains("1422748800")); |
1044 | | - } |
1045 | | - finally { |
1046 | | - client.deleteTableIfExists(SAMPLE_DB, t); |
1047 | | - } |
1048 | | - } |
1049 | | - |
1050 | | - @Test |
1051 | | - public void partialDeleteWithDomainKeyTest() |
1052 | | - throws Exception |
1053 | | - { |
1054 | | - String domainKey = randomDomainKey(); |
1055 | | - |
1056 | | - String t = newTemporaryName("td_client_test"); |
1057 | | - try { |
1058 | | - client.deleteTableIfExists(SAMPLE_DB, t); |
1059 | | - |
1060 | | - String jobId = client.submit(TDJobRequest.newPrestoQuery(SAMPLE_DB, |
1061 | | - String.format("CREATE TABLE %s AS SELECT * FROM (VALUES TD_TIME_PARSE('2015-01-01', 'UTC'), TD_TIME_PARSE('2015-02-01', 'UTC')) as sample(time)", t, t))); |
1062 | | - |
1063 | | - waitJobCompletion(jobId); |
1064 | | - |
1065 | | - int from = 1420070400; |
1066 | | - int to = from + 3600; |
1067 | | - |
1068 | | - TDPartialDeleteJob deleteJob = client.partialDelete(SAMPLE_DB, t, from, to, domainKey); |
1069 | | - |
1070 | | - try { |
1071 | | - client.partialDelete(SAMPLE_DB, t, from, to, domainKey); |
1072 | | - fail("Expected " + TDClientHttpConflictException.class.getName()); |
1073 | | - } |
1074 | | - catch (TDClientHttpConflictException e) { |
1075 | | - assertThat(e.getConflictsWith(), is(Optional.of(deleteJob.getJobId()))); |
1076 | | - } |
1077 | | - } |
1078 | | - finally { |
1079 | | - client.deleteTableIfExists(SAMPLE_DB, t); |
1080 | | - } |
1081 | | - } |
1082 | | - |
1083 | 1003 | @Test |
1084 | 1004 | public void swapTest() |
1085 | 1005 | throws Exception |
@@ -1248,9 +1168,23 @@ public void testBulkImport() |
1248 | 1168 | } |
1249 | 1169 |
|
1250 | 1170 | // Check the data |
| 1171 | + // It seems it needs some time to reflect the data in TD, |
| 1172 | + // so we will wait for few mins before checking the data |
| 1173 | + deadline = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5); |
| 1174 | + |
1251 | 1175 | TDTable imported = client.listTables(SAMPLE_DB).stream().filter(input -> { |
1252 | 1176 | return input.getName().equals(bulkImportTable); |
1253 | 1177 | }).findFirst().get(); |
| 1178 | + while (imported.getRowCount() == 0) { |
| 1179 | + if (System.currentTimeMillis() > deadline) { |
| 1180 | + throw new IllegalStateException("timeout error: data is not imported yet"); |
| 1181 | + } |
| 1182 | + logger.info("Waiting data import step completion"); |
| 1183 | + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); |
| 1184 | + imported = client.listTables(SAMPLE_DB).stream().filter(input -> { |
| 1185 | + return input.getName().equals(bulkImportTable); |
| 1186 | + }).findFirst().get(); |
| 1187 | + } |
1254 | 1188 |
|
1255 | 1189 | assertEquals(numRowsInPart * 2, imported.getRowCount()); |
1256 | 1190 | List<TDColumn> columns = imported.getColumns(); |
|
0 commit comments