Skip to main content
Skip table of contents

MSC ROS Unit Test Quick Start Manual

MSC provides control of the Simulator via an API without directly using the UI, enabling script-based control and automation of all basic commands within MORAI SIM: Drive.

This page contains the tutorial for using the MSC API with ROS as the main interface between the simulator and the user.

Setup

Download Example Code

Python version

  • Example code was written and tested in python 3.7.5.

Run Morai Launcher

  • To use MSC, use Morai Launcher and check for the message 'MSC is Connect' on the upper left-hand corner of the screen.
    * If 'MSC in DisConnect' message appears instead, check if another launcher is being run.

Run Code

  • If the Simulator and code are run in different environments (environments with different IP addresses), corresponding IP addresses in params.txt and params.json files must be edited.

  • Open params.txt from downloaded files and enter ID, password, version, vehicle, and map (case sensitive). Refer to example below.
    receive_user_ip : 127.0.0.1
    receive_user_port : 10329
    request_dst_ip : 127.0.0.1
    request_dst_port : 10509
    user_id : id
    user_pw : pw
    version: v.3.9.210226.3
    map/vehicle: K-City/Niro

  • Running on unit_test_ros.py, the simulator is executed to check the test algorithm using the corresponding ID, password, version, vehicle, and map information in params.txt.

Simulator Network setting

  • Network Settings configure network information for the saved scenario.

  • For example, Network Settings can be found in the upper menu bar by selecting Network → Network Settings.

    • Bridge IP: 127.0.0.1


Code

  • class sim_ctrl Class for running simulations via MSC by reading information entered in parmas.txt

  • class gen_planner Algorithm for unit test

PY
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import rospy,rospkg
from math import cos,sin,sqrt,pow,atan2,pi
import sys,os,time
import json
from morai_msgs.msg import EgoVehicleStatus,ObjectInfo,CtrlCmd,ScenarioLoad
from lib.ros_utils import pathReader,findLocalPath,purePursuit,Point,cruiseControl,vaildObject,velocityPlanning,pidController
from lib.udp_parser import udp_parser,udp_sender

#read params.txt
file = open('params.txt', 'r')
line=file.read()
params=line.split('\n')
for i in range(0,len(params)):
    params[i]=(params[i].split(':')[1].replace(" ","")).replace('\r','')


recive_user_ip = params[0]
recive_user_port = int(params[1])

request_dst_ip = params[2]
request_dst_port = int(params[3])

class sim_ctrl :

    def __init__(self):            
        self.get_status = udp_parser(recive_user_ip, recive_user_port,'get_sim_status')        
        self.set_status = udp_sender(request_dst_ip, request_dst_port,'set_sim_status') 
                
        self.main_loop()                           

    def main_loop(self):               
        while True:
            self.status_data=self.get_status.get_data()
            print("data : ",self.status_data)
            
            
            if not len(self.status_data)==0:
                
                self.data_platform = self.status_data[0]           
                self.data_stage = self.status_data[1]
                self.data_status = self.status_data[2]

                self.command_platform = self.status_data[3]
                self.command_cmd = self.status_data[4]
                self.command_option = self.status_data[5]
                self.command_result = self.status_data[6]                               
                
                if self.data_platform == "0x01" and self.data_stage=="0x01": #Launcher platform에서 로그인 전 상태
                    cmd_platform="0x01"
                    cmd_command="0x0001"
                    cmd_option=params[4]+"/"+params[5]#(ID/PW)       
                    self.send_data([cmd_platform,cmd_command,cmd_option])#Login명령
                
                if self.data_platform=="0x01" and self.data_stage=="0x02": #Launcher platform에서 로그인 후 상태
                    cmd_platform = "0x01"
                    cmd_command = "0x0002"
                    cmd_option = params[6]#simulator version
                    self.send_data([cmd_platform,cmd_command,cmd_option]) #version 선택 명령
                    
                    if self.data_status=="0x0001": #Result = 성공
                        cmd_platform="0x01"
                        cmd_command="0x0004"
                        cmd_option=""                                                
                        
                        self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 실행 명령                       
                        time.sleep(3)

                        
                        
                    elif self.data_status=="0x0012": #Result = Simulator 설치 안됨
                        cmd_platform="0x01"
                        cmd_command="0x0003"
                        cmd_option=""
                        self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 설치 명령
                  

                if self.data_platform=="0x02" and self.data_stage=="0x01" and self.data_status=="0x0001": #Simulator platform에서 로비 Stage의 대기상태
                    cmd_platform="0x02"
                    cmd_command="0x0001"
                    cmd_option=params[7]#(MAP/Vehicle)                    
                    
                    self.send_data([cmd_platform,cmd_command,cmd_option]) #시뮬레이션/옵션 변경 실행 명령
                    time.sleep(3)
                    
                    while True: #로딩이 정상적으로 완료될때까지 대기.                        
                        _,self.data_stage,_,_,_,_,_=self.get_status.get_data()                     
                        print(self.data_stage)
                        if self.data_stage== "0x02":
                            if self.data_platform=="0x02" and self.data_stage=="0x02" and self.data_status=="0x0001": #Simulator platform에서 Play상태                                
                                break           
                        time.sleep(1)
                    break

                time.sleep(1)
                
            else :
                print("[NO Simulator Control Data]")
                time.sleep(0.5)          

        gen_planner() ##test알고리즘
        
            

    def send_data(self, data):
   
        try:
            print("send",data)
            cmd_platform=int(data[0],0)
            cmd_command=int(data[1],0)
            cmd_option=data[2]         
            self.set_status.send_data([cmd_platform,cmd_command,cmd_option])
        except ValueError:
            print("Invalid input")


class gen_planner():
    def __init__(self):
        rospy.init_node('gen_planner', anonymous=True)
        
        #publisher
        
        ctrl_pub = rospy.Publisher('/ctrl_cmd',CtrlCmd, queue_size=1) ## Vehicl Control
        scenario_pub = rospy.Publisher('/ScenarioLoad',ScenarioLoad, queue_size=1) ## Vehicl Control
        
        ctrl_msg= CtrlCmd()

        scenarioLoad_msgs = ScenarioLoad()
        scenarioLoad_msgs.file_name = "scenario_example_ros"
        scenarioLoad_msgs.load_ego_vehicle_data=True
        scenarioLoad_msgs.load_pedestrian_data=True
        scenarioLoad_msgs.load_surrounding_vehicle_data=True
        scenarioLoad_msgs.load_object_data=True
             
        
        
        
        #subscriber
        rospy.Subscriber("/Ego_topic", EgoVehicleStatus, self.statusCB) ## Vehicl Status Subscriber 
        rospy.Subscriber("/Object_topic", ObjectInfo, self.objectInfoCB) ## Object information Subscriber
        

        #def
        self.is_status=False ## 차량 상태 점검
        self.is_obj=False ## 장애물 상태 점검

        

        #class
        path_reader=pathReader('gen_ros') ## 경로 파일의 위치
        pure_pursuit=purePursuit() ## purePursuit import
        self.cc=cruiseControl(0.5,1) ## cruiseControl import (object_vel_gain, object_dis_gain)
        self.vo=vaildObject()        
        pid=pidController() ## pidController import
        

        #read path
        self.global_path=path_reader.read_txt("scenario_example.txt") ## 출력할 경로의 이름
        
        vel_planner=velocityPlanning(40/3.6,1.5) ## 속도 계획
        vel_profile=vel_planner.curveBasedVelocity(self.global_path,50)

    
        #time var
        count=0
        rate = rospy.Rate(30) # 30hz

        while not rospy.is_shutdown():
                        
            if self.is_status==True  and self.is_obj==True: ## 차량의 상태, 장애물 상태 점검
                if count==0:
                    time.sleep(5)                    
                    scenario_pub.publish(scenarioLoad_msgs)
                ## global_path와 차량의 status_msg를 이용해 현제 waypoint와 local_path를 생성
                local_path,self.current_waypoint=findLocalPath(self.global_path,self.status_msg,0) 
                
                ## 장애물의 숫자와 Type 위치 속도 (object_num, object type, object pose_x, object pose_y, object velocity)
                self.vo.get_object(self.object_info_msg.num_of_objects,self.object_info_msg.object_type,self.object_info_msg.pose_x,self.object_info_msg.pose_y,self.object_info_msg.velocity)
                global_obj,local_obj=self.vo.calc_vaild_obj([self.status_msg.pose_x,self.status_msg.pose_y,(self.status_msg.heading+90)/180*pi])
                
                self.cc.checkObject(local_path,global_obj,local_obj)

                
                pure_pursuit.getPath(local_path) ## pure_pursuit 알고리즘에 Local path 적용
                pure_pursuit.getEgoStatus(self.status_msg) ## pure_pursuit 알고리즘에 차량의 status 적용

                ctrl_msg.steering=-pure_pursuit.steering_angle()/180*pi
                
                cc_vel = self.cc.acc(local_obj,self.status_msg.velocity,vel_profile[self.current_waypoint],self.status_msg) ## advanced cruise control 적용한 속도 계획
                target_velocity = cc_vel              


                control_input=pid.pid(target_velocity,self.status_msg.velocity) ## 속도 제어를 위한 PID 적용 (target Velocity, Status Velocity)

                if control_input > 0 :
                    ctrl_msg.accel= control_input
                    ctrl_msg.brake= 0
                else :
                    ctrl_msg.accel= 0
                    ctrl_msg.brake= -control_input

                if self.status_msg.velocity < 3.0  and target_velocity<=0.0:
                    ctrl_msg.accel=0
                    ctrl_msg.brake=1

    
                print(self.current_waypoint,len(self.global_path.poses)-1)
                print(ctrl_msg)
                ctrl_pub.publish(ctrl_msg) ## Vehicl Control 출력
                if self.current_waypoint == (len(self.global_path.poses)-1) :
                    print("??????????")
                    count=0
                    self.current_waypoint=0                               
                    scenario_pub.publish(scenarioLoad_msgs)
                    time.sleep(3)
                
            
            
                count+=1
            rate.sleep()

    def statusCB(self,data): ## Vehicl Status Subscriber 
        
        self.status_msg=data
        self.is_status=True

    def objectInfoCB(self,data): ## Object information Subscriber
        
        self.object_info_msg=data
        self.is_obj=True   
   
                      


if __name__ == "__main__":
    ctrl=sim_ctrl()
    

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.