下载
中文
注册

智能小车控制逻辑入口

“main.py”是小车控制代码的主入口,定义了小车控制模式及对应模式的后续调用实现。

首先导入一系列与参数有关的模块,以及启动摄像头的关键模块。

import os
from argparse import ArgumentParser
from multiprocessing import Process, Queue
from src.scenes import Manual, scene_initiator
from src.utils import getkey, log, CameraBroadcaster, CAMERA_INFO

定义出可选参数更改小车的控制模式,可选命令行cmd控制,手动控制,声音控制(正在规划中)以及测试等。

def parse_args():
    parser = ArgumentParser()
    parser.add_argument('--mode', type=str, required=False, default='manual',
                        choices=['cmd', 'voice', 'manual', 'test'])
    return parser.parse_args()

遵循简单性的原则,在程序的主入口需判断模式的设定后,进入到对应模式的实现中,包含摄像头端口的调用及行驶过程中各类消息队列的处理。

if __name__ == '__main__':
    args = parse_args()
    log.info('start')
    msg_queue = Queue(maxsize=1)
    camera = CameraBroadcaster(CAMERA_INFO)
    shared_memory_name = camera.memory_name
    camera_process = Process(target=camera.run)
    camera_process.start()
    if args.mode == 'manual':
        task = Manual(shared_memory_name, CAMERA_INFO, msg_queue)
        process = Process(target=task.loop)
        process.start()
        try:
            while True:
                key = getkey()
                if key == 'esc':
                    process.join()
                    camera.stop_sign.value = True
                    camera_process.join()
                    break
                else:
                    msg_queue.put(key)
        except (KeyboardInterrupt, SystemExit):
            camera.stop_sign.value = True
            camera_process.join()
            os.system('stty sane')
            log.info('stopping.')
    elif args.mode == 'cmd':
        process_list = []
        record_map = {}
        try:
            log.info(f'start reading cmd')
            while True:
                command = input().strip()
                if command == 'stop':
                    for p in process_list:
                        p.kill()
                    log.info(f'start put stop sign')
                    camera.stop_sign.value = True
                    camera_process.join()
                    break
                elif command == 'clear':
                    for p in process_list:
                        p.kill()
                    process_list.clear()
                    log.info(f'clear succ')
                    continue
                elif command == 'Manual':
                    log.error(f'Does not support switching from cmd mode to manual mode')
                    continue
                log.info(f'building scene {command}')
                scene = scene_initiator(command)
                log.info(f'{scene}')
                if scene is not None:
                    scene_obj = scene(shared_memory_name, CAMERA_INFO, msg_queue)
                    process = Process(target=scene_obj.loop)
                    process.start()
                    process_list.append(process)
 
        except (KeyboardInterrupt, SystemExit):
            camera.stop_sign.value = True
            camera_process.join()
            for process in process_list:
                process.kill()
            log.info('stopping.')
 
    elif args.mode == 'voice':
        raise NotImplementedError('voice control is not currently supported.')
    elif args.mode == 'test':
        test_list = []
        test1 = Manual(shared_memory_name, CAMERA_INFO, msg_queue)
        test_list.append(Process(target=test1.loop))
        test2 = scene_initiator('Tracking')(shared_memory_name, CAMERA_INFO, msg_queue)
        test_list.append(Process(target=test2.loop))
        test3 = scene_initiator('Tracking')(shared_memory_name, CAMERA_INFO, msg_queue)
        test_list.append(Process(target=test3.loop))
 
        for process in test_list:
            process.start()
        try:
            while True:
                key = getkey()
                if key == 'esc':
                    for process in test_list:
                        process.kill()
                    camera.stop_sign.value = True
                    camera_process.join()
                    break
                else:
                    msg_queue.put(key)
        except (KeyboardInterrupt, SystemExit):
            camera.stop_sign.value = True
            camera_process.join()
            os.system('stty sane')
            log.info('stopping.')