Commit c35e60bf authored by Zhuoyu Zhang's avatar Zhuoyu Zhang
Browse files

1

parent c9e2cc6a
import math, sympy
from numpy import angle
# 计算单位: cm
L1 = 8.0000 # 大腿长
L2 = 15.321 # 小腿长
X0 = 15 # 初始腿的末端到舵机根的距离
# 定义初始位置满足的关系
# L1*cos(base1)+L2*cos(base2)=x0
# L1*sin(base1)+L2*sin(base2)=0
base1 = 1.3427 #arccos((L1**2+X0**2-L2**2)/(2*X0*L1))
base2 = -0.5336 #base1代入方程反解
def calc_turn(input_angle):
# input_angle 实际转过的角度, 返回舵机转的角度
input_angle = input_angle / 180 * 3.1415926
temp = math.asin(8 / 11 * math.sin(input_angle)) + input_angle
res = int(-temp / 3.1415926 * 180)
# print("SdjakdjakljdklaJdljadk: ", res)
return res
# return 2 * math.atan(11*math.sin(input_angle)/(11*math.cos(input_angle)+8)) / 3.1415926 * 180
#小腿153.21, 大腿80
def calc_pos(angle1, angle2):
rad1 = angle1 / 180 * 3.1415926
rad2 = angle2 / 180 * 3.1415926
x = L1 * math.cos(rad1 + base1) + L2 * math.cos(rad1 + rad2 + base2)
y = L1 * math.sin(rad1 + base1) + L2 * math.sin(rad1 + rad2 + base2)
return x, y
#temp1 = (L2*L2-L1*L1 - (x**2+y**2)) / 2*L1
#temp2 = (L2*L2-L1*L1 + (x**2+y**2)) / 2*L2
def calc_angle(x, y):
# x*sin(r1)+y*cos(r1)=sqrt(x**2+y**2)sin(r1+atan(y/x))
# 两个常数分别为L2**2-L1**2, 2*L1
square = x**2 + y**2
temp1 = (square - (L2 * L2 - L1 * L1)) / (2 * L1)
# 辅助角
at = math.pi/2 if x == 0 else math.atan(y/x)
rad1 = math.acos(temp1 / math.sqrt(square)) + at - base1
angle1 = int(rad1 / 3.1415926 * 180)
temp2 = (square + (L2 * L2 - L1 * L1)) / (2 * L2)
rad2 = -math.acos(temp2 / math.sqrt(square)) + at - rad1 - base2
angle2 = int(rad2 / 3.1415926 * 180)
flag = 1
# if(angle2 > 90):
# angle2 = angle2 - 180
return flag, angle1, angle2
if __name__ == "__main__":
# pass
print(calc_turn(-17))
# print(int(calc_turn(20) / 3.1415926 * 180))
# print(int(calc_turn(30) / 3.1415926 * 180))
# print(int(calc_turn(40) / 3.1415926 * 180))
# print(int(calc_turn(50) / 3.1415926 * 180))
# x, y = calc_pos(28, -27)
# print(x, y)
# _, angle1, angle2 = calc_angle(x, y)
# print(angle1, angle2)
# print(calc_pos(-46, 17))
# print(calc_pos(angle1, angle2))
# print(calc_pos(0, 0))
# print(calc_pos(-29, -14))
import cv2
import rospy
from sensor_msgs.msg import CameraInfo, Image
from std_msgs.msg import String
from cv_bridge import CvBridge
import numpy as np
import math
def find_color(image, color='blue'):
'''
寻找色块
'''
color_dist = {
'red': {'Lower': np.array([0, 60, 60]), 'Upper': np.array([6, 255, 255])},
'blue': {'Lower': np.array([97, 224, 70]), 'Upper': np.array([124, 255, 255])},
'green': {'Lower': np.array([35, 43, 35]), 'Upper': np.array([90, 255, 255])},
}
gs_frame = cv2.GaussianBlur(image, (5, 5), 0) # 高斯模糊
hsv = cv2.cvtColor(gs_frame, cv2.COLOR_BGR2HSV) # 转化成HSV图像
erode_hsv = cv2.erode(hsv, None, iterations=2) # 腐蚀 粗的变细
inRange_hsv = cv2.inRange(erode_hsv, color_dist[color]['Lower'], color_dist[color]['Upper']) #识别颜色
cnts = cv2.findContours(inRange_hsv.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[-2] #寻找轮廓
try:
c = max(cnts, key=cv2.contourArea) #寻找最大色块
except:
return 0
rect = cv2.minAreaRect(c) #找最小的包裹色块的矩形
box = cv2.boxPoints(rect) #box的点
sumx = 0.0
sumy = 0.0
for pi in box:
sumx += pi[0]
sumy += pi[1]
center = (int(sumx / 4), int(sumy / 4))
sita = math.atan(math.tan(34.5 / 180 * 3.1415926) * (center[0] - 320) / 320) / 3.1415926 * 180
# print("center, sita = ", center, sita)
# cv2.drawContours(image, [np.int0(box)], -1, (0, 255, 255), 2)
# cv2.circle(image, center, 7, (0, 0, 0), -1)
return sita
def image_process(msg):
image = CvBridge().imgmsg_to_cv2(msg, "bgr8")
sita = find_color(image)
# cv2.imshow('image', new_image)
# cv2.waitKey(1)
return sita
# cv2.imwrite('1.jpg', image)
# 目标所在未知在图片中的角度
# def get_angle():
# rospy.spinOnce()
# return .sita
def depth_process(msg):
image = CvBridge().imgmsg_to_cv2(msg, '16UC1')
image_arr = np.array(image)
# print(image_arr.shape[0])
divide_p = [image_arr.shape[1] // 4, image_arr.shape[1] // 2, image_arr.shape[1] * 3 // 4]
center_part = image_arr[int(image_arr.shape[0] / 2):, divide_p[0]:divide_p[2]]
left_part = image_arr[int(image_arr.shape[0] / 2):, 0:divide_p[1]]
right_part = image_arr[int(image_arr.shape[0] / 2):, divide_p[1]:image_arr.shape[1]]
center_part_mean = center_part.mean()
left_part_mean = left_part.mean()
right_part_mean = right_part.mean()
return center_part_mean, left_part_mean, right_part_mean
def depth_callback(msg):
print(depth_process(msg))
image = CvBridge().imgmsg_to_cv2(msg, '16UC1')
image -= image.min()
image = image / (image.max() - image.min())
image *= 255
# image_arr = np.array(image)
# print(image_arr)
cv2.imwrite('2.jpg', image)
# np.savetxt( "a.csv", image_arr, delimiter=",")
if __name__ == "__main__":
rospy.init_node("camera")
# sub = rospy.Subscriber("/camera/color/image_raw", Image, image_process)
depth = rospy.Subscriber("/camera/aligned_depth_to_color/image_raw", Image, depth_callback)
rate = rospy.Rate(1000)
rospy.spin()
rate.sleep()
\ No newline at end of file
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
import time
import cmd
import logging
def SendCommand(msg):
print(msg.data)
pub.publish(msg)
class CommandCLI(cmd.Cmd):
intro = 'XLAB机器人控制平台, 输入 help 或者?查看帮助。\n'
prompt = '> 请输入指令: '
msg = String()
def ErrorArgs(self):
print('参数个数错误,请检查输入')
def checkArgs(self, arg, aim):
'检查参数合法性; arg1:输入参数; arg2:包含符合要求的参数个数'
args = arg.split(' ')
argnum = 0 if arg == '' else len(args)
invalid = False
if(argnum not in aim):
self.ErrorArgs()
invalid = True
return args, argnum, invalid
def default(self, line):
print('无效指令,请检查输入')
def precmd(self, line: str) -> str:
logger.info(f'{line}')
return super().precmd(line)
def do_EOF(self, line):
'退出快捷键, 输入ctrl+D发送EOF使得控制终端退出'
return True
def do_scan(self, arg):
'扫描舵机'
args, argnum, invalid = self.checkArgs(arg, [0])
if invalid: return
self.msg.data = 'manager.' + 'ServoScan()'
SendCommand(self.msg)
def do_rst(self, arg):
'复位至初始位置'
args, argnum, invalid = self.checkArgs(arg, [0])
if invalid: return
self.msg.data = 'manager.' + 'Init()'
SendCommand(self.msg)
def do_set(self, arg):
'设置初始位置'
args, argnum, invalid = self.checkArgs(arg, [0])
if invalid: return
self.msg.data = 'manager.' + 'SetInit()'
SendCommand(self.msg)
def do_damp(self, arg):
'使所有舵机转变为低阻尼模式'
args, argnum, invalid = self.checkArgs(arg, [0, 1])
if invalid: return
if(argnum == 0):
self.msg.data = 'manager.' + 'DampAll()'
else:
self.msg.data = 'manager.' + f'Damp({args[0]})'
SendCommand(self.msg)
def do_show(self, arg):
'显示舵机状态'
args = arg.split(' ')
if(len(args) > 1):
self.ErrorArgs()
return
elif(arg == ''):
self.msg.data = 'manager.' + f'ShowServoState()'
else:
self.msg.data = 'manager.' + f'ShowServoState({args[0]})'
SendCommand(self.msg)
def do_showabs(self, arg):
'显示舵机状态'
args = arg.split(' ')
if(len(args) > 1):
self.ErrorArgs()
return
elif(arg == ''):
self.msg.data = 'manager.' + f'ShowServoAbsState()'
else:
self.msg.data = 'manager.' + f'ShowServoAbsState({args[0]})'
SendCommand(self.msg)
def do_showf(self, arg):
'显示文件; arg1: 子目录名; arg2: 文件名'
args = arg.split(' ')
if(len(args) != 2):
self.ErrorArgs()
return
self.msg.data = 'manager.' + f'ShowFile("{args[0]}", "{args[1]}")'
SendCommand(self.msg)
def do_wt(self, arg):
'保存时间流; arg1: 时间流文件名; arg2: 时间流时长;'
args = arg.split(' ')
if(len(args) != 2):
self.ErrorArgs()
return
self.msg.data = 'manager.' + f'SaveServoFlow("{args[0]}", {args[1]}, "TimeFlow")'
SendCommand(self.msg)
def do_rt(self, arg):
'加载时间流; arg1: 时间流文件名;'
args = arg.split(' ')
if(len(args) != 1):
self.ErrorArgs()
return
self.msg.data = 'manager.' + f'LoadServoFlow("{args[0]}", "TimeFlow")'
SendCommand(self.msg)
def do_ws(self, arg):
'保存单步动作; arg1: 文件名; arg2: 单步动作序号'
args = arg.split(' ')
if(len(args) != 2):
self.ErrorArgs()
return
self.msg.data = 'manager.' + f'SaveServo("{args[0]}", "Step", {args[1]})'
SendCommand(self.msg)
def do_ra(self, arg):
'读取action; arg1: action名'
args, argnum, invalid = self.checkArgs(arg, [1])
if invalid: return
self.msg.data = 'manager.' + f'ReadAction("{args[0]}")'
SendCommand(self.msg)
def do_wa(self, arg):
'记录action; arg1: action名'
args, argnum, invalid = self.checkArgs(arg, [1])
if invalid: return
self.msg.data = 'manager.' + f'WriteAction("{args[0]}")'
SendCommand(self.msg)
def do_rs(self, arg):
'加载单步动作; arg1: 文件名; arg2: 单步动作序号'
args = arg.split(' ')
if(len(args) != 2):
self.ErrorArgs()
return
self.msg.data = 'manager.' + f'LoadServo("{args[0]}", "Step", {args[1]})'
SendCommand(self.msg)
def do_exec(self, arg):
'将传入的参数视为函数调用语句'
args = arg.split(' ')
self.msg.data = args[0]
SendCommand(self.msg)
def do_vol(self, arg):
'获取电量'
args = arg.split(' ')
if(arg != ''):
self.ErrorArgs()
return
self.msg.data = 'manager.' + f'GetVol()'
SendCommand(self.msg)
def do_exit(self, arg):
'退出'
print('退出')
exit(0)
if __name__ == "__main__":
# 日志系统初始化
logger = logging.getLogger("Controller")
logger.setLevel(logging.INFO)
log_file = "/home/yyf/ros03/log/controller.log"
try:
with open(log_file, 'r') as f:
print('打开日志成功')
except:
with open(log_file, 'w') as f:
print("创建新日志")
fh = logging.FileHandler(log_file, "a+", encoding="utf-8")
fh.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
fh.setFormatter(formatter)
logger.addHandler(fh)
#2.初始化 ROS 节点:命名(唯一)
rospy.init_node("Controller")
#3.实例化 发布者 对象
pub = rospy.Publisher("Manager", String, queue_size=1)
#4.组织被发布的数据,并编写逻辑发布数据
# 进入主循环
cli = CommandCLI()
cli.cmdloop()
#!/usr/bin/env python
from cgitb import reset
import time
import math
import sys
sys.path.append("/home/yyf/ros03/src/hotwheelspkg/scripts")#导入模块,搜索当前目录、已安装内置模块、第三方模块
from tracemalloc import start
from uservo import serial, UartServoManager
from FileManager import *
import rospy
from std_msgs.msg import String
#from calculate import calc_angleup, calc_angledown, calc_posup, calc_posdown
# UART
SERVO_PORT_NAME = '/dev/ttyAMA0' # 舵机串口号
SERVO_BAUDRATE = 115200 # 舵机的波特率
# 初始化串口
uart = serial.Serial(port=SERVO_PORT_NAME, baudrate=SERVO_BAUDRATE,
parity=serial.PARITY_NONE, stopbits=1,
bytesize=8,timeout=0)
uservo = UartServoManager(uart)
filefolder = dict()
class ServoManager:
# 自定义舵机名称
neck = 1 #脖子
left_hand = 2 #左上肢
left_arm = 3
left_shoulder = 4
right_hand = 5 #右上肢
right_arm = 6
right_shoulder = 7
left_legdown = 8 #左下肢
left_legup = 9
left_foot = 10
right_legdown = 11 #右下肢
right_legup = 12
right_foot = 13
left_hip = 14 #臀部
right_hip = 15
neck = [neck]
left_up = [left_hand, left_arm, left_shoulder]
right_up = [right_hand, right_arm, right_shoulder]
left_down = [left_legdown, left_legup, left_foot ]
right_down = [right_legdown, right_legup, right_foot]
hip = [left_hip, right_hip]
ALL = [neck, left_up, right_up, left_down, right_down, hip]
servo_num = 15
base = []
sign = [ +1, #neck
+1, -1, +1, #left_up
-1, -1, -1, #right_up
-1, +1, -1, #left_down
+1, +1, +1, #right_down
+1, +1] #hip
#舵机初始化,扫描舵机个数,获得舵机基准状态参数
def __init__(self):
print("> Manager: 开始初始化舵机...")
self.ServoScan()#扫描所有舵机
self.LoadBase()#读取base配置文件
#展示初始状态动作
def Init(self):
print("> Manager: 读取舵机RESET配置文件")
self.LoadServo('reset', 'Init')
#保存初始状态动作
def SetInit(self):
print("> Manager: 保存初始RESET状态")
self.SaveServo('reset', 'Init')
#文件操作
#显示指定的记录文件内容
def ShowFolder(self):
print('> FILE: 文件管理器列表: ')
for folder in filefolder:
print(f' {folder}/{filefolder[folder].FILE_NAME + filefolder[folder].FILE_EXTENSION}')
# 获得单个舵机相对角度
def GetSingleAngle(self, servo_index):
return self.sign[servo_index - 1] * (int(uservo.query_servo_angle(servo_index)) - self.base[servo_index - 1])
# 获得单个舵机绝对角度
def GetSingleAngleAbs(self, servo_index):
return int(uservo.query_servo_angle(servo_index))
# 获得所有舵机相对角度
def GetAllAngle(self):
data = []
for servo_index in range(1, self.servo_num + 1):
data.append(str(self.GetSingleAngle(servo_index)))
return ','.join(data) + '\n'
# 获得所有舵机绝对角度
def GetAllAngleAbs(self):
data = []
for servo_index in range(1, self.servo_num + 1):
data.append(str(self.GetSingleAngleAbs(servo_index)))
return ','.join(data) + '\n'
print(data)
#保存动作,保存相对角度
def SaveServo(self, filename, subpath='Defalut', index=0):
data = self.GetAllAngle()
filefolder[subpath] = FileManager(filename, subpath)
filefolder[subpath].WriteFile(data, index)
#保存基准状态
def SaveBase(self):
data = self.GetAllAngleAbs()
FileManager('base', 'Init', '.abs').WriteFile(data, 0)
#记录时间流
def SaveServoFlow(self, filename, recordtime, subpath='TimeFlow'):
print('> Manager: 时间流开始记录')
self.DampAll()
start_time = time.time()
lines = []
while time.time() - start_time < recordtime:
line = []
# uservo.query_all_srv_angle()
for servo_id in uservo.servos:
line.append(str(self.GetSingleAngle(servo_id)))
lines.append(line)
print("\r> Manager: The program has processed {:2}%".format(100 * (time.time() - start_time) / recordtime), end="")
print('')
liness = []
for line in lines:
liness.append(','.join(line) + '\n')
filefolder[subpath] = FileManager(filename, subpath, extension='.flow')
filefolder[subpath].WriteFileAll(liness)
print('> Maneger: 底层时间流记录完成, 用时', time.time() - start_time)
#读取基准状态#
def LoadBase(self):
print("> Manager: 读取舵机BASE配置文件")
filefolder['Init'] = GetFileObject('base', 'Init', '.abs')
line = filefolder['Init'].lines[0]
data = line[:-1].split(',')
for servo_index in range(1, self.servo_num + 1):
self.base.append(int(data[servo_index - 1]))
#展示动作
def LoadServo(self, filename='defalut', subpath='Defalut', index=0, delay=0.0):
filefolder[subpath] = GetFileObject(filename, subpath)
line = filefolder[subpath].lines[index]
data = line[:-1].split(',')
for servo_index in range(1, self.servo_num + 1):
self.SettingSingleAngle(servo_index, int(data[servo_index - 1]), delay)
time.sleep(0.1) #THE LAST STEP
#底层时间流运行
def LoadServoFlow(self, filename='defalut', subpath='TimeFlow'):
print('> Manager: 底层时间流开始运行')
start_time = time.time()
filefolder[subpath + '/' + filename] = FileManager(filename, subpath, ".flow")
lines = filefolder[subpath + '/' + filename].lines
for line in lines:
step_start_time = time.time()
data = line[:-1].split(',')
i = 0
for servo_id in uservo.servos:
self.SettingSingleAngle(servo_id, int(data[i]))
i += 1
# time.sleep(5/136 - (time.time() - step_start_time))
print('> Manager: 底层时间流运行完成, 用时', time.time() - start_time)
#发送控制单个舵机角度请求#
def SettingSingleAngle(self, servo_index, angle, t=0.0):
t = int(1000 * t)
angle = self.base[servo_index - 1] + self.sign[servo_index - 1] * angle
uservo.set_servo_angle(servo_index, angle, interval=t)
#显示所有舵机相对角度#
def ShowServoState(self, t=0):
showname = ['脖子', '左上肢', '右上肢', '左下肢', '右下肢', '臀部']
print("> Manager: 相对角度")
for i in range(6):
print(f' {showname[i]}: {[(self.GetSingleAngle(servo_index)) for servo_index in self.ALL[i]]}')
#显示所有舵机绝对角度#
def ShowServoAbsState(self, t=0):
showname = ['脖子', '左上肢', '右上肢', '左下肢', '右下肢', '臀部']
print("> Manager: 绝对角度")
for i in range(6):
print(f' {showname[i]}: {[(self.GetSingleAngleAbs(servo_index)) for servo_index in self.ALL[i]]}')
#扫描舵机,所有的舵机必须首先扫描才可以接受其他指令的控制#
def ServoScan(self):
global uservo
print("> Uservo: 开始进行舵机扫描")
uservo.scan_servo()
servo_list = list(uservo.servos.keys())
print(f"> Uservo: 舵机扫描结束,共{len(servo_list)}个舵机", ", 舵机列表: {}".format(servo_list))
#使舵机转变为低阻尼模式,在这种状态下即可手动掰动对应的舵机#
def Damp(self, servo_index):#单个
uservo.set_damping(servo_index)
def DampAll(self):
print(uservo.servos)
for servo_index in range(1, self.servo_num -7) :
uservo.set_damping(servo_index)
time.sleep(0.1)
#显示电量
def GetVol(self):
vol_list = [uservo.query_voltage(servo_index) for servo_index in uservo.servos]
print('> Manager: Vol: ', sum(vol_list) / len(vol_list))
#写入动作,与ws/rs命令类似,但是一个文件只保存一个动作
#因此可以通过文件名直接索引动作,不用通过文件名+序号的繁琐方式。
#默认子目录为Action
def WriteAction(self, act_name):
self.ShowServoState(0)
self.SaveServo(act_name, 'Action')
self.ShowServoState(0)
#读取动作
def ReadAction(self, act_name):
self.ShowServoState(0)
self.LoadServo(act_name, 'Action')
self.ShowServoState(0)
def response(msg):
cmd = msg.data
exec(cmd)
#main程序
if __name__ == "__main__":
manager = ServoManager()
msg = String()
rospy.init_node("UservoSDK")
sub = rospy.Subscriber("Manager", String, response)
rate = rospy.Rate(1000)
rospy.spin()
rate.sleep()
#!/usr/bin/env python
import time
import logging
import serial
import struct
from hotwheelspkg.srv import *
# 设置日志等级
class Packet:
'''数据包'''
# 使用pkt_type来区分请求数据还是响应数据
PKT_TYPE_REQUEST = 0 # 请求包
PKT_TYPE_RESPONSE = 1 # 响应包
HEADER_LEN = 2 # 帧头校验数据的字节长度
HEADERS = [b'\x12\x4c', b'\x05\x1c']
CODE_LEN = 1 # 功能编号长度
SIZE_LEN = 1 # 字节长度
CHECKSUM_LEN = 1 # 校验和长度
@classmethod
def calc_checksum(cls, code, param_bytes=b'', pkt_type=1):
'''计算校验和'''
header = cls.HEADERS[pkt_type]
return sum(header + struct.pack('<BB', code, len(param_bytes)) + param_bytes) %256
@classmethod
def verify(cls, packet_bytes, pkt_type=1):
'''检验数据是否合法'''
# 获取帧头
header = cls.HEADERS[pkt_type]
# 帧头检验
if packet_bytes[:cls.HEADER_LEN] != cls.HEADERS[pkt_type]:
return False
code, size = struct.unpack('<BB', packet_bytes[cls.HEADER_LEN : cls.HEADER_LEN + cls.CODE_LEN + cls.SIZE_LEN])
# 长度校验
param_bytes = packet_bytes[cls.HEADER_LEN + cls.CODE_LEN + cls.SIZE_LEN : -cls.CHECKSUM_LEN]
if len(param_bytes) != size:
return False
# 校验和检验
checksum = packet_bytes[-cls.CHECKSUM_LEN]
# logging.info('实际的Checksum : {} 计算得到的Checksum: {}'.format(checksum, cls.calc_checksum(code , param_bytes, pkt_type=pkt_type)))
# 校验和检查
if checksum != cls.calc_checksum(code , param_bytes, pkt_type=pkt_type):
return False
# 数据检验合格
return True
@classmethod
def pack(cls, code, param_bytes=b''):
'''数据打包为二进制数据'''
size = len(param_bytes)
checksum = cls.calc_checksum(code, param_bytes, pkt_type=cls.PKT_TYPE_REQUEST)
frame_bytes = cls.HEADERS[cls.PKT_TYPE_REQUEST] + struct.pack('<BB', code, size) + param_bytes + struct.pack('<B', checksum)
return frame_bytes
@classmethod
def unpack(cls, packet_bytes):
'''二进制数据解包为所需参数'''
if not cls.verify(packet_bytes, pkt_type=cls.PKT_TYPE_RESPONSE):
# 数据非法
return None
code = struct.unpack('<B', packet_bytes[cls.HEADER_LEN:cls.HEADER_LEN+cls.CODE_LEN])[0]
param_bytes = packet_bytes[cls.HEADER_LEN + cls.CODE_LEN + cls.SIZE_LEN : -cls.CHECKSUM_LEN]
return code, param_bytes
class PacketBuffer:
'''Packet中转站'''
def __init__(self, is_debug=False):
self.is_debug = is_debug
self.packet_bytes_list = []
# 清空缓存区域
self.empty_buffer()
def update(self, next_byte):
'''将新的字节添加到Packet中转站'''
# logging.info('[INFO]: next byte: 0x%02x'%next_byte[0])
if not self.header_flag:
# 填充头部字节
if len(self.header) < Packet.HEADER_LEN:
# 向Header追加字节
self.header += next_byte
if len(self.header) == Packet.HEADER_LEN and self.header == Packet.HEADERS[Packet.PKT_TYPE_RESPONSE]:
self.header_flag = True
elif len(self.header) == Packet.HEADER_LEN:
# 首字节出队列
self.header = self.header[1:] + next_byte
# 查看Header是否匹配
if self.header == Packet.HEADERS[Packet.PKT_TYPE_RESPONSE]:
# print('header: {}'.format(self.header))
self.header_flag = True
elif not self.code_flag:
# 填充Code
if len(self.code) < Packet.CODE_LEN:
self.code += next_byte
if len(self.code) == Packet.CODE_LEN:
# print('code: {}'.format(self.code))
self.code_flag = True
elif not self.size_flag:
# 填充参数尺寸
if len(self.size) < Packet.SIZE_LEN:
self.size += next_byte
if len(self.size) == Packet.SIZE_LEN:
self.size_flag = True
# 更新参数个数
self.param_len = struct.unpack('<B', self.size)[0]
elif not self.param_bytes_flag:
# 填充参数
if len(self.param_bytes) < self.param_len:
self.param_bytes += next_byte
if len(self.param_bytes) == self.param_len:
self.param_bytes_flag = True
else:
# 计算校验和
# 构建一个完整的Packet
tmp_packet_bytes = self.header + self.code + self.size + self.param_bytes + next_byte
ret = Packet.verify(tmp_packet_bytes, pkt_type=Packet.PKT_TYPE_RESPONSE)
if ret:
self.checksum_flag = True
# 将新的Packet数据添加到中转列表里
self.packet_bytes_list.append(tmp_packet_bytes)
# 重新清空缓冲区
self.empty_buffer()
def empty_buffer(self):
# 数据帧是否准备好
self.param_len = None
self.header = b''
self.header_flag = False
self.code = b''
self.code_flag = False
self.size = b''
self.size_flag = False
self.param_bytes = b''
self.param_bytes_flag = False
def has_valid_packet(self):
'''是否有有效的包'''
return len(self.packet_bytes_list) > 0
def get_packet(self):
'''获取队首的Bytes'''
return self.packet_bytes_list.pop(0)
class UartServoInfo:
'''串口舵机的信息'''
SERVO_DEADBLOCK = 1.0 # 舵机死区
SERVO_ANGLE_LOWERB = -175 # 舵机角度下限
SERVO_ANGLE_UPPERB = 175 # 舵机角度上限
def __init__(self, id, lowerb=None, upperb=None):
self.id = id # 舵机的ID
self.cur_angle = None # 当前的角度
self.target_angle = None # 目标角度
self.is_online = False # 舵机是否在线
# 舵机角度上下限
self.lowerb = lowerb if lowerb is not None else None # self.SERVO_ANGLE_LOWERB
self.upperb = upperb if upperb is not None else None # self.SERVO_ANGLE_UPPERB
self.last_angle_error = None # 上一次的角度误差
self.last_sample_time = None # 上一次的采样时间
# 内存表数据
self.data_table_raw_dict = {} # 原始数据 字典类型
# 内存表写入标志位
self.data_write_success = False
# 是否开启多圈模式
self.is_mturn = False
def is_stop(self):
'''判断舵机是否已经停止'''
# 如果没有指定目标角度, 就将其设置为当前角度
if self.target_angle is None:
self.target_angle = self.cur_angle
# 角度误差判断
angle_error = self.target_angle - self.cur_angle
if abs(angle_error) <= self.SERVO_DEADBLOCK:
return True
if self.last_angle_error is None:
self.last_angle_error = angle_error
self.last_sample_time = time.time()
# 角度误差在死区范围以内则判断为已经到达目标点
# 更新采样数据
if abs(self.last_angle_error - angle_error) > 0.2:
self.last_angle_error = angle_error
self.last_sample_time = time.time() # 更新采样时间
if (time.time() - self.last_sample_time) > 1:
#
self.last_angle_error = None
self.last_sample_time = None
return True
return False
@property
def angle(self):
'''获取当前舵机的角度'''
return self.cur_angle
def move(self, angle):
'''设置舵机的目标角度'''
# 角度边界约束
if self.lowerb is not None:
angle = self.lowerb if angle < self.lowerb else angle
if self.upperb is not None:
angle = self.upperb if angle > self.upperb else angle
# 设置目标角度
self.target_angle = angle
def update(self, angle):
'''更新当前舵机的角度'''
self.cur_angle = angle
def __str__(self):
return "目标角度:{:.1f} 实际角度:{:.1f} 角度误差:{:.2f}".format(self.target_angle, self.angle, self.target_angle-self.angle)
class UartServoManager:
'''串口舵机管理器'''
UPDATE_INTERVAL_MS = 10 # ms
# 指令定义
CODE_PING = 1 # 舵机检测
CODE_QUERY_SERVO_ANGLE = 10 # 查询舵机的角度
CODE_QUERY_SERVO_ANGLE_MTURN = 16 # 查询舵机角度(多圈)
CODE_QUERY_SERVO_INFO = 5 # 查询舵机所有的信息 (未使用)
CODE_SET_SERVO_ANGLE = 8 # 设置舵机角度
CODE_SET_SPIN = 7 # 设置轮式模式
CODE_SET_DAMPING = 9 # 设置阻尼模式
CODE_SET_SERVO_ANGLE_BY_INTERVAL = 11 # 角度设置(指定周期)
CODE_SET_SERVO_ANGLE_BY_VELOCITY = 12 # 角度设置(指定转速)
CODE_SET_SERVO_ANGLE_MTURN = 13 # 多圈角度设置
CODE_SET_SERVO_ANGLE_MTURN_BY_INTERVAL = 14 # 多圈角度设置(指定周期)
CODE_SET_SERVO_ANGLE_MTURN_BY_VELOCITY = 15 # 多圈角度设置(指定转速)
CODE_RESET_USER_DATA = 2 # 用户表数据重置
CODE_READ_DATA = 3 # 读取内存表
CODE_WRITE_DATA = 4 # 写入内存表
# 响应数据包黑名单
RESPONSE_CODE_NEGLECT = []
# 定义轮式模式的四种控制方法
WHEEL_MODE_STOP = 0x00 # 停止
WHEEL_MODE_NORMAL = 0x01 # 常规模式
WHEEL_MODE_TURN = 0x02 # 定圈
WHEEL_MODE_TIME = 0x03 # 定时
# 内存表
ADDRESS_VOLTAGE = 1
ADDRESS_CURRENT = 2
ADDRESS_POWER = 3
ADDRESS_TEMPERATURE = 4
# 自定义变量
FILE_NAME = '/home/yyf/ros03/rec/data.t'
FILE_PATH = '/home/yyf/ros03/rec/'
RECORD_TIME = 0
RECORD_UNIT = 0
ACT_UNIT = 0
def __init__(self, uart, is_scan_servo=True, srv_num=254, mean_dps=100, is_debug=False):
self.is_debug = is_debug
self.uart = uart
self.pkt_buffer = PacketBuffer(is_debug=True)
self.mean_dps = mean_dps # 默认的舵机旋转角速度
# 存放舵机信息
self.servos = {}
# 返回的CODE与函数的映射
self.response_handle_funcs = {
self.CODE_QUERY_SERVO_ANGLE: self.response_query_servo_angle,
self.CODE_QUERY_SERVO_ANGLE_MTURN: self.response_query_servo_angle_mturn,
self.CODE_PING: self.response_ping,
self.CODE_RESET_USER_DATA: self.response_reset_user_data,
self.CODE_READ_DATA: self.response_read_data,
self.CODE_WRITE_DATA: self.response_write_data,
}
# self.cur_ping_servo_id = 0 # 当前Ping的舵机ID号
if is_scan_servo:
self.scan_servo(srv_num=srv_num)
def send_request(self, code, param_bytes):
'''发送请数据'''
packet_bytes = Packet.pack(code, param_bytes)
try:
self.uart.write(packet_bytes)
# logging.info('串口发送请求数据 code:{}'.format(code))
# logging.info('数据帧内容:')
# logging.info(''.join(['0x%02x ' % b for b in packet_bytes]))
except serial.SerialException as e:
print(e)
logging.error('串口数据发送异常, 请检查是否是USB口松动或设备号变更, 需重新初始化舵机')
#
def ping(self, servo_id:int):
'''发送Ping请求'''
# self.cur_ping_servo_id = servo_id # 为了可视化显示
self.send_request(self.CODE_PING, struct.pack('<B', servo_id))
if self.is_debug:
logging.info('PING 舵机 id={}'.format(servo_id))
self.update(wait_response=True)
ret = servo_id in self.servos
if self.is_debug and ret:
logging.info('[fs_uservo]舵机ID={} 响应ping'.format(servo_id))
if ret:
# 更新舵机角度
self.query_servo_angle(servo_id)
return ret
def scan_servo(self, srv_num=254):
'''扫描所有的舵机'''
# ping所有的舵机
for servo_id in range(srv_num):
# 发送ping请求
self.ping(servo_id)
if self.is_debug:
logging.info("有效的舵机ID列表: {}".format(list(self.servos.keys())))
def response_ping(self, param_bytes):
'''响应PING请求'''
servo_id, = struct.unpack('<B', param_bytes)
if servo_id not in self.servos:
self.servos[servo_id] = UartServoInfo(servo_id)
self.servos[servo_id].is_online = True # 设置舵机在线的标志位
if self.is_debug:
logging.info('[fs_uservo]ECHO 添加一个新的舵机 id={}'.format(servo_id))
else:
self.servos[servo_id].is_online = True # 设置舵机在线的标志位
if self.is_debug:
logging.info('[fs_uservo]ECHO 已知舵机 id={}'.format(servo_id))
def query_servo_angle(self, servo_id):
'''更新单个舵机的角度'''
if self.is_debug:
logging.info('查询单个舵机的角度 id={}'.format(servo_id))
# print("查询舵机角度 多圈模式? {}".format(self.servos[servo_id].is_mturn))
if self.servos[servo_id].is_mturn:
# 多圈模式
self.send_request(self.CODE_QUERY_SERVO_ANGLE_MTURN, struct.pack('<B', servo_id))
else:
# 单圈模式
self.send_request(self.CODE_QUERY_SERVO_ANGLE, struct.pack('<B', servo_id))
# start_time = time.time()
self.update(wait_response=True) # 等待数据回传
# print(servo_id, time.time() - start_time)
return self.servos[servo_id].angle
def query_all_srv_angle(self):
'''更新所有的舵机角度'''
for servo_id in self.servos:
self.query_servo_angle(servo_id)
def response_query_servo_angle(self, param_bytes):
'''响应查询单个舵机角度'''
# 数据解包
servo_id, angle = struct.unpack('<Bh', param_bytes)
# 舵机的分辨率是0.1度
angle /= 10
# print("查询到角度单圈: {}".format(angle))
if servo_id not in self.servos:
pass
else:
# 更新当前的角度
self.servos[servo_id].update(angle)
if self.is_debug:
logging.info('[INFO] 更新舵机角度 id={} 角度: {:.2f} deg'.format(servo_id, angle))
def response_query_servo_angle_mturn(self, param_bytes):
'''响应舵机角度查询角度(多圈)'''
# 数据解包
servo_id, angle, mturn = struct.unpack('<Bih', param_bytes)
# 舵机的分辨率是0.1度
angle /= 10.0
if servo_id not in self.servos:
pass
else:
# 更新当前的角度
self.servos[servo_id].update(angle)
if self.is_debug:
logging.info('[INFO] 更新舵机角度 id={} 角度: {:.2f} deg'.format(servo_id, angle))
def refresh_srv_list(self, max_servo_id=254):
'''刷新当前的舵机列表'''
# 清空已有的字典
self.servos = {}
for servo_idx in range(max_servo_id):
self.ping(servo_idx)
for ti in range(1, 16):
# 查询一个舵机最多等待1000ms
self.update()
if servo_idx in self.servos:
break
time.sleep(0.05)
def query_srv_info(self, servo_id):
'''查询单个舵机的所有配置'''
self.send_request(self.CODE_QUERY_SERVO_INFO, struct.pack('<B', servo_id))
# logging.info('查询单个舵机的所有配置 id={}'.format(servo_id))
self.update(wait_response=True)
def set_servo_angle(self, servo_id:int, angle:float, is_mturn:bool=False, interval:float=None, velocity:float=None, t_acc:int=20, t_dec:int=20, power:int=0, mean_dps:float=100.0):
'''发送舵机角度控制请求
@param servo_id
舵机的ID号
@param angle
舵机的目标角度
@param is_mturn
是否开启多圈模式
@param interval
中间间隔 单位ms
@param velocity
目标转速,单位dps
@param t_acc
加速时间,在指定目标转速时有效. 单位ms
@param t_dec
减速时间, 在指定减速时间时有效. 单位ms
@param power
功率限制, 单位mW
@param mean_dps
平均转速, 单位dps
'''
if servo_id not in self.servos:
# logging.warn('未知舵机序号: {}'.format(servo_id))
return False, '未知舵机序号: {}'.format(servo_id)
# 同步修改srv_info
self.servos[servo_id].move(angle)
# 发送控制指令
# 单位转换为0.1度
angle = int(angle * 10)
# 角度约束
if is_mturn:
if angle < -3686400:
angle = -3686400
elif angle > 3686400:
angle = 3686400
else:
if angle < -1800:
angle = -1800
elif angle > 1800:
angle = 1800
# 加减速时间约束
# if t_acc < 20:
# t_acc = 20
# if t_dec < 20:
# t_dec = 20
# 去除时间约束
t_acc = 20
t_dec = 20
# 获取舵机信息
srv_info = self.servos[servo_id]
self.servos[servo_id].is_mturn = is_mturn
if interval is not None and interval != 0:
# 指定周期
# 判断周期设定是否合法
interval = int(interval)
if is_mturn:
if interval < t_acc + t_dec:
interval = t_acc + t_dec
elif interval > 4096000:
interval = 4096000
param_bytes = struct.pack('<BiIHHH', servo_id, angle, interval, t_acc, t_dec, power)
self.send_request(self.CODE_SET_SERVO_ANGLE_MTURN_BY_INTERVAL, param_bytes)
else:
param_bytes = struct.pack('<BhHH', servo_id, angle, interval, power)
self.send_request(self.CODE_SET_SERVO_ANGLE, param_bytes)
# param_bytes = struct.pack('<BhHHHH', servo_id, angle, interval, t_acc, t_dec, power)
# self.send_request(self.CODE_SET_SERVO_ANGLE_BY_INTERVAL, param_bytes)
elif velocity is not None:
# 指定目标转速
# 转速约束
if velocity < 1.0:
velocity = 1.0
elif velocity > 750.0:
velocity = 750.0
velocity = int(velocity*10.0) # 单位dps -> 0.1dps
if is_mturn:
param_bytes = struct.pack('<BiHHHH', servo_id, angle, velocity, t_acc, t_dec, power)
self.send_request(self.CODE_SET_SERVO_ANGLE_MTURN_BY_VELOCITY, param_bytes)
else:
param_bytes = struct.pack('<BhHHHH', servo_id, angle, velocity, t_acc, t_dec, power)
self.send_request(self.CODE_SET_SERVO_ANGLE_BY_VELOCITY, param_bytes)
else:
# 根据平均转速,计算周期
if interval is None:
# if srv_info.angle is None:
# 查询角度
srv_info.update(self.query_servo_angle(servo_id))
interval = int((abs(angle*0.1 - srv_info.angle) / mean_dps) * 1000)
if is_mturn:
param_bytes = struct.pack('<BiIH', servo_id, angle, interval, power)
self.send_request(self.CODE_SET_SERVO_ANGLE_MTURN, param_bytes)
else:
param_bytes = struct.pack('<BhHH', servo_id, angle, interval, power)
self.send_request(self.CODE_SET_SERVO_ANGLE, param_bytes)
# # 周期如果是0的话就意味着需要马上旋转到目标的角度
# 如果不是0则需要计算所需的周期interval
# if interval != 0:
# if srv_info.cur_angle is None:
# # 初始状态还没有角度
# interval = 800
# elif interval is None and mean_dps is None:
# # 延时时间差不多是15ms旋转一度,可以比较平滑
# interval = int((abs(angle - srv_info.angle) / self.mean_dps) *1000)
# elif mean_dps is not None:
# # 根据mean_dps计算时间间隔 (转换为ms)
# interval = int((abs(angle - srv_info.angle) / mean_dps) *1000)
return True, '设置成功'
def set_wheel(self, servo_id, mode, value=0, is_cw=True, mean_dps=None):
'''设置舵机轮式模式控制
@param servo_id
舵机的ID号
@param mode
舵机的模式 取值范围[0,3]
@param value
定时模式下代表时间(单位ms)
定圈模式下代表圈数(单位圈)
@param is_cw
轮子的旋转方向, is_cw代表是否是顺指针旋转
@param speed
轮子旋转的角速度, 单位 °/s
'''
# 轮式模式的控制方法
method = mode | 0x80 if is_cw else mode
# 设置旋转的角速度
mean_dps = self.mean_dps if mean_dps is None else mean_dps
mean_dps = int(mean_dps)
# 发送请求
self.send_request(self.CODE_SET_SPIN, struct.pack('<BBHH', servo_id, method, mean_dps, value))
def wheel_stop(self, servo_id):
'''停止'''
self.set_wheel(servo_id, self.WHEEL_MODE_STOP, 0, False, 0)
def set_wheel_norm(self, servo_id, is_cw=True, mean_dps=None):
'''设置轮式为普通模式, 转速单位: °/s'''
self.set_wheel(servo_id, self.WHEEL_MODE_NORMAL, value=0, is_cw=is_cw, mean_dps=mean_dps)
def set_wheel_turn(self, servo_id, turn=1, is_cw=True, mean_dps=None, is_wait=True):
'''设置轮式为定圈模式, 圈数单位: 圈数'''
if mean_dps is None:
mean_dps = self.mean_dps
self.set_wheel(servo_id, self.WHEEL_MODE_TURN, value=turn, is_cw=is_cw, mean_dps=mean_dps)
# 根据圈数与转速, 估计延时的时间
if is_wait:
time.sleep(turn*360.0 / mean_dps)
def set_wheel_time(self, servo_id, interval=1000, is_cw=True, mean_dps=None, is_wait=True):
'''设置轮子为定时模式,时间单位: ms'''
self.set_wheel(servo_id, self.WHEEL_MODE_TIME, value=interval, is_cw=is_cw, mean_dps=mean_dps)
if is_wait:
time.sleep(interval/1000.0)
def set_damping(self, servo_id, power=0):
'''设置阻尼模式
@param servo_id
舵机ID
@param power
舵机保持功率
'''
self.send_request(self.CODE_SET_DAMPING, struct.pack('<BH', servo_id, power))
print('done')
def reset_user_data(self, servo_id):
'''用户表数据重置'''
# 发送请求数据
self.send_request(self.CODE_RESET_USER_DATA, struct.pack('<B', servo_id))
# 等待数据回传
# 注: 舵机并不会回传指令
# self.update(wait_response=True)
return True
def response_reset_user_data(self, param_bytes):
'''用户数据重置响应'''
servo_id, result = struct.unpack('<BB', param_bytes)
if self.is_debug:
# logging.info("舵机用户数据重置 舵机ID={} 是否成功={}".format(servo_id, result))
print("舵机用户数据重置 舵机ID={} 是否成功={}".format(servo_id, result))
def read_data(self, servo_id, address):
'''读取内存表数据
注: 返回的是字节流数据, 需要进一步处理
'''
# 发送请求
self.send_request(self.CODE_READ_DATA, struct.pack('<BB', servo_id, address))
self.update(wait_response=True) # 等待数据回传
if self.is_debug:
logging.info("READ DATA 舵机ID={} Address={}".format(servo_id, address))
logging.info("DATA : {}".format(self.servos[servo_id].data_table_raw_dict[address]))
# 返回读取到的数据
return self.servos[servo_id].data_table_raw_dict[address]
def response_read_data(self, param_bytes):
'''处理内存表数据读取回传'''
# 数据提取
servo_id, address = struct.unpack('<BB', param_bytes[:2])
content = param_bytes[2:]
self.servos[servo_id].data_table_raw_dict[address] = content
def write_data(self, servo_id, address, content):
'''写入数据'''
# 发送请求
self.send_request(self.CODE_WRITE_DATA, struct.pack('<BB', servo_id, address)+content)
# 初始化标志位
self.servos[servo_id].data_write_success = False
if self.is_debug:
logging.info("WRITE DATA 舵机ID={} Address={} Value={}".format(servo_id, address, content))
# 等待数据回传
self.update(wait_response=True)
return self.servos[servo_id].data_write_success
def response_write_data(self, param_bytes):
'''处理写入数据回传'''
servo_id, address, result = struct.unpack('<BBB', param_bytes)
self.servos[servo_id].data_write_success = result == 1
if self.is_debug:
logging.info("WRITE DATA 舵机ID={} Address={} Result={}".format(servo_id, address, result))
def query_voltage(self, servo_id):
'''查询总线电压,单位V'''
voltage_bytes = self.read_data(servo_id, self.ADDRESS_VOLTAGE)
return struct.unpack('<H', voltage_bytes)[0] / 1000.0
def query_current(self, servo_id):
'''查询电流, 单位A'''
current_bytes = self.read_data(servo_id, self.ADDRESS_CURRENT)
return struct.unpack('<H', current_bytes)[0] / 1000.0
def query_power(self, servo_id):
'''查询功率, 单位W'''
power_bytes = self.read_data(servo_id, self.ADDRESS_POWER)
return struct.unpack('<H', power_bytes)[0] / 1000.0
def query_temperature(self, servo_id):
'''查询温度,为ADC值'''
temp_bytes = self.read_data(servo_id, self.ADDRESS_TEMPERATURE)
return float(struct.unpack('<H', temp_bytes)[0])
def update(self, is_empty_buffer=False, wait_response=False, timeout=0.02):
'''舵机管理器的定时器回调函数'''
t_start = time.time() # 获取开始时间
while True:
# 读入所有缓冲区的Bytes
buffer_bytes = self.uart.readall()
if len(buffer_bytes) != 0:
if self.is_debug:
logging.info('Recv Bytes: ')
logging.info(' '.join(['0x%02x'%b for b in buffer_bytes]))
# 将读入的Bytes打包成数据帧
if buffer_bytes is not None:
for b in buffer_bytes:
self.pkt_buffer.update( struct.pack('<B', b))
t_cur = time.time() # 获取当前的时间戳
is_timeout = (t_cur - t_start) > timeout # 是否超时
if not wait_response:
break
elif self.pkt_buffer.has_valid_packet() or is_timeout:
break
t_end = time.time()
# print(t_end - t_start)
# 相应回调数据
while self.pkt_buffer.has_valid_packet():
# 处理现有的返回数据帧
response_bytes = self.pkt_buffer.get_packet()
# 解包
code, param_bytes = Packet.unpack(response_bytes)
# 根据code找到相应处理函数
if code in self.response_handle_funcs:
self.response_handle_funcs[code](param_bytes)
else:
logging.warn('未知功能码 : {}'.format(code))
# 清空原来的缓冲区
if is_empty_buffer:
self.pkt_buffer.empty_buffer()
def is_stop(self):
'''判断所有的舵机是否均停止旋转'''
self.query_all_srv_angle()
for servo_id, srv_info in self.servos.items():
if not srv_info.is_stop():
return False
return True
def wait(self, timeout=None):
'''等待舵机旋转到特定的角度'''
t_start = time.time()
while True:
# 更新舵机角度
self.query_all_srv_angle()
if self.is_stop():
break
# 超时检测
if timeout is not None:
t_current = time.time()
if t_current - t_start > timeout:
break
def write_data_sr(self):
print('底层时间流开始记录')
st = time.time()
index = 0
start_time = time.time()
# index_max = int(time_interval / RECORD_TIME_STREAM_UNIT)
lines = []
while time.time() - start_time < self.RECORD_TIME:
line = []
uservo.query_all_srv_angle()
for servo_id in self.servos:
line.append(str(int(self.servos[servo_id].angle)))
line = ' '.join(line) + '\n'
lines.append(line)
time.sleep(self.RECORD_UNIT)
with open(self.FILE_NAME, "w") as f:
f.writelines(lines)
print('底层时间流记录完成', time.time() - st)
def write_data_sr_once(self):
print('底层时间流开始记录')
st = time.time()
index = 0
start_time = time.time()
# index_max = int(time_interval / RECORD_TIME_STREAM_UNIT)
lines = []
line = []
uservo.query_all_srv_angle()
for servo_id in self.servos:
line.append(str(int(self.servos[servo_id].angle)))
line = ' '.join(line) + '\n'
lines.append(line)
time.sleep(self.RECORD_UNIT)
with open(self.FILE_NAME, "w") as f:
f.writelines(lines)
print('底层时间流记录完成', time.time() - st)
def read_data_sr(self):
print("ACT_UNIT", self.ACT_UNIT)
print('底层时间流开始运行')
st = time.time()
with open(self.FILE_NAME, "r") as f:
lines = f.readlines()
tmp = 0
flag = 1
for line in lines:
if(len(lines) > 1 and flag < 5):
flag += 1
continue
data = line[:-1].split(' ')
# for servo_id in self.servos:
# self.set_servo_angle(servo_id, int(data[servo_id - 1]), interval=self.ACT_UNIT)
i = 0
for servo_id in self.servos:
self.set_servo_angle(servo_id, int(data[i]), interval=uservo.ACT_UNIT)
i += 1
# self.set_servo_angle(15, int(data[12]), interval=uservo.ACT_UNIT)
# self.set_servo_angle(12, int(data[9]), interval=uservo.ACT_UNIT)
# self.set_servo_angle(14, int(data[11]), interval=uservo.ACT_UNIT)
time.sleep(uservo.ACT_UNIT)
tmp+=1
print(tmp)
print('底层时间流运行done', time.time() - st)
# UART
SERVO_PORT_NAME = '/dev/ttyAMA0' # 舵机串口号
SERVO_BAUDRATE = 115200 # 舵机的波特率
# 初始化串口
# for i in range(4):
# try:
# uart = serial.Serial(port=f'/dev/ttyUSB{i}', baudrate=SERVO_BAUDRATE,
# parity=serial.PARITY_NONE, stopbits=1,
# bytesize=8,timeout=0)
# print('Uservo: Using ', f'/dev/ttyUSB{i}')
# break
# except:
# continue
uart = serial.Serial(port=SERVO_PORT_NAME, baudrate=SERVO_BAUDRATE,
parity=serial.PARITY_NONE, stopbits=1,
bytesize=8,timeout=0)
try:
uservo = UartServoManager(uart)
print("Uservo: Uart initiate successful")
except:
print("Uservo: Uart not found")
def servo_scan():
global uservo
print("Uservo: 开始进行舵机扫描")
uservo.scan_servo()
servo_list = list(uservo.servos.keys())
print(f"Uservo: 舵机扫描结束,共{len(servo_list)}个舵机", ", 舵机列表: {}".format(servo_list))
def response(req):
ret = servoResponse()
# print(time.time())
if (req.cmd == "set"):
ret.flag, ret.log = uservo.set_servo_angle(req.index, req.angle, interval=req.time)
ret.angle = 0
elif (req.cmd == "read"):
ret.angle = int(uservo.query_servo_angle(req.index))
ret.log = ""
ret.flag = 1
elif (req.cmd == "read_all"):
uservo.query_all_srv_angle()
data = []
for index in uservo.servos:
data.append(str(int(uservo.servos[index].angle)))
ret.log = ",".join(data)
ret.flag = 1
elif (list(req.cmd.split())[0] == 'wtsr'):
for index in range(1, 16):
uservo.set_damping(index)
uservo.FILE_NAME = uservo.FILE_PATH + req.cmd.split()[1]
uservo.RECORD_TIME = int(req.cmd.split()[2])
try:
uservo.RECORD_UNIT = float(req.cmd.split()[3])
except:
uservo.RECORD_UNIT = 0
uservo.write_data_sr()
elif (list(req.cmd.split())[0] == 'rtsr'):
uservo.FILE_NAME = uservo.FILE_PATH + req.cmd.split()[1]
try:
uservo.ACT_UNIT = float(req.cmd.split()[2])
except:
uservo.ACT_UNIT = 0
uservo.read_data_sr()
elif (req.cmd == "damp"):
for index in range(1, 16):
uservo.set_damping(index)
else:
print("请求错误")
return ret
def response_2(cmd):
# print(time.time())
if (list(cmd.split())[0] == 'ws'):
uservo.FILE_NAME = uservo.FILE_PATH + cmd.split()[1]
uservo.write_data_sr_once()
elif (list(cmd.split())[0] == 'wtsr'):
for index in uservo.servos:
uservo.set_damping(index)
uservo.FILE_NAME = uservo.FILE_PATH + cmd.split()[1]
uservo.RECORD_TIME = int(cmd.split()[2])
try:
uservo.RECORD_UNIT = float(cmd.split()[3])
except:
uservo.RECORD_UNIT = 0
uservo.write_data_sr()
elif (list(cmd.split())[0] in ['rtsr', 'rs']):
uservo.FILE_NAME = uservo.FILE_PATH + cmd.split()[1]
try:
uservo.ACT_UNIT = float(cmd.split()[2])
except:
uservo.ACT_UNIT = 0
uservo.read_data_sr()
elif (cmd == "damp"):
for index in uservo.servos:
uservo.set_damping(index)
elif(cmd == 'scan'):
servo_scan()
else:
print("请求错误")
if __name__ == "__main__":
servo_scan()
# rospy.init_node("UservoSDK")
# rate = rospy.Rate(1000)
# server = rospy.Service("Servo", servo, response)
# rospy.spin()
while(1):
print('输入:')
cmd = input()
try:
response_2(cmd)
except Exception as e:
print(e)
continue
\ No newline at end of file
#!/usr/bin/env python3
import rospy
from std_msgs.msg import String
import socket
#2.初始化 ROS 节点:命名(唯一)
rospy.init_node("Remote_Controller")
#3.实例化 发布者 对象
pub = rospy.Publisher("Manager", String, queue_size=1)
#4.组织被发布的数据,并编写逻辑发布数据
rate = rospy.Rate(2)
server = socket.socket()
server.bind(('localhost', 6789))
server.listen(5)
exit = ''
msg = String()
while True:
con, addr = server.accept()
print('connect_addr: ', addr)
while con:
data = con.recv(1024).decode('utf-8')
if data[0:2] == 'ab' and data[-2:] == 'cd':
while not rospy.is_shutdown():
msg.data = data[2:-2]
pub.publish(msg)
rate.sleep()
if data == 'break':
con.close()
exit = 'break'
break
if exit == 'break':
break
server.close()
\ No newline at end of file
#!/usr/bin/env python
import pyrealsense2 as rs
import numpy as np
import cv2
import time
if __name__ == "__main__":
# Configure depth and color streams
import sys
print(sys.version)
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 60)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 60)
# Start streaming
pipeline.start(config)
try:
while True:
# Wait for a coherent pair of frames: depth and color
frames = pipeline.wait_for_frames()
depth_frame = frames.get_depth_frame()
color_frame = frames.get_color_frame()
if not depth_frame or not color_frame:
continue
# Convert images to numpy arrays
depth_image = np.asanyarray(depth_frame.get_data())
color_image = np.asanyarray(color_frame.get_data())
# Apply colormap on depth image (image must be converted to 8-bit per pixel first)
depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
# Stack both images horizontally
images = np.hstack((color_image, depth_colormap))
# Show images
cv2.namedWindow('RealSense', cv2.WINDOW_AUTOSIZE)
cv2.imshow('RealSense', images)
key = cv2.waitKey(1)
# Press esc or 'q' to close the image window
if key & 0xFF == ord('q') or key == 27:
cv2.destroyAllWindows()
break
finally:
# Stop streaming
pipeline.stop()
#!/usr/bin/env python
import time
import logging
import serial
import struct
# 设置日志等级
class Packet:
'''数据包'''
# 使用pkt_type来区分请求数据还是响应数据
PKT_TYPE_REQUEST = 0 # 请求包
PKT_TYPE_RESPONSE = 1 # 响应包
HEADER_LEN = 2 # 帧头校验数据的字节长度
HEADERS = [b'\x12\x4c', b'\x05\x1c']
CODE_LEN = 1 # 功能编号长度
SIZE_LEN = 1 # 字节长度
CHECKSUM_LEN = 1 # 校验和长度
@classmethod
def calc_checksum(cls, code, param_bytes=b'', pkt_type=1):
'''计算校验和'''
header = cls.HEADERS[pkt_type]
return sum(header + struct.pack('<BB', code, len(param_bytes)) + param_bytes) %256
@classmethod
def verify(cls, packet_bytes, pkt_type=1):
'''检验数据是否合法'''
# 获取帧头
header = cls.HEADERS[pkt_type]
# 帧头检验
if packet_bytes[:cls.HEADER_LEN] != cls.HEADERS[pkt_type]:
return False
code, size = struct.unpack('<BB', packet_bytes[cls.HEADER_LEN : cls.HEADER_LEN + cls.CODE_LEN + cls.SIZE_LEN])
# 长度校验
param_bytes = packet_bytes[cls.HEADER_LEN + cls.CODE_LEN + cls.SIZE_LEN : -cls.CHECKSUM_LEN]
if len(param_bytes) != size:
return False
# 校验和检验
checksum = packet_bytes[-cls.CHECKSUM_LEN]
# logging.info('实际的Checksum : {} 计算得到的Checksum: {}'.format(checksum, cls.calc_checksum(code , param_bytes, pkt_type=pkt_type)))
# 校验和检查
if checksum != cls.calc_checksum(code , param_bytes, pkt_type=pkt_type):
return False
# 数据检验合格
return True
@classmethod
def pack(cls, code, param_bytes=b''):
'''数据打包为二进制数据'''
size = len(param_bytes)
checksum = cls.calc_checksum(code, param_bytes, pkt_type=cls.PKT_TYPE_REQUEST)
frame_bytes = cls.HEADERS[cls.PKT_TYPE_REQUEST] + struct.pack('<BB', code, size) + param_bytes + struct.pack('<B', checksum)
return frame_bytes
@classmethod
def unpack(cls, packet_bytes):
'''二进制数据解包为所需参数'''
if not cls.verify(packet_bytes, pkt_type=cls.PKT_TYPE_RESPONSE):
# 数据非法
return None
code = struct.unpack('<B', packet_bytes[cls.HEADER_LEN:cls.HEADER_LEN+cls.CODE_LEN])[0]
param_bytes = packet_bytes[cls.HEADER_LEN + cls.CODE_LEN + cls.SIZE_LEN : -cls.CHECKSUM_LEN]
return code, param_bytes
class PacketBuffer:
'''Packet中转站'''
def __init__(self, is_debug=False):
self.is_debug = is_debug
self.packet_bytes_list = []
# 清空缓存区域
self.empty_buffer()
def update(self, next_byte):
'''将新的字节添加到Packet中转站'''
# logging.info('[INFO]: next byte: 0x%02x'%next_byte[0])
if not self.header_flag:
# 填充头部字节
if len(self.header) < Packet.HEADER_LEN:
# 向Header追加字节
self.header += next_byte
if len(self.header) == Packet.HEADER_LEN and self.header == Packet.HEADERS[Packet.PKT_TYPE_RESPONSE]:
self.header_flag = True
elif len(self.header) == Packet.HEADER_LEN:
# 首字节出队列
self.header = self.header[1:] + next_byte
# 查看Header是否匹配
if self.header == Packet.HEADERS[Packet.PKT_TYPE_RESPONSE]:
# print('header: {}'.format(self.header))
self.header_flag = True
elif not self.code_flag:
# 填充Code
if len(self.code) < Packet.CODE_LEN:
self.code += next_byte
if len(self.code) == Packet.CODE_LEN:
# print('code: {}'.format(self.code))
self.code_flag = True
elif not self.size_flag:
# 填充参数尺寸
if len(self.size) < Packet.SIZE_LEN:
self.size += next_byte
if len(self.size) == Packet.SIZE_LEN:
self.size_flag = True
# 更新参数个数
self.param_len = struct.unpack('<B', self.size)[0]
elif not self.param_bytes_flag:
# 填充参数
if len(self.param_bytes) < self.param_len:
self.param_bytes += next_byte
if len(self.param_bytes) == self.param_len:
self.param_bytes_flag = True
else:
# 计算校验和
# 构建一个完整的Packet
tmp_packet_bytes = self.header + self.code + self.size + self.param_bytes + next_byte
ret = Packet.verify(tmp_packet_bytes, pkt_type=Packet.PKT_TYPE_RESPONSE)
if ret:
self.checksum_flag = True
# 将新的Packet数据添加到中转列表里
self.packet_bytes_list.append(tmp_packet_bytes)
# 重新清空缓冲区
self.empty_buffer()
def empty_buffer(self):
# 数据帧是否准备好
self.param_len = None
self.header = b''
self.header_flag = False
self.code = b''
self.code_flag = False
self.size = b''
self.size_flag = False
self.param_bytes = b''
self.param_bytes_flag = False
def has_valid_packet(self):
'''是否有有效的包'''
return len(self.packet_bytes_list) > 0
def get_packet(self):
'''获取队首的Bytes'''
return self.packet_bytes_list.pop(0)
class UartServoInfo:
'''串口舵机的信息'''
SERVO_DEADBLOCK = 1.0 # 舵机死区
SERVO_ANGLE_LOWERB = -165 # 舵机角度下限
SERVO_ANGLE_UPPERB = 165 # 舵机角度上限
def __init__(self, id, lowerb=None, upperb=None):
self.id = id # 舵机的ID
self.cur_angle = None # 当前的角度
self.target_angle = None # 目标角度
self.is_online = False # 舵机是否在线
# 舵机角度上下限
self.lowerb = lowerb if lowerb is not None else None # self.SERVO_ANGLE_LOWERB
self.upperb = upperb if upperb is not None else None # self.SERVO_ANGLE_UPPERB
self.last_angle_error = None # 上一次的角度误差
self.last_sample_time = None # 上一次的采样时间
# 内存表数据
self.data_table_raw_dict = {} # 原始数据 字典类型
# 内存表写入标志位
self.data_write_success = False
# 是否开启多圈模式
self.is_mturn = False
def is_stop(self):
'''判断舵机是否已经停止'''
# 如果没有指定目标角度, 就将其设置为当前角度
if self.target_angle is None:
self.target_angle = self.cur_angle
# 角度误差判断
angle_error = self.target_angle - self.cur_angle
if abs(angle_error) <= self.SERVO_DEADBLOCK:
return True
if self.last_angle_error is None:
self.last_angle_error = angle_error
self.last_sample_time = time.time()
# 角度误差在死区范围以内则判断为已经到达目标点
# 更新采样数据
if abs(self.last_angle_error - angle_error) > 0.2:
self.last_angle_error = angle_error
self.last_sample_time = time.time() # 更新采样时间
if (time.time() - self.last_sample_time) > 1:
#
self.last_angle_error = None
self.last_sample_time = None
return True
return False
@property
def angle(self):
'''获取当前舵机的角度'''
return self.cur_angle
def move(self, angle):
'''设置舵机的目标角度'''
# 角度边界约束
if self.lowerb is not None:
angle = self.lowerb if angle < self.lowerb else angle
if self.upperb is not None:
angle = self.upperb if angle > self.upperb else angle
# 设置目标角度
self.target_angle = angle
def update(self, angle):
'''更新当前舵机的角度'''
self.cur_angle = angle
def __str__(self):
return "目标角度:{:.1f} 实际角度:{:.1f} 角度误差:{:.2f}".format(self.target_angle, self.angle, self.target_angle-self.angle)
class UartServoManager:
SERVO_NUM = 20
'''串口舵机管理器'''
UPDATE_INTERVAL_MS = 10 # ms
# 指令定义
CODE_PING = 1 # 舵机检测
CODE_QUERY_SERVO_ANGLE = 10 # 查询舵机的角度
CODE_QUERY_SERVO_ANGLE_MTURN = 16 # 查询舵机角度(多圈)
CODE_QUERY_SERVO_INFO = 5 # 查询舵机所有的信息 (未使用)
CODE_SET_SERVO_ANGLE = 8 # 设置舵机角度
CODE_SET_SPIN = 7 # 设置轮式模式
CODE_SET_DAMPING = 9 # 设置阻尼模式
CODE_SET_SERVO_ANGLE_BY_INTERVAL = 11 # 角度设置(指定周期)
CODE_SET_SERVO_ANGLE_BY_VELOCITY = 12 # 角度设置(指定转速)
CODE_SET_SERVO_ANGLE_MTURN = 13 # 多圈角度设置
CODE_SET_SERVO_ANGLE_MTURN_BY_INTERVAL = 14 # 多圈角度设置(指定周期)
CODE_SET_SERVO_ANGLE_MTURN_BY_VELOCITY = 15 # 多圈角度设置(指定转速)
CODE_RESET_USER_DATA = 2 # 用户表数据重置
CODE_READ_DATA = 3 # 读取内存表
CODE_WRITE_DATA = 4 # 写入内存表
# 响应数据包黑名单
RESPONSE_CODE_NEGLECT = []
# 定义轮式模式的四种控制方法
WHEEL_MODE_STOP = 0x00 # 停止
WHEEL_MODE_NORMAL = 0x01 # 常规模式
WHEEL_MODE_TURN = 0x02 # 定圈
WHEEL_MODE_TIME = 0x03 # 定时
# 内存表
ADDRESS_VOLTAGE = 1
ADDRESS_CURRENT = 2
ADDRESS_POWER = 3
ADDRESS_TEMPERATURE = 4
# 自定义变量
FILE_NAME = 'data.t'
FILE_PATH = '/home/yyf/ros03/rec/'
RECORD_TIME = 0
RECORD_UNIT = 0
ACT_UNIT = 0
def __init__(self, uart, is_scan_servo=True, srv_num=254, mean_dps=100, is_debug=False):
self.is_debug = is_debug
self.uart = uart
self.pkt_buffer = PacketBuffer(is_debug=True)
self.mean_dps = mean_dps # 默认的舵机旋转角速度
# 存放舵机信息
self.servos = {}
# 返回的CODE与函数的映射
self.response_handle_funcs = {
self.CODE_QUERY_SERVO_ANGLE: self.response_query_servo_angle,
self.CODE_QUERY_SERVO_ANGLE_MTURN: self.response_query_servo_angle_mturn,
self.CODE_PING: self.response_ping,
self.CODE_RESET_USER_DATA: self.response_reset_user_data,
self.CODE_READ_DATA: self.response_read_data,
self.CODE_WRITE_DATA: self.response_write_data,
}
# self.cur_ping_servo_id = 0 # 当前Ping的舵机ID号
if is_scan_servo:
self.scan_servo(srv_num=srv_num)
def send_request(self, code, param_bytes):
'''发送请数据'''
packet_bytes = Packet.pack(code, param_bytes)
try:
self.uart.write(packet_bytes)
# logging.info('串口发送请求数据 code:{}'.format(code))
# logging.info('数据帧内容:')
# logging.info(''.join(['0x%02x ' % b for b in packet_bytes]))
except serial.SerialException as e:
print(e)
logging.error('串口数据发送异常, 请检查是否是USB口松动或设备号变更, 需重新初始化舵机')
#
def ping(self, servo_id:int):
'''发送Ping请求'''
# self.cur_ping_servo_id = servo_id # 为了可视化显示
self.send_request(self.CODE_PING, struct.pack('<B', servo_id))
if self.is_debug:
logging.info('PING 舵机 id={}'.format(servo_id))
self.update(wait_response=True)
ret = servo_id in self.servos
if self.is_debug and ret:
logging.info('[fs_uservo]舵机ID={} 响应ping'.format(servo_id))
if ret:
# 更新舵机角度
self.query_servo_angle(servo_id)
return ret
def scan_servo(self, srv_num=254):
'''扫描所有的舵机'''
# ping所有的舵机
for servo_id in range(srv_num):
# 发送ping请求
self.ping(servo_id)
if self.is_debug:
logging.info("有效的舵机ID列表: {}".format(list(self.servos.keys())))
def response_ping(self, param_bytes):
'''响应PING请求'''
servo_id, = struct.unpack('<B', param_bytes)
if servo_id not in self.servos:
self.servos[servo_id] = UartServoInfo(servo_id)
self.servos[servo_id].is_online = True # 设置舵机在线的标志位
if self.is_debug:
logging.info('[fs_uservo]ECHO 添加一个新的舵机 id={}'.format(servo_id))
else:
self.servos[servo_id].is_online = True # 设置舵机在线的标志位
if self.is_debug:
logging.info('[fs_uservo]ECHO 已知舵机 id={}'.format(servo_id))
def query_servo_angle(self, servo_id):
'''更新单个舵机的角度'''
if self.is_debug:
logging.info('查询单个舵机的角度 id={}'.format(servo_id))
# print("查询舵机角度 多圈模式? {}".format(self.servos[servo_id].is_mturn))
if self.servos[servo_id].is_mturn:
# 多圈模式
self.send_request(self.CODE_QUERY_SERVO_ANGLE_MTURN, struct.pack('<B', servo_id))
else:
# 单圈模式
self.send_request(self.CODE_QUERY_SERVO_ANGLE, struct.pack('<B', servo_id))
# start_time = time.time()
self.update(wait_response=True) # 等待数据回传
# print(servo_id, time.time() - start_time)
return self.servos[servo_id].angle
def query_all_srv_angle(self):
'''更新所有的舵机角度'''
for servo_id in self.servos:
self.query_servo_angle(servo_id)
def response_query_servo_angle(self, param_bytes):
'''响应查询单个舵机角度'''
# 数据解包
servo_id, angle = struct.unpack('<Bh', param_bytes)
# 舵机的分辨率是0.1度
angle /= 10
# print("查询到角度单圈: {}".format(angle))
if servo_id not in self.servos:
pass
else:
# 更新当前的角度
self.servos[servo_id].update(angle)
if self.is_debug:
logging.info('[INFO] 更新舵机角度 id={} 角度: {:.2f} deg'.format(servo_id, angle))
def response_query_servo_angle_mturn(self, param_bytes):
'''响应舵机角度查询角度(多圈)'''
# 数据解包
servo_id, angle, mturn = struct.unpack('<Bih', param_bytes)
# 舵机的分辨率是0.1度
angle /= 10.0
if servo_id not in self.servos:
pass
else:
# 更新当前的角度
self.servos[servo_id].update(angle)
if self.is_debug:
logging.info('[INFO] 更新舵机角度 id={} 角度: {:.2f} deg'.format(servo_id, angle))
def refresh_srv_list(self, max_servo_id=254):
'''刷新当前的舵机列表'''
# 清空已有的字典
self.servos = {}
for servo_idx in range(max_servo_id):
self.ping(servo_idx)
for ti in range(1, self.SERVO_NUM + 1):
# 查询一个舵机最多等待1000ms
self.update()
if servo_idx in self.servos:
break
time.sleep(0.05)
def query_srv_info(self, servo_id):
'''查询单个舵机的所有配置'''
self.send_request(self.CODE_QUERY_SERVO_INFO, struct.pack('<B', servo_id))
# logging.info('查询单个舵机的所有配置 id={}'.format(servo_id))
self.update(wait_response=True)
def set_servo_angle(self, servo_id:int, angle:float, is_mturn:bool=False, interval:float=None, velocity:float=None, t_acc:int=20, t_dec:int=20, power:int=0, mean_dps:float=100.0):
'''发送舵机角度控制请求
@param servo_id
舵机的ID号
@param angle
舵机的目标角度
@param is_mturn
是否开启多圈模式
@param interval
中间间隔 单位ms
@param velocity
目标转速,单位dps
@param t_acc
加速时间,在指定目标转速时有效. 单位ms
@param t_dec
减速时间, 在指定减速时间时有效. 单位ms
@param power
功率限制, 单位mW
@param mean_dps
平均转速, 单位dps
'''
if servo_id not in self.servos:
# logging.warn('未知舵机序号: {}'.format(servo_id))
return False, '未知舵机序号: {}'.format(servo_id)
# 同步修改srv_info
self.servos[servo_id].move(angle)
# 发送控制指令
# 单位转换为0.1度
angle = int(angle * 10)
# 角度约束
if is_mturn:
if angle < -3686400:
angle = -3686400
elif angle > 3686400:
angle = 3686400
else:
if angle < -1800:
angle = -1800
elif angle > 1800:
angle = 1800
# 加减速时间约束
# if t_acc < 20:
# t_acc = 20
# if t_dec < 20:
# t_dec = 20
# 去除时间约束
t_acc = 20
t_dec = 20
# 获取舵机信息
srv_info = self.servos[servo_id]
self.servos[servo_id].is_mturn = is_mturn
if interval is not None and interval != 0:
# 指定周期
# 判断周期设定是否合法
interval = int(interval)
if is_mturn:
if interval < t_acc + t_dec:
interval = t_acc + t_dec
elif interval > 4096000:
interval = 4096000
param_bytes = struct.pack('<BiIHHH', servo_id, angle, interval, t_acc, t_dec, power)
self.send_request(self.CODE_SET_SERVO_ANGLE_MTURN_BY_INTERVAL, param_bytes)
else:
param_bytes = struct.pack('<BhHH', servo_id, angle, interval, power)
self.send_request(self.CODE_SET_SERVO_ANGLE, param_bytes)
# param_bytes = struct.pack('<BhHHHH', servo_id, angle, interval, t_acc, t_dec, power)
# self.send_request(self.CODE_SET_SERVO_ANGLE_BY_INTERVAL, param_bytes)
elif velocity is not None:
# 指定目标转速
# 转速约束
if velocity < 1.0:
velocity = 1.0
elif velocity > 750.0:
velocity = 750.0
velocity = int(velocity*10.0) # 单位dps -> 0.1dps
if is_mturn:
param_bytes = struct.pack('<BiHHHH', servo_id, angle, velocity, t_acc, t_dec, power)
self.send_request(self.CODE_SET_SERVO_ANGLE_MTURN_BY_VELOCITY, param_bytes)
else:
param_bytes = struct.pack('<BhHHHH', servo_id, angle, velocity, t_acc, t_dec, power)
self.send_request(self.CODE_SET_SERVO_ANGLE_BY_VELOCITY, param_bytes)
else:
# 根据平均转速,计算周期
if interval is None:
# if srv_info.angle is None:
# 查询角度
srv_info.update(self.query_servo_angle(servo_id))
interval = int((abs(angle*0.1 - srv_info.angle) / mean_dps) * 1000)
if is_mturn:
param_bytes = struct.pack('<BiIH', servo_id, angle, interval, power)
self.send_request(self.CODE_SET_SERVO_ANGLE_MTURN, param_bytes)
else:
param_bytes = struct.pack('<BhHH', servo_id, angle, interval, power)
self.send_request(self.CODE_SET_SERVO_ANGLE, param_bytes)
# # 周期如果是0的话就意味着需要马上旋转到目标的角度
# 如果不是0则需要计算所需的周期interval
# if interval != 0:
# if srv_info.cur_angle is None:
# # 初始状态还没有角度
# interval = 800
# elif interval is None and mean_dps is None:
# # 延时时间差不多是15ms旋转一度,可以比较平滑
# interval = int((abs(angle - srv_info.angle) / self.mean_dps) *1000)
# elif mean_dps is not None:
# # 根据mean_dps计算时间间隔 (转换为ms)
# interval = int((abs(angle - srv_info.angle) / mean_dps) *1000)
return True, '设置成功'
def set_wheel(self, servo_id, mode, value=0, is_cw=True, mean_dps=None):
'''设置舵机轮式模式控制
@param servo_id
舵机的ID号
@param mode
舵机的模式 取值范围[0,3]
@param value
定时模式下代表时间(单位ms)
定圈模式下代表圈数(单位圈)
@param is_cw
轮子的旋转方向, is_cw代表是否是顺指针旋转
@param speed
轮子旋转的角速度, 单位 °/s
'''
# 轮式模式的控制方法
method = mode | 0x80 if is_cw else mode
# 设置旋转的角速度
mean_dps = self.mean_dps if mean_dps is None else mean_dps
mean_dps = int(mean_dps)
# 发送请求
self.send_request(self.CODE_SET_SPIN, struct.pack('<BBHH', servo_id, method, mean_dps, value))
def wheel_stop(self, servo_id):
'''停止'''
self.set_wheel(servo_id, self.WHEEL_MODE_STOP, 0, False, 0)
def set_wheel_norm(self, servo_id, is_cw=True, mean_dps=None):
'''设置轮式为普通模式, 转速单位: °/s'''
self.set_wheel(servo_id, self.WHEEL_MODE_NORMAL, value=0, is_cw=is_cw, mean_dps=mean_dps)
def set_wheel_turn(self, servo_id, turn=1, is_cw=True, mean_dps=None, is_wait=True):
'''设置轮式为定圈模式, 圈数单位: 圈数'''
if mean_dps is None:
mean_dps = self.mean_dps
self.set_wheel(servo_id, self.WHEEL_MODE_TURN, value=turn, is_cw=is_cw, mean_dps=mean_dps)
# 根据圈数与转速, 估计延时的时间
if is_wait:
time.sleep(turn*360.0 / mean_dps)
def set_wheel_time(self, servo_id, interval=1000, is_cw=True, mean_dps=None, is_wait=True):
'''设置轮子为定时模式,时间单位: ms'''
self.set_wheel(servo_id, self.WHEEL_MODE_TIME, value=interval, is_cw=is_cw, mean_dps=mean_dps)
if is_wait:
time.sleep(interval/1000.0)
def set_damping(self, servo_id, power=0):
'''设置阻尼模式
@param servo_id
舵机ID
@param power
舵机保持功率
'''
self.send_request(self.CODE_SET_DAMPING, struct.pack('<BH', servo_id, power))
def reset_user_data(self, servo_id):
'''用户表数据重置'''
# 发送请求数据
self.send_request(self.CODE_RESET_USER_DATA, struct.pack('<B', servo_id))
# 等待数据回传
# 注: 舵机并不会回传指令
# self.update(wait_response=True)
return True
def response_reset_user_data(self, param_bytes):
'''用户数据重置响应'''
servo_id, result = struct.unpack('<BB', param_bytes)
if self.is_debug:
# logging.info("舵机用户数据重置 舵机ID={} 是否成功={}".format(servo_id, result))
print("舵机用户数据重置 舵机ID={} 是否成功={}".format(servo_id, result))
def read_data(self, servo_id, address):
'''读取内存表数据
注: 返回的是字节流数据, 需要进一步处理
'''
# 发送请求
self.send_request(self.CODE_READ_DATA, struct.pack('<BB', servo_id, address))
self.update(wait_response=True) # 等待数据回传
if self.is_debug:
logging.info("READ DATA 舵机ID={} Address={}".format(servo_id, address))
logging.info("DATA : {}".format(self.servos[servo_id].data_table_raw_dict[address]))
# 返回读取到的数据
return self.servos[servo_id].data_table_raw_dict[address]
def response_read_data(self, param_bytes):
'''处理内存表数据读取回传'''
# 数据提取
servo_id, address = struct.unpack('<BB', param_bytes[:2])
content = param_bytes[2:]
self.servos[servo_id].data_table_raw_dict[address] = content
def write_data(self, servo_id, address, content):
'''写入数据'''
# 发送请求
self.send_request(self.CODE_WRITE_DATA, struct.pack('<BB', servo_id, address)+content)
# 初始化标志位
self.servos[servo_id].data_write_success = False
if self.is_debug:
logging.info("WRITE DATA 舵机ID={} Address={} Value={}".format(servo_id, address, content))
# 等待数据回传
self.update(wait_response=True)
return self.servos[servo_id].data_write_success
def response_write_data(self, param_bytes):
'''处理写入数据回传'''
servo_id, address, result = struct.unpack('<BBB', param_bytes)
self.servos[servo_id].data_write_success = result == 1
if self.is_debug:
logging.info("WRITE DATA 舵机ID={} Address={} Result={}".format(servo_id, address, result))
def query_voltage(self, servo_id):
'''查询总线电压,单位V'''
voltage_bytes = self.read_data(servo_id, self.ADDRESS_VOLTAGE)
return struct.unpack('<H', voltage_bytes)[0] / 1000.0
def query_current(self, servo_id):
'''查询电流, 单位A'''
current_bytes = self.read_data(servo_id, self.ADDRESS_CURRENT)
return struct.unpack('<H', current_bytes)[0] / 1000.0
def query_power(self, servo_id):
'''查询功率, 单位W'''
power_bytes = self.read_data(servo_id, self.ADDRESS_POWER)
return struct.unpack('<H', power_bytes)[0] / 1000.0
def query_temperature(self, servo_id):
'''查询温度,为ADC值'''
temp_bytes = self.read_data(servo_id, self.ADDRESS_TEMPERATURE)
return float(struct.unpack('<H', temp_bytes)[0])
def update(self, is_empty_buffer=False, wait_response=False, timeout=0.02):
'''舵机管理器的定时器回调函数'''
t_start = time.time() # 获取开始时间
while True:
# 读入所有缓冲区的Bytes
buffer_bytes = self.uart.readall()
if len(buffer_bytes) != 0:
if self.is_debug:
logging.info('Recv Bytes: ')
logging.info(' '.join(['0x%02x'%b for b in buffer_bytes]))
# 将读入的Bytes打包成数据帧
if buffer_bytes is not None:
for b in buffer_bytes:
self.pkt_buffer.update( struct.pack('<B', b))
t_cur = time.time() # 获取当前的时间戳
is_timeout = (t_cur - t_start) > timeout # 是否超时
if not wait_response:
break
elif self.pkt_buffer.has_valid_packet() or is_timeout:
break
t_end = time.time()
# print(t_end - t_start)
# 相应回调数据
while self.pkt_buffer.has_valid_packet():
# 处理现有的返回数据帧
response_bytes = self.pkt_buffer.get_packet()
# 解包
code, param_bytes = Packet.unpack(response_bytes)
# 根据code找到相应处理函数
if code in self.response_handle_funcs:
self.response_handle_funcs[code](param_bytes)
else:
logging.warn('未知功能码 : {}'.format(code))
# 清空原来的缓冲区
if is_empty_buffer:
self.pkt_buffer.empty_buffer()
def is_stop(self):
'''判断所有的舵机是否均停止旋转'''
self.query_all_srv_angle()
for servo_id, srv_info in self.servos.items():
if not srv_info.is_stop():
return False
return True
def wait(self, timeout=None):
'''等待舵机旋转到特定的角度'''
t_start = time.time()
while True:
# 更新舵机角度
self.query_all_srv_angle()
if self.is_stop():
break
# 超时检测
if timeout is not None:
t_current = time.time()
if t_current - t_start > timeout:
break
uint8 index
int16 angle
int16 time
string cmd
---
int16 angle
string log
uint8 flag
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment