本篇概览
- 本文是《Kurento实战》的第五篇,咱们用KMS的现有能力开发一个简单的媒体播放器,整体架构如下图:
- 从上图可见,实战主要内容是开发player-with-record应用,整个过程如下:
- 部署KMS
- 开发名为player-with-record的springboot应用,含简单的网页
- 浏览器打开网页,与player-with-record建立websocket连接,将流媒体地址发送到player-with-record
- player-with-record通过kurento SDK向KMS发指令,创建媒体播放和webrtc组件实例
- player-with-record还负责浏览器和前端页面之间的WebRTC信令传输
- 浏览器和KMS之前的媒体连接建立好之后,即可接收流媒体数据再播放出来
- 接下来进入实战,从部署KMS开始
源码下载
- 本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
- 这个git项目中有多个文件夹,本次实战的源码在kurentordemo文件夹下,如下图红框所示:
- kurentordemo是整个《Kurento实战》系列的父工程,里面有多个子工程,本篇对应的源码是子工程player-with-record,如下图红框:
部署KMS
- 为了简单操作,KMS还是采用docker的方式部署,执行如下命令即可:
docker run -d
--restart always
--name kms
--network host
kurento/kurento-media-server:6.15
- 和之前实战不同的是,KMS和player-with-record应用分别部署在不同的电脑上,因此,KMS所在机器记得关闭防火墙或者开放8888端口;
开发PlayerWithRecorder应用
- 在kurentodemo工程下,新增名为player-with-record的子工程,其pom.xml内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>kurentodemo</artifactId>
<groupId>com.bolingcavalry</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.bolingcavalry</groupId>
<artifactId>player-with-record</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>player-with-record</name>
<packaging>jar</packaging>
<description>show how to play and record the file</description>
<!--不用spring-boot-starter-parent作为parent时的配置-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.3.3.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>webjars-locator</artifactId>
</dependency>
<dependency>
<groupId>org.webjars.bower</groupId>
<artifactId>jquery</artifactId>
</dependency>
<dependency>
<groupId>org.webjars.bower</groupId>
<artifactId>bootstrap</artifactId>
</dependency>
<dependency>
<groupId>org.webjars.bower</groupId>
<artifactId>demo-console</artifactId>
</dependency>
<dependency>
<groupId>org.webjars.bower</groupId>
<artifactId>ekko-lightbox</artifactId>
</dependency>
<dependency>
<groupId>org.webjars.bower</groupId>
<artifactId>webrtc-adapter</artifactId>
</dependency>
<dependency>
<groupId>org.kurento</groupId>
<artifactId>kurento-client</artifactId>
</dependency>
<dependency>
<groupId>org.kurento</groupId>
<artifactId>kurento-utils-js</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.3.RELEASE</version>
<configuration>
<mainClass>com.bolingcavalry.playerwithrecord.PlayerWithRecorder</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
</resource>
</resources>
</build>
</project>
- 配置文件application.properties很简单:
# 端口
server.port=8080
#
spring.application.name=PlayerWithRecorder
- 新增一个数据结构UserSession.java,每个网页都对应一个UserSession实例,重点关注的是release方法,在停止播放时调用此方法释放播放器和WebRTC连接资源:
package com.bolingcavalry.playerwithrecord;
import org.kurento.client.IceCandidate;
import org.kurento.client.MediaPipeline;
import org.kurento.client.PlayerEndpoint;
import org.kurento.client.WebRtcEndpoint;
public class UserSession {
private WebRtcEndpoint webRtcEndpoint;
private MediaPipeline mediaPipeline;
private PlayerEndpoint playerEndpoint;
public UserSession() {
}
public WebRtcEndpoint getWebRtcEndpoint() {
return webRtcEndpoint;
}
public void setWebRtcEndpoint(WebRtcEndpoint webRtcEndpoint) {
this.webRtcEndpoint = webRtcEndpoint;
}
public MediaPipeline getMediaPipeline() {
return mediaPipeline;
}
public void setMediaPipeline(MediaPipeline mediaPipeline) {
this.mediaPipeline = mediaPipeline;
}
public void addCandidate(IceCandidate candidate) {
webRtcEndpoint.addIceCandidate(candidate);
}
public PlayerEndpoint getPlayerEndpoint() {
return playerEndpoint;
}
public void setPlayerEndpoint(PlayerEndpoint playerEndpoint) {
this.playerEndpoint = playerEndpoint;
}
public void release() {
this.playerEndpoint.stop();
this.mediaPipeline.release();
}
}
- 启动类PlayerWithRecorder.java,有两处要注意,一个是registerWebSocketHandlers方法用来绑定websocket的处理类,另一个是kurentoClient,KurentoClient.create方法的入参是KMS的服务地址:
package com.bolingcavalry.playerwithrecord;
import org.kurento.client.KurentoClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
@EnableWebSocket
@SpringBootApplication
public class PlayerWithRecorder implements WebSocketConfigurer {
@Bean
public PlayerHandler handler() {
return new PlayerHandler();
}
/**
* 实例化KurentoClient,入参是KMS地址
* @return
*/
@Bean
public KurentoClient kurentoClient() {
return KurentoClient.create("ws://192.168.91.128:8888/kurento");
}
@Bean
public ServletServerContainerFactoryBean createServletServerContainerFactoryBean() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(32768);
return container;
}
/**
* 标准的WebSocket处理类绑定
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(handler(), "/player");
}
public static void main(String[] args) throws Exception {
SpringApplication.run(PlayerWithRecorder.class, args);
}
}
- 接下来就是websocket的处理类PlayerHandler.java,这是本篇的核心,有几处重点稍后会提到:
package com.bolingcavalry.playerwithrecord;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.ConcurrentHashMap;
import org.kurento.client.EndOfStreamEvent;
import org.kurento.client.ErrorEvent;
import org.kurento.client.EventListener;
import org.kurento.client.IceCandidate;
import org.kurento.client.IceCandidateFoundEvent;
import org.kurento.client.KurentoClient;
import org.kurento.client.MediaPipeline;
import org.kurento.client.MediaState;
import org.kurento.client.MediaStateChangedEvent;
import org.kurento.client.PlayerEndpoint;
import org.kurento.client.ServerManager;
import org.kurento.client.VideoInfo;
import org.kurento.client.WebRtcEndpoint;
import org.kurento.commons.exception.KurentoException;
import org.kurento.jsonrpc.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
public class PlayerHandler extends TextWebSocketHandler {
@Autowired
private KurentoClient kurento;
private final Logger log = LoggerFactory.getLogger(PlayerHandler.class);
private final Gson gson = new GsonBuilder().create();
private final ConcurrentHashMap<String, UserSession> users = new ConcurrentHashMap<>();
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
JsonObject jsonMessage = gson.fromJson(message.getPayload(), JsonObject.class);
String sessionId = session.getId();
log.debug("用户[{}]收到websocket命令: {} from sessionId", sessionId, jsonMessage);
try {
switch (jsonMessage.get("id").getAsString()) {
// 开始播放
case "start":
start(session, jsonMessage);
break;
// 停止播放
case "stop":
stop(sessionId);
break;
// 暂停
case "pause":
pause(sessionId);
break;
// 恢复
case "resume":
resume(session);
break;
// 生成监控内容
case "debugDot":
debugDot(session);
break;
// 前进或者倒退
case "doSeek":
doSeek(session, jsonMessage);
break;
// 取位置
case "getPosition":
getPosition(session);
break;
// 更新WebRTC的ICE数据
case "onIceCandidate":
onIceCandidate(sessionId, jsonMessage);
break;
default:
sendError(session, "Invalid message with id " jsonMessage.get("id").getAsString());
break;
}
} catch (Throwable t) {
log.error("Exception handling message {} in sessionId {}", jsonMessage, sessionId, t);
sendError(session, t.getMessage());
}
}
private void start(final WebSocketSession session, JsonObject jsonMessage) {
// 1.新建MediaPipeline对象
MediaPipeline pipeline = kurento.createMediaPipeline();
// 2. 新建连接浏览器的WebRtcEndpoint对象
WebRtcEndpoint webRtcEndpoint = new WebRtcEndpoint.Builder(pipeline).build();
// 3.1 取出要播放的地址
String videourl = jsonMessage.get("videourl").getAsString();
// 3.2 新建负责播放的PlayerEndpoint对象
final PlayerEndpoint playerEndpoint = new PlayerEndpoint.Builder(pipeline, videourl).build();
// 4 playerEndpoint连接webRtcEndpoint,这样playerEndpoint解码出的内容通过webRtcEndpoint给到浏览器
playerEndpoint.connect(webRtcEndpoint);
// 5. WebRtc相关的操作
// 5.1 一旦收到KMS的candidate就立即给到前端
webRtcEndpoint.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
@Override
public void onEvent(IceCandidateFoundEvent event) {
JsonObject response = new JsonObject();
response.addProperty("id", "iceCandidate");
response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));
try {
synchronized (session) {
session.sendMessage(new TextMessage(response.toString()));
}
} catch (IOException e) {
log.debug(e.getMessage());
}
}
});
// SDP offer是前端给的
String sdpOffer = jsonMessage.get("sdpOffer").getAsString();
// 给前端准备SDP answer
String sdpAnswer = webRtcEndpoint.processOffer(sdpOffer);
log.info("[Handler::start] SDP Offer from browser to KMS:n{}", sdpOffer);
log.info("[Handler::start] SDP Answer from KMS to browser:n{}", sdpAnswer);
JsonObject response = new JsonObject();
response.addProperty("id", "startResponse");
response.addProperty("sdpAnswer", sdpAnswer);
sendMessage(session, response.toString());
// 6. 和媒体播放有关的操作
// 6.1 KMS会发送和媒体播放有关的消息过来,如果连接媒体成功,就把获取到的相关参数给到前端
webRtcEndpoint.addMediaStateChangedListener(new EventListener<MediaStateChangedEvent>() {
@Override
public void onEvent(MediaStateChangedEvent event) {
if (event.getNewState() == MediaState.CONNECTED) {
// 媒体相关的信息可以用getVideoInfo去的
VideoInfo videoInfo = playerEndpoint.getVideoInfo();
JsonObject response = new JsonObject();
response.addProperty("id", "videoInfo");
response.addProperty("isSeekable", videoInfo.getIsSeekable());
response.addProperty("initSeekable", videoInfo.getSeekableInit());
response.addProperty("endSeekable", videoInfo.getSeekableEnd());
response.addProperty("videoDuration", videoInfo.getDuration());
// 把这些媒体信息给前端
sendMessage(session, response.toString());
}
}
});
// 让KMS把它的ICD Candidate发过来(前面的监听会收到)
webRtcEndpoint.gatherCandidates();
// 7.1 添加媒体播放的监听:异常消息
playerEndpoint.addErrorListener(new EventListener<ErrorEvent>() {
@Override
public void onEvent(ErrorEvent event) {
log.info("ErrorEvent: {}", event.getDescription());
// 通知前端停止播放
sendPlayEnd(session);
}
});
// 7.2 添加媒体播放的监听:播放结束
playerEndpoint.addEndOfStreamListener(new EventListener<EndOfStreamEvent>() {
@Override
public void onEvent(EndOfStreamEvent event) {
log.info("EndOfStreamEvent: {}", event.getTimestamp());
// 通知前端停止播放
sendPlayEnd(session);
}
});
// 通过KMS开始连接远程媒体
playerEndpoint.play();
// 将pipeline、webRtcEndpoint、playerEndpoint这些信息放入UserSession对象中,
// 这样方便处理前端发过来的各种命令
final UserSession user = new UserSession();
user.setMediaPipeline(pipeline);
user.setWebRtcEndpoint(webRtcEndpoint);
user.setPlayerEndpoint(playerEndpoint);
users.put(session.getId(), user);
}
/**
* 暂停播放
* @param sessionId
*/
private void pause(String sessionId) {
UserSession user = users.get(sessionId);
if (user != null) {
user.getPlayerEndpoint().pause();
}
}
/**
* 从暂停恢复
* @param session
*/
private void resume(final WebSocketSession session) {
UserSession user = users.get(session.getId());
if (user != null) {
user.getPlayerEndpoint().play();
VideoInfo videoInfo = user.getPlayerEndpoint().getVideoInfo();
JsonObject response = new JsonObject();
response.addProperty("id", "videoInfo");
response.addProperty("isSeekable", videoInfo.getIsSeekable());
response.addProperty("initSeekable", videoInfo.getSeekableInit());
response.addProperty("endSeekable", videoInfo.getSeekableEnd());
response.addProperty("videoDuration", videoInfo.getDuration());
sendMessage(session, response.toString());
}
}
/**
* 停止播放
* @param sessionId
*/
private void stop(String sessionId) {
UserSession user = users.remove(sessionId);
if (user != null) {
user.release();
}
}
/**
* 取得Gstreamer的dot内容,这样的内容可以被graphviz工具解析成拓扑图
* @param session
*/
private void debugDot(final WebSocketSession session) {
UserSession user = users.get(session.getId());
if (user != null) {
final String pipelineDot = user.getMediaPipeline().getGstreamerDot();
try (PrintWriter out = new PrintWriter("player.dot")) {
out.println(pipelineDot);
} catch (IOException ex) {
log.error("[Handler::debugDot] Exception: {}", ex.getMessage());
}
final String playerDot = user.getPlayerEndpoint().getElementGstreamerDot();
try (PrintWriter out = new PrintWriter("player-decoder.dot")) {
out.println(playerDot);
} catch (IOException ex) {
log.error("[Handler::debugDot] Exception: {}", ex.getMessage());
}
}
ServerManager sm = kurento.getServerManager();
log.warn("[Handler::debugDot] CPU COUNT: {}", sm.getCpuCount());
log.warn("[Handler::debugDot] CPU USAGE: {}", sm.getUsedCpu(1000));
log.warn("[Handler::debugDot] RAM USAGE: {}", sm.getUsedMemory());
}
/**
* 跳转到指定位置
* @param session
* @param jsonMessage
*/
private void doSeek(final WebSocketSession session, JsonObject jsonMessage) {
UserSession user = users.get(session.getId());
if (user != null) {
try {
user.getPlayerEndpoint().setPosition(jsonMessage.get("position").getAsLong());
} catch (KurentoException e) {
log.debug("The seek cannot be performed");
JsonObject response = new JsonObject();
response.addProperty("id", "seek");
response.addProperty("message", "Seek failed");
sendMessage(session, response.toString());
}
}
}
/**
* 取得当前播放位置
* @param session
*/
private void getPosition(final WebSocketSession session) {
UserSession user = users.get(session.getId());
if (user != null) {
long position = user.getPlayerEndpoint().getPosition();
JsonObject response = new JsonObject();
response.addProperty("id", "position");
response.addProperty("position", position);
sendMessage(session, response.toString());
}
}
/**
* 收到前端的Ice candidate后,立即发给KMS
* @param sessionId
* @param jsonMessage
*/
private void onIceCandidate(String sessionId, JsonObject jsonMessage) {
UserSession user = users.get(sessionId);
if (user != null) {
JsonObject jsonCandidate = jsonMessage.get("candidate").getAsJsonObject();
IceCandidate candidate =
new IceCandidate(jsonCandidate.get("candidate").getAsString(), jsonCandidate
.get("sdpMid").getAsString(), jsonCandidate.get("sdpMLineIndex").getAsInt());
user.getWebRtcEndpoint().addIceCandidate(candidate);
}
}
/**
* 通知前端停止播放
* @param session
*/
public void sendPlayEnd(WebSocketSession session) {
if (users.containsKey(session.getId())) {
JsonObject response = new JsonObject();
response.addProperty("id", "playEnd");
sendMessage(session, response.toString());
}
}
/**
* 将错误信息发给前端
* @param session
* @param message
*/
private void sendError(WebSocketSession session, String message) {
if (users.containsKey(session.getId())) {
JsonObject response = new JsonObject();
response.addProperty("id", "error");
response.addProperty("message", message);
sendMessage(session, response.toString());
}
}
/**
* 给前端发送消息的方法用synchronized修饰,
* 因为收到KMS通知的时机不确定,此时可能正在给前端发送消息,存在同时调用sendMessage的可能
* @param session
* @param message
*/
private synchronized void sendMessage(WebSocketSession session, String message) {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error("Exception sending message", e);
}
}
/**
* 和前端的websocket连接断开后,此方法会被调用
* @param session
* @param status
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
stop(session.getId());
}
}
- PlayerHandler.java的代码略多,但逻辑还是很清楚的,此处整理如下:
- handleTextMessage方法负责接收websocket命令,根据不同的命令调用对应的方法,如播放,暂停等
- 最重要的就是start方法了,这里面会通知KMS创建播放器(PlayerEndpoint),WebRTC连接组件(WebRtcEndpoint),还有SDP相关的处理,如offer、answer、candidate等
- 其余的如pause、seek等方法都是调用PlayerEndpoint对应的API,并不复杂,了解即可
- 接下来是前端开发,作者欣宸并不擅长前端,直接使用了kurento官方demo的前端代码,具体代码请根据前面的提示去我的github下载,唯一要注意的是:kurento官方demo是https协议,我这里为了简单是http的,对应的index.js中websocket连接请使用http协议:
- 如下图红框,所有前端资源和代码都在static目录下,篇幅所限就不展开了:
- 至此,编码完成,可以启动应用了
验证
- 启动应用player-with-record,浏览器访问:http://localhost:8080/,效果如下:
- 找一个在线流流媒体试试播放效果,我用的是广东卫视的地址:rtmp://58.200.131.2:1935/livetv/gdtv,填入上图红框中,再点击绿色的Start按钮,效果如下图,并且声音也正常:
- 广东卫视是直播类型的,无法执行暂停、快进等操作,咱们换一个点播类型的流媒体试试,我这里用的是http://clips.vorwaerts-gmbh.de/big_buck_bunny.mp4,如下图,各种操作可以进行:
- 至此,一个简单的媒体播放器就完成了,接下来的实战,咱们给这个播放器增加一个功能:云端录制。