简介

Kurento-group-call是一个类似视频会议的官方例子,在每个用户端都会创建N个端点,相比one2one-call/one2many-call要复杂些。

在用户之间连接多个客户端,创建一个视频会议;同样,运行之前要安装Kurento媒体服务。先克隆项目,然后运行主类:

git clone https://github.com/Kurento/kurento-tutorial-java.git
cd kurento-tutorial-java/kurento-group-call
运行项目
mvn -U clean spring-boot:run \
    -Dspring-boot.run.jvmArguments="-Dkms.url=ws://{KMS_HOST}:8888/kurento"

功能分析

  1. 用户进入房间时,会创建一个新的Media,另外会通知其他用户有新用户连接,然后所有参于者将请求服务器接收新参于者的媒体。
  2. 新用户依次获取所有已连接的参于者列表,并请求服务器接收房间中所有客户端的media。
  3. 每个客户端都发送自己的media,然后从其他用户那里接收媒体,每个客户端都会有n个端点,房间总共会有n*n个端点。
  4. 当用户离开房间时,服务器会通知所有客户端。然后,客户端代码请求服务器取消与离开的客户端相关的所有媒体元素。
服务端

实例化Kurento客户端后,您就可以与Kurento媒体服务器通信并控制其多媒体功能了。GroupCallApp实现了接口WebSocketConfigurer,注册了一个WebSocketHandler用于处理WebSocket请求

@EnableWebSocket
@SpringBootApplication
public class GroupCallApp implements WebSocketConfigurer {

  @Bean
  public UserRegistry registry() {
    return new UserRegistry();
  }

  @Bean
  public RoomManager roomManager() {
    return new RoomManager();
  }

  @Bean
  public CallHandler groupCallHandler() {
    return new CallHandler();
  }

  @Bean
  public KurentoClient kurentoClient() {
    return KurentoClient.create();
  }

  public static void main(String[] args) throws Exception {
    SpringApplication.run(GroupCallApp.class, args);
  }

  @Override
  public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
    registry.addHandler(groupCallHandler(), "/groupcall");
  }
}

CallHandler类实现TextWebSocketHandler处理文本WebSocket请求。这个类的核心部分是方法handleTextMessage。此方法实现请求的操作,通过WebSocket返回响应

public class CallHandler extends TextWebSocketHandler {

  private static final Logger log = LoggerFactory.getLogger(CallHandler.class);

  private static final Gson gson = new GsonBuilder().create();

  @Autowired
  private RoomManager roomManager;

  @Autowired
  private UserRegistry registry;

  @Override
  public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    final JsonObject jsonMessage = gson.fromJson(message.getPayload(), JsonObject.class);

    final UserSession user = registry.getBySession(session);

    if (user != null) {
      log.debug("Incoming message from user '{}': {}", user.getName(), jsonMessage);
    } else {
      log.debug("Incoming message from new user: {}", jsonMessage);
    }

    switch (jsonMessage.get("id").getAsString()) {
      case "joinRoom":
        joinRoom(jsonMessage, session);
        break;
      case "receiveVideoFrom":
        final String senderName = jsonMessage.get("sender").getAsString();
        final UserSession sender = registry.getByName(senderName);
        final String sdpOffer = jsonMessage.get("sdpOffer").getAsString();
        user.receiveVideoFrom(sender, sdpOffer);
        break;
      case "leaveRoom":
        leaveRoom(user);
        break;
      case "onIceCandidate":
        JsonObject candidate = jsonMessage.get("candidate").getAsJsonObject();

        if (user != null) {
          IceCandidate cand = new IceCandidate(candidate.get("candidate").getAsString(),
              candidate.get("sdpMid").getAsString(), candidate.get("sdpMLineIndex").getAsInt());
          user.addCandidate(cand, jsonMessage.get("name").getAsString());
        }
        break;
      default:
        break;
    }
  }

  @Override
  public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
      ...
  }

  private void joinRoom(JsonObject params, WebSocketSession session) throws IOException {
      ...
  }

  private void leaveRoom(UserSession user) throws IOException {
      ...
  }
}

在afterConnectionClosed方法里,它会将用户userSession从registry房间中移除并驱逐出去

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
   UserSession user = registry.removeBySession(session);
   roomManager.getRoom(user.getRoomName()).leave(user);
}

在joinRoom方法中,服务器检查是否有指定名称的已注册房间,将用户添加到该房间并注册用户

private void joinRoom(JsonObject params, WebSocketSession session) throws IOException {
   final String roomName = params.get("room").getAsString();
   final String name = params.get("name").getAsString();
   log.info("PARTICIPANT {}: trying to join room {}", name, roomName);

   Room room = roomManager.getRoom(roomName);
   final UserSession user = room.join(name, session);
   registry.register(user);
}

leaveRoom方法结束一个用户的视频通话

private void leaveRoom(UserSession user) throws IOException {
    final Room room = roomManager.getRoom(user.getRoomName());
    room.leave(user);
    if (room.getParticipants().isEmpty()) {
      roomManager.removeRoom(room);
    }
}
客户端

创建WebSocket,在onMessage方法下监听JSON信令协议。有5个不用的消息传入到客户端:existingParticipants,newParticipantArrived,participantLeft,receiveVideoAnswer和iceCandidate。

var ws = new WebSocket('wss://' + location.host + '/groupcall');
var participants = {};
var name;

window.onbeforeunload = function() {
   ws.close();
};

ws.onmessage = function(message) {
   var parsedMessage = JSON.parse(message.data);
   console.info('Received message: ' + message.data);

   switch (parsedMessage.id) {
   case 'existingParticipants':
      onExistingParticipants(parsedMessage);
      break;
   case 'newParticipantArrived':
      onNewParticipant(parsedMessage);
      break;
   case 'participantLeft':
      onParticipantLeft(parsedMessage);
      break;
   case 'receiveVideoAnswer':
      receiveVideoResponse(parsedMessage);
      break;
   case 'iceCandidate':
      participants[parsedMessage.name].rtcPeer.addIceCandidate(parsedMessage.candidate, function (error) {
           if (error) {
            console.error("Error adding candidate: " + error);
            return;
           }
       });
       break;
   default:
      console.error('Unrecognized message', parsedMessage);
   }
}

function register() {
   name = document.getElementById('name').value;
   var room = document.getElementById('roomName').value;

   document.getElementById('room-header').innerText = 'ROOM ' + room;
   document.getElementById('join').style.display = 'none';
   document.getElementById('room').style.display = 'block';

   var message = {
      id : 'joinRoom',
      name : name,
      room : room,
   }
   sendMessage(message);
}

function onNewParticipant(request) {
   receiveVideo(request.name);
}

function receiveVideoResponse(result) {
   participants[result.name].rtcPeer.processAnswer (result.sdpAnswer, function (error) {
      if (error) return console.error (error);
   });
}

function callResponse(message) {
   if (message.response != 'accepted') {
      console.info('Call not accepted by peer. Closing call');
      stop();
   } else {
      webRtcPeer.processAnswer(message.sdpAnswer, function (error) {
         if (error) return console.error (error);
      });
   }
}

function onExistingParticipants(msg) {
   var constraints = {
      audio : true,
      video : {
         mandatory : {
            maxWidth : 320,
            maxFrameRate : 15,
            minFrameRate : 15
         }
      }
   };
   console.log(name + " registered in room " + room);
   var participant = new Participant(name);
   participants[name] = participant;
   var video = participant.getVideoElement();

   var options = {
         localVideo: video,
         mediaConstraints: constraints,
         onicecandidate: participant.onIceCandidate.bind(participant)
       }
   participant.rtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerSendonly(options,
      function (error) {
        if(error) {
           return console.error(error);
        }
        this.generateOffer (participant.offerToReceiveVideo.bind(participant));
   });

   msg.data.forEach(receiveVideo);
}

function leaveRoom() {
   sendMessage({
      id : 'leaveRoom'
   });

   for ( var key in participants) {
      participants[key].dispose();
   }

   document.getElementById('join').style.display = 'block';
   document.getElementById('room').style.display = 'none';

   ws.close();
}

function receiveVideo(sender) {
   var participant = new Participant(sender);
   participants[sender] = participant;
   var video = participant.getVideoElement();

   var options = {
      remoteVideo: video,
      onicecandidate: participant.onIceCandidate.bind(participant)
    }

   participant.rtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly(options,
         function (error) {
           if(error) {
              return console.error(error);
           }
           this.generateOffer (participant.offerToReceiveVideo.bind(participant));
   });;
}

function onParticipantLeft(request) {
   console.log('Participant ' + request.name + ' left');
   var participant = participants[request.name];
   participant.dispose();
   delete participants[request.name];
}

function sendMessage(message) {
   var jsonMessage = JSON.stringify(message);
   console.log('Sending message: ' + jsonMessage);
   ws.send(jsonMessage);
}

这里主要依赖于kurento-utils来完成

Android

上面是官方的代码分析,接下来主要用Android来实现客户端功能,跟one2one一样,都是引用org.webrtc:

    implementation 'org.webrtc:google-webrtc:1.0.32006'
    implementation 'org.java-websocket:Java-WebSocket:1.5.3'
    implementation "com.google.code.gson:gson:2.+"

初始化WebSocket,在onMessage里监听JSON信令消息,实现五个方法:existingParticipants,newParticipantArrived,participantLeft,receiveVideoAnswer和iceCandidate

        when (id) {
            "existingParticipants" -> {
                onExistingParticipants(jsonObject)
            }
            "newParticipantArrived" -> {
                onNewParticipantArrived(jsonObject)
            }
            "participantLeft" -> {
                onParticipantLeft(jsonObject)
            }
            "receiveVideoAnswer" -> {
                onReceiveVideoAnswer(jsonObject)
            }
            "iceCandidate" -> {
                iceCandidate(jsonObject)
            }
        }
  • existingParticipants:在加入房间后会收到existingParticipants,并告诉有哪些参于者
  • newParticipantArrived:如果有新的参于者加入时,会收到此信息
  • participantLeft:有参于者离开,会收到此信息
  • receiveVideoAnswer:接收到服务器发送的Answer信令
  • iceCandidate:candidates事件
发送给服务端的请求:
  • joinRoom:加入到房间的请求
  • receiveVideoFrom:发起接收某参于者时间的请求
  • leaveRoom:离开房间的请求
  • onIceCandidate:将candidate发送到服务端
手机与电脑完成连接