41 def __exit__(self, exc_type, exc_value, traceback):
61 self.
_socket.connect((address, port))
62 self.
_socket.send(b
'HELLO 1.0\n')
63 with self.
_socket.makefile()
as f:
64 if f.readline().strip() !=
'OK':
65 raise RuntimeError(
'Protocol not supported')
67 def __clamp(self, value, min, max):
68 return sorted((min, value, max))[1]
71 self.
_socket.send(b
'LISTPORTS\n')
72 with self.
_socket.makefile()
as f:
74 if answer.startswith(
'OK'):
75 return json.loads(answer[3:])
76 elif answer.startswith(
'ERROR'):
77 raise RuntimeError(answer[6:])
79 raise RuntimeError(
'Unknown response')
81 def _set_pin_direction(self, pin, direction):
82 direction =
'INPUT' if direction == ahio.Direction.Input
else 'OUTPUT' 83 command = (
'SETDIRECTION %s %s\n' % (pin, direction)).encode(
'utf8')
85 with self.
_socket.makefile()
as f:
87 if answer.startswith(
'OK'):
89 elif answer.startswith(
'ERROR'):
90 raise RuntimeError(answer[6:])
92 raise RuntimeError(
'Unknown response')
94 def _pin_direction(self, pin):
95 command = (
'DIRECTION %s\n' % pin).encode(
'utf8')
97 with self.
_socket.makefile()
as f:
99 if answer.startswith(
'OK'):
100 direction = answer[3:].strip()
102 return d.Input
if direction ==
'INPUT' else d.Output
103 elif answer.startswith(
'ERROR'):
104 raise RuntimeError(answer[6:])
106 raise RuntimeError(
'Unknown response')
108 def _set_pin_type(self, pin, ptype):
109 ptype =
'DIGITAL' if ptype == ahio.PortType.Digital
else 'ANALOG' 110 command = (
'SETTYPE %s %s\n' % (pin, ptype)).encode(
'utf8')
112 with self.
_socket.makefile()
as f:
113 answer = f.readline()
114 if answer.startswith(
'OK'):
116 elif answer.startswith(
'ERROR'):
117 raise RuntimeError(answer[6:])
119 raise RuntimeError(
'Unknown response')
121 def _pin_type(self, pin):
122 command = (
'TYPE %s\n' % pin).encode(
'utf8')
124 with self.
_socket.makefile()
as f:
125 answer = f.readline()
126 if answer.startswith(
'OK'):
127 ptype = answer[3:].strip()
129 return pt.Digital
if ptype ==
'DIGITAL' else pt.Analog
130 elif answer.startswith(
'ERROR'):
131 raise RuntimeError(answer[6:])
133 raise RuntimeError(
'Unknown response')
135 def _find_port_info(self, pin):
142 def _write(self, pin, value, pwm):
146 if self.
_pin_type(pin) == ahio.PortType.Digital:
147 if not pin_info[
'digital'][
'output']:
148 raise RuntimeError(
'Pin does not support digital output')
150 if not pin_info[
'digital'][
'pwm']:
151 raise RuntimeError(
'Pin does not support PWM')
152 value = self.
__clamp(value, 0, 1)
153 command = (
'WRITEPWM %s %s\n' % (pin, value)).encode(
'utf8')
155 value =
'HIGH' if value == ahio.LogicValue.High
else 'LOW' 156 command = (
'WRITEDIGITAL %s %s\n' %
157 (pin, value)).encode(
'utf8')
159 if not pin_info[
'analog'][
'output']:
160 raise RuntimeError(
'Pin does not support analog output')
161 l = pin_info[
'analog'][
'write_range']
162 value = self.
__clamp(value, l[0], l[1])
163 command = (
'WRITEANALOG %s %s\n' % (pin, value)).encode(
'utf8')
165 with self.
_socket.makefile()
as f:
166 answer = f.readline()
167 if answer.startswith(
'OK'):
169 elif answer.startswith(
'ERROR'):
170 raise RuntimeError(answer[6:])
172 raise RuntimeError(
'Unknown response')
174 def _read(self, pin):
177 if pin_info[
'digital'][
'input']
and pin_type == ahio.PortType.Digital:
178 da = ahio.PortType.Digital
179 command = (
'READDIGITAL %s\n' % pin).encode(
'utf8')
180 elif pin_info[
'analog'][
'input']
and pin_type == ahio.PortType.Analog:
181 da = ahio.PortType.Analog
182 command = (
'READANALOG %s\n' % pin).encode(
'utf8')
184 raise RuntimeError(
'Pin does not support input or is not set up')
186 with self.
_socket.makefile()
as f:
187 answer = f.readline()
188 if answer.startswith(
'OK'):
189 value = answer[3:].strip()
190 if da == ahio.PortType.Digital:
192 return lv.High
if value ==
'HIGH' else lv.Low
195 elif answer.startswith(
'ERROR'):
196 raise RuntimeError(answer[6:])
198 raise RuntimeError(
'Unknown response')
201 self.
_socket.send(b
'ANALOGREFERENCES\n')
202 with self.
_socket.makefile()
as f:
203 answer = f.readline()
204 if answer.startswith(
'OK'):
205 __, *opts = answer.strip().split(
' ')
207 elif answer.startswith(
'ERROR'):
208 raise RuntimeError(answer[6:])
210 raise RuntimeError(
'Unknown response')
212 def _set_analog_reference(self, reference, pin):
214 command = (
'SETANALOGREFERENCE %s %s\n' % (reference, pin))
216 command = (
'SETANALOGREFERENCE %s\n' % reference)
217 self.
_socket.send(command.encode(
'utf8'))
218 with self.
_socket.makefile()
as f:
219 answer = f.readline()
220 if answer.startswith(
'OK'):
222 elif answer.startswith(
'ERROR'):
223 raise RuntimeError(answer[6:])
225 raise RuntimeError(
'Unknown response')
227 def _analog_reference(self, pin):
229 command =
'ANALOGREFERENCE %s\n' % pin
231 command =
'ANALOGREFERENCE\n' 232 self.
_socket.send(command.encode(
'utf8'))
233 with self.
_socket.makefile()
as f:
234 answer = f.readline()
235 if answer.startswith(
'OK'):
236 return answer.strip().split(
' ')[1]
237 elif answer.startswith(
'ERROR'):
238 raise RuntimeError(answer[6:])
240 raise RuntimeError(
'Unknown response')
242 def _set_pwm_frequency(self, frequency, pin):
244 command =
'SETPWMFREQUENCY %s %s\n' % (frequency, pin)
246 command =
'SETPWMFREQUENCY %s\n' % frequency
247 self.
_socket.send(command.encode(
'utf8'))
248 with self.
_socket.makefile()
as f:
249 answer = f.readline()
250 if answer.startswith(
'OK'):
252 elif answer.startswith(
'ERROR'):
253 raise RuntimeError(answer[6:])
255 raise RuntimeError(
'Unknown response')
def __exit__(self, exc_type, exc_value, traceback)
def available_pins(self)
Returns available pins.
def _pin_direction(self, pin)
def analog_references(self)
def _find_port_info(self, pin)
Abstract class containing information about the driver.
def setup(self, address, port)
Connects to server at address:port.
Contains abstract classes that should be implemented by drivers.
def __clamp(self, value, min, max)