Skip to content

Commit 51df971

Browse files
committed
Initial release
Initial release of pycorn script
1 parent 240c308 commit 51df971

File tree

1 file changed

+363
-0
lines changed

1 file changed

+363
-0
lines changed

pycorn.py

Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
# -*- coding: utf-8 -*-
2+
'''
3+
PyCORN - script to extract data from .res (results) files generated
4+
by UNICORN Chromatography software supplied with ÄKTA Systems
5+
(c)2014 - Yasar L. Ahmed
6+
v0.1
7+
'''
8+
9+
import struct
10+
import argparse
11+
import codecs
12+
import os
13+
import time
14+
15+
start = time.time()
16+
17+
parser = argparse.ArgumentParser(
18+
description = "Extract data from UNICORN .res files to .csv/.txt",
19+
epilog = "Make it so!")
20+
parser.add_argument("-c", "--check",
21+
help = "Perform simple check if file is supported",
22+
action = "store_true")
23+
parser.add_argument("-n", "--info",
24+
help = "Display entries in header",
25+
action = "store_true")
26+
parser.add_argument("-i", "--inject", type = int, default = 0,
27+
help = "Set injection number # as zero retention, use -t to find injection points",
28+
metavar="#")
29+
parser.add_argument("-t", "--points",
30+
help = "Display injection points",
31+
action = "store_true")
32+
parser.add_argument("-e", "--extract",
33+
help = "Extract supported data blocks",
34+
action = "store_true")
35+
parser.add_argument("-p", "--plot",
36+
help = 'Plot curves',
37+
action = "store_true")
38+
parser.add_argument("-u", "--user",
39+
help = "Show stored user name",
40+
action = "store_true")
41+
parser.add_argument("inp_res",
42+
help = "Input .res file",
43+
metavar = "<file>.res",
44+
type = argparse.FileType('rb'))
45+
46+
args = parser.parse_args()
47+
file_in = args.inp_res.name # file handling is not done by argparse
48+
file_base = args.inp_res.name[:-4]
49+
args.inp_res.close() # see comment above
50+
51+
'''
52+
Magic numbers
53+
'''
54+
RES_magic_id = b'\x11\x47\x11\x47\x18\x00\x00\x00\xB0\x02\x00\x00\x20\x6C\x03\x00'
55+
CNotes_id = b'\x00\x00\x01\x00\x02\x00\x03\x22'
56+
Methods_id = b'\x00\x00\x01\x00\x02\x00\x01\x02'
57+
Logbook_id = b'\x00\x00\x01\x00\x04\x00\x48\x04'
58+
SensData_id = b'\x00\x00\x01\x00\x04\x00\x01\x14'
59+
Fractions_id = b'\x00\x00\x01\x00\x04\x00\x44\x04'
60+
Inject_id = b'\x00\x00\x01\x00\x04\x00\x46\x04'
61+
LogBook_id = b'\x00\x00\x01\x00\x02\x00\x01\x13' #capital B!
62+
63+
inj_sel = 0.0 #injection point is by default 0.0 ml
64+
65+
66+
def input_check(inp):
67+
'''
68+
Checks if input file is a supported res file
69+
x = magic number, y = version string, z = file size/EOF
70+
'''
71+
print((" ---- \n Input file: {0}").format(inp))
72+
with open(inp, 'rb') as fo:
73+
file_read = fo.read()
74+
x=file_read.find(RES_magic_id, 0, 16)
75+
y=file_read.find(b'UNICORN 3.10',16,36)
76+
z=struct.unpack("i", file_read[16:20])
77+
if (x,y) == (0,24):
78+
print(" Input is UNICORN 3.10 file!")
79+
x,y = (0,0)
80+
else:
81+
print(" Input not UNICORN 3.10 file!")
82+
x,y = (1,1)
83+
if z[0] == os.path.getsize(file_in):
84+
print(" File size check - OK")
85+
z = 0
86+
else:
87+
print(" File size mismatch - file corrupted?")
88+
z = 1
89+
return(x,y,z)
90+
91+
92+
def readheader(inp):
93+
'''
94+
Extracts all the entries in the header (starts at position 686)
95+
'''
96+
tmp_list = []
97+
with open(inp, 'rb') as fo:
98+
fread = fo.read()
99+
header_end = fread.find(LogBook_id) + 342
100+
for i in range(686,header_end, 344):
101+
decl = struct.unpack("8s296s4i", fread[i:i+320])
102+
x = decl[0], codecs.decode(decl[1], 'cp1250').rstrip("\x00"),decl[2],decl[3],decl[4],decl[5]
103+
tmp_list.append(x)
104+
return(tmp_list)
105+
106+
107+
def showheader(inp,full="true"):
108+
'''
109+
Prints content of header
110+
'''
111+
header = readheader(file_in)
112+
print((" ---- \n Header of {0}: \n").format(inp))
113+
print(" ENTRY_NAME, BLOCK_SIZE, OFFSET_TO_NEXT, ADRESSE, OFFSET_TO_DATA")
114+
for i in header:
115+
if full == "true":
116+
print(i)
117+
else:
118+
print(i[1:])
119+
120+
121+
def showuser(inp):
122+
'''
123+
Show stored user name
124+
'''
125+
with open(inp, 'rb') as fo:
126+
fread = fo.read(512)
127+
u = struct.unpack("40s", fread[118:158])
128+
dec_u = codecs.decode(u[0], 'cp1250').rstrip("\x00")
129+
print((" ---- \n User: {0}").format(dec_u))
130+
131+
132+
def dataextractor(in_tup):
133+
'''
134+
Identify data type by comparing magic id, then run appropriate
135+
function to extract data, return data as list
136+
'''
137+
inp_id = in_tup[0]
138+
data_tup = in_tup[1:]
139+
#skip empty blocks
140+
if (in_tup[2],in_tup[3]) == (0, 0):
141+
pass
142+
elif inp_id == CNotes_id or inp_id == Methods_id:
143+
x = meta2_read(data_tup)
144+
x.insert(0,inp_id)
145+
return x
146+
elif inp_id == Logbook_id or inp_id == Inject_id or Fractions_id == inp_id:
147+
return meta1_read(data_tup)
148+
elif inp_id == SensData_id:
149+
return sensor_read(data_tup)
150+
else:
151+
pass
152+
153+
154+
def meta1_read(in_tup, silent="false"):
155+
'''
156+
Extracts meta-data/type1, Logbook, fractions and Inject marks
157+
'''
158+
if silent == "true":
159+
pass
160+
else:
161+
print((" Extracting: {0}").format(in_tup[0]))
162+
finaldata = []
163+
start = start = in_tup[3] + in_tup[4]
164+
end = in_tup[3] + in_tup[1]
165+
with open(file_in, 'rb') as fo:
166+
fread = fo.read()
167+
for i in range(start, end, 180):
168+
dp = struct.unpack("dd158s", fread[i:i+174])
169+
#acc_time = dp[0] # not used atm
170+
acc_volume = round(dp[1]-inj_sel,4)
171+
label = codecs.decode(dp[2], 'cp1250')
172+
merged_data=acc_volume,label.rstrip('\x00')#.encode("utf-8")
173+
finaldata.append(merged_data)
174+
finaldata.insert(0, in_tup[0])
175+
return(finaldata)
176+
177+
178+
def meta2_read(in_tup):
179+
'''
180+
Extracts meta-data/type2, Method used in the run
181+
'''
182+
print((" Extracting: {0}").format(in_tup[0]))
183+
start = in_tup[3] + in_tup[4]
184+
with open(file_in,'rb') as fo:
185+
fo.seek(start)
186+
tmp_data = fo.read(in_tup[1])
187+
size = tmp_data.rfind(b'\n') #declared block-size in header is always off
188+
fo.seek(start) #by a few bytes, hence it is redetermined here
189+
data = fo.read(size)
190+
output = [in_tup[0],data]
191+
return(output)
192+
193+
194+
def sensor_read(in_tup):
195+
'''
196+
extracts sensor/run-data and applies correct division
197+
'''
198+
tmp_list0 = []
199+
# set positions
200+
start = in_tup[3] + in_tup[4]
201+
end = in_tup[3] + in_tup[1]
202+
# get the labels
203+
sep0 = in_tup[0].find(":")
204+
sep1 = in_tup[0].find("_")
205+
#run_label = (in_tup[0])[:sep0] # not used atm
206+
sensor_label = (in_tup[0])[sep1+1:]
207+
# set the dividor
208+
if "UV" in sensor_label or "Cond" == sensor_label or "Flow" == sensor_label:
209+
sensor_div = 1000.0
210+
elif "Pressure" in sensor_label:
211+
sensor_div = 100.0
212+
else:
213+
sensor_div = 10.0
214+
print((" Extracting: {0}").format(in_tup[0]))
215+
with open(file_in, 'rb') as fo:
216+
fread = fo.read()
217+
for i in range(start, end, 8):
218+
sread = struct.unpack("ii", fread[i:i+8])
219+
data=round((sread[0]/100.0)-inj_sel,4),sread[1]/sensor_div
220+
tmp_list0.append(data)
221+
tmp_list0.insert(0, in_tup[0])
222+
return(tmp_list0)
223+
224+
225+
def inject_det(inp,show="false"):
226+
'''
227+
Finds injection points - required for adjusting retention
228+
'''
229+
injection_points = []
230+
header = readheader(inp)
231+
for i in header:
232+
if i[0] == Inject_id:
233+
injection = meta1_read(i[1:],silent="true")[1][0]
234+
injection_points.append(injection)
235+
if injection != 0.0:
236+
injection_points.insert(0,0.0)
237+
if injection_points == []:
238+
injection_points = [0.0]
239+
if show == "true":
240+
print((" ---- \n Injection points: \n # \t ml \n 0 \t {0}").format(injection_points[0]))
241+
return(injection_points)
242+
else:
243+
if show == "true":
244+
print(" ---- \n Injection points: \n # \t ml")
245+
for x,y in enumerate(injection_points):
246+
print((" {0} \t {1}").format(x,y))
247+
return(injection_points)
248+
249+
250+
def store_in_list(inp):
251+
'''
252+
extract all data and store in list
253+
'''
254+
global inj_sel
255+
inj_sel = inject_det(file_in,show="false")[args.inject]
256+
header = readheader(inp)
257+
for i in header:
258+
x = dataextractor(i)
259+
if x == None:
260+
pass
261+
else:
262+
data_storage.append(x)
263+
264+
265+
def writer(in_list):
266+
'''
267+
general writer, picks the correct writer function based on data type
268+
'''
269+
if in_list[0] == Methods_id or in_list[0] == CNotes_id:
270+
meta_writer(in_list[1:])
271+
else:
272+
data_writer(in_list)
273+
274+
275+
def meta_writer(in_list):
276+
'''
277+
writes meta-data to txt-files
278+
'''
279+
ext = "_"+in_list[0]+".txt"
280+
inp_data = codecs.decode(in_list[1], 'cp1250')#.replace("\r","")
281+
content = inp_data.encode('utf-8')
282+
with open(file_base+ext,'wb') as fout:
283+
print(" Writing %s" %in_list[0])
284+
fout.write(content)
285+
286+
287+
def data_writer(in_list):
288+
'''
289+
writes sensor/run-data to csv-files
290+
'''
291+
sep = in_list[0].find(":")+1
292+
label = in_list[0][sep:]
293+
run_name = in_list[0][:sep-1]
294+
fname = file_base+"_"+run_name+"_"+label+".csv"
295+
if "Logbook" in label:
296+
pass
297+
else:
298+
with open(fname, 'wb') as fout:
299+
print((" Writing {0}").format(label))
300+
for i in in_list[1:]:
301+
dp = (str(i[0]) + "," + str(i[1]) + str("\n")).encode('utf-8')
302+
fout.write(dp)
303+
304+
305+
def plotter(in_list):
306+
try:
307+
import matplotlib.pyplot as plt
308+
except ImportError:
309+
print(" Matplotlib not found - Plotting will not work!")
310+
endscript()
311+
else:
312+
print(" Matplotlib found - WARNING - PLOTTING IS EXPERIMENTAL")
313+
sep = in_list[0].find(":")+1
314+
label = in_list[0][sep:]
315+
run_name = in_list[0][:sep-1]
316+
f_list = in_list[1:] #filter the list
317+
x_val=[x[0] for x in f_list]
318+
y_val=[x[1] for x in f_list]
319+
plot_x_min = in_list[1][0]
320+
plot_x_max = in_list[-1][0]
321+
if type(y_val[0]) == float:
322+
print(" Plotting %s" %label)
323+
plt.xlim(xmin = plot_x_min, xmax = plot_x_max)
324+
plt.title("%s: " %file_in + run_name + " - " "%s" %label)
325+
plt.xlabel('ml')
326+
plt.grid(which='major', axis='both')
327+
plt.plot(x_val, y_val,linewidth=1.5, alpha=0.85,color='b')
328+
ext = '.pdf'
329+
fname = file_base + "_Plot_" + run_name + "_" + label + ext
330+
plt.savefig(fname, dpi=300)
331+
plt.clf()
332+
333+
334+
def endscript():
335+
print("stopping")
336+
exit
337+
338+
#--- run time---#
339+
340+
if args.user:
341+
showuser(file_in)
342+
if args.check:
343+
input_check(file_in)
344+
if args.info:
345+
showheader(file_in,full="false")
346+
if args.points:
347+
inject_det(file_in,show="true")
348+
if args.extract:
349+
data_storage = []
350+
if input_check(file_in) != (0,0,0):
351+
print("\n Gotta stop!")
352+
else:
353+
print("\n Go go go!")
354+
store_in_list(file_in)
355+
for i in data_storage:
356+
writer(i)
357+
if args.plot:
358+
plotter(i)
359+
360+
end = time.time()
361+
runtime = str(end - start)
362+
print("\n ----")
363+
print((" Runtime: {0} seconds!").format(runtime[:5]))

0 commit comments

Comments
 (0)