树莓派&旭日X3派通过TCP指令控制继电器的通断

2023-10-16 14:17:21 浏览数 (2)

  • 理论上地平线的旭日X3派同样适用(未测试,文末提供方案)
  • 功能:通过TCP通讯实现远程控制继电器通断,进而控制机器人本体的电器等开关

继电器模块

继电器直接在淘宝上随便找的树莓派适用的(旭日X3派同样适用),如图:


TCP指令(HEX格式):

  • 继电器开:55 02 04 00 88 01
  • 继电器关:55 02 04 00 88 02

树莓派

以下是树莓派的代码示意:

代码语言:javascript复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 4/1/23 9:52 PM
# @Author : Chenan_Wang
# @File : socket_test.py
# @Software : PyCharm

import os
import RPi.GPIO as GPIO
import sys
import math
import time
from threading import Thread
from socket import *


# 接受TCP消息
def Receive(TCPsocket):
    while True:
        socket_data = TCPsocket.recv(1024)
        # print('raw', socket_data)
        socket_data = socket_data.hex()    # 转成hex编码格式
        # print('2hex', socket_data)
        if len(socket_data):
            key = socket_data
            print("n")
            frame_header = key[0:2]
            frame_mark = key[2:4]
            data_length = key[4:6]
            data_cmd = key[6:10]
            data_data = key[10:12]

            # print('接收数据:{!r}'.format(key.decode('utf-8')))
            if frame_header   frame_mark == '5502':
                print('接收到数据: '   frame_header   " "   frame_mark   " "   data_length   " "   data_cmd   " "   data_data)

                if data_cmd == '0088':  # 继电器
                    print('-----风机控制-----')
                    try:
                        if data_data == '01':       # 继电器开
                            GPIO.output(Relay_Ch1, GPIO.LOW)
                            print('------风机开------')
                            TCPsocket.send(bytes.fromhex('550204008801'))   # 返回继电器开的指令
                            time.sleep(0.5)
                        elif data_data == '02':     # 继电器关
                            GPIO.output(Relay_Ch1, GPIO.HIGH)
                            print('------风机关------')
                            TCPsocket.send(bytes.fromhex('550204008802'))  # 返回继电器关的指令
                            time.sleep(0.5)
                    except:
                        print("except")
                        GPIO.cleanup()
            elif key == '88':
                print("-----关闭TCP连接-----")
                TCPsocket.close()
                break
            else:
                print("Error, The Target Message is WRONG!!!")
                # break
        else:
            print('-----未接收到客户端数据,可能连接已经断开-----')
            # 数据中断时进行服务重启程序,先close 再accept等待重新连线
            # 可以防止出现当client意外终止导致server的中断(Broken pipe错误)
            print('-----正在重新建立连接-----')
            TCPsocket.close()
            TCPsocket, clientInfo = serverSocket.accept()


if __name__ == '__main__':

    # 配置树莓派
    Relay_Ch1 = 26
    Relay_Ch2 = 20
    Relay_Ch3 = 21
    GPIO.setwarnings(False)
    GPIO.setmode(GPIO.BCM)           # 设置BMC编码模式
    GPIO.setup(Relay_Ch1, GPIO.OUT)  # 设置26引脚为输出
    GPIO.setup(Relay_Ch2, GPIO.OUT)  # 设置20引脚为输出
    GPIO.setup(Relay_Ch3, GPIO.OUT)  # 设置21引脚为输出
    print("-----Setup The Relay Module is SUCCESS!-----n")

    # 启动TCP通讯
    serverSocket = socket(AF_INET, SOCK_STREAM)
    serverSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    serverSocket.bind(('192.168.72.134', 8200))     # ×××××这里改IP和端口号××××× #
    serverSocket.listen(5)
    print("-----树莓派服务器正在启动!-----")

    TCPsocket, clientInfo = serverSocket.accept()
    # Receive(Sockets)
    t1 = Thread(target=Receive, args=(TCPsocket,))
    t1.start()

旭日X3派

  • 仅提供测试方案,尚未在X3派实机上验证

首先安装hobot-arm64-gpiopy.deb(默认已安装)

代码语言:javascript复制
dpkg –i hobot-arm64-gpiopy.deb

然后将代码修改为:

代码语言:javascript复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 4/1/23 9:52 PM
# @Author : Chenan_Wang
# @File : socket_test.py
# @Software : PyCharm

import os
import Hobot.GPIO as GPIO
import sys
import math
import time
from threading import Thread
from socket import *


# 接受TCP消息
def Receive(TCPsocket):
    while True:
        socket_data = TCPsocket.recv(1024)
        # print('raw', socket_data)
        socket_data = socket_data.hex()    # 转成hex编码格式
        # print('2hex', socket_data)
        if len(socket_data):
            key = socket_data
            print("n")
            frame_header = key[0:2]
            frame_mark = key[2:4]
            data_length = key[4:6]
            data_cmd = key[6:10]
            data_data = key[10:12]

            # print('接收数据:{!r}'.format(key.decode('utf-8')))
            if frame_header   frame_mark == '5502':
                print('接收到数据: '   frame_header   " "   frame_mark   " "   data_length   " "   data_cmd   " "   data_data)

                if data_cmd == '0088':  # 继电器
                    print('-----风机控制-----')
                    try:
                        if data_data == '01':       # 继电器开
                            GPIO.output(Relay_Ch1, GPIO.LOW)
                            print('------风机开------')
                            TCPsocket.send(bytes.fromhex('550204008801'))   # 返回继电器开的指令
                            time.sleep(0.5)
                        elif data_data == '02':     # 继电器关
                            GPIO.output(Relay_Ch1, GPIO.HIGH)
                            print('------风机关------')
                            TCPsocket.send(bytes.fromhex('550204008802'))  # 返回继电器关的指令
                            time.sleep(0.5)
                    except:
                        print("except")
                        GPIO.cleanup()
            elif key == '88':
                print("-----关闭TCP连接-----")
                TCPsocket.close()
                break
            else:
                print("Error, The Target Message is WRONG!!!")
                # break
        else:
            print('-----未接收到客户端数据,可能连接已经断开-----')
            # 数据中断时进行服务重启程序,先close 再accept等待重新连线
            # 可以防止出现当client意外终止导致server的中断(Broken pipe错误)
            print('-----正在重新建立连接-----')
            TCPsocket.close()
            TCPsocket, clientInfo = serverSocket.accept()


if __name__ == '__main__':

    # 配置X3派
    Relay_Ch1 = 26
    Relay_Ch2 = 20
    Relay_Ch3 = 21
    GPIO.setwarnings(False)
    GPIO.setmode(GPIO.BCM)           # 设置BMC编码模式
    GPIO.setup(Relay_Ch1, GPIO.OUT)  # 设置26引脚为输出
    GPIO.setup(Relay_Ch2, GPIO.OUT)  # 设置20引脚为输出
    GPIO.setup(Relay_Ch3, GPIO.OUT)  # 设置21引脚为输出
    print("-----Setup The Relay Module is SUCCESS!-----n")

    # 启动TCP通讯
    serverSocket = socket(AF_INET, SOCK_STREAM)
    serverSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    serverSocket.bind(('192.168.72.134', 8200))     # ×××××这里改IP和端口号××××× #
    serverSocket.listen(5)
    print("-----X3派服务器正在启动!-----")

    TCPsocket, clientInfo = serverSocket.accept()
    # Receive(Sockets)
    t1 = Thread(target=Receive, args=(TCPsocket,))
    t1.start()

0 人点赞