机械狗控制逻辑入口
“main.py”是机械狗控制代码的主入口,定义了控制模式及对应模式的后续调用实现。
首先导入一系列与参数有关的模块。
import os from argparse import ArgumentParser from multiprocessing import Process, Queue from src.actions import Stop from src.scenes import Manual, scene_initiator from src.utils import getkey, log, CameraBroadcaster, SystemInfo, Controller, get_port, STM32_NAME, ESP32_NAME
定义出可选参数更改小车的控制模式,可选命令行cmd控制(预留接口),手动控制,简易模式(自动巡线行走),声音控制(预留接口)、手动控制行走等。
def parse_args(): parser = ArgumentParser() parser.add_argument('--mode', type=str, required=False, default='manual', choices=['cmd', 'voice', 'tracking', 'easy', 'manual']) return parser.parse_args()
遵循简单性的原则,在程序的主入口需判断模式的设定后,进入到对应模式的实现中。
if __name__ == '__main__': # 获取STM32和ESP32设备的端口 stm32_port = get_port(STM32_NAME) esp32_port = get_port(ESP32_NAME) # 使用获得的端口创建SystemInfo实例 system_info = SystemInfo(stm32_port=stm32_port, esp32_port=esp32_port) # 初始化Controller ctrl = Controller() # 解析命令行参数 args = parse_args() log.info('start') # 创建一个最大大小为1的消息队列 msg_queue = Queue(maxsize=1) camera = CameraBroadcaster(system_info) shared_memory_name = camera.memory_name # 在后台启动摄像头进程 camera_process = Process(target=camera.run) camera_process.start() # 检查选择的模式并执行相应的操作 if args.mode == 'manual': task = Manual(shared_memory_name, system_info, msg_queue) process = Process(target=task.loop) process.start() try: while True: key = getkey() if key == 'esc': process.join() camera.stop_sign.value = True camera_process.join() break else: msg_queue.put(key) except (KeyboardInterrupt, SystemExit): camera.stop_sign.value = True camera_process.join() os.system('stty sane') log.info('stopping.') elif args.mode == 'cmd': process_list = [] record_map = {} try: log.info(f'start reading cmd') while True: command = input().strip() if command == 'stop': # 如果命令是'stop',终止所有进程,包括摄像头,并退出循环 for p in process_list: p.kill() log.info(f'start put stop sign') ctrl.execute(Stop()) camera.stop_sign.value = True camera_process.join() break elif command == 'clear': # 如果命令是'clear',清空进程列表,重置控制器,并继续 for p in process_list: p.kill() process_list.clear() ctrl = Controller() ctrl.execute(Stop()) log.info(f'clear succ') continue elif command == 'Manual': log.error(f'Does not support switching from cmd mode to manual mode') continue # 根据命令构建场景,并在单独的进程中启动它 log.info(f'building scene {command}') scene = scene_initiator(command) log.info(f'{scene}') if scene is not None: scene_obj = scene(shared_memory_name, system_info, msg_queue) process = Process(target=scene_obj.loop) process.start() process_list.append(process) # 处理键盘中断或系统退出,停止摄像头进程和所有场景进程,并记录事件 except (KeyboardInterrupt, SystemExit): camera.stop_sign.value = True camera_process.join() for process in process_list: process.kill() log.info('stopping.') elif args.mode == 'voice': raise NotImplementedError('voice control is not currently supported.') elif args.mode == 'easy': process_list = [] task2 = scene_initiator('LF')(shared_memory_name, system_info, msg_queue) process_list.append(Process(target=task2.loop)) for process in process_list: process.start() try: while True: key = getkey() if key == 'esc': for process in process_list: process.kill() camera.stop_sign.value = True camera_process.join() break else: # 将按键放入消息队列供场景处理 msg_queue.put(key) except (KeyboardInterrupt, SystemExit): camera.stop_sign.value = True camera_process.join() os.system('stty sane') log.info('stopping.') elif args.mode == 'tracking': # 对于跟踪模式,启动对应场景(Tracking)的单独进程 process_list = [] task1 = scene_initiator('Tracking')(shared_memory_name, system_info, msg_queue) process_list.append(Process(target=task1.loop)) for process in process_list: process.start() try: while True: key = getkey() if key == 'esc': for process in process_list: process.kill() camera.stop_sign.value = True camera_process.join() break else: msg_queue.put(key) except (KeyboardInterrupt, SystemExit): # 处理键盘中断或系统退出,停止摄像头进程,并记录事件 camera.stop_sign.value = True camera_process.join() os.system('stty sane') log.info('stopping.')
父主题: 代码实现