主要通过MQTT实现通信。图片的上传和显示使用了服务器。 对于没有服务器的同学,可以进入官网领取免费1个月的轻量云服务器:云产品免费试用;需要选购的进:轻量应用服务器专场;不清楚怎么操作的可以看教程:腾讯云产品免费试用教程
实现效果:
当然这只是一个示例,只要能实现手机间的通信,那就可以借助termux-api实现非常多的功能!
被控端手机(termux)
termux安装mqtt等库:
代码语言:javascript复制pip install paho-mqtt, requests, imagemagick
在termux中的Python代码为:
代码语言:javascript复制# coding:utf-8
import paho.mqtt.client as pahomqtt
import os
import base64
import requests
import time
class MQTT:
def __init__(self):
self.CLIENTID = 'redmi8'
self.MQTTHOST = "test.mosquitto.org"
self.MQTTPORT = 1883
self.USERNAME = 'lh'
self.PASSWORD = "hualala"
self.HEARTBEAT = 60
self.topic_subscribe = r'/mqtt/redmi8'
self.topic_publish = r'/mqtt/redmi8/send/msg'
self.topic_publish_image = r'/mqtt/redmi8/send/image'
self.client = pahomqtt.Client()
self.CloudAllowRun = 1
# MQTT连接回调
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print("Connected")
client.subscribe(self.topic_subscribe)
# MQTT接收回调
def on_message(self, client, userdata, message):
result = 'ok'
msg = str(message.payload.decode("utf-8"))
print("Received message: " msg)
if msg.startswith('cmd:img'):
_, cmd, path = msg.split(' ')
print('path:', path)
if cmd == 'upload':
img = self.upload_image(path)
if img:
self.client.publish(self.topic_publish_image, img)
result = 'upload ok'
else:
result = 'upload fail'
elif cmd == 'remove':
self.remove_image(path)
result = 'remove ok'
else:
# 执行本地cmd命令,并将结果保存到变量result中
result = os.popen(msg).read().strip() or 'capture ok'
time.sleep(1)
if self.upload_image(path=msg.split()[-1]):
result = 'nupload ok'
print("Command result: " result)
self.client.publish(self.topic_publish, result)
def remove_image(self, path="a.png"):
url = 'http://xfxuezhang.cn/web/picture/remove_file.php?name=' path
if os.path.exists(path):
os.remove(path)
return requests.get(url).text
def upload_image(self, path="a.png"):
result = None
try:
url = 'http://xfxuezhang.cn/web/picture/upload_file.php'
os.popen(f'convert {path} -resize 200x150 {path}')
filename = path.split('/')[-1]
files = {"file": (filename, open(path, "rb"), "image/png")}
response = requests.post(url, files=files)
print(response.text)
result = response.json()['data']
except Exception as e:
print(e)
return result
def convert_image(self, path="a.png"):
os.popen(f'convert {path} -resize 200x150 {path}')
# 从文件中读取图片并将其转换为二进制数据
with open(path, "rb") as f:
image_bytes = f.read()
# 将二进制数据编码为base64字符串
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
return image_base64
def mqtt(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.username_pw_set(self.USERNAME, self.PASSWORD)
try:
self.client.connect(self.MQTTHOST, self.MQTTPORT, self.HEARTBEAT)
# self.client.loop_start() # 线程
self.client.loop_forever()
except:
pass
def publish(self, msg):
self.client.publish(topic=self.topic_publish, payload=msg, qos=1)
if __name__ == '__main__':
Mqtt = MQTT()
Mqtt.mqtt()
# while True:
# time.sleep(10)
# msg = input('>> ')
# Mqtt.publish(msg)
运行记录:
控制端手机
另一台控制端的手机上安装“MQTT Dashboard”,并添加一些组件,其中的主题根据上面的代码来填。实现效果: