|
| 1 | +""" csv_to_custom_json by n4n5 """ |
1 | 2 | from csv import reader as CSVreader
|
2 | 3 | from os.path import isfile
|
3 | 4 | from json import dumps as JSONstringify
|
4 | 5 |
|
| 6 | +# pylint: disable=W0102 |
| 7 | +# -> dangerous arguments |
| 8 | +# pylint: disable=C0103 |
| 9 | +# -> Function name "parseFile" |
5 | 10 |
|
6 |
| -def parseFile(pathToFile="", schema=None, optionsUser={}): |
7 |
| - global lineReader |
8 |
| - def checkOptions(optionsUser, attr, defaultValue): |
9 |
| - if attr in optionsUser: |
10 |
| - return optionsUser[attr] |
11 |
| - else: |
12 |
| - return defaultValue |
| 11 | +def parseFile(path_to_file="", schema=None, options_user={}): |
| 12 | + """Global function to parse a file""" |
| 13 | + def check_options(options_user, attr, default_value): |
| 14 | + """ Check options or put default value """ |
| 15 | + if attr in options_user: |
| 16 | + return options_user[attr] |
| 17 | + return default_value |
13 | 18 | options = {
|
14 |
| - "arrayParse": checkOptions(optionsUser, "arrayParse", True), |
15 |
| - "callBackForce": checkOptions(optionsUser, "callBackForce", False), |
16 |
| - "debug": checkOptions(optionsUser, "debug", False), |
17 |
| - "error": checkOptions(optionsUser, "error", False), |
18 |
| - "lineCallBack": checkOptions(optionsUser, "lineCallBack", None), |
19 |
| - "parse": checkOptions(optionsUser, "parse", True), |
20 |
| - "separator": checkOptions(optionsUser, "separator", ","), |
21 |
| - "privateSeparator": checkOptions(optionsUser, "privateSeparator", "..."), |
22 |
| - "overrideFirstLine": checkOptions(optionsUser, "overrideFirstLine", False), |
23 |
| - "avoidVoidLine": checkOptions(optionsUser, "avoidVoidLine", False) |
| 19 | + "arrayParse": check_options(options_user, "arrayParse", True), |
| 20 | + "callBackForce": check_options(options_user, "callBackForce", False), |
| 21 | + "debug": check_options(options_user, "debug", False), |
| 22 | + "error": check_options(options_user, "error", False), |
| 23 | + "lineCallBack": check_options(options_user, "lineCallBack", None), |
| 24 | + "parse": check_options(options_user, "parse", True), |
| 25 | + "separator": check_options(options_user, "separator", ","), |
| 26 | + "privateSeparator": check_options(options_user, "privateSeparator", "..."), |
| 27 | + "overrideFirstLine": check_options(options_user, "overrideFirstLine", False), |
| 28 | + "avoidVoidLine": check_options(options_user, "avoidVoidLine", False) |
24 | 29 | }
|
25 | 30 | if options["debug"]:
|
26 |
| - if (isinstance(schema,list) or isinstance(schema, dict)) and schema != None: |
| 31 | + if isinstance(schema, (list, dict)) and schema is not None: |
27 | 32 | print("HAS SCHEMA")
|
28 | 33 | else:
|
29 | 34 | print("NO SCHEMA")
|
30 | 35 | print("OPTIONS", JSONstringify(options))
|
31 | 36 | if options["error"] == "no":
|
32 | 37 | print("Useless informations : just use try catch if you don't want error :)")
|
33 |
| - if isinstance(pathToFile, str): |
34 |
| - if not isfile(pathToFile): |
| 38 | + if isinstance(path_to_file, str): |
| 39 | + if not isfile(path_to_file): |
35 | 40 | if options["error"] == "no":
|
36 | 41 | return []
|
37 |
| - else: |
38 |
| - raise ValueError("Can't access to the file : '{}'".format(pathToFile)) |
| 42 | + raise ValueError("Can't access to the file : '{}'".format(path_to_file)) |
39 | 43 |
|
40 |
| - if isinstance(pathToFile, str): |
41 |
| - csvFile = open(pathToFile) |
42 |
| - lineReader = CSVreader(csvFile, delimiter=options["separator"]) |
43 |
| - elif isinstance(pathToFile, list): |
44 |
| - lineReader = pathToFile |
45 |
| - rows = [] |
46 |
| - firstLine = [] |
| 44 | + if isinstance(path_to_file, str): |
| 45 | + csv_file = open(path_to_file) |
| 46 | + line_reader = CSVreader(csv_file, delimiter=options["separator"]) |
| 47 | + elif isinstance(path_to_file, list): |
| 48 | + line_reader = path_to_file |
47 | 49 |
|
48 |
| - def createFieldsBinding(schemaObject, startPath=""): |
49 |
| - global firstLine |
| 50 | + def create_fields_binding(schema_object, first_line, start_path=""): |
| 51 | + """ Create fields bindings """ |
50 | 52 | bindings = []
|
51 |
| - for index, value in enumerate(schemaObject): |
52 |
| - if isinstance(schemaObject, list): |
53 |
| - oneElement = index |
54 |
| - elif isinstance(schemaObject, dict): |
55 |
| - oneElement = value |
56 |
| - if startPath == "": |
57 |
| - path = '{}'.format(oneElement) |
| 53 | + for index, value in enumerate(schema_object): |
| 54 | + is_list = isinstance(schema_object, list) |
| 55 | + is_dict = isinstance(schema_object, dict) |
| 56 | + if is_list: |
| 57 | + one_element = index |
| 58 | + elif is_dict: |
| 59 | + one_element = value |
| 60 | + if start_path == "": |
| 61 | + path = '{}'.format(one_element) |
58 | 62 | else:
|
59 | 63 | path = '{}{}{}'.format(
|
60 |
| - startPath, options["privateSeparator"], oneElement) |
61 |
| - if isinstance(schemaObject[oneElement], dict) or isinstance(schemaObject[oneElement], list): |
62 |
| - if isinstance(schemaObject[oneElement], list): |
| 64 | + start_path, options["privateSeparator"], one_element) |
| 65 | + if isinstance(schema_object[one_element], (dict, list)): |
| 66 | + if isinstance(schema_object[one_element], list): |
63 | 67 | bindings.append({
|
64 |
| - "name": oneElement, |
| 68 | + "name": one_element, |
65 | 69 | "path": path,
|
66 | 70 | "type": "helper-array"
|
67 | 71 | })
|
68 |
| - bindings = [ |
69 |
| - *bindings, *createFieldsBinding(schemaObject[oneElement], path)] |
| 72 | + bindings = bindings + create_fields_binding(schema_object[one_element], first_line, path) |
70 | 73 | else:
|
71 |
| - if isinstance(schemaObject, list) and options["arrayParse"] and schemaObject[oneElement] in firstLine: |
| 74 | + if is_list and options["arrayParse"] and schema_object[one_element] in first_line: |
72 | 75 | bindings.append({
|
73 |
| - "name": schemaObject[oneElement], |
| 76 | + "name": schema_object[one_element], |
74 | 77 | "path": path,
|
75 | 78 | "value": "string"
|
76 | 79 | })
|
77 | 80 | else:
|
78 |
| - if oneElement in firstLine or callable(schemaObject[oneElement]): |
| 81 | + if one_element in first_line or callable(schema_object[one_element]): |
79 | 82 | bindings.append({
|
80 |
| - "name": oneElement, |
| 83 | + "name": one_element, |
81 | 84 | "path": path,
|
82 |
| - "value": schemaObject[oneElement] |
| 85 | + "value": schema_object[one_element] |
83 | 86 | })
|
84 | 87 | else:
|
85 | 88 | bindings.append({
|
86 |
| - "name": oneElement, |
| 89 | + "name": one_element, |
87 | 90 | "path": path,
|
88 | 91 | "type": "static",
|
89 |
| - "value": schemaObject[oneElement] |
| 92 | + "value": schema_object[one_element] |
90 | 93 | })
|
91 | 94 | return bindings
|
92 | 95 |
|
93 |
| - def parseLine(line): |
94 |
| - global rows |
95 |
| - global firstLine |
| 96 | + def parse_line(line, rows, first_line): |
| 97 | + """" Parse one line """ |
96 | 98 | if isinstance(schema, list):
|
97 | 99 | obj = []
|
98 | 100 | else:
|
99 | 101 | obj = {}
|
100 |
| - allValues = line |
101 |
| - for oneRow in rows: |
102 |
| - onePathRow = oneRow["path"] |
103 |
| - onePathName = oneRow["name"] |
104 |
| - allPath = onePathRow.split(options["privateSeparator"]) |
105 |
| - currentValue = None |
106 |
| - if ('type' not in oneRow) or ('type' in oneRow and oneRow["type"] == None): |
107 |
| - if 'value' not in oneRow: |
108 |
| - schemaValue = None |
| 102 | + all_values = line |
| 103 | + for one_row in rows: |
| 104 | + one_path_row = one_row["path"] |
| 105 | + one_path_name = one_row["name"] |
| 106 | + all_path = one_path_row.split(options["privateSeparator"]) |
| 107 | + current_value = None |
| 108 | + if ('type' not in one_row) or ('type' in one_row and one_row["type"] is None): |
| 109 | + if 'value' not in one_row: |
| 110 | + schema_value = None |
109 | 111 | else:
|
110 |
| - schemaValue = oneRow["value"] |
111 |
| - if oneRow["name"] in firstLine: |
112 |
| - index = firstLine.index(oneRow["name"]) |
| 112 | + schema_value = one_row["value"] |
| 113 | + if one_row["name"] in first_line: |
| 114 | + index = first_line.index(one_row["name"]) |
113 | 115 | else:
|
114 | 116 | index = -1
|
115 | 117 | if index == -1:
|
116 |
| - currentValue = schemaValue |
| 118 | + current_value = schema_value |
117 | 119 | else:
|
118 |
| - if index < len(allValues): |
119 |
| - currentValue = allValues[index] |
120 |
| - if options["parse"] == True and currentValue != None: |
121 |
| - if schemaValue == "int" and currentValue != '': |
122 |
| - currentValue = int(currentValue) |
123 |
| - elif schemaValue == "float": |
124 |
| - currentValue = float(currentValue) |
125 |
| - elif schemaValue == "string": |
126 |
| - currentValue = str(currentValue) |
127 |
| - elif callable(schemaValue): |
128 |
| - if callable(currentValue): |
| 120 | + if index < len(all_values): |
| 121 | + current_value = all_values[index] |
| 122 | + if options["parse"] and current_value is not None: |
| 123 | + if schema_value == "int" and current_value != '': |
| 124 | + current_value = int(current_value) |
| 125 | + elif schema_value == "float": |
| 126 | + current_value = float(current_value) |
| 127 | + elif schema_value == "string": |
| 128 | + current_value = str(current_value) |
| 129 | + elif callable(schema_value): |
| 130 | + if callable(current_value): |
129 | 131 | # When the value is in an array
|
130 |
| - currentValue = schemaValue(allValues) |
| 132 | + current_value = schema_value(all_values) |
131 | 133 | else:
|
132 |
| - currentValue = schemaValue(currentValue) |
133 |
| - elif ('type' in oneRow and oneRow["type"] == "helper-array"): |
134 |
| - currentValue = [] |
135 |
| - elif ('type' in oneRow and oneRow["type"] == "static"): |
136 |
| - currentValue = oneRow["value"] |
137 |
| - goodPlace = None |
138 |
| - if len(allPath) > 1: |
139 |
| - goodPlace = obj |
140 |
| - long = len(allPath) |
| 134 | + current_value = schema_value(current_value) |
| 135 | + elif ('type' in one_row and one_row["type"] == "helper-array"): |
| 136 | + current_value = [] |
| 137 | + elif ('type' in one_row and one_row["type"] == "static"): |
| 138 | + current_value = one_row["value"] |
| 139 | + good_place = None |
| 140 | + if len(all_path) > 1: |
| 141 | + good_place = obj |
| 142 | + long = len(all_path) |
141 | 143 | for count in range(0, long):
|
142 |
| - nextPath = allPath[count] |
143 |
| - if isinstance(goodPlace, list): |
144 |
| - nextPathInt = int(nextPath) |
| 144 | + next_path = all_path[count] |
| 145 | + if isinstance(good_place, list): |
| 146 | + next_path_int = int(next_path) |
145 | 147 | if count == (long - 1):
|
146 |
| - if isinstance(goodPlace, dict): |
147 |
| - goodPlace[nextPath] = "" |
| 148 | + if isinstance(good_place, dict): |
| 149 | + good_place[next_path] = "" |
148 | 150 | else:
|
149 |
| - if (isinstance(goodPlace, list) and nextPathInt not in goodPlace) or nextPath not in goodPlace: |
150 |
| - if isinstance(goodPlace, list): |
151 |
| - if len(goodPlace) < (nextPathInt+1): |
| 151 | + if (isinstance(good_place, list) and next_path_int not in good_place) or next_path not in good_place: |
| 152 | + if isinstance(good_place, list): |
| 153 | + if len(good_place) < (next_path_int+1): |
152 | 154 | # len() returns 0 and the first index of the list is 0 !
|
153 |
| - goodPlace.insert(nextPathInt, {}) |
| 155 | + good_place.insert(next_path_int, {}) |
154 | 156 | else:
|
155 |
| - goodPlace[nextPath] = {} |
156 |
| - if isinstance(goodPlace, list): |
157 |
| - if nextPathInt < len(goodPlace): |
158 |
| - goodPlace = goodPlace[nextPathInt] |
| 157 | + good_place[next_path] = {} |
| 158 | + if isinstance(good_place, list): |
| 159 | + if next_path_int < len(good_place): |
| 160 | + good_place = good_place[next_path_int] |
159 | 161 | else:
|
160 |
| - goodPlace = goodPlace[nextPath] |
161 |
| - if isinstance(goodPlace, list): |
162 |
| - goodPlace.append(currentValue) |
163 |
| - elif isinstance(goodPlace, dict): |
164 |
| - goodPlace[onePathName] = currentValue |
| 162 | + good_place = good_place[next_path] |
| 163 | + if isinstance(good_place, list): |
| 164 | + good_place.append(current_value) |
| 165 | + elif isinstance(good_place, dict): |
| 166 | + good_place[one_path_name] = current_value |
165 | 167 | else:
|
166 |
| - goodPlace = currentValue |
| 168 | + good_place = current_value |
167 | 169 | else:
|
168 | 170 | if isinstance(obj, list):
|
169 |
| - place = int(onePathRow) |
170 |
| - obj.insert(place, currentValue) |
| 171 | + place = int(one_path_row) |
| 172 | + obj.insert(place, current_value) |
171 | 173 | elif isinstance(obj, dict):
|
172 |
| - obj[onePathRow] = currentValue |
| 174 | + obj[one_path_row] = current_value |
173 | 175 | return obj
|
174 | 176 |
|
175 |
| - def parsefirstLine(): |
176 |
| - global firstLine |
177 |
| - if isinstance(options["overrideFirstLine"], list): |
178 |
| - firstLine = options["overrideFirstLine"] |
179 |
| - if schema != None: |
| 177 | + def parse_first_line(first_line): |
| 178 | + """ Parse the first line """ |
| 179 | + if schema is not None: |
180 | 180 | # None is default value for schema
|
181 |
| - cols = createFieldsBinding(schema) |
| 181 | + cols = create_fields_binding(schema, first_line) |
182 | 182 | if options["debug"]:
|
183 | 183 | print("BINDINGS:", JSONstringify(cols, default=lambda o: '<not serializable>'))
|
184 | 184 | else:
|
185 | 185 | def dupli(element):
|
| 186 | + """" Duplicate the first line """ |
186 | 187 | return {
|
187 | 188 | "name": element,
|
188 | 189 | "path": element
|
189 | 190 | }
|
190 |
| - cols = [dupli(x) for x in firstLine] |
| 191 | + cols = [dupli(x) for x in first_line] |
191 | 192 | return cols
|
192 | 193 |
|
193 |
| - def reader(): |
194 |
| - global rows |
195 |
| - global firstLine |
196 |
| - global lineReader |
197 |
| - finalJson = [] |
198 |
| - if isinstance(pathToFile, str): |
199 |
| - firstLine = next(lineReader) |
200 |
| - elif isinstance(pathToFile, list): |
201 |
| - firstLine = lineReader[0].split(options["separator"]) |
202 |
| - lineReader = lineReader[1:] |
203 |
| - rows = parsefirstLine() |
204 |
| - for oneLine in lineReader: |
205 |
| - parsedLine = {} |
206 |
| - if isinstance(pathToFile, list): |
207 |
| - oneLine = oneLine.split(options["separator"]) |
208 |
| - elif isinstance(pathToFile, str) and isinstance(oneLine, list) and len(oneLine) == 0: |
209 |
| - oneLine = [''] #create a fake void line |
210 |
| - if options["avoidVoidLine"] == True: |
211 |
| - if (isinstance(oneLine, list) and len(oneLine) == 0) or (isinstance(oneLine, list) and len(oneLine) >= 1 and oneLine[0] == "") or oneLine == "" or oneLine == "\n" or oneLine == "\r\n": |
| 194 | + def reader(line_reader): |
| 195 | + """" Read the file """ |
| 196 | + final_json = [] |
| 197 | + if isinstance(path_to_file, str): |
| 198 | + first_line = next(line_reader) |
| 199 | + elif isinstance(path_to_file, list): |
| 200 | + first_line = line_reader[0].split(options["separator"]) |
| 201 | + line_reader = line_reader[1:] |
| 202 | + if isinstance(options["overrideFirstLine"], list): |
| 203 | + first_line = options["overrideFirstLine"] |
| 204 | + rows = parse_first_line(first_line) |
| 205 | + for one_line in line_reader: |
| 206 | + parsed_line = {} |
| 207 | + if isinstance(path_to_file, list): |
| 208 | + one_line = one_line.split(options["separator"]) |
| 209 | + elif isinstance(path_to_file, str) and isinstance(one_line, list) and not one_line: |
| 210 | + one_line = [''] #create a fake void line |
| 211 | + if options["avoidVoidLine"]: |
| 212 | + if (isinstance(one_line, list) and not one_line) or (isinstance(one_line, list) and len(one_line) >= 1 and one_line[0] == "") or one_line == "" or one_line == "\n" or one_line == "\r\n": |
212 | 213 | continue
|
213 |
| - parsedLine = parseLine(oneLine) |
| 214 | + parsed_line = parse_line(one_line, rows, first_line) |
214 | 215 | if callable(options["lineCallBack"]):
|
215 |
| - resCallback = options["lineCallBack"](parsedLine, oneLine) |
216 |
| - if resCallback == None: |
| 216 | + res_callback = options["lineCallBack"](parsed_line, one_line) |
| 217 | + if res_callback is None: |
217 | 218 | if options["callBackForce"]:
|
218 |
| - parsedLine = resCallback |
| 219 | + parsed_line = res_callback |
219 | 220 | else:
|
220 | 221 | if options["debug"]:
|
221 | 222 | print(
|
222 | 223 | "CallBack force at False and callBack result is not correct")
|
223 | 224 | else:
|
224 |
| - parsedLine = resCallback |
225 |
| - finalJson.append(parsedLine) |
226 |
| - return finalJson |
227 |
| - converted = reader() |
228 |
| - if isinstance(pathToFile, str): |
229 |
| - csvFile.close() |
| 225 | + parsed_line = res_callback |
| 226 | + final_json.append(parsed_line) |
| 227 | + return final_json |
| 228 | + converted = reader(line_reader) |
| 229 | + if isinstance(path_to_file, str): |
| 230 | + csv_file.close() |
230 | 231 | return converted
|
0 commit comments