gRPC API
: This page describes how to use MORAI Simulator gRPC API Example
The gRPC API is a function that controls the simulator without using UI.
The user could load the map and control NPC vehicles on the module implemented by the simulator. This simulator is executed with gRPC API and the wishful test scenario environment by the user could be configured.
Configuration
Example code Download
The download link is to be uploaded on Github or S3
API Documentation
gRPC python docs
https://grpc.io/docs/languages/python/
Python Configurations
python version
Example code is created and tested in the version of python 3.7.11
python module to be installed
grpcio : 1.39.0
grpcio-tools : 1.39.0
How to execute gRPC and to get the result
How to execute
Execute Morai Launcher and log in
Installation and execution of a specific simulator version in Morai Launcher
Execute example.py script while selecting simulator map and vehicle type.
Execution result
reference video to be attached
Specific Explanation
File Structure
example code is written as follows
example.py
proto file used for gRPC communication
morai_openscenario_base_pb2.py
morai_openscenario_base_pb2_grpc.py
morai_openscenario_msgs_pb2.py
morai_openscenario_msgs_pb2_grpc.py
geometry_msgs_pb2.py
geometry_msgs_pb2_grpc.py
Description of example code execution
Create a GRPCClient class in the main function and call the gRPC API through that class to perform gRPC communication with the simulator.
The following IP and Port are used to communicate with the current simulator via gRPC.
localhost:7789
The program iterates through the infinite loop not to terminate. Press ‘Ctrl + C’ to kill the program.
Example code flow
Start gRPC communication by calling start command
Call map load command and load the map the user requests
Call ego control command to move the ego to the desired location
Start the ego vehicle’s driving by calling object pause function
Create npc vehicle by calling create npc vehicle command
Execute npc control command by calling npc vehicle control command
Create obstacle by calling create obstacle command
Periodically receive the status information of ego and npc vehicles by creating start_status_worker through thread
import os, sys
from trace import Trace
current_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.normpath(os.path.join(current_path, './')))
import morai_openscenario_base_pb2
import morai_openscenario_base_pb2_grpc
import morai_openscenario_msgs_pb2
import morai_openscenario_msgs_pb2_grpc
import geometry_msgs_pb2
import geometry_msgs_pb2_grpc
import grpc
import asyncio
import math
import threading
import time
import json
class GRPCClient():
def __init__(self, parent=None):
self.cnt = 0
self.event_loop = None
async def get_vehicle_state(self):
async with grpc.aio.insecure_channel('localhost:7789') as channel:
stub = morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub(channel)
req = morai_openscenario_base_pb2.ServiceRequest()
req.service_name = ""
req.msg = bytes("", 'utf-8')
call = stub.Connect(req)
async for resp in call:
if resp.service_name == "/morai_msgs/MultiEgoState" :
info = json.loads(str(resp.msg))
elif resp.service_name == "/morai_msgs/EgoState" :
info = json.loads(str(resp.msg))
def start__status_worker(self):
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
try:
asyncio.get_event_loop().run_until_complete(self.get_vehicle_state())
except:
return
def send_ego_ctrl_cmd(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub) :
k_city = {"x": 200.93956082984351, "y": 1766.8720364740955, "z": 0.0, "roll": 0.0, "pitch": 0.0, "yaw": -90.0}
request = morai_openscenario_msgs_pb2.EgoCtrlCmd()
request.service_name = "/morai_msgs/EgoCtrlCmd"
request.position.x = k_city["x"]
request.position.y = k_city["y"]
request.position.z = k_city["z"]
request.rotation.x = k_city["roll"]
request.rotation.y = k_city["pitch"]
request.rotation.z = k_city["yaw"]
request.cruise_settings.cruise_on = True
request.cruise_settings.cruise_type = morai_openscenario_msgs_pb2.EgoCruiseCtrl.CONSTANT
request.cruise_settings.link_speed_ratio = 100
request.cruise_settings.constant_velocity = 50
request.velocity = 50
request.pause = True
try:
response = stub.SendEgoCtrlCmd(request)
print("SendEgoCtrlCmd - " + str(response))
except:
return
def set_multi_ego_ctrl_cmd(self, ctrl_info, unique_id, longCmdType, accel, brake, steering, velocity, acceleration) :
ctrl_info.unique_id = unique_id
ctrl_info.longCmdType = longCmdType
ctrl_info.accel = accel
ctrl_info.brake = brake
ctrl_info.steering = steering
ctrl_info.velocity = velocity
ctrl_info.acceleration = acceleration
return ctrl_info
def send_multi_ego_ctrl_cmd(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub) :
while self.cnt < 15:
self.cnt += 1
request = morai_openscenario_msgs_pb2.MultiEgoCtrlCmdList()
request.service_name = "/morai_msgs/MultiEgoCtrlCmd"
ctrl_info = morai_openscenario_msgs_pb2.MultiEgoCtrlCmd()
ctrl_info = self.set_multi_ego_ctrl_cmd(ctrl_info, "NPC_1", 1, 0.9, 0, 1, 66, 77)
request.multi_ego_ctrl_cmd.append(ctrl_info)
ctrl_info_2 = morai_openscenario_msgs_pb2.MultiEgoCtrlCmd()
ctrl_info_2 = self.set_multi_ego_ctrl_cmd(ctrl_info_2, "NPC_2", 2, 0.6, 0, 1, 30, 77)
request.multi_ego_ctrl_cmd.append(ctrl_info_2)
try:
response = stub.SendMultiEgoCtrlCmd(request)
except:
return
time.sleep(1)
def create_multi_ego_vehicle(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub) :
request = morai_openscenario_msgs_pb2.CreateMultiEgoVehicleRequestList()
request.service_name = "/morai_msgs/CreateMultiEgoVehicle"
create_info = morai_openscenario_msgs_pb2.CreateMultiEgoVehicleRequest()
create_info.unique_id = "NPC_1"
create_info.position.x = 201.1550653096617
create_info.position.y = 1726.8726365584039
create_info.position.z = 0.0
create_info.rotation.x = 0.0
create_info.rotation.y = 0.0
create_info.rotation.z = -90.0
create_info.velocity = 50
create_info.vehicleName = "2016_Hyundai_Genesis_DH"
create_info.pause = False
request.req_list.append(create_info)
create_info_2 = morai_openscenario_msgs_pb2.CreateMultiEgoVehicleRequest()
create_info_2.unique_id = "NPC_2"
create_info_2.position.x = 201.70380861467328
create_info_2.position.y = 1620.374109200712
create_info_2.position.z = 0.0
create_info_2.rotation.x = 0.0
create_info_2.rotation.y = 0.0
create_info_2.rotation.z = -90.0
create_info_2.velocity = 50
create_info_2.vehicleName = "2016_Hyundai_Ioniq"
create_info_2.pause = False
request.req_list.append(create_info_2)
try:
response = stub.SendCreateMultiEgoVehicle(request)
print("SendCreateMultiEgoVehicle" + str(response))
except:
return
def start_cmd(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
request = morai_openscenario_msgs_pb2.StartRequest()
request.service_name = "/morai_msgs/StartCmd"
request.cmd_start = True
try:
response = stub.Start(request)
except:
return
finally:
time.sleep(2)
def stop_cmd(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
request = morai_openscenario_msgs_pb2.StopRequest()
request.service_name = "/morai_msgs/StopCmd"
request.cmd_stop = True
try:
response = stub.Stop(request)
except:
return
finally:
time.sleep(2)
def load_map(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
request = morai_openscenario_msgs_pb2.Map()
# extra asset bundle map을 로드하려는 경우
# extra_asset_bundle_name = "las_test"
# request.map_name = f"V_Extra_Scene,{extra_asset_bundle_name}"
request.map_name = "R_KR_PG_KATRI"
request.ego_name = "2016_Hyundai_Ioniq"
try:
response = stub.LoadMap(request)
print(response)
except:
return
def send_delete_multi_ego_vehicle(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
request = morai_openscenario_msgs_pb2.DeleteMultiEgoVehicleRequest()
request.service_name = "/morai_msgs/DeleteMultiEgoVehicle"
request.req_delete = True
try:
response = stub.SendDeleteMultiEgoVehicle(request)
print(response)
except:
return
def object_pause(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub, object_list):
request = morai_openscenario_msgs_pb2.ObjectPauseList()
for object in object_list:
msg = morai_openscenario_msgs_pb2.ObjectPause()
msg.unique_id = object['unique_id']
msg.obj_type = morai_openscenario_msgs_pb2.ObjectPause.EGO if object['is_ego'] else morai_openscenario_msgs_pb2.ObjectPause.MULTIEGO
msg.set_pause = object['is_pause']
request.req_list.append(msg)
try:
response = stub.ObjectPause(request)
except BaseException as e:
return
def create_obstacle(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
request = morai_openscenario_msgs_pb2.ObstacleSpawnList()
create_info = morai_openscenario_msgs_pb2.ObstacleSpawn()
create_info.unique_id = 'obstacle_1'
create_info.position.x = 208.68639761359265
create_info.position.y = 1661.4038564971388
create_info.position.z = 0
create_info.rotation.x = 0.0
create_info.rotation.y = 0.0
create_info.rotation.z = 0.0
create_info.scale.x = 1.0
create_info.scale.y = 1.0
create_info.scale.z = 1.0
create_info.obstacle_name = 'CargoBox'
request.req_list.append(create_info)
create_info_2 = morai_openscenario_msgs_pb2.ObstacleSpawn()
create_info_2.unique_id = 'obstacle_2'
create_info_2.position.x = 212.4813728324434
create_info_2.position.y = 1605.9207673354679
create_info_2.position.z = 0
create_info_2.rotation.x = 0.0
create_info_2.rotation.y = 0.0
create_info_2.rotation.z = 0.0
create_info_2.scale.x = 1.0
create_info_2.scale.y = 1.0
create_info_2.scale.z = 1.0
create_info_2.obstacle_name = 'WoodBox'
request.req_list.append(create_info_2)
# send message & get response
try:
response = stub.CreateObstacle(request)
except BaseException as e:
return
def delete_objects(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
"""Spawn Obstacles 제거"""
req = morai_openscenario_msgs_pb2.CategoryObstacles()
req.vehicle = True
req.pedestrian = True
req.obstacle = True
req.spawn_point = True
req.map_object = True
try:
stub.DeleteSpawnObstacles(req)
except BaseException as e:
return
if __name__ == "__main__":
print('start example.')
print('press ctrl + c for exit.')
client = GRPCClient()
channel = grpc.insecure_channel('localhost:7789')
stub = morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub(channel)
# start command
client.start_cmd(stub)
# load map command
client.load_map(stub)
# ego Control
client.send_ego_ctrl_cmd(stub)
# resume ego vehicle
ego_object_list = []
ego_object = {
'unique_id': 'ego',
'is_ego': True,
'is_pause': False
}
ego_object_list.append(ego_object)
client.object_pause(stub, ego_object_list)
# create npc vehicle
client.create_multi_ego_vehicle(stub)
# control npc vehicle
t_stop = threading.Thread(target=client.send_multi_ego_ctrl_cmd, daemon=True, args=(stub,))
t_stop.start()
# create obstacle
client.create_obstacle(stub)
# start to receive vehicle status
t_car_status = threading.Thread(target=client.start__status_worker)
t_car_status.start()
try:
while True:
pass
# press ctrl + c for exit
except KeyboardInterrupt:
# delete all scenario objects
client.delete_objects(stub)
# stop receiving vehicle status
if client.event_loop != None:
client.event_loop.stop()
# stop command
client.stop_cmd(stub)
# gRPC channel close
channel.close()
sys.exit()