from __future__ import division
"""
Main Pathway Device Class
=========================
"""
__all__ = ['Pathway']
__author__ = ["Cosan Lab"]
__license__ = "MIT"
import socket
import time
import numpy as np
from collections import OrderedDict
import six
[docs]class Pathway(object):
"""
Pathway is a class to communicate with the Medoc Pathway thermal stimulation machine.
Args:
ip (str): device ip address
port_number (int): port the device is listening on
timeout (float): seconds until connection timeouts; default 5s
verbose (bool): flag whether to print responses; default True
buffer_size (int): size of connection buffer; default 1024
"""
def __init__(self, ip, port_number,timeout = 5.,verbose=True, buffer_size = 1024):
assert isinstance(ip,six.string_types), "IP address must be a string."
assert isinstance(port_number,six.integer_types), "Port must be an integer"
self.ip = ip
self.port_number = port_number
self.BUFFER_SIZE = buffer_size
self.timeout = timeout
self.verbose = verbose
self.socket = None
self.test_states = {
0: 'IDLE',
1: 'RUNNING',
2: 'PAUSED',
3: 'READY'
}
self.state_codes = {
0: 'IDLE',
1: 'READY',
2: 'TEST'
}
self.command_codes = {
0: 'STATUS',
1: 'TEST_PROGRAM',
2: 'START',
3: 'PAUSE',
4: 'TRIGGER',
5: 'STOP',
6: 'ABORT',
7: 'YES',
8: 'NO'
}
self.response_codes = {
0: 'RESULT_OK',
1: 'RESULT_ILLEGAL_ARG',
2: 'RESULT_ILLEGAL_STATE',
3: 'RESULT_ILLEGAL_TEST_STATE',
4096: 'RESULT_DEVICE_COMM_ERROR',
8192: 'RESULT_SAFETY_WARNING',
16384: 'RESULT_SAFETY_ERROR'
}
self.segmentation_points = OrderedDict({
'LENGTH_OFFSET': (0,4),
'TIMESTAMP_OFFSET': (4,8),
'COMMAND_OFFSET': 8,
'SYSTEM_STATE_OFFSET':9,
'TEST_STATE_OFFSET': 10,
'RESULT_OFFSET': (11,13),
'TEST_TIME_OFFSET': (13,17),
'ERROR_MESSAGE_OFFSET': 17
})
try:
_ = self.call('STATUS',verbose=False)
print('Connection to Pathway successful')
except:
raise IOError('Cannot establish connection, check IP and port number is correct and the host computer is on.')
def _create_connection(self):
"""Create and return new socket connection"""
socket.setdefaulttimeout(self.timeout)
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((self.ip,self.port_number))
return s
[docs] def call(self, command, protocol=None, reuse_socket=False, verbose = False):
"""
Send command to device.
Args:
command (str/int): command name or command_id number to send to device
protocol (str/int): protocol number on device to issue command to (only needed for command TEST_PROGRAM)
reuse_socket (bool): try to reuse the last created socket connection; *NOT CURRENTLY FUNCTIONAL*
verbose (bool): whether to print out the device callback
Returns:
response (dict): response from Medoc system
"""
if reuse_socket:
raise NotImplementedError("Reusing sockets does not currently work.")
if self.socket:
s = self.socket
else:
raise ValueError("No previous sockets exist")
else:
s = self._create_connection()
self.socket = s
if isinstance(command,str):
command = self.command_codes.keys()[self.command_codes.values().index(command)]
if command ==1 and protocol is None:
raise ValueError('TEST_PROGRAM command requires a protocol number')
MESSAGE = self._format_command(command, protocol)
nbytes = s.send(MESSAGE)
time.sleep(0.5)
data = s.recv(self.BUFFER_SIZE)
response = self._format_response(data,nbytes)
if not response:
return self.call(command,protocol,verbose=verbose)
if verbose:
print_flag = verbose
else:
print_flag = self.verbose
if print_flag:
print(response)
return response
def _format_command(self, command, protocol):
"""
Format calls to device.
Args:
command (str/int): command name or command_id number to send to device
protocol (int): protocol number on device to issue command to (only needed for command TEST_PROGRAM)
Returns:
message: formatted message to be sent
"""
bin32 = lambda x : ''.join(reversed( [str((x >> i) & 1) for i in range(32)] ))
curtime = bin32(int(time.time()))
timelist = []
for i in range(1,5):
timelist.append(int(curtime[(i-1)*8:i*8],2))
timelist.reverse()
cmd = [command]
protocollist = []
if command==1 and protocol:
protocol = bin32(protocol)
for i in range(1,5):
protocollist.append(int(protocol[(i-1)*8:i*8],2))
protocollist.reverse()
MESSAGE = timelist + cmd + protocollist # first should be time
sizelist = []
size = bin32(len(MESSAGE))
for i in range(1,5):
sizelist.append(int(size[(i-1)*8:i*8],2))
sizelist.reverse()
MESSAGE = np.array(sizelist + MESSAGE)
MESSAGE = np.getbuffer(MESSAGE.astype(np.uint8),size=len(MESSAGE))
return MESSAGE
def _format_response(self, data, nbytes):
"""
Format responses from device.
Note: Test time is the time since machine was turned on.
Args:
data: data bytes from devices
nbytes: length of bytes from devices
Returns:
response (dict): dictionary of response data
"""
data_int = [int(elem.encode('hex'),16) for elem in data]
response_dict = {}
try:
response_dict['response_length'] = self._decode(data_int,'LENGTH_OFFSET')
response_dict['time_stamp'] = time.ctime(self._decode(data_int,'TIMESTAMP_OFFSET'))
response_dict['command_id'] = self.command_codes[data_int[self.segmentation_points['COMMAND_OFFSET']]]
response_dict['pathway_state'] = self.state_codes[data_int[self.segmentation_points['SYSTEM_STATE_OFFSET']]]
response_dict['test_state'] = self.test_states[data_int[self.segmentation_points['TEST_STATE_OFFSET']]]
response_dict['response'] = self.response_codes[self._decode(data_int,'RESULT_OFFSET')]
response_dict['test_time_stamp'] = self._decode_test_time(data_int)
if response_dict['response_length'] > 13:
#Covert to uint8 and native?
response_dict['error_message'] = data[self.segmentation_points['ERROR_MESSAGE_OFFSET']]
except Exception as e:
print("ERROR FORMATTING RESPONSE")
print("data_int: ", data_int)
print("data: ", data)
print("nbyes: ", nbytes)
return response_dict
def _decode(self,data_int,whichtime):
"""
Helper function to decode response.
"""
bin8 = lambda x : ''.join(reversed( [str((x >> i) & 1) for i in range(8)] ))
dat_int = data_int[self.segmentation_points[whichtime][0]:self.segmentation_points[whichtime][1]]
dat_bin = [bin8(dat_int[i])for i in range(0,self.segmentation_points[whichtime][1]-self.segmentation_points[whichtime][0])]
dat_bin.reverse()
dat_bin = "".join(dat_bin)
return int(dat_bin,2)
def _decode_test_time(self,data_int):
"""
Helper function to decode response time.
"""
test_time = self._decode(data_int,'TEST_TIME_OFFSET')
hours = test_time//3600000
mins = (test_time-(hours*3600000))//60000
secs = (test_time-(hours*3600000)-(mins*60000))//1000
msecs = test_time-(hours*3600000)-(mins*60000)-(secs*1000)
test_time = '%.2d:%.2d:%.2d.%3d' %(hours,mins,secs,msecs)
return test_time
[docs] def poll_for_change(self,to_watch,desired_value,poll_interval=.5,poll_max=-1,verbose=False,server_lag=1.,reuse_socket=False):
"""
Poll system for a value change. Useful for waiting until the Medoc system has transitioned to a specific state in order to issue another command, but the transition length is unknowable.
Args:
to_watch (str): the response field we should be monitoring; most often 'test_state' or 'pathway_state'
desired_value (str): the desired value of the field to wait on, i.e. keep checking until response_field has this value
poll_interval (float): how often to poll; default .5s
poll_max (int): upper limit on polling attempts; default -1 (unlimited)
verbose (bool): print poll attempt number and current state
server_lag (float): sometimes if the socket connection is pinged too quickly after a value change the subsequent command after this method is called can get missed. This adds an additional layer of timing delay before returning from this method to prevent this; default 1s
reuse_socket (bool): try to reuse the last created socket connection; *NOT CURRENTLY FUNCTIONAL*
Returns:
status (bool): whether desired_value was achieved
"""
val = ''
count = 1
while val != desired_value:
if verbose:
print("Poll: {}".format(str(count)))
resp = self.call('STATUS',reuse_socket=False)
if resp:
val = resp[to_watch]
else:
val = 'RESPONSE_FORMAT_ERROR'
if verbose:
print("Current value: {}".format(val))
time.sleep(poll_interval)
count += 1
if poll_max > 0 and count > poll_max:
print("Polling limit exceeded")
return False
time.sleep(server_lag)
return True
#Convenience wrappers around call method
[docs] def status(self):
""" Convenience method."""
return self.call('STATUS')
[docs] def program(self, protocol):
""" Convenience method."""
return self.call('TEST_PROGRAM',protocol=protocol)
[docs] def start(self):
""" Convenience method."""
return self.call('START')
[docs] def pause(self):
""" Convenience method."""
return self.call('PAUSE')
[docs] def trigger(self):
""" Convenience method."""
return self.call('TRIGGER')
[docs] def stop(self):
""" Convenience method."""
return self.call('STOP')
[docs] def abort(self):
""" Convenience method."""
return self.call('ABORT')
[docs] def yes(self):
""" Convenience method."""
return self.call('YES')
[docs] def no(self):
""" Convenience method."""
return self.call('NO')