本文最后更新于18 天前,其中的信息可能已经过时,如有错误请发送邮件到2275012286@qq.com,或者在下方留言。
想了解更多项目请搜索“项目”标签或者访问我的github仓库。
仓库地址:QianmoNai’s Repositories
基于树莓派docker容器部署下的OpenClaw获取USB摄像头推流画面项目
1.项目背景
最近尝试扩展一下openclaw的玩法,于是就想着能不能用usb摄像头来给龙虾“长眼睛”,关于如何在树莓派中部署openclaw,本文不展示部署过程,详细可以参考这个视频⚡极致挑战:树莓派上部署 OpenClaw 小龙虾🦞!
2.项目介绍
项目所用到的:部署完成并且成功接入飞书的openclaw、树莓派5(8G)、usb摄像头。
实现效果(点我)



如上所示,最终成果是能够得到一个局域网内任意设备都能实时看摄像头的推流网页,和一个能感知外界“长眼睛的龙虾”
①推流视频网页部分:
工作原理:
摄像头 → ffmpeg (采集+MJPEG编码) → Python HTTP服务器 → 客户端浏览器
核心代码:
Python HTTP服务器代码 (点我)
#!/usr/bin/env python3
"""
树莓派摄像头 MJPEG 流服务器 - 抢占式单客户端
新客户端连接时立即终止旧客户端
"""
import subprocess
import socket
import sys
import signal
import threading
import time
import select
HOST = '0.0.0.0'
PORT = 8080
DEVICE = '/dev/video0'
BOUNDARY = b'--BoundaryString'
# 全局资源引用
current_proc = None
current_client = None
current_thread = None
lock = threading.Lock()
def get_local_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return "127.0.0.1"
def kill_old(reset_global=True):
"""终止旧资源"""
global current_proc, current_client, current_thread
with lock:
# 保存当前资源引用
proc = current_proc if reset_global else None
client = current_client if reset_global else None
thread = current_thread if reset_global else None
# 重置全局变量(先重置,避免新请求拿到旧资源)
if reset_global:
current_proc = None
current_client = None
current_thread = None
# 终止线程(如果需要)
if thread and thread.is_alive():
# 线程是守护线程,主要靠关闭客户端和进程来终止
pass
# 终止ffmpeg进程
if proc:
try:
proc.terminate()
try:
proc.wait(timeout=0.5)
except subprocess.TimeoutExpired:
proc.kill()
except Exception as e:
print(f"终止旧进程失败: {e}")
# 关闭客户端连接
if client:
try:
client.shutdown(socket.SHUT_RDWR)
except Exception:
pass
try:
client.close()
print("旧客户端连接已关闭")
except Exception as e:
print(f"关闭旧客户端失败: {e}")
def read_http_request(client):
"""读取并解析客户端的HTTP请求(非阻塞)"""
try:
# 设置非阻塞模式,避免卡死
client.setblocking(False)
ready = select.select([client], [], [], 2.0)
if ready[0]:
# 读取请求头(最多4096字节)
request = client.recv(4096)
if b'GET' in request:
return True
except Exception:
pass
return False
def stream_to_client(client, proc):
"""向客户端发送视频流"""
try:
while True:
# 检查客户端是否还连接
try:
# 非阻塞检查客户端是否关闭
client.setblocking(False)
data = client.recv(1, socket.MSG_PEEK)
if not data:
break
except BlockingIOError:
pass
except Exception:
break
# 检查ffmpeg进程是否还在运行
if proc.poll() is not None:
print("ffmpeg进程已退出")
break
client.sendall(BOUNDARY + b'\r\n')
client.sendall(b'Content-Type: image/jpeg\r\n\r\n')
frame = b''
found_start = False
while True:
chunk = proc.stdout.read(8192)
if not chunk:
return
if not found_start:
start = chunk.find(b'\xff\xd8')
if start >= 0:
frame = chunk[start:]
found_start = True
continue
frame += chunk
end = frame.rfind(b'\xff\xd9')
if end >= 0:
frame = frame[:end+2]
break
client.sendall(frame)
client.sendall(b'\r\n')
except Exception as e:
print(f"流发送异常: {e}")
finally:
# 清理当前连接的资源
kill_old(reset_global=False)
def main():
global current_proc, current_client, current_thread
ip = get_local_ip()
print(f"视频流服务器已启动")
print(f"访问地址: http://{ip}:{PORT}/")
print(f"摄像头: {DEVICE}")
print(f"模式: 新客户端抢占连接")
print(f"按 Ctrl+C 停止\n")
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, PORT))
server.listen(5)
# 设置服务器套接字超时,避免accept卡死
server.settimeout(1.0)
def handler(sig, frame):
print("\n停止服务器...")
kill_old()
server.close()
sys.exit(0)
signal.signal(signal.SIGINT, handler)
while True:
try:
client, addr = server.accept()
except socket.timeout:
continue
except Exception as e:
print(f"接受连接失败: {e}")
continue
print(f"\n新客户端连接: {addr}")
# 1. 先终止所有旧资源(核心:先杀旧的,再处理新的)
kill_old()
# 2. 读取并验证客户端的HTTP请求
if not read_http_request(client):
print(f"客户端 {addr} 未发送有效HTTP请求,关闭连接")
client.close()
continue
# 3. 启动新的ffmpeg进程
cmd = [
'ffmpeg',
'-f', 'v4l2',
'-input_format', 'mjpeg',
'-video_size', '1280x720',
'-framerate', '15',
'-i', DEVICE,
'-c:v', 'mjpeg',
'-q:v', '5',
'-f', 'image2pipe',
'-fflags', 'nobuffer', # 禁用缓冲,降低延迟
'-'
]
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
preexec_fn=lambda: signal.signal(signal.SIGPIPE, signal.SIG_DFL)
)
except Exception as e:
print(f"启动ffmpeg失败: {e}")
client.close()
continue
# 4. 发送HTTP响应头
header = (
"HTTP/1.1 200 OK\r\n"
"Content-Type: multipart/x-mixed-replace; boundary=BoundaryString\r\n"
"Cache-Control: no-cache\r\n"
"Pragma: no-cache\r\n"
"Connection: close\r\n" # 明确关闭连接
"\r\n"
).encode()
try:
client.sendall(header)
except Exception as e:
print(f"发送响应头失败: {e}")
proc.terminate()
client.close()
continue
# 5. 注册新资源到全局变量
with lock:
current_proc = proc
current_client = client
# 6. 启动流发送线程
t = threading.Thread(target=stream_to_client, args=(client, proc))
t.daemon = True
t.start()
with lock:
current_thread = t
print(f"客户端 {addr} 抢占成功,流已开始传输")
if __name__ == '__main__':
main()
- 采集视频: 用 ffmpeg 从 /dev/video0 采集 MJPEG 格式
- HTTP 响应: 返回 multipart/x-mixed-replace 类型,浏览器可实时显示
- 帧解析:从 ffmpeg 输出中提取完整 JPEG 帧 (\xff\xd8 到 \xff\xd9)
- 抢占机制: 新客户端连接时,终止旧的 ffmpeg 进程和 socket
同时发现单单只有服务器代码运行小概率会出现进程卡死的情况,可能是视频流阻塞?所以加上了一个的专门用于视频服务器自动监控与重启程序,在服务器出现问题的时候重启服务器,后续也只需要开机自启动这个程序即可。
视频服务器自动监控与重启程序代码(点我)
#!/usr/bin/env python3
"""
视频服务器监控程序
每隔一段时间检测服务器运行状态,出现问题就重启
"""
import subprocess
import time
import socket
import sys
import os
SERVER_SCRIPT = "/home/qianmo/mjpeg_server.py"
HOST = "127.0.0.1"
PORT = 8080
CHECK_INTERVAL = 60 # 检测间隔(秒)
MAX_RETRIES = 1 # 最大重试次数
def is_process_running(name):
"""检查进程是否运行"""
try:
result = subprocess.run(
["pgrep", "-f", name],
capture_output=True,
text=True
)
return result.returncode == 0
except:
return False
def is_port_responding(host, port, timeout=3):
"""检查端口是否响应"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect((host, port))
sock.close()
return True
except:
return False
def check_http_response(host, port, timeout=3):
"""检查HTTP响应是否正常"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect((host, port))
sock.send(b"GET / HTTP/1.0\r\n\r\n")
response = sock.recv(1024)
sock.close()
return b"200 OK" in response
except:
return False
def stop_server():
"""停止服务器"""
print("停止服务器...")
subprocess.run(["pkill", "-f", "mjpeg_server.py"], capture_output=True)
subprocess.run(["pkill", "-f", "ffmpeg.*video0"], capture_output=True)
time.sleep(1)
def start_server():
"""启动服务器"""
print(f"启动服务器: {SERVER_SCRIPT}")
subprocess.Popen(
["python3", SERVER_SCRIPT],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True
)
time.sleep(2)
def restart_server():
"""重启服务器"""
stop_server()
start_server()
def get_timestamp():
"""获取时间戳"""
return time.strftime("%Y-%m-%d %H:%M:%S")
def main():
print(f"视频服务器监控程序启动")
print(f"监控地址: {HOST}:{PORT}")
print(f"检测间隔: {CHECK_INTERVAL}秒")
print(f"按 Ctrl+C 停止\n")
consecutive_failures = 0
while True:
timestamp = get_timestamp()
# 检查进程
process_ok = is_process_running("mjpeg_server.py")
port_ok = is_port_responding(HOST, PORT)
http_ok = check_http_response(HOST, PORT)
if process_ok and port_ok and http_ok:
consecutive_failures = 0
print(f"[{timestamp}] 状态正常")
else:
consecutive_failures += 1
print(f"[{timestamp}] 检测异常 (失败次数: {consecutive_failures})")
print(f" - 进程运行: {'是' if process_ok else '否'}")
print(f" - 端口响应: {'是' if port_ok else '否'}")
print(f" - HTTP正常: {'是' if http_ok else '否'}")
if consecutive_failures >= MAX_RETRIES:
print(f"[{timestamp}] 连续失败{consecutive_failures}次,重启服务器...")
restart_server()
consecutive_failures = 0
print(f"[{timestamp}] 服务器已重启\n")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n监控程序已停止")
sys.exit(0)
- 多维度检测:进程存活 + 端口连通 + HTTP 响应,避免单一检测维度的误判;
- 自动恢复:异常达到阈值后自动重启服务器,无需人工干预;
- 日志清晰:输出带时间戳的状态日志,便于定位问题;
- 鲁棒性:捕获异常(如网络超时、命令执行失败),避免监控程序自身崩溃;
- 后台运行:启动服务器时重定向输出,且新建会话,保证服务独立运行。
其他:
要让局域网下其他设备能够访问要开放防火墙(8080端口)
sudo ufw allow 8080/tcp
访问地址——http://树莓派ip:8080/
①openclaw容器部分:
最简单的方法是把推流地址发给openclaw然后让它自己想办法把画面给你,最后别忘记把流程总结添加到TOOLS.md中让龙虾能记住操作流程。我添加的内容如下如下:
### 局域网摄像头
**地址:** http://192.168.1.112:8080/
**查看流程:**
1. 使用 browser 工具打开地址(profile: openclaw)
2. 刷新页面(navigate 同一地址)
3. 截图(screenshot)
4. 用 image 工具分析画面内容
3.项目重点内容
①推流核心是 ffmpeg 采集 MJPEG 格式视频,结合 Python HTTP 服务器实现浏览器实时播放;
②设计了抢占式单客户端机制,保证同一时间只有一个最新访问的客户端能访问摄像头流;
③OpenClaw 集成只需配置推流地址和预设工具流程,即可实现摄像头画面的获取与分析。










