109 lines
2.9 KiB
Python
109 lines
2.9 KiB
Python
import serial
|
|
import serial.tools.list_ports
|
|
import socket
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from threading import Timer
|
|
|
|
class RepeatedTimer(object):
|
|
def __init__(self, interval, function, *args, **kwargs):
|
|
self._timer = None
|
|
self.interval = interval
|
|
self.function = function
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
self.is_running = False
|
|
self.start()
|
|
|
|
def _run(self):
|
|
self.is_running = False
|
|
self.start()
|
|
self.function(*self.args, **self.kwargs)
|
|
|
|
def start(self):
|
|
if not self.is_running:
|
|
self._timer = Timer(self.interval, self._run)
|
|
self._timer.start()
|
|
self.is_running = True
|
|
|
|
def stop(self):
|
|
self._timer.cancel()
|
|
self.is_running = False
|
|
|
|
def feedWDT (s):
|
|
sendSerChar (s, b'A')
|
|
|
|
def sendSerChar (s, c):
|
|
print ('Sending ' + str (c))
|
|
s.write(c)
|
|
|
|
def find_arduino_leonardo():
|
|
# Scan for available COM ports
|
|
ports = serial.tools.list_ports.comports()
|
|
for port in ports:
|
|
# Check if "LattePanda Leonardo" is in the description of any found port
|
|
if "LattePanda Leonardo" in port.description:
|
|
return port.device
|
|
return None
|
|
|
|
def main():
|
|
try:
|
|
# Locate the Arduino Leonardo on the LattePanda board
|
|
device = find_arduino_leonardo()
|
|
|
|
# Exit if the device is not found
|
|
if device is None:
|
|
print("LattePanda Leonardo device not found. Please check if the device is connected and drivers are installed.")
|
|
input("Press any key to exit...")
|
|
sys.exit(1)
|
|
|
|
# Open a Serial connection to the Arduino Leonardo
|
|
ser = serial.Serial(device, 9600)
|
|
|
|
# Start a repeating timer to feed the watchdog
|
|
rt = RepeatedTimer (30, feedWDT, ser) # it auto-starts, no need of rt.start()
|
|
|
|
# Setup a listening socket
|
|
s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
|
|
s.bind (('localhost', 9999))
|
|
s.listen (5)
|
|
print ('Server is listening')
|
|
|
|
while True:
|
|
print ('Waiting for connection...')
|
|
|
|
c, addr = s.accept ()
|
|
|
|
print ('Got connection from', addr)
|
|
|
|
data = c.recv (1).decode ('utf-8')
|
|
if not data:
|
|
continue
|
|
|
|
print ('From online user: ' + data)
|
|
|
|
if (data == 'T'):
|
|
break
|
|
|
|
b = bytes (data, 'utf-8')
|
|
sendSerChar (ser, b)
|
|
|
|
c.close()
|
|
|
|
if (data == 'Q'):
|
|
break
|
|
|
|
RepeatedTimer.stop (rt)
|
|
|
|
except Exception as e:
|
|
# Handle exceptions and print the error message
|
|
print("An error occurred:")
|
|
print(str(e))
|
|
print(traceback.format_exc())
|
|
input("Press any key to exit...")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|