ahio  1.0.0
I/O Communication Library
generic_tcp_io.py
Go to the documentation of this file.
1 # -*- coding: utf-8; -*-
2 #
3 # Copyright (c) 2016 Álan Crístoffer
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 # THE SOFTWARE.
22 
24 
25 import json
26 import socket
27 from enum import Enum
28 
29 
31  NAME = 'GenericTCPIO'
32  AVAILABLE = True
33 
34 
36  _socket = None
37 
38  def __enter__(self):
39  return self
40 
41  def __exit__(self, exc_type, exc_value, traceback):
42  if self._socket:
43  self._socket.send(b'QUIT\n')
44  self._socket.close()
45 
46 
59  def setup(self, address, port):
60  self._socket = socket.socket()
61  self._socket.connect((address, port))
62  self._socket.send(b'HELLO 1.0\n')
63  with self._socket.makefile() as f:
64  if f.readline().strip() != 'OK':
65  raise RuntimeError('Protocol not supported')
66 
67  def __clamp(self, value, min, max):
68  return sorted((min, value, max))[1]
69 
70  def available_pins(self):
71  self._socket.send(b'LISTPORTS\n')
72  with self._socket.makefile() as f:
73  answer = f.readline()
74  if answer.startswith('OK'):
75  return json.loads(answer[3:])
76  elif answer.startswith('ERROR'):
77  raise RuntimeError(answer[6:])
78  else:
79  raise RuntimeError('Unknown response')
80 
81  def _set_pin_direction(self, pin, direction):
82  direction = 'INPUT' if direction == ahio.Direction.Input else 'OUTPUT'
83  command = ('SETDIRECTION %s %s\n' % (pin, direction)).encode('utf8')
84  self._socket.send(command)
85  with self._socket.makefile() as f:
86  answer = f.readline()
87  if answer.startswith('OK'):
88  return None
89  elif answer.startswith('ERROR'):
90  raise RuntimeError(answer[6:])
91  else:
92  raise RuntimeError('Unknown response')
93 
94  def _pin_direction(self, pin):
95  command = ('DIRECTION %s\n' % pin).encode('utf8')
96  self._socket.send(command)
97  with self._socket.makefile() as f:
98  answer = f.readline()
99  if answer.startswith('OK'):
100  direction = answer[3:].strip()
101  d = ahio.Direction
102  return d.Input if direction == 'INPUT' else d.Output
103  elif answer.startswith('ERROR'):
104  raise RuntimeError(answer[6:])
105  else:
106  raise RuntimeError('Unknown response')
107 
108  def _set_pin_type(self, pin, ptype):
109  ptype = 'DIGITAL' if ptype == ahio.PortType.Digital else 'ANALOG'
110  command = ('SETTYPE %s %s\n' % (pin, ptype)).encode('utf8')
111  self._socket.send(command)
112  with self._socket.makefile() as f:
113  answer = f.readline()
114  if answer.startswith('OK'):
115  return None
116  elif answer.startswith('ERROR'):
117  raise RuntimeError(answer[6:])
118  else:
119  raise RuntimeError('Unknown response')
120 
121  def _pin_type(self, pin):
122  command = ('TYPE %s\n' % pin).encode('utf8')
123  self._socket.send(command)
124  with self._socket.makefile() as f:
125  answer = f.readline()
126  if answer.startswith('OK'):
127  ptype = answer[3:].strip()
128  pt = ahio.PortType
129  return pt.Digital if ptype == 'DIGITAL' else pt.Analog
130  elif answer.startswith('ERROR'):
131  raise RuntimeError(answer[6:])
132  else:
133  raise RuntimeError('Unknown response')
134 
135  def _find_port_info(self, pin):
136  ps = [p for p in self.available_pins() if p['id'] == pin]
137  if ps:
138  return ps[0]
139  else:
140  return None
141 
142  def _write(self, pin, value, pwm):
143  if self._pin_direction(pin) == ahio.Direction.Input:
144  return None
145  pin_info = self._find_port_info(pin)
146  if self._pin_type(pin) == ahio.PortType.Digital:
147  if not pin_info['digital']['output']:
148  raise RuntimeError('Pin does not support digital output')
149  if pwm:
150  if not pin_info['digital']['pwm']:
151  raise RuntimeError('Pin does not support PWM')
152  value = self.__clamp(value, 0, 1)
153  command = ('WRITEPWM %s %s\n' % (pin, value)).encode('utf8')
154  else:
155  value = 'HIGH' if value == ahio.LogicValue.High else 'LOW'
156  command = ('WRITEDIGITAL %s %s\n' %
157  (pin, value)).encode('utf8')
158  else:
159  if not pin_info['analog']['output']:
160  raise RuntimeError('Pin does not support analog output')
161  l = pin_info['analog']['write_range']
162  value = self.__clamp(value, l[0], l[1])
163  command = ('WRITEANALOG %s %s\n' % (pin, value)).encode('utf8')
164  self._socket.send(command)
165  with self._socket.makefile() as f:
166  answer = f.readline()
167  if answer.startswith('OK'):
168  return None
169  elif answer.startswith('ERROR'):
170  raise RuntimeError(answer[6:])
171  else:
172  raise RuntimeError('Unknown response')
173 
174  def _read(self, pin):
175  pin_info = self._find_port_info(pin)
176  pin_type = self._pin_type(pin)
177  if pin_info['digital']['input'] and pin_type == ahio.PortType.Digital:
178  da = ahio.PortType.Digital
179  command = ('READDIGITAL %s\n' % pin).encode('utf8')
180  elif pin_info['analog']['input'] and pin_type == ahio.PortType.Analog:
181  da = ahio.PortType.Analog
182  command = ('READANALOG %s\n' % pin).encode('utf8')
183  else:
184  raise RuntimeError('Pin does not support input or is not set up')
185  self._socket.send(command)
186  with self._socket.makefile() as f:
187  answer = f.readline()
188  if answer.startswith('OK'):
189  value = answer[3:].strip()
190  if da == ahio.PortType.Digital:
191  lv = ahio.LogicValue
192  return lv.High if value == 'HIGH' else lv.Low
193  else:
194  return int(value)
195  elif answer.startswith('ERROR'):
196  raise RuntimeError(answer[6:])
197  else:
198  raise RuntimeError('Unknown response')
199 
200  def analog_references(self):
201  self._socket.send(b'ANALOGREFERENCES\n')
202  with self._socket.makefile() as f:
203  answer = f.readline()
204  if answer.startswith('OK'):
205  __, *opts = answer.strip().split(' ')
206  return opts
207  elif answer.startswith('ERROR'):
208  raise RuntimeError(answer[6:])
209  else:
210  raise RuntimeError('Unknown response')
211 
212  def _set_analog_reference(self, reference, pin):
213  if pin:
214  command = ('SETANALOGREFERENCE %s %s\n' % (reference, pin))
215  else:
216  command = ('SETANALOGREFERENCE %s\n' % reference)
217  self._socket.send(command.encode('utf8'))
218  with self._socket.makefile() as f:
219  answer = f.readline()
220  if answer.startswith('OK'):
221  return
222  elif answer.startswith('ERROR'):
223  raise RuntimeError(answer[6:])
224  else:
225  raise RuntimeError('Unknown response')
226 
227  def _analog_reference(self, pin):
228  if pin:
229  command = 'ANALOGREFERENCE %s\n' % pin
230  else:
231  command = 'ANALOGREFERENCE\n'
232  self._socket.send(command.encode('utf8'))
233  with self._socket.makefile() as f:
234  answer = f.readline()
235  if answer.startswith('OK'):
236  return answer.strip().split(' ')[1]
237  elif answer.startswith('ERROR'):
238  raise RuntimeError(answer[6:])
239  else:
240  raise RuntimeError('Unknown response')
241 
242  def _set_pwm_frequency(self, frequency, pin):
243  if pin:
244  command = 'SETPWMFREQUENCY %s %s\n' % (frequency, pin)
245  else:
246  command = 'SETPWMFREQUENCY %s\n' % frequency
247  self._socket.send(command.encode('utf8'))
248  with self._socket.makefile() as f:
249  answer = f.readline()
250  if answer.startswith('OK'):
251  return
252  elif answer.startswith('ERROR'):
253  raise RuntimeError(answer[6:])
254  else:
255  raise RuntimeError('Unknown response')
def __exit__(self, exc_type, exc_value, traceback)
def available_pins(self)
Returns available pins.
Abstract class containing information about the driver.
def setup(self, address, port)
Connects to server at address:port.
Contains abstract classes that should be implemented by drivers.
def __clamp(self, value, min, max)