mirror of
https://github.com/rjNemo/xafsManager
synced 2026-06-06 02:36:42 +00:00
425 lines
17 KiB
Python
425 lines
17 KiB
Python
import zmq
|
|
import os, sys
|
|
import numpy as np
|
|
from scipy.optimize import curve_fit
|
|
from scipy.interpolate import LSQUnivariateSpline
|
|
from scipy.ndimage import gaussian_filter1d
|
|
from scipy.integrate import simps
|
|
import pyqtgraph as pg
|
|
from pyqtgraph.Qt import QtCore, QtGui
|
|
from time import time
|
|
|
|
FPS_LIMIT = 10
|
|
EXAFS_SLOWDOWN_FACTOR = 10
|
|
|
|
class ZeroMQ_Listener(QtCore.QObject):
|
|
message = QtCore.pyqtSignal(list)
|
|
def __init__(self):
|
|
QtCore.QObject.__init__(self)
|
|
self.context = zmq.Context()
|
|
self.socket = self.context.socket(zmq.SUB)
|
|
self.socket.connect ("tcp://127.0.0.1:6400")
|
|
self.socket.setsockopt_string(zmq.SUBSCRIBE, "")
|
|
self.running = True
|
|
|
|
def loop(self):
|
|
buf = []
|
|
t0 = time()
|
|
while self.running:
|
|
try:
|
|
string = self.socket.recv()
|
|
buf.append(string)
|
|
if time() - t0 > 1./FPS_LIMIT:
|
|
self.message.emit(buf[:])
|
|
buf = []
|
|
t0 = time()
|
|
except:
|
|
pass
|
|
|
|
def finish(self):
|
|
self.socket.close()
|
|
self.context.term()
|
|
|
|
|
|
class EXAFS_Extractor(QtCore.QObject):
|
|
message = QtCore.pyqtSignal(list)
|
|
def __init__(self):
|
|
QtCore.QObject.__init__(self)
|
|
self.running = True
|
|
self.pre1 = -150
|
|
self.pre2 = -50
|
|
self.post1 = 50
|
|
|
|
def victoreen(self, e, c ,d):
|
|
f = 1.23986e4/e
|
|
return c*f**3 - d*f**4
|
|
|
|
def find_e0_idx(self, e, mu):
|
|
d = gaussian_filter1d(np.gradient(mu) / np.gradient(e), 3)
|
|
return np.argmax(d), d
|
|
|
|
def pre_edge(self, e, idx, mu):
|
|
idx1 = np.abs(e[idx] + self.pre1 - e).argmin()
|
|
idx2 = np.abs(e[idx] + self.pre2 - e).argmin()
|
|
popt, pcov = curve_fit(self.victoreen, e[idx1:idx2], mu[idx1:idx2])
|
|
return self.victoreen(e, popt[0], popt[1])
|
|
|
|
def post_edge(self, e, idx, mu, pre, kw = 2):
|
|
# k indexes starts from e0
|
|
f = 0.262467 # magic number for calculation of k
|
|
k = np.sqrt(f * (e[idx:]-e[idx]))
|
|
kmin_idx = np.abs(e[idx] + self.post1 - e).argmin() - idx
|
|
fun_fit = (mu[idx:] - pre[idx:]) * k**kw
|
|
n_knots = int(2.0*(k.max() - k.min() ) / np.pi) + 1
|
|
knots = np.linspace(k[kmin_idx], k[-2], n_knots)
|
|
spl = LSQUnivariateSpline(k, fun_fit, knots)
|
|
post = spl(k[kmin_idx:])/(k[kmin_idx:])**kw
|
|
edge_step = post[0]
|
|
norm = mu - pre
|
|
norm[idx + kmin_idx:] += -post + edge_step
|
|
norm /= edge_step
|
|
chi = norm[idx:] - 1
|
|
return k, gaussian_filter1d(chi*k*k, 3)
|
|
|
|
def hann_win(self, k, kmin, kmax, dk):
|
|
win = np.empty_like(k)
|
|
for i in range (len(k)):
|
|
if (k[i] < (kmin - 0.5*dk)):
|
|
win[i] = 0.0
|
|
elif (k[i] >= (kmin - 0.5*dk)) and (k[i] <= (kmin + 0.5*dk)):
|
|
win[i] = 0.5*(1.0 - np.cos(np.pi*(k[i] - kmin + 0.5*dk)/dk ) )
|
|
elif (k[i] > (kmin + 0.5*dk)) and (k[i] < (kmax - 0.5*dk)):
|
|
win[i] = 1.0
|
|
elif (k[i] >= (kmax - 0.5*dk)) and (k[i] <= (kmax + 0.5*dk)):
|
|
win[i] = 0.5*(1.0 + np.cos(np.pi*(k[i] - kmax + 0.5*dk)/dk ) )
|
|
elif (k[i] >(kmax + 0.5*dk)):
|
|
win[i] = 0.0
|
|
return win
|
|
|
|
def make_ft(self, k, chi, r):
|
|
ft_im = np.empty_like(r)
|
|
ft_re = np.empty_like(r)
|
|
ft_mag = np.empty_like(r)
|
|
win = self.hann_win(k, 2.0, max(k) - 1., 1.0)
|
|
for i in range (len(r)):
|
|
ft_im[i] = simps (np.sin(2.0*k*r[i])*chi*win, k)
|
|
ft_re[i] = simps (np.cos(2.0*k*r[i])*chi*win, k)
|
|
ft_mag[i] = 1.0/np.sqrt(np.pi)*np.sqrt((ft_im[i]*ft_im[i] + ft_re[i]*ft_re[i]))
|
|
return ft_mag
|
|
|
|
def calculate(self, list_data):
|
|
e, mu1, mu2 = np.array(list_data[0]), np.array(list_data[1]), np.array(list_data[2])
|
|
e0_idx1, e0_idx2, deriv1, deriv2 = None, None, [], []
|
|
try:
|
|
e0_idx1, deriv1 = self.find_e0_idx(e, mu1)
|
|
pre_edge1 = self.pre_edge(e, e0_idx1, mu1)
|
|
k1, chi1 = self.post_edge(e, e0_idx1, mu1, pre_edge1)
|
|
except:
|
|
k1, chi1 = [], []
|
|
try:
|
|
e0_idx2, deriv2 = self.find_e0_idx(e, mu2)
|
|
pre_edge2 = self.pre_edge(e, e0_idx2, mu2)
|
|
k2, chi2 = self.post_edge(e, e0_idx2, mu2, pre_edge2)
|
|
except:
|
|
k2, chi2 = [], []
|
|
r1, ft_mag1, r2, ft_mag2 = [], [], [], []
|
|
if len(k1) > 0:
|
|
try:
|
|
r1 = np.arange(0.0, 6.0, 0.02)
|
|
ft_mag1 = self.make_ft(k1, chi1, r1)
|
|
except:
|
|
r1, ft_mag1 = [], []
|
|
if len(k2) > 0:
|
|
try:
|
|
r2 = np.arange(0.0, 6.0, 0.02)
|
|
ft_mag2 = self.make_ft(k2, chi2, r2)
|
|
except:
|
|
r2, ft_mag2 = [], []
|
|
self.message.emit([e0_idx1, deriv1, k1, chi1, r1, ft_mag1, e0_idx2, deriv2, k2, chi2, r2, ft_mag2, e])
|
|
|
|
class MonitorWidget(QtGui.QWidget):
|
|
message_data = QtCore.pyqtSignal(list)
|
|
def __init__(self, app):
|
|
QtGui.QWidget.__init__(self)
|
|
self.app = app
|
|
|
|
self.thread = QtCore.QThread()
|
|
self.zeromq_listener = ZeroMQ_Listener()
|
|
self.zeromq_listener.moveToThread(self.thread)
|
|
self.thread.started.connect(self.zeromq_listener.loop)
|
|
self.zeromq_listener.message.connect(self.signal_received)
|
|
QtCore.QTimer.singleShot(0, self.thread.start)
|
|
|
|
self.thread2 = QtCore.QThread()
|
|
self.exafs_extractor = EXAFS_Extractor()
|
|
self.exafs_extractor.moveToThread(self.thread2)
|
|
self.exafs_extractor.message.connect(self.update_exafs)
|
|
self.message_data.connect(self.exafs_extractor.calculate)
|
|
QtCore.QTimer.singleShot(1, self.thread2.start)
|
|
|
|
self.layout = QtGui.QGridLayout()
|
|
self.pw1 = pg.PlotWidget(title='I0 [counts] / Energy[eV]')
|
|
self.pw2 = pg.PlotWidget(title='I1 [counts] / Energy[eV]')
|
|
self.pw3 = pg.PlotWidget(title='I2 [counts] / Energy[eV]')
|
|
self.pw4 = pg.PlotWidget(title='IP [counts] / Energy[eV]')
|
|
|
|
self.hbl = QtGui.QHBoxLayout()
|
|
self.flabel = QtGui.QLabel("N/A")
|
|
self.flabel.setStyleSheet("color: #fff")
|
|
self.fbtn = QtGui.QPushButton("Open data folder")
|
|
self.fbtn.clicked.connect(self.open_folder)
|
|
self.fbtn.setSizePolicy(QtGui.QSizePolicy.Maximum, QtGui.QSizePolicy.Maximum)
|
|
self.fbtn.setStyleSheet('''
|
|
QPushButton{
|
|
color: #777; font-family:'Arial';
|
|
font-weight: bold;
|
|
font-size:16px;
|
|
border: 2px solid #777;
|
|
border-radius: 10px;
|
|
min-width: 200px;
|
|
padding: 7px;
|
|
outline: none;
|
|
}'''
|
|
)
|
|
self.viewlabel = QtGui.QLabel("Currents")
|
|
self.viewlabel.setAlignment(QtCore.Qt.AlignCenter)
|
|
self.viewlabel.setStyleSheet('''QLabel{
|
|
color: #fff; font-family:'Arial';
|
|
font-weight: bold;
|
|
font-size:16px;
|
|
min-width: 200px;
|
|
max-width: 200px;
|
|
outline: none;
|
|
}'''
|
|
)
|
|
self.btn = QtGui.QPushButton("Switch to XANES")
|
|
self.btn.clicked.connect(self.switchView)
|
|
self.view = 0
|
|
self.btn.setSizePolicy(QtGui.QSizePolicy.Maximum, QtGui.QSizePolicy.Maximum)
|
|
self.btn.setStyleSheet('''
|
|
QPushButton{
|
|
color: #777; font-family:'Arial';
|
|
font-weight: bold;
|
|
font-size:16px;
|
|
border: 2px solid #777;
|
|
border-radius: 10px;
|
|
min-width: 200px;
|
|
padding: 7px;
|
|
outline: none;
|
|
}'''
|
|
)
|
|
self.hbl.addWidget(self.fbtn)
|
|
self.hbl.addWidget(self.flabel)
|
|
self.hbl.addWidget(self.viewlabel)
|
|
self.hbl.addWidget(self.btn)
|
|
self.datafilename = None
|
|
|
|
self.layout.addLayout(self.hbl, 0, 0, 1, 2)
|
|
self.layout.addWidget(self.pw1, 1, 0)
|
|
self.layout.addWidget(self.pw2, 1, 1)
|
|
self.layout.addWidget(self.pw3, 2, 0)
|
|
self.layout.addWidget(self.pw4, 2, 1)
|
|
self.setLayout(self.layout)
|
|
self.app = app
|
|
|
|
self.counter = 0
|
|
self.energy = []
|
|
self.denergy = []
|
|
self.data_i0 = []
|
|
self.data_i1 = []
|
|
self.data_i2 = []
|
|
self.data_ip = []
|
|
self.mu = []
|
|
self.muf = []
|
|
self.dmu = []
|
|
self.dmuf = []
|
|
self.k = []
|
|
self.kf = []
|
|
self.chi = []
|
|
self.chif = []
|
|
self.r = []
|
|
self.ft_mag = []
|
|
self.rf = []
|
|
self.ft_magf = []
|
|
self.e0i = None
|
|
self.e0if = None
|
|
self.curve1 = self.pw1.plot(self.energy, self.data_i0, pen = "b")
|
|
self.curve2 = self.pw2.plot(self.energy, self.data_i1, pen = "g")
|
|
self.curve3 = self.pw3.plot(self.energy, self.data_i2, pen = "r")
|
|
self.curve4 = self.pw4.plot(self.energy, self.data_ip, pen = "c")
|
|
self.pen_orchid = pg.mkPen(color=(230, 170, 250))
|
|
self.pen_lime = pg.mkPen(color=(190, 250, 0))
|
|
self.text2 = pg.TextItem("", anchor=(0.5, 2))
|
|
self.arrow2 = pg.ArrowItem(angle=-90)
|
|
self.text4 = pg.TextItem("", anchor=(0.5, 2))
|
|
self.arrow4 = pg.ArrowItem(angle=-90)
|
|
|
|
def signal_received(self, message_buf):
|
|
for message in message_buf:
|
|
if message == "test":
|
|
pass
|
|
elif message == "clear":
|
|
self.counter = 0
|
|
self.energy = []
|
|
self.denergy = []
|
|
self.data_i0 = []
|
|
self.data_i1 = []
|
|
self.data_i2 = []
|
|
self.data_ip = []
|
|
self.mu = []
|
|
self.muf = []
|
|
self.dmu = []
|
|
self.dmuf = []
|
|
self.k = []
|
|
self.chi = []
|
|
self.kf = []
|
|
self.chif = []
|
|
self.r = []
|
|
self.rf = []
|
|
self.ft_mag = []
|
|
self.ft_magf = []
|
|
self.e0i = None
|
|
self.e0if = None
|
|
self.updatePlot()
|
|
elif (message[-4:] == ".dat") or (message[-4:] == ".nxs"):
|
|
self.flabel.setText(message)
|
|
self.datafilename = message
|
|
else:
|
|
data = [float(v) for v in message.split()]
|
|
self.energy.append(data[0])
|
|
self.data_i0.append(data[1])
|
|
self.data_i1.append(data[2])
|
|
self.data_i2.append(data[3])
|
|
self.data_ip.append(data[4])
|
|
self.mu.append(np.log(data[1]/data[2]))
|
|
self.muf.append(data[4]/data[1])
|
|
self.updatePlot()
|
|
if not self.counter%EXAFS_SLOWDOWN_FACTOR:
|
|
self.message_data.emit([self.energy, self.mu, self.muf])
|
|
self.counter += 1
|
|
self.app.processEvents()
|
|
|
|
def switchView(self):
|
|
self.view = (self.view + 1)%3
|
|
if self.view == 0:
|
|
self.viewlabel.setText("Currents")
|
|
self.btn.setText("Switch to XANES")
|
|
self.pw1.setTitle("I0 [counts] / Energy[eV]")
|
|
self.pw2.setTitle("I1 [counts] / Energy[eV]")
|
|
self.pw3.setTitle("I2 [counts] / Energy[eV]")
|
|
self.pw4.setTitle("IP [counts] / Energy[eV]")
|
|
self.pw2.removeItem(self.text2)
|
|
self.pw2.removeItem(self.arrow2)
|
|
self.pw4.removeItem(self.text4)
|
|
self.pw4.removeItem(self.arrow4)
|
|
elif self.view == 1:
|
|
self.viewlabel.setText("XANES")
|
|
self.btn.setText("Switch to EXAFS")
|
|
self.pw1.setTitle("mu trans. / Energy[eV]")
|
|
self.pw2.setTitle("deriv. mu trans. / Energy[eV]")
|
|
self.pw3.setTitle("mu fluo. / Energy[eV]")
|
|
self.pw4.setTitle("deriv. mu fluo. / Energy[eV]")
|
|
elif self.view == 2:
|
|
self.viewlabel.setText("EXAFS")
|
|
self.btn.setText("Switch to Currents")
|
|
self.pw1.setTitle(u"\u03C7" + "(k)*k^2 trans. / k[" + u"\u212B" + "^-1]")
|
|
self.pw2.setTitle("| FT(" + u"\u03C7" + "(k)*k^2 trans." + ") | / R[" + u"\u212B" + "]")
|
|
self.pw3.setTitle(u"\u03C7" + "(k)*k^2 fluo. / k[" + u"\u212B" + "^-1]")
|
|
self.pw4.setTitle("| FT(" + u"\u03C7" + "(k)*k^2 fluo." + ") | / R[" + u"\u212B" + "]")
|
|
self.pw2.removeItem(self.text2)
|
|
self.pw2.removeItem(self.arrow2)
|
|
self.pw4.removeItem(self.text4)
|
|
self.pw4.removeItem(self.arrow4)
|
|
|
|
#https://github.com/pyqtgraph/pyqtgraph/issues/821
|
|
self.pw1.getAxis('left').textWidth = 0
|
|
self.pw1.getAxis('left')._updateWidth()
|
|
self.pw2.getAxis('left').textWidth = 0
|
|
self.pw2.getAxis('left')._updateWidth()
|
|
self.pw3.getAxis('left').textWidth = 0
|
|
self.pw3.getAxis('left')._updateWidth()
|
|
self.pw4.getAxis('left').textWidth = 0
|
|
self.pw4.getAxis('left')._updateWidth()
|
|
|
|
self.pw1.enableAutoRange()
|
|
self.pw2.enableAutoRange()
|
|
self.pw3.enableAutoRange()
|
|
self.pw4.enableAutoRange()
|
|
self.updatePlot()
|
|
|
|
def updatePlot(self):
|
|
if self.view == 0:
|
|
self.curve1.setData(self.energy, self.data_i0)
|
|
self.curve1.setPen("b")
|
|
self.curve2.setData(self.energy, self.data_i1)
|
|
self.curve2.setPen("g")
|
|
self.curve3.setData(self.energy, self.data_i2)
|
|
self.curve3.setPen("r")
|
|
self.curve4.setData(self.energy, self.data_ip)
|
|
self.curve4.setPen("c")
|
|
elif self.view == 1:
|
|
self.curve1.setData(self.energy, self.mu)
|
|
self.curve1.setPen(self.pen_lime)
|
|
self.curve2.setData(self.denergy, self.dmu)
|
|
self.curve2.setPen(self.pen_lime)
|
|
self.pw2.removeItem(self.text2)
|
|
self.pw2.removeItem(self.arrow2)
|
|
if self.e0i is not None:
|
|
self.text2.setText("%.2f"%self.denergy[self.e0i])
|
|
self.text2.setPos(self.denergy[self.e0i], self.dmu[self.e0i])
|
|
self.arrow2.setPos(self.denergy[self.e0i], self.dmu[self.e0i])
|
|
self.pw2.addItem(self.text2)
|
|
self.pw2.addItem(self.arrow2)
|
|
self.pw4.removeItem(self.text4)
|
|
self.pw4.removeItem(self.arrow4)
|
|
self.curve3.setData(self.energy, self.muf)
|
|
self.curve3.setPen(self.pen_orchid)
|
|
self.curve4.setData(self.denergy, self.dmuf)
|
|
self.curve4.setPen(self.pen_orchid)
|
|
if self.e0if is not None:
|
|
self.text4.setText("%.2f"%self.denergy[self.e0if])
|
|
self.text4.setPos(self.denergy[self.e0if], self.dmuf[self.e0if])
|
|
self.arrow4.setPos(self.denergy[self.e0if], self.dmuf[self.e0if])
|
|
self.pw4.addItem(self.text4)
|
|
self.pw4.addItem(self.arrow4)
|
|
elif self.view == 2:
|
|
self.curve1.setData(self.k, self.chi)
|
|
self.curve1.setPen(self.pen_lime)
|
|
self.curve2.setData(self.r, self.ft_mag)
|
|
self.curve2.setPen(self.pen_lime)
|
|
self.curve3.setData(self.kf, self.chif)
|
|
self.curve3.setPen(self.pen_orchid)
|
|
self.curve4.setData(self.rf, self.ft_magf)
|
|
self.curve4.setPen(self.pen_orchid)
|
|
|
|
def update_exafs(self, data_list):
|
|
self.e0i, self.dmu = data_list[0], data_list[1][:]
|
|
self.k, self.chi = data_list[2][:], data_list[3][:]
|
|
self.r, self.ft_mag = data_list[4][:], data_list[5][:]
|
|
self.e0if, self.dmuf = data_list[6], data_list[7][:]
|
|
self.kf, self.chif = data_list[8][:], data_list[9][:]
|
|
self.rf, self.ft_magf = data_list[10][:], data_list[11][:]
|
|
self.denergy = data_list[12][:]
|
|
self.updatePlot()
|
|
|
|
def open_folder(self):
|
|
if self.datafilename:
|
|
os.system('xdg-open "%s"' %os.path.dirname(self.datafilename))
|
|
|
|
def closeEvent(self, event):
|
|
self.zeromq_listener.running = False
|
|
self.zeromq_listener.finish()
|
|
self.thread.quit()
|
|
self.thread.wait()
|
|
self.thread2.quit()
|
|
self.thread2.wait()
|
|
self.app.processEvents()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app = QtGui.QApplication([])
|
|
mw = MonitorWidget(app)
|
|
mw.setStyleSheet("background-color: #000")
|
|
mw.setWindowTitle("Data monitor")
|
|
mw.show()
|
|
sys.exit(app.exec_())
|