MSC UDP Unit Test Quick Start Manual
: MORAI Simulator Control(MSC) Quick Start Manual.
MSC provides control of Simulator via API without having to manipulate the UI. This page provides the Unit Test Example Manual using the MSC.
Setup
Download Example Code
The donwloaded files are configured as follows.
msc_udp : msc_udp_exmaple files
Python version
The example code was written and tested in Python 3.7.5.
Run Morai Launcher
To use MSC, use Morai Launcher and check for the message 'MSC is Connect' on the upper left-hand corner of the screen.
* If 'MSC in DisConnect' message appears instead, check if another launcher is being run.
Run Code
If the Simulator and code are running in different environments (environments with different IP addresses), corresponding IP addresses in
params.txt
andparams.json
files must be edited.Open params.txt from downloaded files and enter ID, password, version, vehicle, and map (case sensitive). Refer to example below.
receive_user_ip : 127.0.0.1
receive_user_port : 10329
request_dst_ip : 127.0.0.1
request_dst_port : 10509
user_id : id
user_pw : pw
version: v.3.9.210226.3
map/vehicle: K-City/NiroRunning on
unit_test_ros.py
, the simulator is executed to check the test algorithm using the corresponding ID, password, version, vehicle, and map information in params.txt.
Simulator Network setting
Network Settings configure network information for the saved scenario.
Network Settings are configured as below by selecting Network → Network Settings on the upper menu bar.
Host IP: 127.0.0.1
Destination IP: 127.0.0.1
Ego Ctrl Cmd
Host PORT: 7601
Destination PORT: 7600
Ego Vehicle Status
Host PORT: 8001
Destination PORT: 8000
Object Info
Host PORT: 8101
Destination PORT: 8100
코드
class sim_ctrl
Class for running simulations via MSC by reading information entered in parmas.txtclass planner
Algorithm for unit test
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from lib.udp_parser import udp_parser,udp_sender
import time
import threading
import threading
from math import cos,sin,sqrt,pow,atan2,pi
import sys,os,time
import struct
import csv,json
from inputimeout import inputimeout, TimeoutOccurred
from lib.utils import pathReader,findLocalPath,purePursuit,Point,cruiseControl,vaildObject,velocityPlanning,pidController
#read params.txt
file = open('params.txt', 'r')
line=file.read()
params=line.split('\n')
for i in range(0,len(params)):
params[i]=params[i].split(':')[1].replace(" ","")
recive_user_ip = params[0]
recive_user_port = int(params[1])
request_dst_ip = params[2]
request_dst_port = int(params[3])
class sim_ctrl :
def __init__(self):
self.get_status = udp_parser(recive_user_ip, recive_user_port,'get_sim_status')
self.set_status = udp_sender(request_dst_ip, request_dst_port,'set_sim_status')
self.main_loop()
def main_loop(self):
while True:
self.status_data=self.get_status.get_data()
if not len(self.status_data)==0:
self.data_platform = self.status_data[0]
self.data_stage = self.status_data[1]
self.data_status = self.status_data[2]
self.command_platform = self.status_data[3]
self.command_cmd = self.status_data[4]
self.command_option = self.status_data[5]
self.command_result = self.status_data[6]
if self.data_platform == "0x01" and self.data_stage=="0x01": #Launcher platform에서 로그인 전 상태
cmd_platform="0x01"
cmd_command="0x0001"
cmd_option=params[4]+"/"+params[5]#(ID/PW)
self.send_data([cmd_platform,cmd_command,cmd_option])#Login명령
if self.data_platform=="0x01" and self.data_stage=="0x02": #Launcher platform에서 로그인 후 상태
cmd_platform = "0x01"
cmd_command = "0x0002"
cmd_option = params[6]#simulator version
self.send_data([cmd_platform,cmd_command,cmd_option]) #version 선택 명령
if self.command_result=="0x01": #Result = 성공
cmd_platform="0x01"
cmd_command="0x0004"
cmd_option=""
self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 실행 명령
time.sleep(3)
elif self.command_result=="0x26": #Result = Simulator 설치 안됨
cmd_platform="0x01"
cmd_command="0x0003"
cmd_option=""
self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 설치 명령
if self.data_platform=="0x02" and self.data_stage=="0x01" and self.data_status=="0x0001": #Simulator platform에서 로비 Stage의 대기상태
cmd_platform="0x02"
cmd_command="0x0001"
cmd_option=params[7]#(MAP/Vehicle)
self.send_data([cmd_platform,cmd_command,cmd_option]) #시뮬레이션/옵션 변경 실행 명령
while True: #로딩이 정상적으로 완료될때까지 대기.
_,self.data_stage,_,_,_,_,_=self.get_status.get_data()
if self.data_stage== "0x02":
break
# break
if self.data_platform=="0x02" and self.data_stage=="0x02" and self.data_status=="0x0001": #Simulator platform에서 Play상태
cmd_platform="0x02"
cmd_command="0x0013"
cmd_option="scenario_example"#(Scenario file name)
cmd_option2="0x01 0x02 0x04 0x08" #option2 or 연산
temp_option=cmd_option2.split()
for i in range(0,len(temp_option)):
temp_option[i]=int(temp_option[i],0)
result=temp_option[0]
for i in range(1,len(temp_option)):
result=(result|temp_option[i])
cmd_option=cmd_option+"/"+str(result)
time.sleep(1)
self.send_data([cmd_platform,cmd_command,cmd_option]) #시나리오 세팅 명령
break
time.sleep(1)
else :
print("[NO Simulator Control Data]")
time.sleep(0.5)
planner() ##알고리즘 class
def send_data(self, data):
try:
cmd_platform=int(data[0],0)
cmd_command=int(data[1],0)
cmd_option=data[2]
self.set_status.send_data([cmd_platform,cmd_command,cmd_option])
except ValueError:
print("Invalid input")
class planner :
def __init__(self):
path = os.path.dirname( os.path.abspath( __file__ ) )
with open(os.path.join(path,("params.json")),'r') as fp :
params = json.load(fp)
params=params["params"]
user_ip = params["user_ip"]
host_ip = params["host_ip"]
status_port =params["vehicle_status_dst_port"]
object_port =params["object_info_dst_port"]
get_traffic_port=params["get_traffic_dst_port"]
set_traffic_port=params["set_traffic_host_port"]
ctrl_cmd_port = params["ctrl_cmd_host_port"]
planner_path_file_name = params["planner_path_file_name"]
traffic_greenlight_setting= params["traffic_greenlight_setting"]
self.status=udp_parser(user_ip, status_port,'status')
self.obj=udp_parser(user_ip, object_port,'obj')
self.traffic=udp_parser(user_ip, get_traffic_port,'get_traffic')
self.ctrl_cmd=udp_sender(host_ip,ctrl_cmd_port,'ctrl_cmd')
self.set_traffic=udp_sender(host_ip,set_traffic_port,'set_traffic')
self.txt_reader=pathReader()
self.global_path=self.txt_reader.read(planner_path_file_name)
vel_planner=velocityPlanning(40,0.7) ## 경로기반 속도 계획
self.vel_profile=vel_planner.curveBasedVelocity(self.global_path,50)
self.pure_pursuit=purePursuit()
self.cc=cruiseControl(0.5,1) ## cruiseControl import (object_vel_gain, object_dis_gain)
self.vo=vaildObject()## 장애물 유무 확인
self.pid=pidController() ## pidController import
self._is_status=False
while not self._is_status :
if not self.status.get_data() :
print('No Status Data Cannot run main_loop')
time.sleep(0.5)
else :
self._is_status=True
self.main_loop()
def main_loop(self):
self.timer=threading.Timer(0.1,self.main_loop)
self.timer.start()
status_data=self.status.get_data()
obj_data=self.obj.get_data()
traffic_data = self.traffic.get_data()
position_x=status_data[4]
position_y=status_data[5]
position_z=status_data[6]
heading=status_data[9]+90# degree
velocity=status_data[10]
if not len(traffic_data) == 0 and traffic_greenlight_setting == "True": #set trafficlight (green)
self.set_traffic.send_data([False,traffic_data[1],16])
traffic_data[3]=16
#fine_local_path, waypoint
local_path,current_waypoint =findLocalPath(self.global_path,position_x,position_y)
## 장애물의 숫자와 Type 위치 속도 (object_num, object type, object pose_x, object pose_y, object velocity)
self.vo.get_object(obj_data)
global_obj,local_obj=self.vo.calc_vaild_obj([position_x,position_y,(heading)/180*pi])
self.cc.checkObject(local_path,global_obj,local_obj)
self.pure_pursuit.getPath(local_path)
self.pure_pursuit.getEgoStatus(position_x,position_y,position_z,velocity,heading)
steering_angle=self.pure_pursuit.steering_angle()
#ACC
cc_vel = self.cc.acc(local_obj,velocity,self.vel_profile[current_waypoint]) ## advanced cruise control 적용한 속도 계획
target_velocity = cc_vel
control_input=self.pid.pid(target_velocity,velocity) ## 속도 제어를 위한 PID 적용 (target Velocity, Status Velocity)
if control_input > 0 :
accel= control_input
brake= 0
else :
accel= 0
brake= -control_input
ctrl_mode = 2 # 2 = AutoMode / 1 = KeyBoard
Gear = 4 # 4 1 : (P / parking ) 2 (R / reverse) 3 (N / Neutral) 4 : (D / Drive) 5 : (L)
self.ctrl_cmd.send_data([2,Gear,accel,brake,steering_angle])
if __name__ == "__main__":
ctrl=sim_ctrl()