智能小车控制逻辑入口
“main.py”是小车控制代码的主入口,定义了小车控制模式及对应模式的后续调用实现。
首先导入一系列与参数有关的模块,以及启动摄像头的关键模块。
import os from argparse import ArgumentParser from multiprocessing import Process, Queue from src.scenes import Manual, scene_initiator from src.utils import getkey, log, CameraBroadcaster, CAMERA_INFO
定义出可选参数更改小车的控制模式,可选命令行cmd控制,手动控制,声音控制(正在规划中)以及测试等。
def parse_args(): parser = ArgumentParser() parser.add_argument('--mode', type=str, required=False, default='manual', choices=['cmd', 'voice', 'manual', 'test']) return parser.parse_args()
遵循简单性的原则,在程序的主入口需判断模式的设定后,进入到对应模式的实现中,包含摄像头端口的调用及行驶过程中各类消息队列的处理。
if __name__ == '__main__': args = parse_args() log.info('start') msg_queue = Queue(maxsize=1) camera = CameraBroadcaster(CAMERA_INFO) shared_memory_name = camera.memory_name camera_process = Process(target=camera.run) camera_process.start() if args.mode == 'manual': task = Manual(shared_memory_name, CAMERA_INFO, msg_queue) process = Process(target=task.loop) process.start() try: while True: key = getkey() if key == 'esc': process.join() camera.stop_sign.value = True camera_process.join() break else: msg_queue.put(key) except (KeyboardInterrupt, SystemExit): camera.stop_sign.value = True camera_process.join() os.system('stty sane') log.info('stopping.') elif args.mode == 'cmd': process_list = [] record_map = {} try: log.info(f'start reading cmd') while True: command = input().strip() if command == 'stop': for p in process_list: p.kill() log.info(f'start put stop sign') camera.stop_sign.value = True camera_process.join() break elif command == 'clear': for p in process_list: p.kill() process_list.clear() log.info(f'clear succ') continue elif command == 'Manual': log.error(f'Does not support switching from cmd mode to manual mode') continue log.info(f'building scene {command}') scene = scene_initiator(command) log.info(f'{scene}') if scene is not None: scene_obj = scene(shared_memory_name, CAMERA_INFO, msg_queue) process = Process(target=scene_obj.loop) process.start() process_list.append(process) except (KeyboardInterrupt, SystemExit): camera.stop_sign.value = True camera_process.join() for process in process_list: process.kill() log.info('stopping.') elif args.mode == 'voice': raise NotImplementedError('voice control is not currently supported.') elif args.mode == 'test': test_list = [] test1 = Manual(shared_memory_name, CAMERA_INFO, msg_queue) test_list.append(Process(target=test1.loop)) test2 = scene_initiator('Tracking')(shared_memory_name, CAMERA_INFO, msg_queue) test_list.append(Process(target=test2.loop)) test3 = scene_initiator('Tracking')(shared_memory_name, CAMERA_INFO, msg_queue) test_list.append(Process(target=test3.loop)) for process in test_list: process.start() try: while True: key = getkey() if key == 'esc': for process in test_list: process.kill() camera.stop_sign.value = True camera_process.join() break else: msg_queue.put(key) except (KeyboardInterrupt, SystemExit): camera.stop_sign.value = True camera_process.join() os.system('stty sane') log.info('stopping.')
父主题: 代码实现