"""
A colection of controlling interfaces for drivetrains (both external and internal).
"""
import struct
PYSERIAL = True
try:
from serial import Serial as UART
except ImportError:
PYSERIAL = False # not running on Win32 nor Linux
try:
from busio import UART
except ImportError: # running on a MicroPython board
from .usart_serial_ctx import SerialUART as UART
from circuitpython_nrf24l01 import RF24
IS_TREADED = PYSERIAL
[docs]class NRF24L01():
"""This class acts as a wrapper for circuitpython-nrf24l01 library for using a
peripheral device with nRF24L01 radio transceivers. This is a base class to
:class:`~drivetrain.interfaces.NRF24L01tx` and :class:`~drivetrain.interfaces.NRF24L01rx`
classes.
:param ~circuitpython_nrf24l01.rf24.RF24 nrf24_object: The instantiated object of the nRF24L01
transceiver radio.
:param bytearray address: This will be the RF address used to transmit/receive drivetrain
commands via the nRF24L01 transceiver. For more information on this parameter's usage,
please read the documentation on the using the
:py:meth:`~circuitpython_nrf24l01.rf24.RF24.open_tx_pipe()`
:param str cmd_template: This variable will be used as the `"fmt" (Format String of
Characters) <https://docs.python.org/3.6/library/struct.html#format-strings>`_ parameter
internally passed to the :py:func:`struct.pack()` and :py:func:`struct.unpack()` for
transmiting and receiving drivetrain commands. The number of characters in this string must
correspond to the number of commands in the ``cmds`` list passed to
:py:meth:`~drivetrain.interfaces.NRF24L01rx.go()`.
"""
def __init__(self, nrf24_object, address=b'rfpi0', cmd_template="ll"):
if not isinstance(nrf24_object, RF24):
raise ValueError('nRf24L01 object not recognized or supported.')
self._rf = nrf24_object
self._address = address
self.address = address
self._rf.listen = False
# self._rf.what_happened(1) # prints radio condition
self._prev_cmds = [None, None]
self._fmt = cmd_template
@property
def cmd_template(self):
"""Use this attribute to change or check the format string used to pack or unpack
drivetrain commands in `bytearray` form. Refer to `Format String and Format Characters
<https://docs.python.org/3.6/library/struct.html#format-strings>`_ for allowed datatype
aliases. The number of characters in this string must correspond to the number of commands
in the ``cmds`` list passed to :py:meth:`~drivetrain.interfaces.NRF24L01rx.go()`."""
return self._fmt
@cmd_template.setter
def cmd_template(self, fmt):
self._fmt = fmt
@property
def value(self):
"""The most previous list of commands that were processed by the drivetrain object"""
return self._prev_cmds
@property
def address(self):
"""This `bytearray` will be the RF address used to transmit/receive drivetrain
commands via the nRF24L01 transceiver. For more information on this parameter's usage,
please read the documentation on the using the
:py:meth:`~circuitpython_nrf24l01.rf24.RF24.open_tx_pipe()`"""
return self._address
@address.setter
def address(self, address):
self._address = address
self._rf.open_tx_pipe(address)
self._rf.open_rx_pipe(1, address)
[docs]class NRF24L01tx(NRF24L01):
"""This child class allows the remote controlling of an external drivetrain by transmitting
commands to another MCU via the nRF24L01 transceiver. See also the :class:`~drivetrain.interfaces.NRF24L01` base class for
details about instantiation."""
[docs] def go(self, cmds):
"""Assembles a bytearray to be used for transmitting commands over the air to a receiving
nRF24L01 transceiver.
:param list,tuple cmds: A `list` or `tuple` of `int` commands to be sent over the air
using the nRF24L01. This `list`/`tuple` must have a length equal to the number of
characters in the :py:attr:`~drivetrain.interfaces.NRF24L01.cmd_template` string.
"""
self._prev_cmds = cmds
command = self._fmt.encode() + b';'
for i, c in enumerate(self._fmt):
try:
command += struct.pack(c, cmds[i])
except struct.error:
raise ValueError("command argument, {cmds[i]}, not in range of datatype '{c}'. "
"Refer to 'Format Characters' in python's struct docs for "
"adequate datatype aliases.")
except IndexError:
raise ValueError("expected {} commands, but {} were given".format(
len(self._fmt), len(cmds)))
self._rf.send(command)
# success = self._rf.send(command)
# print('transmit', repr(cmds), 'returned:', success)
[docs]class NRF24L01rx(NRF24L01):
"""This child class allows the external remote controlling of an internal drivetrain by
receiving commands from another MCU via the nRF24L01 transceiver.
:param Tank,Automotive,Locomotive drivetrain: The
pre-instantiated drivetrain configuration object that is to be controlled.
See also the :class:`~drivetrain.interfaces.NRF24L01` base class for details about instantiation.
"""
def __init__(self, nrf24_object, drivetrain, address=b'rfpi0', cmd_template="ll"):
self._d_train = drivetrain
super(NRF24L01rx, self).__init__(nrf24_object, address=b'rfpi0', cmd_template=cmd_template)
self._rf.listen = True
[docs] def sync(self):
"""Checks if there are new commands waiting in the nRF24L01's RX FIFO buffer to be
processed by the drivetrain object (passed to the constructor upon instantiation).
Any data that is waiting to be received is interpreted and passed to the drivetrain object."""
if self._rf.any():
rx = self._rf.recv()
fmt = b''
for i, byte in enumerate(rx):
if byte == 59: # found ';' now convert all prior chars to str & break
# str() doesn't supprot "encoding" kwarg in circuitpython
fmt = ''.join(chr(c) for c in rx[:i])
break
self.go(list(struct.unpack(fmt, rx[len(fmt) + 1:])))
if not IS_TREADED:
self._d_train.sync()
[docs] def go(self, cmds):
"""Assembles a list of drivetrain commands from the received bytearray via the nRF24L01
transceiver.
:param list,tuple cmds: A `list` or `tuple` of `int` commands to be sent the
drivetrain object (passed to the constructor upon instantiation). This `list`/`tuple`
must have a length equal to the number of characters in the
:py:attr:`~drivetrain.interfaces.NRF24L01.cmd_template` string.
"""
self._prev_cmds = list(cmds)
self._d_train.go(self.value)
[docs]class USB():
"""
This base class acts as a wrapper to pyserial module for communicating to an external USB
serial device. Specifically designed for an Arduino running custom code.
:param busio.UART,serial.Serial,machine.UART serial_object: The instantiated serial object to
be used for the serial connection.
:param str cmd_template: This variable will be used as the `"fmt" (Format String of
Characters) <https://docs.python.org/3.6/library/struct.html#format-strings>`_ parameter
internally passed to the :py:func:`struct.pack()` and :py:func:`struct.unpack()` for
transmiting and receiving drivetrain commands. The number of characters in this string must
correspond to the number of commands in the ``cmds`` list passed to
:py:meth:`~drivetrain.interfaces.USBrx.go()`.
"""
def __init__(self, serial_object, cmd_template="ll"):
if not isinstance(serial_object, UART):
raise ValueError("serial_object not recognized or unsupported")
self._ser = serial_object
# print('Successfully opened port {} @ {} to Arduino device'.format(address, baud))
if PYSERIAL:
self._ser.close()
else:
self._ser.deinit()
self._prev_cmds = [None, None]
self._fmt = cmd_template
@property
def cmd_template(self):
"""Use this `str` attribute to change or check the format string used to pack or unpack
drivetrain commands in `bytearray` form. Refer to `Format String and Format Characters
<https://docs.python.org/3.6/library/struct.html#format-strings>`_ for allowed datatype
aliases. The number of characters in this string must correspond to the number of commands
in the ``cmds`` list passed to :py:meth:`~drivetrain.interfaces.USBrx.go()`."""
return self._fmt
@cmd_template.setter
def cmd_template(self, fmt):
self._fmt = fmt
@property
def value(self):
"""The most previous list of commands that were processed by the drivetrain object"""
return self._prev_cmds
[docs]class USBtx(USB):
"""This child class allows the remote controlling of an external drivetrain by transmitting
commands to another MCU via USB serial connection. See also the `USB` base class for
details about instantiation."""
[docs] def go(self, cmds):
"""Assembles a bytearray for outputting over the Serial connection.
:param list,tuple cmds: A `list` or `tuple` of `int` commands to be sent over the Serial
connection. This `list`/`tuple` must have a length equal to the number of characters
in the :py:attr:`~drivetrain.interfaces.USB.cmd_template` string.
"""
self._prev_cmds = cmds
command = self._fmt.encode() + b';'
for i, c in enumerate(self._fmt):
try:
command += struct.pack(c, cmds[i])
except struct.error:
raise ValueError("command argument, {cmds[i]}, not in range of datatype {i}. "
"Refer to 'Format Characters' in python's struct docs for "
"adequate datatype aliases.")
except IndexError:
raise ValueError("expected {} commands, but {} were given".format(
len(self._fmt), len(cmds)))
with self._ser:
self._ser.write(command + b'\n') # terminate command w/ '\n' character for readline()
[docs]class USBrx(USB):
"""This child class allows the remote controlling of an external drivetrain by receiving
commands from another MCU via USB serial connection.
:param Tank,Automotive,Locomotive drivetrain: The
pre-instantiated drivetrain configuration object that is to be controlled.
See also the :class:`~drivetrain.interfaces.USB` base class for details about instantiation.
"""
def __init__(self, drivetrain, serial_object, cmd_template="ll"):
self._d_train = drivetrain
super(USBrx, self).__init__(serial_object=serial_object, cmd_template=cmd_template)
[docs] def sync(self):
"""Checks if there are new commands waiting in the USB serial device's input stream to be
processed by the drivetrain object (passed to the constructor upon instantiation).
Any data that is waiting to be received is interpreted and passed to the drivetrain object."""
rx = b''
with self._ser:
if self._ser.in_waiting:
if PYSERIAL:
# pyserial object doesn't use internal timeout value for read_line()
rx = self._ser.read_until() # defaults to '\n' character
else:
rx = self._ser.read_line()
if rx:
fmt = ''
for i, byte in enumerate(rx):
if byte == 59:
fmt = str(rx[:i], encoding='utf-8')
break
self.go(list(struct.unpack(fmt, rx[len(fmt) + 1 : -1]))) # ignore fmt & '\n
if IS_TREADED:
self._d_train.sync()
[docs] def go(self, cmds):
"""Assembles a list of drivetrain commands from the received bytearray over the USB
serial connection.
:param list,tuple cmds: A `list` or `tuple` of `int` commands to be sent the
drivetrain object (passed to the constructor upon instantiation). This `list`/`tuple`
must have a length equal to the number of characters in the
:py:attr:`~drivetrain.interfaces.USB.cmd_template` string.
"""
self._prev_cmds = cmds
self._d_train.go(self.value)