|
8 | 8 | SQLBookReader,
|
9 | 9 | SQLBookWriter,
|
10 | 10 | from_query_sets,
|
11 |
| - OrderedDict |
| 11 | + OrderedDict, |
| 12 | + PyexcelSQLSkipRowException |
12 | 13 | )
|
13 | 14 | from pyexcel_io.sqlbook import SQLTableReader, SQLTableWriter
|
14 | 15 | from sqlalchemy.orm import relationship, backref
|
@@ -121,6 +122,72 @@ def test_one_table(self):
|
121 | 122 | assert results == self.results
|
122 | 123 | mysession.close()
|
123 | 124 |
|
| 125 | + def test_update_existing_row(self): |
| 126 | + mysession = Session() |
| 127 | + # write existing data |
| 128 | + writer = SQLTableWriter(mysession, |
| 129 | + [Pyexcel,self.data[0], None, None]) |
| 130 | + writer.write_array(self.data[1:]) |
| 131 | + writer.close() |
| 132 | + query_sets=mysession.query(Pyexcel).all() |
| 133 | + results = from_query_sets(self.data[0], query_sets) |
| 134 | + assert results == self.results |
| 135 | + # update data using custom initializer |
| 136 | + update_data = [ |
| 137 | + ['birth', 'id', 'name', 'weight'], |
| 138 | + [datetime.date(2014, 11, 11), 0, 'Adam_E', 12.25], |
| 139 | + [datetime.date(2014, 11, 12), 1, 'Smith_E', 11.25] |
| 140 | + ] |
| 141 | + updated_results = [ |
| 142 | + ['birth', 'id', 'name', 'weight'], |
| 143 | + ['2014-11-11', 0, 'Adam_E', 12.25], |
| 144 | + ['2014-11-12', 1, 'Smith_E', 11.25] |
| 145 | + ] |
| 146 | + def row_updater(row): |
| 147 | + an_instance = mysession.query(Pyexcel).get(row['id']) |
| 148 | + if an_instance is None: |
| 149 | + an_instance = Pyexcel() |
| 150 | + for name in row.keys(): |
| 151 | + setattr(an_instance, name, row[name]) |
| 152 | + return an_instance |
| 153 | + writer = SQLTableWriter(mysession, |
| 154 | + [Pyexcel,update_data[0], None, row_updater]) |
| 155 | + writer.write_array(update_data[1:]) |
| 156 | + writer.close() |
| 157 | + query_sets=mysession.query(Pyexcel).all() |
| 158 | + results = from_query_sets(self.data[0], query_sets) |
| 159 | + assert results == updated_results |
| 160 | + mysession.close() |
| 161 | + |
| 162 | + def test_skipping_rows_if_data_exist(self): |
| 163 | + mysession = Session() |
| 164 | + # write existing data |
| 165 | + writer = SQLTableWriter(mysession, |
| 166 | + [Pyexcel,self.data[0], None, None]) |
| 167 | + writer.write_array(self.data[1:]) |
| 168 | + writer.close() |
| 169 | + query_sets=mysession.query(Pyexcel).all() |
| 170 | + results = from_query_sets(self.data[0], query_sets) |
| 171 | + assert results == self.results |
| 172 | + # update data using custom initializer |
| 173 | + update_data = [ |
| 174 | + ['birth', 'id', 'name', 'weight'], |
| 175 | + [datetime.date(2014, 11, 11), 0, 'Adam_E', 12.25], |
| 176 | + [datetime.date(2014, 11, 12), 1, 'Smith_E', 11.25] |
| 177 | + ] |
| 178 | + def row_updater(row): |
| 179 | + an_instance = mysession.query(Pyexcel).get(row['id']) |
| 180 | + if an_instance is not None: |
| 181 | + raise PyexcelSQLSkipRowException() |
| 182 | + writer = SQLTableWriter(mysession, |
| 183 | + [Pyexcel,update_data[0], None, row_updater]) |
| 184 | + writer.write_array(update_data[1:]) |
| 185 | + writer.close() |
| 186 | + query_sets=mysession.query(Pyexcel).all() |
| 187 | + results = from_query_sets(self.data[0], query_sets) |
| 188 | + assert results == self.results |
| 189 | + mysession.close() |
| 190 | + |
124 | 191 | def test_one_table_with_empty_rows(self):
|
125 | 192 | mysession = Session()
|
126 | 193 | data = [
|
|
0 commit comments