下载
中文
注册

机械狗控制逻辑入口

“main.py”是机械狗控制代码的主入口,定义了控制模式及对应模式的后续调用实现。

首先导入一系列与参数有关的模块。

import os
from argparse import ArgumentParser
from multiprocessing import Process, Queue

from src.actions import Stop
from src.scenes import Manual, scene_initiator
from src.utils import getkey, log, CameraBroadcaster, SystemInfo, Controller, get_port, STM32_NAME, ESP32_NAME

定义出可选参数更改小车的控制模式,可选命令行cmd控制(预留接口),手动控制,简易模式(自动巡线行走),声音控制(预留接口)、手动控制行走等。

def parse_args():
    parser = ArgumentParser()
    parser.add_argument('--mode', type=str, required=False, default='manual',
                        choices=['cmd', 'voice', 'tracking', 'easy', 'manual'])
    return parser.parse_args()
遵循简单性的原则,在程序的主入口需判断模式的设定后,进入到对应模式的实现中。
if __name__ == '__main__':

    # 获取STM32和ESP32设备的端口
    stm32_port = get_port(STM32_NAME)
    esp32_port = get_port(ESP32_NAME)

    # 使用获得的端口创建SystemInfo实例
    system_info = SystemInfo(stm32_port=stm32_port, esp32_port=esp32_port)

    # 初始化Controller
    ctrl = Controller()

    # 解析命令行参数
    args = parse_args()
    log.info('start')

    # 创建一个最大大小为1的消息队列
    msg_queue = Queue(maxsize=1)
    camera = CameraBroadcaster(system_info)
    shared_memory_name = camera.memory_name

    # 在后台启动摄像头进程
    camera_process = Process(target=camera.run)
    camera_process.start()

    # 检查选择的模式并执行相应的操作
    if args.mode == 'manual':
        task = Manual(shared_memory_name, system_info, msg_queue)
        process = Process(target=task.loop)
        process.start()
        try:
            while True:
                key = getkey()
                if key == 'esc':
                    process.join()
                    camera.stop_sign.value = True
                    camera_process.join()
                    break
                else:
                    msg_queue.put(key)
        except (KeyboardInterrupt, SystemExit):
            camera.stop_sign.value = True
            camera_process.join()
            os.system('stty sane')
            log.info('stopping.')
    elif args.mode == 'cmd':
        process_list = []
        record_map = {}
        try:
            log.info(f'start reading cmd')
            while True:
                command = input().strip()
                if command == 'stop':
                    # 如果命令是'stop',终止所有进程,包括摄像头,并退出循环
                    for p in process_list:
                        p.kill()
                    log.info(f'start put stop sign')

                    ctrl.execute(Stop())
                    camera.stop_sign.value = True
                    camera_process.join()
                    break
                elif command == 'clear':
                    # 如果命令是'clear',清空进程列表,重置控制器,并继续
                    for p in process_list:
                        p.kill()
                    process_list.clear()
                    ctrl = Controller()
                    ctrl.execute(Stop())
                    log.info(f'clear succ')
                    continue
                elif command == 'Manual':
                    log.error(f'Does not support switching from cmd mode to manual mode')
                    continue
                # 根据命令构建场景,并在单独的进程中启动它
                log.info(f'building scene {command}')
                scene = scene_initiator(command)
                log.info(f'{scene}')
                if scene is not None:
                    scene_obj = scene(shared_memory_name, system_info, msg_queue)
                    process = Process(target=scene_obj.loop)
                    process.start()
                    process_list.append(process)

        # 处理键盘中断或系统退出,停止摄像头进程和所有场景进程,并记录事件
        except (KeyboardInterrupt, SystemExit):
            camera.stop_sign.value = True
            camera_process.join()
            for process in process_list:
                process.kill()
            log.info('stopping.')

    elif args.mode == 'voice':
        raise NotImplementedError('voice control is not currently supported.')
    elif args.mode == 'easy':
        process_list = []
        task2 = scene_initiator('LF')(shared_memory_name, system_info, msg_queue)
        process_list.append(Process(target=task2.loop))

        for process in process_list:
            process.start()
        try:
            while True:
                key = getkey()
                if key == 'esc':
                    for process in process_list:
                        process.kill()
                    camera.stop_sign.value = True
                    camera_process.join()
                    break
                else:
                    # 将按键放入消息队列供场景处理
                    msg_queue.put(key)
        except (KeyboardInterrupt, SystemExit):
            camera.stop_sign.value = True
            camera_process.join()
            os.system('stty sane')
            log.info('stopping.')
    elif args.mode == 'tracking':
        # 对于跟踪模式,启动对应场景(Tracking)的单独进程
        process_list = []
        task1 = scene_initiator('Tracking')(shared_memory_name, system_info, msg_queue)
        process_list.append(Process(target=task1.loop))

        for process in process_list:
            process.start()
        try:
            while True:
                key = getkey()
                if key == 'esc':
                    for process in process_list:
                        process.kill()
                    camera.stop_sign.value = True
                    camera_process.join()
                    break
                else:
                    msg_queue.put(key)
        except (KeyboardInterrupt, SystemExit):
            # 处理键盘中断或系统退出,停止摄像头进程,并记录事件
            camera.stop_sign.value = True
            camera_process.join()
            os.system('stty sane')
            log.info('stopping.')