diff --git a/daemon/Makefile b/daemon/Makefile index 04c689ef0..119d680f2 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -83,7 +83,7 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \ media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \ codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c t38.c websocket.c \ - mqtt.c + mqtt.c janus.strhash.c LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c dtmflib.c ifeq ($(with_transcoding),yes) LIBSRCS+= codeclib.c resample.c diff --git a/daemon/call.c b/daemon/call.c index ceabe14cd..788ec561e 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -48,6 +48,7 @@ #include "jitter_buffer.h" #include "t38.h" #include "mqtt.h" +#include "janus.h" struct iterator_helper { @@ -77,7 +78,6 @@ unsigned int call_socket_cpu_affinity = 0; /* ********** */ static void __monologue_destroy(struct call_monologue *monologue, int recurse); -static int monologue_destroy(struct call_monologue *ml); static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval_start, struct timeval *interval_duration); static void __call_free(void *p); @@ -138,7 +138,7 @@ void call_make_own_foreign(struct call *c, bool foreign) { static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) { GList *it; unsigned int check; - int good = 0; + bool good = false; struct packet_stream *ps; struct stream_fd *sfd; int tmp_t_reason = UNKNOWN; @@ -178,8 +178,9 @@ static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) { goto delete; } + // conference: call can be created without participants added if (!c->streams.head) - goto drop; + goto out; // ignore media timeout if call was recently taken over if (c->foreign_media && rtpe_now.tv_sec - c->last_signal <= rtpe_config.timeout) @@ -223,7 +224,7 @@ no_sfd: } if (rtpe_now.tv_sec - atomic64_get(timestamp) < check) - good = 1; + good = true; next: ; @@ -250,7 +251,6 @@ next: ilog(LOG_INFO, "Closing call due to timeout"); -drop: hlp->del_timeout = g_slist_prepend(hlp->del_timeout, obj_get(c)); goto out; @@ -3048,6 +3048,10 @@ static void __call_cleanup(struct call *c) { } recording_finish(c); + + if (c->janus_session) + __obj_put((void *) c->janus_session); + c->janus_session = NULL; } /* called lock-free, but must hold a reference to the call */ @@ -3642,7 +3646,7 @@ static void __monologue_destroy(struct call_monologue *monologue, int recurse) { } /* must be called with call->master_lock held in W */ -static int monologue_destroy(struct call_monologue *ml) { +int monologue_destroy(struct call_monologue *ml) { struct call *c = ml->call; __monologue_destroy(ml, 1); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index dfa8e3012..93d47b6fa 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -306,7 +306,7 @@ static void streams_parse(const char *s, GQueue *q) { i = 0; pcre_multi_match(streams_re, streams_ree, s, 3, streams_parse_func, &i, q); } -static void call_unlock_release(struct call **c) { +void call_unlock_release(struct call **c) { if (!*c) return; rwlock_unlock_w(&(*c)->master_lock); @@ -948,17 +948,22 @@ static void call_ng_flags_flags(struct sdp_ng_flags *out, str *s, void *dummy) { STR_FMT(s)); } } -static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *input, enum call_opmode opmode) { - bencode_item_t *list, *it, *dict; - int diridx; - str s; +void call_ng_flags_init(struct sdp_ng_flags *out, enum call_opmode opmode) { ZERO(*out); out->opmode = opmode; out->trust_address = trust_address_def; out->dtls_passive = dtls_passive_def; out->dtls_reverse_passive = dtls_passive_def; +} + +static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *input, enum call_opmode opmode) { + bencode_item_t *list, *it, *dict; + int diridx; + str s; + + call_ng_flags_init(out, opmode); call_ng_flags_list(out, input, "flags", call_ng_flags_flags, NULL); call_ng_flags_list(out, input, "replace", call_ng_flags_replace, NULL); @@ -1202,7 +1207,7 @@ static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *inpu } } } -static void call_ng_free_flags(struct sdp_ng_flags *flags) { +void call_ng_free_flags(struct sdp_ng_flags *flags) { if (flags->codec_except) g_hash_table_destroy(flags->codec_except); if (flags->codec_set) @@ -1362,7 +1367,7 @@ static void fragments_cleanup(int all) { } -static void save_last_sdp(struct call_monologue *ml, str *sdp, GQueue *parsed, GQueue *streams) { +void save_last_sdp(struct call_monologue *ml, str *sdp, GQueue *parsed, GQueue *streams) { str_free_dup(&ml->last_in_sdp); ml->last_in_sdp = *sdp; *sdp = STR_NULL; diff --git a/daemon/janus.c b/daemon/janus.c new file mode 100644 index 000000000..0889ae30b --- /dev/null +++ b/daemon/janus.c @@ -0,0 +1,1554 @@ +#include "janus.h" +#include +#include +#include "websocket.h" +#include "log.h" +#include "main.h" +#include "obj.h" +#include "call.h" +#include "sdp.h" +#include "call_interfaces.h" +#include "rtplib.h" +#include "ice.h" + + +struct janus_session { // "login" session + struct obj obj; + uint64_t id; + mutex_t lock; + time_t last_act; + GHashTable *websockets; // controlling transports + GHashTable *handles; +}; +struct janus_handle { // corresponds to a conference participant + uint64_t id; + uint64_t session; + uint64_t room; + enum { + HANDLE_TYPE_NONE = 0, + HANDLE_TYPE_CONTROLLING, + HANDLE_TYPE_PUBLISHER, + HANDLE_TYPE_SUBSCRIBER, + } type; +}; +struct janus_room { + uint64_t id; + str call_id; + int num_publishers; + struct janus_session *session; // controlling session + uint64_t handle_id; // controlling handle which created the room + GHashTable *publishers; // handle ID -> feed ID + GHashTable *subscribers; // handle ID -> subscribed feed ID +}; + + +static mutex_t janus_lock; +static GHashTable *janus_tokens; +static GHashTable *janus_sessions; +static GHashTable *janus_handles; +static GHashTable *janus_rooms; +static GHashTable *janus_feeds; + + +static void __janus_session_free(void *p) { + struct janus_session *s = p; + g_hash_table_destroy(s->websockets); + g_hash_table_destroy(s->handles); + mutex_destroy(&s->lock); +} + + +// XXX we have several hash tables that hold references to objs - unify all these +static struct janus_session *janus_get_session(uint64_t id) { + mutex_lock(&janus_lock); + struct janus_session *ret = g_hash_table_lookup(janus_sessions, &id); + if (ret) + obj_hold(ret); + mutex_unlock(&janus_lock); + if (!ret) + return NULL; + mutex_lock(&ret->lock); + ret->last_act = rtpe_now.tv_sec; + mutex_unlock(&ret->lock); + return ret; +} + + +static uint64_t *uint64_dup(uint64_t u) { + uint64_t *ret = g_malloc(sizeof(*ret)); + *ret = u; + return ret; +} +INLINE uint64_t janus_random(void) { + return ssl_random() & 0x7ffffffffffffLL; +} +static uint64_t jr_str_int(JsonReader *r) { + uint64_t ret = json_reader_get_int_value(r); + if (ret) + return ret; + const char *s = json_reader_get_string_value(r); + if (!s || !*s) + return 0; + char *ep; + ret = strtoull(s, &ep, 10); + if (*ep) + return 0; + return ret; +} + + +// frees 'builder' +static const char *janus_send_json_msg(struct websocket_message *wm, JsonBuilder *builder, int code, bool done) { + JsonGenerator *gen = json_generator_new(); + JsonNode *root = json_builder_get_root(builder); + json_generator_set_root(gen, root); + char *result = json_generator_to_data(gen, NULL); + + json_node_free(root); + g_object_unref(gen); + g_object_unref(builder); + + const char *ret = NULL; + + if (wm->method == M_WEBSOCKET) + websocket_write_text(wm->wc, result, done); + else { + if (!code) + ret = "Tried to send asynchronous event to HTTP"; + else if (websocket_http_response(wm->wc, code, "application/json", strlen(result))) + ret = "Failed to write Janus response HTTP headers"; + else if (websocket_write_http(wm->wc, result, done)) + ret = "Failed to write Janus JSON response"; + } + + g_free(result); + + return ret; +} + + +static void janus_send_ack(struct websocket_message *wm, const char *transaction, uint64_t session_id) { + // build and send an early ack + JsonBuilder *ack = json_builder_new(); + json_builder_begin_object(ack); // { + json_builder_set_member_name(ack, "janus"); + json_builder_add_string_value(ack, "ack"); + json_builder_set_member_name(ack, "transaction"); + json_builder_add_string_value(ack, transaction); + json_builder_set_member_name(ack, "session_id"); + json_builder_add_int_value(ack, session_id); + json_builder_end_object(ack); // } + + janus_send_json_msg(wm, ack, 0, false); +} + + +static const char *janus_videoroom_create(struct janus_session *session, struct janus_handle *handle, + JsonBuilder *builder, JsonReader *reader, int *retcode) +{ + *retcode = 436; + if (handle->type != HANDLE_TYPE_NONE && handle->type != HANDLE_TYPE_CONTROLLING) + return "User already exists in a room"; + + // create new videoroom + struct janus_room *room = g_slice_alloc0(sizeof(*room)); + + if (json_reader_read_member(reader, "publishers")) + room->num_publishers = jr_str_int(reader); + json_reader_end_member(reader); + if (room->num_publishers <= 0) + room->num_publishers = 3; + room->session = obj_get(session); // XXX replace with just the ID? + room->handle_id = handle->id; // controlling handle + // XXX optimise for 64-bit archs + room->publishers = g_hash_table_new_full(g_int64_hash, g_int64_equal, g_free, g_free); + room->subscribers = g_hash_table_new_full(g_int64_hash, g_int64_equal, g_free, g_free); + + uint64_t room_id; + mutex_lock(&janus_lock); + while (1) { + room_id = room->id = janus_random(); + if (g_hash_table_lookup(janus_rooms, &room->id)) + continue; + room->call_id.s = g_strdup_printf("janus %" PRIu64, room_id); + room->call_id.len = strlen(room->call_id.s); + struct call *call = call_get_or_create(&room->call_id, false, true); + if (!call) { + ilog(LOG_WARN, "Call with reserved Janus ID '" STR_FORMAT + "' already exists", STR_FMT(&room->call_id)); + g_free(room->call_id.s); + continue; + } + if (!call->created_from) + call->created_from = "janus"; + g_hash_table_insert(janus_rooms, &room->id, room); + call->janus_session = obj_get(session); + rwlock_unlock_w(&call->master_lock); + obj_put(call); + break; + } + + mutex_unlock(&janus_lock); + + handle->type = HANDLE_TYPE_CONTROLLING; + + ilog(LOG_INFO, "Created new videoroom with ID %" PRIu64, room_id); + + json_builder_set_member_name(builder, "videoroom"); + json_builder_add_string_value(builder, "created"); + json_builder_set_member_name(builder, "room"); + json_builder_add_int_value(builder, room_id); + json_builder_set_member_name(builder, "permanent"); + json_builder_add_boolean_value(builder, false); + + return NULL; +} + + +static const char *janus_videoroom_exists(struct janus_session *session, + JsonBuilder *builder, uint64_t room_id) +{ + struct janus_room *room = NULL; + + bool exists = false; + + { + LOCK(&janus_lock); + + if (room_id) + room = g_hash_table_lookup(janus_rooms, &room_id); + if (room && room->session != session) + room = NULL; + if (room) { + struct call *call = call_get(&room->call_id); + if (call) { + exists = true; + rwlock_unlock_w(&call->master_lock); + obj_put(call); + } + } + } + + json_builder_set_member_name(builder, "videoroom"); + json_builder_add_string_value(builder, "success"); + json_builder_set_member_name(builder, "room"); + json_builder_add_int_value(builder, room_id); + json_builder_set_member_name(builder, "exists"); + json_builder_add_boolean_value(builder, exists); + + return NULL; +} + + +static const char *janus_videoroom_destroy(struct janus_session *session, + JsonBuilder *builder, int *retcode, uint64_t room_id) +{ + struct janus_room *room = NULL; + + { + LOCK(&janus_lock); + + if (room_id) + room = g_hash_table_lookup(janus_rooms, &room_id); + if (room && room->session != session) + room = NULL; + *retcode = 426; + if (!room) + return "No such room"; + + ilog(LOG_INFO, "Destroying videoroom with ID %" PRIu64, room_id); + + g_hash_table_remove(janus_rooms, &room_id); + } + + struct call *call = call_get(&room->call_id); + // XXX if call is destroyed separately, room persist -> room should be destroyed too + if (call) { + rwlock_unlock_w(&call->master_lock); + call_destroy(call); + obj_put(call); + } + + g_free(room->call_id.s); + obj_put(room->session); + g_hash_table_destroy(room->publishers); + g_hash_table_destroy(room->subscribers); + g_slice_free1(sizeof(*room), room); + + //XXX notify? + + json_builder_set_member_name(builder, "videoroom"); + json_builder_add_string_value(builder, "destroyed"); + json_builder_set_member_name(builder, "room"); + json_builder_add_int_value(builder, room_id); + json_builder_set_member_name(builder, "permanent"); + json_builder_add_boolean_value(builder, false); + + return NULL; +} + + +static void janus_publishers_list(JsonBuilder *builder, struct janus_room *room, uint64_t feed_id) { + json_builder_begin_array(builder); // [ + + GHashTableIter iter; + gpointer value; + g_hash_table_iter_init(&iter, room->publishers); + + while (g_hash_table_iter_next(&iter, NULL, &value)) { + uint64_t *u64 = value; + if (*u64 == feed_id) // skip self + continue; + json_builder_begin_object(builder); // { + json_builder_set_member_name(builder, "id"); + json_builder_add_int_value(builder, *u64); + // XXX + json_builder_end_object(builder); // } + } + + json_builder_end_array(builder); // ] +} + + +static const char *janus_videoroom_join(struct websocket_message *wm, struct janus_session *session, + const char *transaction, + struct janus_handle *handle, JsonBuilder *builder, JsonReader *reader, const char **successp, + int *retcode, + char **jsep_type_out, str *jsep_sdp_out, + uint64_t room_id) +{ + janus_send_ack(wm, transaction, session->id); + + *retcode = 456; + if (!json_reader_read_member(reader, "ptype")) + return "JSON object does not contain 'message.ptype' key"; + const char *ptype = json_reader_get_string_value(reader); + if (!ptype) + return "JSON object does not contain 'message.ptype' key"; + json_reader_end_member(reader); + + *retcode = 430; + bool is_pub = false; + if (!strcmp(ptype, "publisher")) + is_pub = true; + else if (!strcmp(ptype, "subscriber") || !strcmp(ptype, "listener")) + is_pub = false; + else + return "Invalid 'ptype'"; + + { + LOCK(&janus_lock); + + struct janus_room *room = NULL; + if (room_id) + room = g_hash_table_lookup(janus_rooms, &room_id); + if (room && room->session != session) + room = NULL; + *retcode = 426; + if (!room) + return "No such room"; + + // XXX more granular locking? + *retcode = 436; + if (room->handle_id == handle->id) + return "User already exists in the room as a controller"; + if (g_hash_table_lookup(room->subscribers, &handle->id)) + return "User already exists in the room as a subscriber"; + if (g_hash_table_lookup(room->publishers, &handle->id)) + return "User already exists in the room as a publisher"; + if (handle->type != HANDLE_TYPE_NONE) + return "User already exists in the room"; + + uint64_t feed_id = 0; + if (is_pub) { + // random feed ID + while (1) { + feed_id = janus_random(); + if (!feed_id) + continue; + if (g_hash_table_lookup(janus_feeds, &feed_id)) + continue; + break; + } + + // feed ID points to the handle + g_hash_table_insert(janus_feeds, uint64_dup(feed_id), uint64_dup(handle->id)); + // handle ID points to the feed + g_hash_table_insert(room->publishers, uint64_dup(handle->id), uint64_dup(feed_id)); + } + else { + // subscriber + // get the feed ID + *retcode = 456; + if (!json_reader_read_member(reader, "feed")) + return "JSON object does not contain 'message.feed' key"; + feed_id = jr_str_int(reader); + if (!feed_id) + return "JSON object does not contain 'message.feed' key"; + + // does the feed actually exist? get the feed handle + *retcode = 512; + uint64_t *feed_handle = g_hash_table_lookup(janus_feeds, &feed_id); + if (!feed_handle) + return "No such feed exists"; + if (!g_hash_table_lookup(room->publishers, feed_handle)) + return "No such feed handle exists"; + + // handle ID points to the subscribed feed + g_hash_table_insert(room->subscribers, uint64_dup(handle->id), uint64_dup(feed_id)); + + // add the subscription + AUTO_CLEANUP_NULL(struct call *call, call_unlock_release); + *retcode = 426; + call = call_get(&room->call_id); + if (!call) + return "No such room"; + + AUTO_CLEANUP_GBUF(source_handle_buf); + source_handle_buf = g_strdup_printf("%" PRIu64, *feed_handle); + str source_handle_str; + str_init(&source_handle_str, source_handle_buf); + struct call_monologue *source_ml = call_get_monologue(call, &source_handle_str); + if (!source_ml) + return "Feed not found"; + + AUTO_CLEANUP_GBUF(dest_handle_buf); + dest_handle_buf = g_strdup_printf("%" PRIu64, handle->id); + str dest_handle_str; + str_init(&dest_handle_str, dest_handle_buf); + struct call_monologue *dest_ml = call_get_or_create_monologue(call, &dest_handle_str); + + AUTO_CLEANUP(struct sdp_ng_flags flags, call_ng_free_flags); + call_ng_flags_init(&flags, OP_REQUEST); + + // set all WebRTC-specific attributes + flags.transport_protocol = &transport_protocols[PROTO_UDP_TLS_RTP_SAVPF]; + flags.ice_option = ICE_FORCE; + flags.trickle_ice = 1; + flags.generate_mid = 1; + flags.rtcp_mux_offer = 1; + flags.rtcp_mux_require = 1; + flags.no_rtcp_attr = 1; + flags.sdes_off = 1; + + int ret = monologue_subscribe_request(source_ml, dest_ml, &flags); + if (ret) + return "Subscribe error"; + + struct sdp_chopper *chopper = sdp_chopper_new(&source_ml->last_in_sdp); + ret = sdp_replace(chopper, &source_ml->last_in_sdp_parsed, dest_ml, &flags); + sdp_chopper_destroy_ret(chopper, jsep_sdp_out); + + if (ret) + return "Error generating SDP"; + *jsep_type_out = "offer"; + } + + handle->type = is_pub ? HANDLE_TYPE_PUBLISHER : HANDLE_TYPE_SUBSCRIBER; + handle->room = room_id; + + ilog(LOG_INFO, "Handle %" PRIu64 " has joined room %" PRIu64 " as %s (feed %" PRIu64 ")", + handle->id, room_id, + is_pub ? "publisher" : "subscriber", feed_id); + + *successp = "event"; + + if (is_pub) { + json_builder_set_member_name(builder, "videoroom"); + json_builder_add_string_value(builder, "joined"); + json_builder_set_member_name(builder, "room"); + json_builder_add_int_value(builder, room_id); + json_builder_set_member_name(builder, "id"); + json_builder_add_int_value(builder, feed_id); + json_builder_set_member_name(builder, "publishers"); + janus_publishers_list(builder, room, feed_id); + } + else { + // subscriber + json_builder_set_member_name(builder, "videoroom"); + json_builder_add_string_value(builder, "attached"); + json_builder_set_member_name(builder, "room"); + json_builder_add_int_value(builder, room_id); + json_builder_set_member_name(builder, "id"); + json_builder_add_int_value(builder, feed_id); + } + } + + return NULL; +} + + +static void janus_notify_publishers(struct websocket_message *wm, uint64_t room_id, uint64_t except) { + LOCK(&janus_lock); + + struct janus_room *room = g_hash_table_lookup(janus_rooms, &room_id); + if (!room) + return; + if (!room->session) + return; + + GHashTableIter iter; + gpointer key, value; + g_hash_table_iter_init(&iter, room->publishers); + + while (g_hash_table_iter_next(&iter, &key, &value)) { + uint64_t *handle = key; + if (*handle == except) + continue; + + uint64_t *feed = value; + + JsonBuilder *event = json_builder_new(); + json_builder_begin_object(event); // { + json_builder_set_member_name(event, "janus"); + json_builder_add_string_value(event, "event"); + json_builder_set_member_name(event, "session_id"); + json_builder_add_int_value(event, room->session->id); + json_builder_set_member_name(event, "sender"); + json_builder_add_int_value(event, *handle); // destination of notification + json_builder_set_member_name(event, "plugindata"); + json_builder_begin_object(event); // { + json_builder_set_member_name(event, "plugin"); + json_builder_add_string_value(event, "janus.plugin.videoroom"); + json_builder_set_member_name(event, "data"); + json_builder_begin_object(event); // { + json_builder_set_member_name(event, "videoroom"); + json_builder_add_string_value(event, "event"); + json_builder_set_member_name(event, "room"); + json_builder_add_int_value(event, room_id); + json_builder_set_member_name(event, "publishers"); + janus_publishers_list(event, room, *feed); + json_builder_end_object(event); // } + json_builder_end_object(event); // } + json_builder_end_object(event); // } + + janus_send_json_msg(wm, event, 0, false); + } +} + + +static const char *janus_videoroom_configure(struct websocket_message *wm, struct janus_session *session, + const char *jsep_type, const char *jsep_sdp, + const char *transaction, + struct janus_handle *handle, JsonBuilder *builder, JsonReader *reader, const char **successp, + int *retcode, + char **jsep_type_out, str *jsep_sdp_out, + uint64_t room_id) +{ + janus_send_ack(wm, transaction, session->id); + + *retcode = 456; + if (!json_reader_read_member(reader, "feed")) + return "JSON object does not contain 'message.feed' key"; + //uint64_t feed_id = jr_str_int(reader); // needed? + if (!room_id) + return "JSON object does not contain 'message.room' key"; + json_reader_end_member(reader); + +// bool is_audio = true; +// if (json_reader_read_member(reader, "audio")) +// is_audio = json_reader_get_boolean_value(reader); +// json_reader_end_member(reader); + +// bool is_video = true; +// if (json_reader_read_member(reader, "video")) +// is_video = json_reader_get_boolean_value(reader); +// json_reader_end_member(reader); + + *retcode = 512; + + if (handle->room != room_id || handle->type != HANDLE_TYPE_PUBLISHER) + return "Not a publisher"; + if (!jsep_type || !jsep_sdp) + return "No SDP"; + if (strcmp(jsep_type, "offer")) + return "Not an offer"; + + AUTO_CLEANUP(str sdp_in, str_free_dup) = STR_NULL; + str_init_dup(&sdp_in, jsep_sdp); + + AUTO_CLEANUP(struct sdp_ng_flags flags, call_ng_free_flags); + AUTO_CLEANUP(GQueue parsed, sdp_free) = G_QUEUE_INIT; + AUTO_CLEANUP(GQueue streams, sdp_streams_free) = G_QUEUE_INIT; + call_ng_flags_init(&flags, OP_PUBLISH); + *retcode = 512; + if (sdp_parse(&sdp_in, &parsed, &flags)) + return "Failed to parse SDP"; + if (sdp_streams(&parsed, &streams, &flags)) + return "Incomplete SDP specificiation"; + + AUTO_CLEANUP_NULL(struct call *call, call_unlock_release); + + { + LOCK(&janus_lock); + + struct janus_room *room = g_hash_table_lookup(janus_rooms, &room_id); + if (room && room->session != session) + room = NULL; + *retcode = 426; + if (!room) + return "No such room"; + call = call_get(&room->call_id); + // XXX if call is destroyed separately, room persists -> room should be destroyed too + if (!call) + return "No such room"; + } + + AUTO_CLEANUP_GBUF(handle_buf); + handle_buf = g_strdup_printf("%" PRIu64, handle->id); + str handle_str; + str_init(&handle_str, handle_buf); + struct call_monologue *ml = call_get_or_create_monologue(call, &handle_str); + + // accept unsupported codecs if necessary + flags.accept_any = 1; + + int ret = monologue_publish(ml, &streams, &flags); + if (ret) + return "Publish error"; + + // XXX check there's only one audio and one video stream? + + AUTO_CLEANUP(str sdp_out, str_free_dup) = STR_NULL; + ret = sdp_create(&sdp_out, ml, &flags); + if (!ret) { + save_last_sdp(ml, &sdp_in, &parsed, &streams); + *jsep_sdp_out = sdp_out; + sdp_out = STR_NULL; // ownership passed to output + + *jsep_type_out = "answer"; + + *successp = "event"; + json_builder_set_member_name(builder, "videoroom"); + json_builder_add_string_value(builder, "event"); + json_builder_set_member_name(builder, "room"); + json_builder_add_int_value(builder, room_id); + json_builder_set_member_name(builder, "configured"); + json_builder_add_string_value(builder, "ok"); + + for (GList *l = ml->medias.head; l; l = l->next) { + struct call_media *media = l->data; + const char *ent = NULL; + if (media->type_id == MT_AUDIO) + ent = "audio_codec"; + else if (media->type_id == MT_VIDEO) + ent = "video_codec"; + else + continue; + + const char *codec = NULL; + for (GList *k = media->codecs.codec_prefs.head; k; k = k->next) { + struct rtp_payload_type *pt = k->data; + codec = pt->encoding.s; + // XXX check codec support? + break; + } + if (!codec) + continue; + + json_builder_set_member_name(builder, ent); + json_builder_add_string_value(builder, codec); + } + + janus_notify_publishers(wm, room_id, handle->id); + } + + return NULL; +} + + +static const char *janus_videoroom_start(struct websocket_message *wm, struct janus_session *session, + const char *jsep_type, const char *jsep_sdp, + const char *transaction, + struct janus_handle *handle, JsonBuilder *builder, JsonReader *reader, const char **successp, + int *retcode, + uint64_t room_id) +{ + janus_send_ack(wm, transaction, session->id); + + *retcode = 456; + if (!json_reader_read_member(reader, "feed")) + return "JSON object does not contain 'message.feed' key"; + uint64_t feed_id = jr_str_int(reader); // needed? + if (!feed_id) + return "JSON object does not contain 'message.feed' key"; + if (!room_id) + return "JSON object does not contain 'message.room' key"; + json_reader_end_member(reader); + + if (handle->room != room_id || handle->type != HANDLE_TYPE_SUBSCRIBER) + return "Not a subscriber"; + if (!jsep_type || !jsep_sdp) + return "No SDP"; + if (strcmp(jsep_type, "answer")) + return "Not an answer"; + + AUTO_CLEANUP(str sdp_in, str_free_dup) = STR_NULL; + str_init_dup(&sdp_in, jsep_sdp); + + AUTO_CLEANUP(struct sdp_ng_flags flags, call_ng_free_flags); + AUTO_CLEANUP(GQueue parsed, sdp_free) = G_QUEUE_INIT; + AUTO_CLEANUP(GQueue streams, sdp_streams_free) = G_QUEUE_INIT; + call_ng_flags_init(&flags, OP_PUBLISH); + *retcode = 512; + if (sdp_parse(&sdp_in, &parsed, &flags)) + return "Failed to parse SDP"; + if (sdp_streams(&parsed, &streams, &flags)) + return "Incomplete SDP specificiation"; + + AUTO_CLEANUP_NULL(struct call *call, call_unlock_release); + + { + LOCK(&janus_lock); + + struct janus_room *room = g_hash_table_lookup(janus_rooms, &room_id); + if (room && room->session != session) + room = NULL; + *retcode = 426; + if (!room) + return "No such room"; + call = call_get(&room->call_id); + if (!call) + return "No such room"; + + *retcode = 512; + uint64_t *feed_handle = g_hash_table_lookup(janus_feeds, &feed_id); + if (!feed_handle) + return "No such feed exists"; + + AUTO_CLEANUP_GBUF(source_handle_buf); + source_handle_buf = g_strdup_printf("%" PRIu64, *feed_handle); + str source_handle_str; + str_init(&source_handle_str, source_handle_buf); + struct call_monologue *source_ml = call_get_monologue(call, &source_handle_str); + if (!source_ml) + return "Feed not found"; + + AUTO_CLEANUP_GBUF(dest_handle_buf); + dest_handle_buf = g_strdup_printf("%" PRIu64, handle->id); + str dest_handle_str; + str_init(&dest_handle_str, dest_handle_buf); + struct call_monologue *dest_ml = call_get_monologue(call, &dest_handle_str); + if (!dest_ml) + return "Subscriber not found"; + + int ret = monologue_subscribe_answer(source_ml, dest_ml, &flags, &streams); + if (ret) + return "Failed to process subscription answer"; + } + + *successp = "event"; + json_builder_set_member_name(builder, "videoroom"); + json_builder_add_string_value(builder, "event"); + json_builder_set_member_name(builder, "room"); + json_builder_add_int_value(builder, room_id); + json_builder_set_member_name(builder, "started"); + json_builder_add_string_value(builder, "ok"); + + return NULL; +} + + +// session is locked, which also locks handle +static const char *janus_videoroom(struct websocket_message *wm, struct janus_session *session, + const char *jsep_type, const char *jsep_sdp, + const char *transaction, + struct janus_handle *handle, JsonBuilder *builder, JsonReader *reader, const char **successp, + int *retcodep, char **jsep_type_out, str *jsep_sdp_out) +{ + uint64_t room_id = 0; + + if (json_reader_read_member(reader, "room")) + room_id = jr_str_int(reader); + json_reader_end_member(reader); + + int retcode = 456; + const char *err = "JSON object does not contain 'message.request' key"; + if (!json_reader_read_member(reader, "request")) + goto err; + const char *req = json_reader_get_string_value(reader); + if (!req) + goto err; + str req_str; + str_init(&req_str, (char *) req); + json_reader_end_member(reader); + + switch (__csh_lookup(&req_str)) { + case CSH_LOOKUP("create"): + err = janus_videoroom_create(session, handle, builder, reader, &retcode); + break; + + case CSH_LOOKUP("exists"): + err = janus_videoroom_exists(session, builder, room_id); + break; + + case CSH_LOOKUP("destroy"): + err = janus_videoroom_destroy(session, builder, &retcode, room_id); + break; + + case CSH_LOOKUP("join"): + err = janus_videoroom_join(wm, session, transaction, handle, builder, reader, successp, + &retcode, jsep_type_out, jsep_sdp_out, room_id); + break; + + case CSH_LOOKUP("configure"): + err = janus_videoroom_configure(wm, session, jsep_type, jsep_sdp, transaction, + handle, builder, reader, successp, &retcode, jsep_type_out, jsep_sdp_out, + room_id); + break; + + case CSH_LOOKUP("start"): + err = janus_videoroom_start(wm, session, jsep_type, jsep_sdp, transaction, + handle, builder, reader, successp, + &retcode, room_id); + break; + + default: + retcode = 423; + err = "Unknown videoroom request"; + break; + } + +err: + if (err) + *retcodep = retcode; + return err; +} + + +const char *janus_add_token(JsonReader *reader, JsonBuilder *builder, bool authorised, int *retcode) { + *retcode = 403; + if (!authorised) + return "Janus 'admin_secret' key not provided or incorrect"; + + const char *token = NULL; + if (json_reader_read_member(reader, "token")) + token = json_reader_get_string_value(reader); + json_reader_end_member(reader); + + *retcode = 456; + if (!token) + return "JSON object does not contain 'token' key"; + + time_t *now = g_malloc(sizeof(*now)); + *now = rtpe_now.tv_sec; + mutex_lock(&janus_lock); + g_hash_table_replace(janus_tokens, g_strdup(token), now); + mutex_unlock(&janus_lock); + + json_builder_set_member_name(builder, "data"); + json_builder_begin_object(builder); // { + json_builder_set_member_name(builder, "plugins"); + json_builder_begin_array(builder); // [ + json_builder_add_string_value(builder, "janus.plugin.videoroom"); + json_builder_end_array(builder); // ] + json_builder_end_object(builder); // } + + return NULL; +} + + +const char *janus_create(JsonReader *reader, JsonBuilder *builder, struct websocket_message *wm) { + uint64_t session_id = 0; + if (json_reader_read_member(reader, "id")) + session_id = jr_str_int(reader); + json_reader_end_member(reader); + + struct janus_session *session = obj_alloc0("janus_session", sizeof(*session), __janus_session_free); + mutex_init(&session->lock); + session->last_act = rtpe_now.tv_sec; + session->websockets = g_hash_table_new(g_direct_hash, g_direct_equal); + session->handles = g_hash_table_new(g_int64_hash, g_int64_equal); + + g_hash_table_insert(session->websockets, wm->wc, wm->wc); + + do { + while (!session_id) + session_id = janus_random(); + + mutex_lock(&janus_lock); + if (g_hash_table_lookup(janus_sessions, &session_id)) + session_id = 0; // pick a random one + else { + session->id = session_id; + g_hash_table_insert(janus_sessions, &session->id, obj_get(session)); + } + mutex_unlock(&janus_lock); + } + while (!session_id); + + ilog(LOG_INFO, "Created new Janus session with ID %" PRIu64, session_id); + + websocket_conn_add_session(wm->wc, obj_get(session)); + + json_builder_set_member_name(builder, "data"); + json_builder_begin_object(builder); // { + json_builder_set_member_name(builder, "id"); + json_builder_add_int_value(builder, session_id); + json_builder_end_object(builder); // } + + return NULL; +} + + +void janus_detach_websocket(struct janus_session *session, struct websocket_conn *wc) { + LOCK(&session->lock); + g_hash_table_remove(session->websockets, wc); +} + + +// call is locked in some way +void janus_media_up(struct call_monologue *ml) { + struct call *call = ml->call; + struct janus_session *session = call->janus_session; + if (!session) + return; + + // the monologue tag is the handle ID + uint64_t handle = str_to_ui(&ml->tag, 0); + if (!handle) + return; + + // build json + + JsonBuilder *builder = json_builder_new(); + json_builder_begin_object(builder); // { + json_builder_set_member_name(builder, "janus"); + json_builder_add_string_value(builder, "webrtcup"); + json_builder_set_member_name(builder, "session_id"); + json_builder_add_int_value(builder, session->id); + json_builder_set_member_name(builder, "sender"); + json_builder_add_int_value(builder, handle); + json_builder_end_object(builder); // } + + JsonGenerator *gen = json_generator_new(); + JsonNode *root = json_builder_get_root(builder); + json_generator_set_root(gen, root); + char *result = json_generator_to_data(gen, NULL); + + json_node_free(root); + g_object_unref(gen); + g_object_unref(builder); + + LOCK(&session->lock); + + GHashTableIter iter; + gpointer value; + g_hash_table_iter_init(&iter, session->websockets); + + while (g_hash_table_iter_next(&iter, NULL, &value)) { + struct websocket_conn *wc = value; + websocket_write_text(wc, result, true); + } + + g_free(result); +} + + +const char *janus_attach(JsonReader *reader, JsonBuilder *builder, struct janus_session *session, int *retcode) { + *retcode = 458; + if (!session) + return "Session ID not found"; + // verify the plugin + *retcode = 456; + if (!json_reader_read_member(reader, "plugin")) + return "No plugin given"; + const char *plugin = json_reader_get_string_value(reader); + if (!plugin) + return "No plugin given"; + *retcode = 460; + if (strcmp(plugin, "janus.plugin.videoroom")) + return "Unsupported plugin"; + json_reader_end_member(reader); + + struct janus_handle *handle = g_slice_alloc0(sizeof(*handle)); + mutex_lock(&janus_lock); + handle->session = session->id; + uint64_t handle_id = 0; + while (1) { + handle_id = handle->id = janus_random(); + if (g_hash_table_lookup(janus_handles, &handle->id)) + continue; + g_hash_table_insert(janus_handles, &handle->id, (void *) 0x1); + break; + } + mutex_unlock(&janus_lock); + + mutex_lock(&session->lock); + assert(g_hash_table_lookup(session->handles, &handle_id) == NULL); + g_hash_table_insert(session->handles, &handle->id, handle); + mutex_unlock(&session->lock); + // handle is now owned by session + + json_builder_set_member_name(builder, "data"); + json_builder_begin_object(builder); // { + json_builder_set_member_name(builder, "id"); + json_builder_add_int_value(builder, handle_id); + json_builder_end_object(builder); // } + + return NULL; +} + + +const char *janus_detach(struct websocket_message *wm, JsonReader *reader, JsonBuilder *builder, + struct janus_session *session, + uint64_t handle_id, int *retcode) +{ + *retcode = 458; + if (!session) + return "Session ID not found"; + *retcode = 457; + if (!handle_id) + return "Unhandled request method"; + + uint64_t room_id = 0; + + { + LOCK(&session->lock); + struct janus_handle *handle = g_hash_table_lookup(session->handles, &handle_id); + + *retcode = 463; + if (!handle) + return "Could not detach handle from plugin"; + + room_id = handle->room; + + // destroy handle + g_hash_table_remove(session->handles, &handle_id); + g_slice_free1(sizeof(*handle), handle); + } + + { + LOCK(&janus_lock); + + if (room_id) { + struct janus_room *room = g_hash_table_lookup(janus_rooms, &room_id); + if (room) { + uint64_t *feed = g_hash_table_lookup(room->publishers, &handle_id); + if (feed) { + // was a publisher - send notify + + GHashTableIter iter; + gpointer key; + g_hash_table_iter_init(&iter, room->publishers); + + while (g_hash_table_iter_next(&iter, &key, NULL)) { + uint64_t *pub_handle = key; + + if (*pub_handle == handle_id) // skip self + continue; + + JsonBuilder *event = json_builder_new(); + json_builder_begin_object(event); // { + json_builder_set_member_name(event, "janus"); + json_builder_add_string_value(event, "event"); + json_builder_set_member_name(event, "session_id"); + json_builder_add_int_value(event, room->session->id); + json_builder_set_member_name(event, "sender"); + json_builder_add_int_value(event, *pub_handle); + json_builder_set_member_name(event, "plugindata"); + json_builder_begin_object(event); // { + json_builder_set_member_name(event, "plugin"); + json_builder_add_string_value(event, "janus.plugin.videoroom"); + json_builder_set_member_name(event, "data"); + json_builder_begin_object(event); // { + json_builder_set_member_name(event, "videoroom"); + json_builder_add_string_value(event, "event"); + json_builder_set_member_name(event, "room"); + json_builder_add_int_value(event, room_id); + json_builder_set_member_name(event, "unpublished"); + json_builder_add_int_value(event, *feed); + json_builder_end_object(event); // } + json_builder_end_object(event); // } + json_builder_end_object(event); // } + + janus_send_json_msg(wm, event, 0, false); + + event = json_builder_new(); + json_builder_begin_object(event); // { + json_builder_set_member_name(event, "janus"); + json_builder_add_string_value(event, "event"); + json_builder_set_member_name(event, "session_id"); + json_builder_add_int_value(event, room->session->id); + json_builder_set_member_name(event, "sender"); + json_builder_add_int_value(event, *pub_handle); + json_builder_set_member_name(event, "plugindata"); + json_builder_begin_object(event); // { + json_builder_set_member_name(event, "plugin"); + json_builder_add_string_value(event, "janus.plugin.videoroom"); + json_builder_set_member_name(event, "data"); + json_builder_begin_object(event); // { + json_builder_set_member_name(event, "videoroom"); + json_builder_add_string_value(event, "event"); + json_builder_set_member_name(event, "room"); + json_builder_add_int_value(event, room_id); + json_builder_set_member_name(event, "leaving"); + json_builder_add_int_value(event, *feed); + json_builder_end_object(event); // } + json_builder_end_object(event); // } + json_builder_end_object(event); // } + + janus_send_json_msg(wm, event, 0, false); + } + + struct call *call = call_get(&room->call_id); + if (call) { + // remove publisher monologue + AUTO_CLEANUP_GBUF(handle_buf); + handle_buf = g_strdup_printf("%" PRIu64, handle_id); + str handle_str; + str_init(&handle_str, handle_buf); + struct call_monologue *ml = call_get_or_create_monologue(call, + &handle_str); + if (ml) + monologue_destroy(ml); + + rwlock_unlock_w(&call->master_lock); + obj_put(call); + } + + g_hash_table_remove(room->publishers, &handle_id); + feed = NULL; + } + + if (g_hash_table_remove(room->subscribers, &handle_id)) { + // was a subscriber + struct call *call = call_get(&room->call_id); + if (call) { + // remove subscriber monologue + AUTO_CLEANUP_GBUF(handle_buf); + handle_buf = g_strdup_printf("%" PRIu64, handle_id); + str handle_str; + str_init(&handle_str, handle_buf); + struct call_monologue *ml = call_get_or_create_monologue(call, + &handle_str); + if (ml) + monologue_destroy(ml); + + rwlock_unlock_w(&call->master_lock); + obj_put(call); + } + } + } + } + } + + return NULL; +} + + +const char *janus_message(struct websocket_message *wm, JsonReader *reader, JsonBuilder *builder, + struct janus_session *session, + const char *transaction, + uint64_t handle_id, + const char **successp, + int *retcode) +{ + // we only pretend to support one plugin so ignore the handle + // and just go straight to the message + *retcode = 458; + if (!session) + return "Session ID not found"; + *retcode = 457; + if (!handle_id) + return "No plugin handle given"; + + const char *jsep_type = NULL, *jsep_sdp = NULL; + if (json_reader_read_member(reader, "jsep")) { + if (json_reader_read_member(reader, "type")) + jsep_type = json_reader_get_string_value(reader); + json_reader_end_member(reader); + if (json_reader_read_member(reader, "sdp")) + jsep_sdp = json_reader_get_string_value(reader); + json_reader_end_member(reader); + } + json_reader_end_member(reader); + + *retcode = 456; + if (!json_reader_read_member(reader, "body")) + return "JSON object does not contain 'body' key"; + + json_builder_set_member_name(builder, "plugindata"); + json_builder_begin_object(builder); // { + json_builder_set_member_name(builder, "plugin"); + json_builder_add_string_value(builder, "janus.plugin.videoroom"); + json_builder_set_member_name(builder, "data"); + json_builder_begin_object(builder); // { + + char *jsep_type_out = NULL; + str jsep_sdp_out = STR_NULL; + + mutex_lock(&session->lock); + + struct janus_handle *handle = g_hash_table_lookup(session->handles, &handle_id); + + const char *err = NULL; + if (!handle) { + *retcode = 457; + err = "No plugin handle given or invalid handle"; + } + else + err = janus_videoroom(wm, session, jsep_type, jsep_sdp, transaction, handle, + builder, reader, successp, retcode, &jsep_type_out, + &jsep_sdp_out); + + mutex_unlock(&session->lock); + + json_builder_end_object(builder); // } + json_builder_end_object(builder); // } + + if (jsep_type_out && jsep_sdp_out.len) { + json_builder_set_member_name(builder, "jsep"); + json_builder_begin_object(builder); // { + json_builder_set_member_name(builder, "type"); + json_builder_add_string_value(builder, jsep_type_out); + json_builder_set_member_name(builder, "sdp"); + json_builder_add_string_value(builder, jsep_sdp_out.s); + json_builder_end_object(builder); // } + } + + str_free_dup(&jsep_sdp_out); + + return err; + +} + + +const char *janus_trickle(JsonReader *reader, struct janus_session *session, uint64_t handle_id, + const char **successp, int *retcode) +{ + *retcode = 458; + if (!session) + return "Session ID not found"; + *retcode = 457; + if (!handle_id) + return "Unhandled request method"; + + *retcode = 456; + if (!json_reader_read_member(reader, "candidate")) + return "JSON object does not contain 'candidate' key"; + + if (!json_reader_read_member(reader, "candidate")) + return "ICE candidate string missing"; + const char *candidate = json_reader_get_string_value(reader); + if (!candidate) + return "ICE candidate string missing"; + json_reader_end_member(reader); + + const char *ufrag = NULL; + if (json_reader_read_member(reader, "usernameFragment")) + ufrag = json_reader_get_string_value(reader); + json_reader_end_member(reader); + + const char *sdp_mid = NULL; + int64_t sdp_m_line = -1; + + if (json_reader_read_member(reader, "sdpMid")) + sdp_mid = json_reader_get_string_value(reader); + json_reader_end_member(reader); + + if (json_reader_read_member(reader, "sdpMLineIndex")) { + // make sure what we're reading is an int + JsonNode *node = json_reader_get_value(reader); + if (node && json_node_get_value_type(node) == G_TYPE_INT64) + sdp_m_line = json_node_get_int(node); + } + json_reader_end_member(reader); + + json_reader_end_member(reader); + + if (!sdp_mid && sdp_m_line < 0) + return "Neither sdpMid nor sdpMLineIndex given"; + + // fetch call and monologue + + uint64_t room_id = 0; + { + LOCK(&session->lock); + + struct janus_handle *handle = g_hash_table_lookup(session->handles, &handle_id); + + if (!handle) + return "Unhandled request method"; + + room_id = handle->room; + } + + AUTO_CLEANUP_NULL(struct call *call, call_unlock_release); + { + LOCK(&janus_lock); + + struct janus_room *room = g_hash_table_lookup(janus_rooms, &room_id); + + *retcode = 426; + if (!room || room->session != session) + return "No such room"; + call = call_get(&room->call_id); + if (!call) + return "No such room"; + } + + AUTO_CLEANUP_GBUF(handle_buf); + handle_buf = g_strdup_printf("%" PRIu64, handle_id); + str handle_str; + str_init(&handle_str, handle_buf); + struct call_monologue *ml = call_get_monologue(call, &handle_str); + if (!ml) + return "Handle not found in room"; + + // find our media section + struct call_media *media = NULL; + if (sdp_mid) { + str sdp_mid_str = STR_CONST_INIT_LEN((char *) sdp_mid, strlen(sdp_mid)); + media = g_hash_table_lookup(ml->media_ids, &sdp_mid_str); + } + if (!media && sdp_m_line >= 0) + media = g_queue_peek_nth(&ml->medias, sdp_m_line); + + *retcode = 466; + if (!media) + return "No matching media"; + if (!media->ice_agent) + return "Media is not ICE-enabled"; + + // parse candidate + str cand_str = STR_CONST_INIT_LEN((char *) candidate, strlen(candidate)); + str_shift_cmp(&cand_str, "candidate:"); // skip prefix + if (!cand_str.len) { + // end of candidates + } + else { + struct ice_candidate cand; + *retcode = 466; + int ret = sdp_parse_candidate(&cand, &cand_str); + if (ret < 0) + return "Failed to parse trickle candidate"; + + if (ret == 0) { + // do the actual ICE update + struct stream_params sp = { + .ice_ufrag = cand.ufrag, + .index = media->index, + }; + if (!sp.ice_ufrag.len && ufrag) + str_init(&sp.ice_ufrag, (char *) ufrag); + g_queue_push_tail(&sp.ice_candidates, &cand); + + ice_update(media->ice_agent, &sp); + + g_queue_clear(&sp.ice_candidates); + } + } + + *successp = "ack"; + return NULL; +} + + +const char *websocket_janus_process(struct websocket_message *wm) { + JsonParser *parser = NULL; + JsonReader *reader = NULL; + const char *err = NULL; + int retcode = 200; + const char *transaction = NULL; + const char *success = "success"; + uint64_t session_id = 0; + uint64_t handle_id = 0; + struct janus_session *session = NULL; + + ilog(LOG_DEBUG, "Processing Janus message: '%.*s'", (int) wm->body->len, wm->body->str); + + // prepare response + JsonBuilder *builder = json_builder_new(); + json_builder_begin_object(builder); // { + + // start parsing message + parser = json_parser_new(); + + retcode = 454; + err = "Failed to parse JSON"; + if (!json_parser_load_from_data(parser, wm->body->str, wm->body->len, NULL)) + goto err; + reader = json_reader_new(json_parser_get_root(parser)); + if (!reader) + goto err; + + retcode = 455; + err = "JSON string is not an object"; + if (!json_reader_is_object(reader)) + goto err; + + retcode = 456; + err = "JSON object does not contain 'janus' key"; + if (!json_reader_read_member(reader, "janus")) + goto err; + const char *janus_cmd = json_reader_get_string_value(reader); + err = "'janus' key does not contain a string"; + if (!janus_cmd) + goto err; + json_reader_end_member(reader); + + retcode = 456; + err = "JSON object does not contain 'transaction' key"; + if (!json_reader_read_member(reader, "transaction")) + goto err; + transaction = json_reader_get_string_value(reader); + err = "'transaction' key does not contain a string"; + if (!janus_cmd) + goto err; + json_reader_end_member(reader); + + bool authorised = false; + + if (json_reader_read_member(reader, "admin_secret")) { + const char *admin_secret = json_reader_get_string_value(reader); + if (janus_cmd && rtpe_config.janus_secret && !strcmp(admin_secret, rtpe_config.janus_secret)) + authorised = true; + } + json_reader_end_member(reader); + + if (json_reader_read_member(reader, "session_id")) + session_id = jr_str_int(reader); + json_reader_end_member(reader); + + if (session_id) + session = janus_get_session(session_id); + + if (json_reader_read_member(reader, "handle_id")) + handle_id = jr_str_int(reader); + json_reader_end_member(reader); + + ilog(LOG_DEBUG, "Processing '%s' type Janus message", janus_cmd); + + str janus_cmd_str; + str_init(&janus_cmd_str, (char *) janus_cmd); + + err = NULL; + + switch (__csh_lookup(&janus_cmd_str)) { + case CSH_LOOKUP("add_token"): + err = janus_add_token(reader, builder, authorised, &retcode); + break; + + case CSH_LOOKUP("ping"): + success = "pong"; + break; + + case CSH_LOOKUP("keepalive"): + if (!session) { + retcode = 458; + err = "Session ID not found"; + } + else + success = "ack"; + break; + + case CSH_LOOKUP("info"): + success = "server_info"; + json_builder_set_member_name(builder, "name"); + json_builder_add_string_value(builder, "rtpengine Janus interface"); + json_builder_set_member_name(builder, "version_string"); + json_builder_add_string_value(builder, RTPENGINE_VERSION); + json_builder_set_member_name(builder, "plugins"); + json_builder_begin_object(builder); // { + json_builder_set_member_name(builder, "janus.plugin.videoroom"); + json_builder_begin_object(builder); // { + json_builder_set_member_name(builder, "name"); + json_builder_add_string_value(builder, "rtpengine Janus videoroom"); + json_builder_end_object(builder); // } + json_builder_end_object(builder); // } + break; + + case CSH_LOOKUP("create"): // create new session + err = janus_create(reader, builder, wm); + session_id = 0; // don't add it to the reply + break; + + case CSH_LOOKUP("attach"): // attach to a plugin, obtains handle + err = janus_attach(reader, builder, session, &retcode); + break; + + case CSH_LOOKUP("detach"): + err = janus_detach(wm, reader, builder, session, handle_id, &retcode); + break; + + case CSH_LOOKUP("message"): + err = janus_message(wm, reader, builder, session, transaction, handle_id, &success, + &retcode); + break; + + case CSH_LOOKUP("trickle"): + err = janus_trickle(reader, session, handle_id, &success, &retcode); + handle_id = 0; // don't include sender + break; + + default: + retcode = 457; + err = "Unhandled request method"; + goto err; + } + + // done + +err: + json_builder_set_member_name(builder, "janus"); + if (err) { + json_builder_add_string_value(builder, "error"); + + json_builder_set_member_name(builder, "error"); + json_builder_begin_object(builder); // { + json_builder_set_member_name(builder, "code"); + json_builder_add_int_value(builder, retcode); + json_builder_set_member_name(builder, "reason"); + json_builder_add_string_value(builder, err); + json_builder_end_object(builder); // } + + ilog(LOG_WARN, "Janus processing returning error (code %i): %s", retcode, err); + } + else + json_builder_add_string_value(builder, success); + + if (transaction) { + json_builder_set_member_name(builder, "transaction"); + json_builder_add_string_value(builder, transaction); + } + if (session_id) { + json_builder_set_member_name(builder, "session_id"); + json_builder_add_int_value(builder, session_id); + } + if (handle_id) { + json_builder_set_member_name(builder, "sender"); + json_builder_add_int_value(builder, handle_id); + } + json_builder_end_object(builder); // } + + err = janus_send_json_msg(wm, builder, 200, true); + + if (reader) + g_object_unref(reader); + if (parser) + g_object_unref(parser); + if (session) + obj_put(session); + + return err; +} + + +void janus_init(void) { + mutex_init(&janus_lock); + janus_tokens = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free); + janus_sessions = g_hash_table_new(g_int64_hash, g_int64_equal); + janus_handles = g_hash_table_new(g_int64_hash, g_int64_equal); + janus_rooms = g_hash_table_new(g_int64_hash, g_int64_equal); + janus_feeds = g_hash_table_new_full(g_int64_hash, g_int64_equal, g_free, g_free); + // XXX timer thread to clean up orphaned sessions +} +void janus_free(void) { + mutex_destroy(&janus_lock); + g_hash_table_destroy(janus_tokens); + g_hash_table_destroy(janus_sessions); + g_hash_table_destroy(janus_handles); + g_hash_table_destroy(janus_rooms); + g_hash_table_destroy(janus_feeds); +} diff --git a/daemon/main.c b/daemon/main.c index c8f48c90f..db7b3399a 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -54,6 +54,7 @@ #include "websocket.h" #include "codec.h" #include "mqtt.h" +#include "janus.h" @@ -560,6 +561,7 @@ static void options(int *argc, char ***argv) { #ifdef SO_INCOMING_CPU { "socket-cpu-affinity",0,0,G_OPTION_ARG_INT, &rtpe_config.cpu_affinity,"CPU affinity for media sockets","INT"}, #endif + { "janus-secret", 0,0, G_OPTION_ARG_STRING, &rtpe_config.janus_secret,"Admin secret for Janus protocol","STRING"}, { NULL, } }; @@ -976,6 +978,7 @@ static void options_free(void) { g_free(rtpe_config.mqtt_certfile); g_free(rtpe_config.mqtt_keyfile); g_free(rtpe_config.mqtt_publish_topic); + g_free(rtpe_config.janus_secret); // free common config options config_load_free(&rtpe_config.common); @@ -1024,6 +1027,7 @@ static void init_everything(void) { if (rtpe_config.mqtt_host && mqtt_init()) abort(); codecs_init(); + janus_init(); } @@ -1298,10 +1302,9 @@ int main(int argc, char **argv) { redis_close(rtpe_redis_notify); free_prefix(); - options_free(); - log_free(); + janus_free(); obj_release(rtpe_cli); obj_release(rtpe_udp); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 49babf989..3c64bedde 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -29,6 +29,7 @@ #include "jitter_buffer.h" #include "dtmf.h" #include "mqtt.h" +#include "janus.h" #ifndef PORT_RANDOM_MIN @@ -2281,7 +2282,10 @@ static int stream_packet(struct packet_handler_ctx *phc) { phc->mp.raw = phc->s; // XXX separate stats for received/sent - atomic64_inc(&phc->mp.stream->stats.packets); + if (atomic64_inc(&phc->mp.stream->stats.packets) == 0) { + if (phc->mp.stream->component == 1 && phc->mp.media->index == 1) + janus_media_up(phc->mp.media->monologue); + } atomic64_add(&phc->mp.stream->stats.bytes, phc->s.len); atomic64_set(&phc->mp.stream->last_packet, rtpe_now.tv_sec); RTPE_STATS_INC(packets, 1); diff --git a/daemon/sdp.c b/daemon/sdp.c index 72f6f995f..6027ca8dd 100644 --- a/daemon/sdp.c +++ b/daemon/sdp.c @@ -2023,9 +2023,19 @@ static int synth_session_connection(struct sdp_chopper *chop, struct sdp_media * } void sdp_chopper_destroy(struct sdp_chopper *chop) { - g_string_free(chop->output, TRUE); + if (chop->output) + g_string_free(chop->output, TRUE); g_slice_free1(sizeof(*chop), chop); } +void sdp_chopper_destroy_ret(struct sdp_chopper *chop, str *ret) { + *ret = STR_NULL; + if (chop->output) { + str_init_len(ret, chop->output->str, chop->output->len); + g_string_free(chop->output, FALSE); + chop->output = NULL; + } + sdp_chopper_destroy(chop); +} static int process_session_attributes(struct sdp_chopper *chop, struct sdp_attributes *attrs, struct sdp_ng_flags *flags) diff --git a/daemon/websocket.c b/daemon/websocket.c index e7fb0f2b1..77614b2e9 100644 --- a/daemon/websocket.c +++ b/daemon/websocket.c @@ -8,6 +8,7 @@ #include "cli.h" #include "control_ng.h" #include "statistics.h" +#include "janus.h" struct websocket_message; @@ -32,6 +33,7 @@ struct websocket_conn { unsigned int jobs; GQueue messages; cond_t cond; + GHashTable *janus_sessions; // output buffer - also protected by lock GQueue output_q; @@ -546,6 +548,8 @@ static int websocket_http_body(struct websocket_conn *wc, const char *body, size if (!strcmp(uri, "/ng") && wm->method == M_POST && wm->content_type == CT_NG) handler = websocket_http_ng; + else if (!strcmp(uri, "/admin") && wm->method == M_POST && wm->content_type == CT_JSON) + handler = websocket_janus_process; if (!handler) { ilogs(http, LOG_WARN, "Unhandled HTTP POST URI: '%s'", wm->uri); @@ -568,6 +572,20 @@ static void websocket_conn_cleanup(struct websocket_conn *wc) { mutex_lock(&wc->lock); while (wc->jobs) cond_wait(&wc->cond, &wc->lock); + + // detach all Janus sessions + if (wc->janus_sessions) { + GHashTableIter iter; + g_hash_table_iter_init(&iter, wc->janus_sessions); + gpointer key; + while (g_hash_table_iter_next(&iter, &key, NULL)) { + janus_detach_websocket(key, wc); + __obj_put(key); + } + g_hash_table_destroy(wc->janus_sessions); + wc->janus_sessions = NULL; + } + mutex_unlock(&wc->lock); assert(wc->messages.length == 0); @@ -593,6 +611,14 @@ static int websocket_conn_init(struct lws *wsi, void *p) { if (!wc) return -1; + memset(wc, 0, sizeof(*wc)); + wc->wsi = wsi; + mutex_init(&wc->lock); + cond_init(&wc->cond); + g_queue_init(&wc->messages); + g_queue_push_tail(&wc->output_q, websocket_output_new()); + wc->janus_sessions = g_hash_table_new(g_direct_hash, g_direct_equal); + struct sockaddr_storage sa = {0,}; socklen_t sl = sizeof(sa); #if LWS_LIBRARY_VERSION_MAJOR >= 3 @@ -633,6 +659,16 @@ static int websocket_conn_init(struct lws *wsi, void *p) { } +void websocket_conn_add_session(struct websocket_conn *wc, struct janus_session *s) { + mutex_lock(&wc->lock); + if (wc->janus_sessions) { + assert(g_hash_table_lookup(wc->janus_sessions, s) == NULL); + g_hash_table_insert(wc->janus_sessions, s, s); + } + mutex_unlock(&wc->lock); +} + + static int websocket_do_http(struct lws *wsi, struct websocket_conn *wc, const char *uri) { ilogs(http, LOG_DEBUG, "HTTP request start: %s", uri); @@ -787,6 +823,11 @@ static int websocket_protocol(struct lws *wsi, enum lws_callback_reasons reason, } +static int websocket_janus(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, + size_t len) +{ + return websocket_protocol(wsi, reason, user, in, len, websocket_janus_process, "janus-protocol"); +} static int websocket_rtpengine_echo(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { @@ -810,6 +851,11 @@ static const struct lws_protocols websocket_protocols[] = { .callback = websocket_http, .per_session_data_size = sizeof(struct websocket_conn), }, + { + .name = "janus-protocol", + .callback = websocket_janus, + .per_session_data_size = sizeof(struct websocket_conn), + }, { .name = "echo.rtpengine.com", .callback = websocket_rtpengine_echo, diff --git a/debian/control b/debian/control index 15148c190..6ce145eef 100644 --- a/debian/control +++ b/debian/control @@ -40,6 +40,8 @@ Build-Depends: libxmlrpc-core-c3-dev (>= 1.16.07), libxtables-dev (>= 1.4) | iptables-dev (>= 1.4), markdown, + python3, + python3-websockets, zlib1g-dev, Package: ngcp-rtpengine-daemon diff --git a/include/call.h b/include/call.h index a8ee9874e..62b127581 100644 --- a/include/call.h +++ b/include/call.h @@ -218,6 +218,7 @@ struct jitter_buffer; struct codec_tracker; struct rtcp_timer; struct mqtt_timer; +struct janus_session; typedef bencode_buffer_t call_buffer_t; @@ -491,6 +492,7 @@ struct call { GQueue endpoint_maps; struct dtls_cert *dtls_cert; /* for outgoing */ struct mqtt_timer *mqtt_timer; + struct janus_session *janus_session; str callid; struct timeval created; @@ -575,6 +577,7 @@ int monologue_subscribe_request(struct call_monologue *src, struct call_monologu int monologue_subscribe_answer(struct call_monologue *src, struct call_monologue *dst, struct sdp_ng_flags *, GQueue *); int monologue_unsubscribe(struct call_monologue *src, struct call_monologue *dst, struct sdp_ng_flags *); +int monologue_destroy(struct call_monologue *ml); int call_delete_branch(const str *callid, const str *branch, const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay); void call_destroy(struct call *); diff --git a/include/call_interfaces.h b/include/call_interfaces.h index be19de40e..99156263f 100644 --- a/include/call_interfaces.h +++ b/include/call_interfaces.h @@ -187,6 +187,11 @@ const char *call_subscribe_request_ng(bencode_item_t *, bencode_item_t *); const char *call_subscribe_answer_ng(bencode_item_t *, bencode_item_t *); const char *call_unsubscribe_ng(bencode_item_t *, bencode_item_t *); +void save_last_sdp(struct call_monologue *ml, str *sdp, GQueue *parsed, GQueue *streams); +void call_ng_flags_init(struct sdp_ng_flags *out, enum call_opmode opmode); +void call_ng_free_flags(struct sdp_ng_flags *flags); +void call_unlock_release(struct call **c); + int call_interfaces_init(void); void call_interfaces_free(void); void call_interfaces_timer(void); diff --git a/include/janus.h b/include/janus.h new file mode 100644 index 000000000..daa62567b --- /dev/null +++ b/include/janus.h @@ -0,0 +1,18 @@ +#ifndef __JANUS_H__ +#define __JANUS_H__ + +struct websocket_conn; +struct websocket_message; +struct janus_session; +struct call_monologue; + + +void janus_init(void); +void janus_free(void); + +const char *websocket_janus_process(struct websocket_message *wm); +void janus_detach_websocket(struct janus_session *session, struct websocket_conn *wc); +void janus_media_up(struct call_monologue *); + + +#endif diff --git a/include/main.h b/include/main.h index d852dcc69..62bc10c3f 100644 --- a/include/main.h +++ b/include/main.h @@ -148,6 +148,7 @@ struct rtpengine_config { MOS_LQ, } mos; int cpu_affinity; + char *janus_secret; }; diff --git a/include/sdp.h b/include/sdp.h index 857b50470..520cf1b8c 100644 --- a/include/sdp.h +++ b/include/sdp.h @@ -32,6 +32,7 @@ int sdp_parse_candidate(struct ice_candidate *cand, const str *s); // returns -1 struct sdp_chopper *sdp_chopper_new(str *input); void sdp_chopper_destroy(struct sdp_chopper *chop); +void sdp_chopper_destroy_ret(struct sdp_chopper *chop, str *ret); INLINE int is_trickle_ice_address(const struct endpoint *ep) { if (is_addr_unspecified(&ep->address) && ep->port == 9) diff --git a/include/websocket.h b/include/websocket.h index 08c943077..88eca60bc 100644 --- a/include/websocket.h +++ b/include/websocket.h @@ -8,6 +8,7 @@ struct websocket_conn; struct websocket_message; enum lws_write_protocol; +struct janus_session; typedef const char *(*websocket_message_func_t)(struct websocket_message *); @@ -52,4 +53,7 @@ size_t websocket_queue_len(struct websocket_conn *wc); int websocket_http_response(struct websocket_conn *wc, int status, const char *content_type, ssize_t content_length); +// mark a janus session as owned by this transport +void websocket_conn_add_session(struct websocket_conn *, struct janus_session *); + #endif diff --git a/t/.gitignore b/t/.gitignore index 71cded739..3e81fffa3 100644 --- a/t/.gitignore +++ b/t/.gitignore @@ -68,3 +68,6 @@ tcp_listener.c test-kernel-module test-resample mqtt.c +cli.c +janus.c +websocket.c diff --git a/t/Makefile b/t/Makefile index 16e3ed66a..7a6c9d4ff 100644 --- a/t/Makefile +++ b/t/Makefile @@ -22,6 +22,7 @@ CFLAGS+= $(shell pkg-config --cflags spandsp) CFLAGS+= -DWITH_TRANSCODING CFLAGS+= $(shell pkg-config --cflags zlib) CFLAGS+= $(shell pkg-config --cflags json-glib-1.0) +CFLAGS+= $(shell pkg-config --cflags libwebsockets) CFLAGS+= $(shell pkg-config --cflags libevent_pthreads) CFLAGS+= $(shell pkg-config xmlrpc_client --cflags 2> /dev/null || xmlrpc-c-config client --cflags) CFLAGS+= $(shell pkg-config xmlrpc --cflags 2> /dev/null) @@ -49,6 +50,7 @@ LDLIBS+= $(shell pkg-config --libs libavfilter) LDLIBS+= $(shell pkg-config --libs spandsp) LDLIBS+= $(shell pkg-config --libs zlib) LDLIBS+= $(shell pkg-config --libs json-glib-1.0) +LDLIBS+= $(shell pkg-config --libs libwebsockets) LDLIBS+= -lpcap LDLIBS+= $(shell pkg-config --libs libevent_pthreads) LDLIBS+= $(shell pkg-config xmlrpc_client --libs 2> /dev/null || xmlrpc-c-config client --libs) @@ -74,8 +76,8 @@ LIBSRCS+= codeclib.c resample.c socket.c streambuf.c dtmflib.c DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \ dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \ cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \ - media_player.c jitter_buffer.c t38.c tcp_listener.c mqtt.c -HASHSRCS+= call_interfaces.c control_ng.c sdp.c + media_player.c jitter_buffer.c t38.c tcp_listener.c mqtt.c websocket.c cli.c +HASHSRCS+= call_interfaces.c control_ng.c sdp.c janus.c endif OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) $(DAEMONSRCS:.c=.o) $(HASHSRCS:.c=.strhash.o) @@ -111,7 +113,7 @@ daemon-tests: tests-preload.so $(MAKE) -C ../daemon $(MAKE) all-daemon-tests -all-daemon-tests: daemon-tests-main daemon-tests-jb daemon-tests-pubsub +all-daemon-tests: daemon-tests-main daemon-tests-jb daemon-tests-pubsub daemon-tests-websocket daemon-tests-main: rm -rf fake-$@-sockets @@ -153,6 +155,14 @@ daemon-tests-pubsub: test "$$(ls fake-$@-sockets)" = "" rmdir fake-$@-sockets +daemon-tests-websocket: + rm -rf fake-$@-sockets + mkdir fake-$@-sockets + LD_PRELOAD=../t/tests-preload.so RTPE_BIN=../daemon/rtpengine TEST_SOCKET_PATH=./fake-$@-sockets \ + python3 auto-daemon-tests-websocket.py + test "$$(ls fake-$@-sockets)" = "" + rmdir fake-$@-sockets + test-bitstr: test-bitstr.o spandsp_send_fax_pcm: spandsp_send_fax_pcm.o @@ -180,7 +190,8 @@ test-transcode: test-transcode.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \ control_ng.strhash.o \ streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \ - media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o + media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o websocket.o \ + cli.o test-resample: test-resample.o $(COMMONOBJS) codeclib.o resample.o dtmflib.o diff --git a/t/auto-daemon-tests-websocket.py b/t/auto-daemon-tests-websocket.py new file mode 100644 index 000000000..61daab2c5 --- /dev/null +++ b/t/auto-daemon-tests-websocket.py @@ -0,0 +1,2075 @@ +import asyncio +import json +import os +import re +import socket +import ssl +import subprocess +import sys +import tempfile +import traceback +import unittest +import uuid + +import websockets + + +async def get_ws(cls, proto): + for _ in range(1, 300): + try: + cls._ws = await websockets.connect( + "ws://localhost:9191/", subprotocols=[proto] + ) + break + except: + await asyncio.sleep(0.1) + + +async def testIO(self, msg): + await self._ws.send(msg) + self._res = await asyncio.wait_for(self._ws.recv(), timeout=10) + + +async def testIOJson(self, msg): + await self._ws.send(json.dumps(msg)) + self._res = await asyncio.wait_for(self._ws.recv(), timeout=10) + self._res = json.loads(self._res) + + +async def testIJson(self): + self._res = await asyncio.wait_for(self._ws.recv(), timeout=10) + self._res = json.loads(self._res) + + +async def testIJanus(self): + self._res = await asyncio.wait_for(self._ws.recv(), timeout=10) + self._res = json.loads(self._res) + self.assertEqual(self._res["transaction"], self._trans) + del self._res["transaction"] + + +async def testIOJanus(self, msg): + trans = str(uuid.uuid4()) + msg["transaction"] = trans + self._trans = trans + await self._ws.send(json.dumps(msg)) + await testIJanus(self) + + +async def testOJanus(self, msg): + trans = str(uuid.uuid4()) + msg["transaction"] = trans + self._trans = trans + await self._ws.send(json.dumps(msg)) + + +class TestWSEcho(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls._eventloop = asyncio.get_event_loop() + cls._eventloop.run_until_complete(get_ws(cls, "echo.rtpengine.com")) + + def testEcho(self): + self._eventloop.run_until_complete(testIO(self, b"foobar")) + self.assertEqual(self._res, b"foobar") + + def testEchoText(self): + self._eventloop.run_until_complete(testIO(self, "foobar")) + self.assertEqual(self._res, b"foobar") + + +class TestWSCli(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls._eventloop = asyncio.get_event_loop() + cls._eventloop.run_until_complete(get_ws(cls, "cli.rtpengine.com")) + + def testListNumsessions(self): + # race condition here if this runs at the same as the janus test (creates call) + self._eventloop.run_until_complete(testIO(self, "list numsessions")) + self.assertEqual( + self._res, + b"Current sessions own: 0\n" + + b"Current sessions foreign: 0\n" + + b"Current sessions total: 0\n" + + b"Current transcoded media: 0\n" + + b"Current sessions ipv4 only media: 0\n" + + b"Current sessions ipv6 only media: 0\n" + + b"Current sessions ip mixed media: 0\n", + ) + + +class TestWSJanus(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls._eventloop = asyncio.get_event_loop() + cls._eventloop.run_until_complete(get_ws(cls, "janus-protocol")) + + def testPing(self): + self._eventloop.run_until_complete( + testIOJson(self, {"janus": "ping", "transaction": "test123"}) + ) + self.assertEqual(self._res, {"janus": "pong", "transaction": "test123"}) + + def testPingNoTS(self): + self._eventloop.run_until_complete(testIOJson(self, {"janus": "ping"})) + self.assertEqual( + self._res, + { + "janus": "error", + "error": { + "code": 456, + "reason": "JSON object does not contain 'transaction' key", + }, + }, + ) + + def testInfo(self): + self._eventloop.run_until_complete( + testIOJson(self, {"janus": "info", "transaction": "foobar"}) + ) + # ignore version string + self.assertTrue("version_string" in self._res) + del self._res["version_string"] + self.assertEqual( + self._res, + { + "janus": "server_info", + "name": "rtpengine Janus interface", + "plugins": { + "janus.plugin.videoroom": {"name": "rtpengine Janus videoroom"} + }, + "transaction": "foobar", + }, + ) + + +class TestVideoroom(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls._eventloop = asyncio.get_event_loop() + cls._eventloop.run_until_complete(get_ws(cls, "janus-protocol")) + + def startSession(self): + self.maxDiff = None + + token = str(uuid.uuid4()) + + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "add_token", + "token": token, + "admin_secret": "dfgdfgdvgLyATjHPvckg", + }, + ) + ) + self.assertEqual( + self._res, + {"janus": "success", "data": {"plugins": ["janus.plugin.videoroom"]}}, + ) + + # create session + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "create", + "token": token, + "admin_secret": "dfgdfgdvgLyATjHPvckg", + }, + ) + ) + session = self._res["data"]["id"] + self.assertIsInstance(session, int) + self.assertEqual(self._res, {"janus": "success", "data": {"id": session}}) + + return (token, session) + + def startVideoroom(self): + (token, session) = self.startSession() + + handle = self.createHandle(token, session) + + # create room + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": {"request": "create", "publishers": 16}, + "handle_id": handle, + "session_id": session, + "token": token, + }, + ) + ) + room = self._res["plugindata"]["data"]["room"] + self.assertIsInstance(room, int) + self.assertNotEqual(room, handle) + self.assertNotEqual(room, session) + self.assertEqual( + self._res, + { + "janus": "success", + "session_id": session, + "sender": handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "created", + "room": room, + "permanent": False, + }, + }, + }, + ) + + return (token, session, handle, room) + + def destroyVideoroom(self, token, session, handle, room): + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": {"request": "destroy", "room": room}, + "handle_id": handle, + "session_id": session, + "token": token, + }, + ) + ) + self.assertNotEqual(room, handle) + self.assertNotEqual(room, session) + self.assertEqual( + self._res, + { + "janus": "success", + "session_id": session, + "sender": handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "destroyed", + "room": room, + "permanent": False, + }, + }, + }, + ) + + def createHandle(self, token, session): + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "attach", + "plugin": "janus.plugin.videoroom", + "session_id": session, + "token": token, + "opaque_id": None, + }, + ) + ) + handle = self._res["data"]["id"] + self.assertIsInstance(handle, int) + self.assertNotEqual(handle, session) + self.assertEqual( + self._res, + {"janus": "success", "session_id": session, "data": {"id": handle}}, + ) + + return handle + + def createPublisher(self, token, session, room, handle, pubs=[]): + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": {"request": "join", "ptype": "publisher", "room": room}, + "handle_id": handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the joined event + self._eventloop.run_until_complete(testIJanus(self)) + feed = self._res["plugindata"]["data"]["id"] + self.assertIsInstance(feed, int) + self.assertNotEqual(feed, session) + self.assertNotEqual(feed, room) + self.assertNotEqual(feed, handle) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "joined", + "room": room, + "id": feed, + "publishers": pubs, + }, + }, + }, + ) + + return feed + + def testKeepalive(self): + (token, session) = self.startSession() + + self._eventloop.run_until_complete( + testIOJanus( + self, {"janus": "keepalive", "token": token, "session_id": session} + ) + ) + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + + def testVideoroomWebRTC(self): + (token, session, control_handle, room) = self.startVideoroom() + + # timeout test + self._eventloop.run_until_complete(asyncio.sleep(3)) + + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "exists", + "room": room, + }, + "handle_id": control_handle, + "session_id": session, + "token": token, + }, + ) + ) + self.assertEqual( + self._res, + { + "janus": "success", + "session_id": session, + "sender": control_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "success", + "room": room, + "exists": True, + }, + }, + }, + ) + + pub_handle = self.createHandle(token, session) + self.assertNotEqual(pub_handle, control_handle) + + feed = self.createPublisher(token, session, room, pub_handle) + self.assertNotEqual(feed, control_handle) + + # publish as plain RTP + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "configure", + "room": room, + "feed": feed, + "data": False, + "audio": True, + "video": True, + }, + "jsep": { + "type": "offer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.3\r\n" + "c=IN IP4 203.0.113.2\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 8000 RTP/AVP 8 0\r\n" + "a=sendonly\r\n" + ), + }, + "handle_id": pub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the event notification + self._eventloop.run_until_complete(testIJanus(self)) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + self.assertRegex( + sdp, + re.compile( + "^v=0\r\n" + "o=- \d+ \d+ IN IP4 203.0.113.1\r\n" + "s=rtpengine.*?\r\n" + "t=0 0\r\n" + "m=audio \d+ RTP/AVP 8\r\n" + "c=IN IP4 203.0.113.1\r\n" + "a=rtpmap:8 PCMA/8000\r\n" + "a=recvonly\r\n" + "a=rtcp:\d+\r\n$", + re.DOTALL, + ), + ) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": pub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "configured": "ok", + "audio_codec": "PCMA", + }, + }, + "jsep": {"type": "answer", "sdp": sdp}, + }, + ) + + sub_handle = self.createHandle(token, session) + self.assertNotEqual(sub_handle, pub_handle) + self.assertNotEqual(sub_handle, control_handle) + + # subscriber expects full WebRTC attributes + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "join", + "ptype": "subscriber", + "room": room, + "feed": feed, + }, + "handle_id": sub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the attached event + self._eventloop.run_until_complete(testIJanus(self)) + self.assertEqual(feed, self._res["plugindata"]["data"]["id"]) + self.assertNotEqual(feed, control_handle) + self.assertNotEqual(feed, session) + self.assertNotEqual(feed, room) + self.assertNotEqual(feed, pub_handle) + self.assertNotEqual(feed, sub_handle) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + self.assertRegex( + sdp, + re.compile( + "^v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.3\r\n" + "c=IN IP4 203.0.113.1\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio \d+ UDP/TLS/RTP/SAVPF 8\r\n" + "a=mid:1\r\n" + "a=rtpmap:8 PCMA/8000\r\n" + "a=sendonly\r\n" + "a=rtcp-mux\r\n" + "a=setup:actpass\r\n" + "a=fingerprint:sha-256 .{95}\r\n" + "a=ice-ufrag:.{8}\r\n" + "a=ice-pwd:.{26}\r\n" + "a=ice-options:trickle\r\n" + "a=candidate:.{16} 1 UDP 2130706431 203.0.113.1 \d+ typ host\r\n" + "a=end-of-candidates\r\n$", + re.DOTALL, + ), + ) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": sub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "attached", + "room": room, + "id": feed, + }, + }, + "jsep": {"type": "offer", "sdp": sdp}, + }, + ) + + # subscriber #1 answer + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": {"request": "start", "room": room, "feed": feed}, + "jsep": { + "type": "answer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.2\r\n" + "c=IN IP4 0.0.0.0\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 9 RTP/AVP 8\r\n" + "a=mid:audio\r\n" + "a=ice-ufrag:abcd\r\n" + "a=ice-pwd:WD1pLsdgsdfsdWuEBb0vjyZr\r\n" + "a=ice-options:trickle\r\n" + "a=rtcp-mux\r\n" + "a=recvonly\r\n" + ), + }, + "handle_id": sub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the attached event + self._eventloop.run_until_complete(testIJanus(self)) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": sub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "started": "ok", + "room": room, + }, + }, + }, + ) + + self.destroyVideoroom(token, session, control_handle, room) + + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "exists", + "room": room, + }, + "handle_id": control_handle, + "session_id": session, + "token": token, + }, + ) + ) + self.assertEqual( + self._res, + { + "janus": "success", + "session_id": session, + "sender": control_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "success", + "room": room, + "exists": False, + }, + }, + }, + ) + + def testVideoroomSDESDTLS(self): + (token, session, control_handle, room) = self.startVideoroom() + + pub_handle = self.createHandle(token, session) + self.assertNotEqual(pub_handle, control_handle) + + feed = self.createPublisher(token, session, room, pub_handle) + self.assertNotEqual(feed, control_handle) + + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "configure", + "room": room, + "feed": feed, + "data": False, + "audio": True, + "video": True, + }, + "jsep": { + "type": "offer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.5\r\n" + "c=IN IP4 203.0.113.4\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 30000 RTP/SAVP 8 0 96\r\n" + "a=rtpmap:96 opus/48000\r\n" + "a=crypto:1 AES_CM_128_HMAC_SHA1_80 inline:cJOJ7kxQjhFBp2fP6AYjs3vKw7CeBdWZCj0isbJv\r\n" + "a=crypto:2 AES_CM_128_HMAC_SHA1_32 inline:VAzLKvoE3jG9cdH/AZsl/ZqWNXrUzyM4Gw6chrFr\r\n" + "a=crypto:3 AES_256_CM_HMAC_SHA1_80 inline:8AbZePWwsKhLGX3GlXA+yHYPQ3cgraer/9DkFJYCOPZZy3o9wC0NIbIFYZfyHw==\r\n" + "a=crypto:4 AES_256_CM_HMAC_SHA1_32 inline:2GLk3p/csdno4KlGO1TxCVaEt+bifmDlQ5NjnCb5cJYPURiGRSTBEtEq37db8g==\r\n" + "a=fingerprint:sha-256 1A:20:98:16:CA:26:8C:33:62:0B:70:94:73:A0:9B:30:00:1A:EA:26:FC:7D:84:8B:F1:F9:52:2D:A7:92:C5:3D\r\n" + "a=setup:actpass\r\n" + "a=sendonly\r\n" + ), + }, + "handle_id": pub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the event notification + self._eventloop.run_until_complete(testIJanus(self)) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + self.assertRegex( + sdp, + re.compile( + "^v=0\r\n" + "o=- \d+ \d+ IN IP4 203.0.113.1\r\n" + "s=rtpengine.*?\r\n" + "t=0 0\r\n" + "m=audio \d+ RTP/SAVP 8\r\n" + "c=IN IP4 203.0.113.1\r\n" + "a=rtpmap:8 PCMA/8000\r\n" + "a=recvonly\r\n" + "a=rtcp:\d+\r\n" + "a=setup:active\r\n" + "a=fingerprint:sha-256 .{95}\r\n$", + re.DOTALL, + ), + ) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": pub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "configured": "ok", + "audio_codec": "PCMA", + }, + }, + "jsep": {"type": "answer", "sdp": sdp}, + }, + ) + + self.destroyVideoroom(token, session, control_handle, room) + + def testVideoroomSDES(self): + (token, session, control_handle, room) = self.startVideoroom() + + pub_handle = self.createHandle(token, session) + self.assertNotEqual(pub_handle, control_handle) + + feed = self.createPublisher(token, session, room, pub_handle) + self.assertNotEqual(feed, control_handle) + + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "configure", + "room": room, + "feed": feed, + "data": False, + "audio": True, + "video": True, + }, + "jsep": { + "type": "offer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.5\r\n" + "c=IN IP4 203.0.113.4\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 30000 RTP/SAVP 8 0 96\r\n" + "a=rtpmap:96 opus/48000\r\n" + "a=crypto:1 AES_CM_128_HMAC_SHA1_80 inline:cJOJ7kxQjhFBp2fP6AYjs3vKw7CeBdWZCj0isbJv\r\n" + "a=crypto:2 AES_CM_128_HMAC_SHA1_32 inline:VAzLKvoE3jG9cdH/AZsl/ZqWNXrUzyM4Gw6chrFr\r\n" + "a=crypto:3 AES_256_CM_HMAC_SHA1_80 inline:8AbZePWwsKhLGX3GlXA+yHYPQ3cgraer/9DkFJYCOPZZy3o9wC0NIbIFYZfyHw==\r\n" + "a=crypto:4 AES_256_CM_HMAC_SHA1_32 inline:2GLk3p/csdno4KlGO1TxCVaEt+bifmDlQ5NjnCb5cJYPURiGRSTBEtEq37db8g==\r\n" + "a=sendonly\r\n" + ), + }, + "handle_id": pub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the event notification + self._eventloop.run_until_complete(testIJanus(self)) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + self.assertRegex( + sdp, + re.compile( + "^v=0\r\n" + "o=- \d+ \d+ IN IP4 203.0.113.1\r\n" + "s=rtpengine.*?\r\n" + "t=0 0\r\n" + "m=audio \d+ RTP/SAVP 8\r\n" + "c=IN IP4 203.0.113.1\r\n" + "a=rtpmap:8 PCMA/8000\r\n" + "a=recvonly\r\n" + "a=rtcp:\d+\r\n" + "a=crypto:1 AES_CM_128_HMAC_SHA1_80 inline:.{40}\r\n", + re.DOTALL, + ), + ) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": pub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "configured": "ok", + "audio_codec": "PCMA", + }, + }, + "jsep": {"type": "answer", "sdp": sdp}, + }, + ) + + self.destroyVideoroom(token, session, control_handle, room) + + def testVideoroomDTLS(self): + (token, session, control_handle, room) = self.startVideoroom() + + pub_handle = self.createHandle(token, session) + self.assertNotEqual(pub_handle, control_handle) + + feed = self.createPublisher(token, session, room, pub_handle) + self.assertNotEqual(feed, control_handle) + + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "configure", + "room": room, + "feed": feed, + "data": False, + "audio": True, + "video": True, + }, + "jsep": { + "type": "offer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.5\r\n" + "c=IN IP4 203.0.113.4\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 30000 UDP/TLS/RTP/SAVPF 8 0 96\r\n" + "a=mid:audio\r\n" + "a=rtpmap:96 opus/48000\r\n" + "a=fingerprint:sha-256 1A:20:98:16:CA:26:8C:33:62:0B:70:94:73:A0:9B:30:00:1A:EA:26:FC:7D:84:8B:F1:F9:52:2D:A7:92:C5:3D\r\n" + "a=setup:actpass\r\n" + "a=sendonly\r\n" + ), + }, + "handle_id": pub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the event notification + self._eventloop.run_until_complete(testIJanus(self)) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + self.assertRegex( + sdp, + re.compile( + "^v=0\r\n" + "o=- \d+ \d+ IN IP4 203.0.113.1\r\n" + "s=rtpengine.*?\r\n" + "t=0 0\r\n" + "m=audio \d+ UDP/TLS/RTP/SAVPF 8\r\n" + "c=IN IP4 203.0.113.1\r\n" + "a=mid:audio\r\n" + "a=rtpmap:8 PCMA/8000\r\n" + "a=recvonly\r\n" + "a=rtcp:\d+\r\n" + "a=setup:active\r\n" + "a=fingerprint:sha-256 .{95}\r\n$", + re.DOTALL, + ), + ) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": pub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "configured": "ok", + "audio_codec": "PCMA", + }, + }, + "jsep": {"type": "answer", "sdp": sdp}, + }, + ) + + self.destroyVideoroom(token, session, control_handle, room) + + def testVideoroomWebrtcup(self): + (token, session, control_handle, room) = self.startVideoroom() + + pub_handle = self.createHandle(token, session) + self.assertNotEqual(pub_handle, control_handle) + + feed = self.createPublisher(token, session, room, pub_handle) + self.assertNotEqual(feed, control_handle) + + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "configure", + "room": room, + "feed": feed, + "data": False, + "audio": True, + "video": True, + }, + "jsep": { + "type": "offer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.4\r\n" + "c=IN IP4 203.0.113.4\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 30000 RTP/AVP 8 0 96\r\n" + "a=mid:audio\r\n" + "a=rtpmap:96 opus/48000\r\n" + "a=sendonly\r\n" + ), + }, + "handle_id": pub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the event notification + self._eventloop.run_until_complete(testIJanus(self)) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + match_re = re.compile( + "^v=0\r\n" + "o=- \d+ \d+ IN IP4 203.0.113.1\r\n" + "s=rtpengine.*?\r\n" + "t=0 0\r\n" + "m=audio (\d+) RTP/AVP 8\r\n" + "c=IN IP4 203.0.113.1\r\n" + "a=mid:audio\r\n" + "a=rtpmap:8 PCMA/8000\r\n" + "a=recvonly\r\n" + "a=rtcp:\d+\r\n$", + re.DOTALL, + ) + self.assertRegex(sdp, match_re) + matches = match_re.search(sdp) + port = int(matches[1]) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": pub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "configured": "ok", + "audio_codec": "PCMA", + }, + }, + "jsep": {"type": "answer", "sdp": sdp}, + }, + ) + + pub_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + pub_sock.settimeout(1) + pub_sock.bind(("203.0.113.4", 30000)) + pub_sock.connect(("203.0.113.1", port)) + + # send fake RTP to trigger event + m = pub_sock.send( + b"\x80\x08\x12\x34\x43\x32\x12\x45\x65\x45\x34\x23\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + ) + + # wait for webrtcup event + self._eventloop.run_until_complete(testIJson(self)) + self.assertEqual( + self._res, + {"janus": "webrtcup", "session_id": session, "sender": pub_handle}, + ) + + self.destroyVideoroom(token, session, control_handle, room) + pub_sock.close() + + def testVideoroomWebRTCVideo(self): + (token, session, control_handle, room) = self.startVideoroom() + + pub_handle = self.createHandle(token, session) + self.assertNotEqual(pub_handle, control_handle) + + feed = self.createPublisher(token, session, room, pub_handle) + self.assertNotEqual(feed, control_handle) + + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "configure", + "room": room, + "feed": feed, + "data": False, + "audio": True, + "video": True, + }, + "jsep": { + "type": "offer", + "sdp": ( + "v=0\r\n" + "o=- 3959345330719813235 2 IN IP4 127.0.0.1\r\n" + "s=-\r\n" + "t=0 0\r\n" + "a=group:BUNDLE 0 1\r\n" + "a=extmap-allow-mixed\r\n" + "a=msid-semantic: WMS hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC\r\n" + "m=audio 9 UDP/TLS/RTP/SAVPF 111 103 104 9 0 8 106 105 13 110 112 113 126\r\n" + "c=IN IP4 0.0.0.0\r\n" + "a=rtcp:9 IN IP4 0.0.0.0\r\n" + "a=ice-ufrag:+JrN\r\n" + "a=ice-pwd:TMWORlSHr9fd+0bUNXnlBs5D\r\n" + "a=ice-options:trickle\r\n" + "a=fingerprint:sha-256 FD:56:1A:DB:3E:7B:8E:0B:75:4E:2E:49:1A:91:52:E4:69:9E:66:91:FF:34:A2:50:58:72:C0:8E:C2:87:CA:1F\r\n" + "a=setup:actpass\r\n" + "a=mid:0\r\n" + "a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level\r\n" + "a=extmap:2 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time\r\n" + "a=extmap:3 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01\r\n" + "a=extmap:4 urn:ietf:params:rtp-hdrext:sdes:mid\r\n" + "a=extmap:5 urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id\r\n" + "a=extmap:6 urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id\r\n" + "a=sendonly\r\n" + "a=msid:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC 2de0f1b0-3a39-450e-9804-8305ec87452b\r\n" + "a=rtcp-mux\r\n" + "a=rtpmap:111 opus/48000/2\r\n" + "a=rtcp-fb:111 transport-cc\r\n" + "a=fmtp:111 minptime=10;useinbandfec=1\r\n" + "a=rtpmap:103 ISAC/16000\r\n" + "a=rtpmap:104 ISAC/32000\r\n" + "a=rtpmap:9 G722/8000\r\n" + "a=rtpmap:0 PCMU/8000\r\n" + "a=rtpmap:8 PCMA/8000\r\n" + "a=rtpmap:106 CN/32000\r\n" + "a=rtpmap:105 CN/16000\r\n" + "a=rtpmap:13 CN/8000\r\n" + "a=rtpmap:110 telephone-event/48000\r\n" + "a=rtpmap:112 telephone-event/32000\r\n" + "a=rtpmap:113 telephone-event/16000\r\n" + "a=rtpmap:126 telephone-event/8000\r\n" + "a=ssrc:677770262 cname:NMNDwVd66x2SfiO0\r\n" + "a=ssrc:677770262 msid:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC 2de0f1b0-3a39-450e-9804-8305ec87452b\r\n" + "a=ssrc:677770262 mslabel:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC\r\n" + "a=ssrc:677770262 label:2de0f1b0-3a39-450e-9804-8305ec87452b\r\n" + "m=video 9 UDP/TLS/RTP/SAVPF 96 97 98 99 100 101 102 121 127 120 125 107 108 109 35 36 124 119 123\r\n" + "c=IN IP4 0.0.0.0\r\n" + "a=rtcp:9 IN IP4 0.0.0.0\r\n" + "a=ice-ufrag:+JrN\r\n" + "a=ice-pwd:TMWORlSHr9fd+0bUNXnlBs5D\r\n" + "a=ice-options:trickle\r\n" + "a=fingerprint:sha-256 FD:56:1A:DB:3E:7B:8E:0B:75:4E:2E:49:1A:91:52:E4:69:9E:66:91:FF:34:A2:50:58:72:C0:8E:C2:87:CA:1F\r\n" + "a=setup:actpass\r\n" + "a=mid:1\r\n" + "a=extmap:14 urn:ietf:params:rtp-hdrext:toffset\r\n" + "a=extmap:2 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time\r\n" + "a=extmap:13 urn:3gpp:video-orientation\r\n" + "a=extmap:3 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01\r\n" + "a=extmap:12 http://www.webrtc.org/experiments/rtp-hdrext/playout-delay\r\n" + "a=extmap:11 http://www.webrtc.org/experiments/rtp-hdrext/video-content-type\r\n" + "a=extmap:7 http://www.webrtc.org/experiments/rtp-hdrext/video-timing\r\n" + "a=extmap:8 http://www.webrtc.org/experiments/rtp-hdrext/color-space\r\n" + "a=extmap:4 urn:ietf:params:rtp-hdrext:sdes:mid\r\n" + "a=extmap:5 urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id\r\n" + "a=extmap:6 urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id\r\n" + "a=sendonly\r\n" + "a=msid:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC 6d6ec7a7-e3d7-4c82-b03c-45e017713abd\r\n" + "a=rtcp-mux\r\n" + "a=rtcp-rsize\r\n" + "a=rtpmap:96 VP8/90000\r\n" + "a=rtcp-fb:96 goog-remb\r\n" + "a=rtcp-fb:96 transport-cc\r\n" + "a=rtcp-fb:96 ccm fir\r\n" + "a=rtcp-fb:96 nack\r\n" + "a=rtcp-fb:96 nack pli\r\n" + "a=rtpmap:97 rtx/90000\r\n" + "a=fmtp:97 apt=96\r\n" + "a=rtpmap:98 VP9/90000\r\n" + "a=rtcp-fb:98 goog-remb\r\n" + "a=rtcp-fb:98 transport-cc\r\n" + "a=rtcp-fb:98 ccm fir\r\n" + "a=rtcp-fb:98 nack\r\n" + "a=rtcp-fb:98 nack pli\r\n" + "a=fmtp:98 profile-id=0\r\n" + "a=rtpmap:99 rtx/90000\r\n" + "a=fmtp:99 apt=98\r\n" + "a=rtpmap:100 VP9/90000\r\n" + "a=rtcp-fb:100 goog-remb\r\n" + "a=rtcp-fb:100 transport-cc\r\n" + "a=rtcp-fb:100 ccm fir\r\n" + "a=rtcp-fb:100 nack\r\n" + "a=rtcp-fb:100 nack pli\r\n" + "a=fmtp:100 profile-id=2\r\n" + "a=rtpmap:101 rtx/90000\r\n" + "a=fmtp:101 apt=100\r\n" + "a=rtpmap:102 H264/90000\r\n" + "a=rtcp-fb:102 goog-remb\r\n" + "a=rtcp-fb:102 transport-cc\r\n" + "a=rtcp-fb:102 ccm fir\r\n" + "a=rtcp-fb:102 nack\r\n" + "a=rtcp-fb:102 nack pli\r\n" + "a=fmtp:102 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f\r\n" + "a=rtpmap:121 rtx/90000\r\n" + "a=fmtp:121 apt=102\r\n" + "a=rtpmap:127 H264/90000\r\n" + "a=rtcp-fb:127 goog-remb\r\n" + "a=rtcp-fb:127 transport-cc\r\n" + "a=rtcp-fb:127 ccm fir\r\n" + "a=rtcp-fb:127 nack\r\n" + "a=rtcp-fb:127 nack pli\r\n" + "a=fmtp:127 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f\r\n" + "a=rtpmap:120 rtx/90000\r\n" + "a=fmtp:120 apt=127\r\n" + "a=rtpmap:125 H264/90000\r\n" + "a=rtcp-fb:125 goog-remb\r\n" + "a=rtcp-fb:125 transport-cc\r\n" + "a=rtcp-fb:125 ccm fir\r\n" + "a=rtcp-fb:125 nack\r\n" + "a=rtcp-fb:125 nack pli\r\n" + "a=fmtp:125 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f\r\n" + "a=rtpmap:107 rtx/90000\r\n" + "a=fmtp:107 apt=125\r\n" + "a=rtpmap:108 H264/90000\r\n" + "a=rtcp-fb:108 goog-remb\r\n" + "a=rtcp-fb:108 transport-cc\r\n" + "a=rtcp-fb:108 ccm fir\r\n" + "a=rtcp-fb:108 nack\r\n" + "a=rtcp-fb:108 nack pli\r\n" + "a=fmtp:108 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f\r\n" + "a=rtpmap:109 rtx/90000\r\n" + "a=fmtp:109 apt=108\r\n" + "a=rtpmap:35 AV1X/90000\r\n" + "a=rtcp-fb:35 goog-remb\r\n" + "a=rtcp-fb:35 transport-cc\r\n" + "a=rtcp-fb:35 ccm fir\r\n" + "a=rtcp-fb:35 nack\r\n" + "a=rtcp-fb:35 nack pli\r\n" + "a=rtpmap:36 rtx/90000\r\n" + "a=fmtp:36 apt=35\r\n" + "a=rtpmap:124 red/90000\r\n" + "a=rtpmap:119 rtx/90000\r\n" + "a=fmtp:119 apt=124\r\n" + "a=rtpmap:123 ulpfec/90000\r\n" + "a=ssrc-group:FID 3005569364 2001490794\r\n" + "a=ssrc:3005569364 cname:NMNDwVd66x2SfiO0\r\n" + "a=ssrc:3005569364 msid:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC 6d6ec7a7-e3d7-4c82-b03c-45e017713abd\r\n" + "a=ssrc:3005569364 mslabel:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC\r\n" + "a=ssrc:3005569364 label:6d6ec7a7-e3d7-4c82-b03c-45e017713abd\r\n" + "a=ssrc:2001490794 cname:NMNDwVd66x2SfiO0\r\n" + "a=ssrc:2001490794 msid:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC 6d6ec7a7-e3d7-4c82-b03c-45e017713abd\r\n" + "a=ssrc:2001490794 mslabel:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC\r\n" + "a=ssrc:2001490794 label:6d6ec7a7-e3d7-4c82-b03c-45e017713abd\r\n" + ), + }, + "handle_id": pub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the event notification + self._eventloop.run_until_complete(testIJanus(self)) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + self.assertRegex( + sdp, + re.compile( + "^v=0\r\n" + "o=- \d+ \d+ IN IP4 203.0.113.1\r\n" + "s=rtpengine.*?\r\n" + "t=0 0\r\n" + "m=audio \d+ UDP/TLS/RTP/SAVPF 111\r\n" + "c=IN IP4 203.0.113.1\r\n" + "a=mid:0\r\n" + "a=rtpmap:111 opus/48000/2\r\n" + "a=fmtp:111 minptime=10;useinbandfec=1\r\n" + "a=rtcp-fb:111 transport-cc\r\n" + "a=recvonly\r\n" + "a=rtcp:\d+\r\n" + "a=rtcp-mux\r\n" + "a=setup:active\r\n" + "a=fingerprint:sha-256 .{95}\r\n" + "a=ice-ufrag:.{8}\r\n" + "a=ice-pwd:.{26}\r\n" + "a=ice-options:trickle\r\n" + "a=candidate:.{16} 1 UDP 2130706431 203.0.113.1 \d+ typ host\r\n" + "a=end-of-candidates\r\n" + "m=video \d+ UDP/TLS/RTP/SAVPF 96\r\n" + "c=IN IP4 203.0.113.1\r\n" + "a=mid:1\r\n" + "a=rtpmap:96 VP8/90000\r\n" + "a=rtcp-fb:96 goog-remb\r\n" + "a=rtcp-fb:96 transport-cc\r\n" + "a=rtcp-fb:96 ccm fir\r\n" + "a=rtcp-fb:96 nack\r\n" + "a=rtcp-fb:96 nack pli\r\n" + "a=recvonly\r\n" + "a=rtcp:\d+\r\n" + "a=rtcp-mux\r\n" + "a=setup:active\r\n" + "a=fingerprint:sha-256 .{95}\r\n" + "a=ice-ufrag:.{8}\r\n" + "a=ice-pwd:.{26}\r\n" + "a=ice-options:trickle\r\n" + "a=candidate:.{16} 1 UDP 2130706431 203.0.113.1 \d+ typ host\r\n" + "a=end-of-candidates\r\n$", + re.DOTALL, + ), + ) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": pub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "configured": "ok", + "audio_codec": "opus", + "video_codec": "VP8", + }, + }, + "jsep": {"type": "answer", "sdp": sdp}, + }, + ) + + # subscriber + sub_handle = self.createHandle(token, session) + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "join", + "ptype": "subscriber", + "room": room, + "feed": feed, + }, + "handle_id": sub_handle, + "session_id": session, + "token": token, + }, + ) + ) + + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + self._eventloop.run_until_complete(testIJanus(self)) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + self.assertRegex( + sdp, + re.compile( + "^v=0\r\n" + "o=- 3959345330719813235 2 IN IP4 127.0.0.1\r\n" + "s=-\r\n" + "t=0 0\r\n" + "a=extmap-allow-mixed\r\n" + "a=msid-semantic: WMS hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC\r\n" + "m=audio \d+ UDP/TLS/RTP/SAVPF 111\r\n" + "c=IN IP4 203.0.113.1\r\n" + "a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level\r\n" + "a=extmap:2 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time\r\n" + "a=extmap:3 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01\r\n" + "a=extmap:4 urn:ietf:params:rtp-hdrext:sdes:mid\r\n" + "a=extmap:5 urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id\r\n" + "a=extmap:6 urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id\r\n" + "a=msid:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC 2de0f1b0-3a39-450e-9804-8305ec87452b\r\n" + "a=ssrc:677770262 cname:NMNDwVd66x2SfiO0\r\n" + "a=ssrc:677770262 msid:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC 2de0f1b0-3a39-450e-9804-8305ec87452b\r\n" + "a=ssrc:677770262 mslabel:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC\r\n" + "a=ssrc:677770262 label:2de0f1b0-3a39-450e-9804-8305ec87452b\r\n" + "a=mid:0\r\n" + "a=rtpmap:111 opus/48000/2\r\n" + "a=fmtp:111 minptime=10;useinbandfec=1\r\n" + "a=rtcp-fb:111 transport-cc\r\n" + "a=sendonly\r\n" + "a=rtcp-mux\r\n" + "a=setup:actpass\r\n" + "a=fingerprint:sha-256 .{95}\r\n" + "a=ice-ufrag:.{8}\r\n" + "a=ice-pwd:.{26}\r\n" + "a=ice-options:trickle\r\n" + "a=candidate:.{16} 1 UDP 2130706431 203.0.113.1 \d+ typ host\r\n" + "a=end-of-candidates\r\n" + "m=video \d+ UDP/TLS/RTP/SAVPF 96\r\n" + "c=IN IP4 203.0.113.1\r\n" + "a=extmap:14 urn:ietf:params:rtp-hdrext:toffset\r\n" + "a=extmap:2 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time\r\n" + "a=extmap:13 urn:3gpp:video-orientation\r\n" + "a=extmap:3 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01\r\n" + "a=extmap:12 http://www.webrtc.org/experiments/rtp-hdrext/playout-delay\r\n" + "a=extmap:11 http://www.webrtc.org/experiments/rtp-hdrext/video-content-type\r\n" + "a=extmap:7 http://www.webrtc.org/experiments/rtp-hdrext/video-timing\r\n" + "a=extmap:8 http://www.webrtc.org/experiments/rtp-hdrext/color-space\r\n" + "a=extmap:4 urn:ietf:params:rtp-hdrext:sdes:mid\r\n" + "a=extmap:5 urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id\r\n" + "a=extmap:6 urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id\r\n" + "a=msid:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC 6d6ec7a7-e3d7-4c82-b03c-45e017713abd\r\n" + "a=rtcp-rsize\r\n" + "a=ssrc-group:FID 3005569364 2001490794\r\n" + "a=ssrc:3005569364 cname:NMNDwVd66x2SfiO0\r\n" + "a=ssrc:3005569364 msid:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC 6d6ec7a7-e3d7-4c82-b03c-45e017713abd\r\n" + "a=ssrc:3005569364 mslabel:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC\r\n" + "a=ssrc:3005569364 label:6d6ec7a7-e3d7-4c82-b03c-45e017713abd\r\n" + "a=ssrc:2001490794 cname:NMNDwVd66x2SfiO0\r\n" + "a=ssrc:2001490794 msid:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC 6d6ec7a7-e3d7-4c82-b03c-45e017713abd\r\n" + "a=ssrc:2001490794 mslabel:hJifdaJwqEqHxSG0pVbs1DrLAwiHqz7fKlqC\r\n" + "a=ssrc:2001490794 label:6d6ec7a7-e3d7-4c82-b03c-45e017713abd\r\n" + "a=mid:1\r\n" + "a=rtpmap:96 VP8/90000\r\n" + "a=rtcp-fb:96 goog-remb\r\n" + "a=rtcp-fb:96 transport-cc\r\n" + "a=rtcp-fb:96 ccm fir\r\n" + "a=rtcp-fb:96 nack\r\n" + "a=rtcp-fb:96 nack pli\r\n" + "a=sendonly\r\n" + "a=rtcp-mux\r\n" + "a=setup:actpass\r\n" + "a=fingerprint:sha-256 .{95}\r\n" + "a=ice-ufrag:.{8}\r\n" + "a=ice-pwd:.{26}\r\n" + "a=ice-options:trickle\r\n" + "a=candidate:.{16} 1 UDP 2130706431 203.0.113.1 \d+ typ host\r\n" + "a=end-of-candidates\r\n$", + re.DOTALL, + ), + ) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": sub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "attached", + "room": room, + "id": feed, + }, + }, + "jsep": {"type": "offer", "sdp": sdp}, + }, + ) + + self.destroyVideoroom(token, session, control_handle, room) + + def testVideoroomICE(self): + (token, session, control_handle, room) = self.startVideoroom() + + pub_handle = self.createHandle(token, session) + self.assertNotEqual(pub_handle, control_handle) + + feed = self.createPublisher(token, session, room, pub_handle) + self.assertNotEqual(feed, control_handle) + + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "configure", + "room": room, + "feed": feed, + "data": False, + "audio": True, + "video": True, + }, + "jsep": { + "type": "offer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.2\r\n" + "c=IN IP4 0.0.0.0\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 9 RTP/AVP 8 0 96\r\n" + "a=mid:audio\r\n" + "a=rtpmap:96 opus/48000\r\n" + "a=ice-ufrag:62lL\r\n" + "a=ice-pwd:WD1pLdamJOWH2WuEBb0vjyZr\r\n" + "a=ice-options:trickle\r\n" + "a=rtcp-mux\r\n" + "a=sendonly\r\n" + ), + }, + "handle_id": pub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the event notification + self._eventloop.run_until_complete(testIJanus(self)) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + self.assertRegex( + sdp, + re.compile( + "^v=0\r\n" + "o=- \d+ \d+ IN IP4 203.0.113.1\r\n" + "s=rtpengine.*?\r\n" + "t=0 0\r\n" + "m=audio \d+ RTP/AVP 8\r\n" + "c=IN IP4 203.0.113.1\r\n" + "a=mid:audio\r\n" + "a=rtpmap:8 PCMA/8000\r\n" + "a=recvonly\r\n" + "a=rtcp:\d+\r\n" + "a=rtcp-mux\r\n" + "a=ice-ufrag:.{8}\r\n" + "a=ice-pwd:.{26}\r\n" + "a=ice-options:trickle\r\n" + "a=candidate:.{16} 1 UDP 2130706431 203.0.113.1 \d+ typ host\r\n" + "a=end-of-candidates\r\n$", + re.DOTALL, + ), + ) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": pub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "configured": "ok", + "audio_codec": "PCMA", + }, + }, + "jsep": {"type": "answer", "sdp": sdp}, + }, + ) + + pub_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + pub_sock.settimeout(1) + pub_sock.bind(("203.0.113.2", 30000)) + + # trickle update + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "trickle", + "candidate": { + "candidate": "candidate:3279615273 1 udp 2113937151 203.0.113.2 30000 typ host generation 0 ufrag 62lL network-cost 999", + "sdpMid": "audio", + }, + "handle_id": pub_handle, + "session_id": session, + "token": token, + }, + ) + ) + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + + m = pub_sock.recv(1000) + self.assertRegex( + m, + re.compile( + b"^\x00\x01\x00.\x21\x12\xa4\x42(............)\x80\x22\x00.rtpengine.*?\x00\x06\x00\x0d62lL:(........)\x00\x00\x00\x80\\\x29\x00\x08........\x00\\\x24\x00\x04\x6e\xff\xff\xff\x00\x08\x00\x14....................\x80\\\x28\x00\x04....$", + re.DOTALL, + ), + ) + + sub_handle = self.createHandle(token, session) + self.assertNotEqual(sub_handle, pub_handle) + self.assertNotEqual(sub_handle, control_handle) + + # subscriber #1 joins publisher #1 + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "join", + "ptype": "subscriber", + "room": room, + "feed": feed, + }, + "handle_id": sub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the attached event + self._eventloop.run_until_complete(testIJanus(self)) + self.assertEqual(feed, self._res["plugindata"]["data"]["id"]) + self.assertNotEqual(feed, control_handle) + self.assertNotEqual(feed, session) + self.assertNotEqual(feed, room) + self.assertNotEqual(feed, pub_handle) + self.assertNotEqual(feed, sub_handle) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + self.assertRegex( + sdp, + re.compile( + "^v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.2\r\n" + "c=IN IP4 203.0.113.1\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio \d+ UDP/TLS/RTP/SAVPF 8\r\n" + "a=mid:audio\r\n" + "a=rtpmap:8 PCMA/8000\r\n" + "a=sendonly\r\n" + "a=rtcp-mux\r\n" + "a=setup:actpass\r\n" + "a=fingerprint:sha-256 .{95}\r\n" + "a=ice-ufrag:.{8}\r\n" + "a=ice-pwd:.{26}\r\n" + "a=ice-options:trickle\r\n" + "a=candidate:.{16} 1 UDP 2130706431 203.0.113.1 \d+ typ host\r\n" + "a=end-of-candidates\r\n$", + re.DOTALL, + ), + ) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": sub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "attached", + "room": room, + "id": feed, + }, + }, + "jsep": {"type": "offer", "sdp": sdp}, + }, + ) + + # subscriber #1 answer + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": {"request": "start", "room": room, "feed": feed}, + "jsep": { + "type": "answer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.2\r\n" + "c=IN IP4 0.0.0.0\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 9 RTP/AVP 8\r\n" + "a=mid:audio\r\n" + "a=ice-ufrag:abcd\r\n" + "a=ice-pwd:WD1pLsdgsdfsdWuEBb0vjyZr\r\n" + "a=ice-options:trickle\r\n" + "a=rtcp-mux\r\n" + "a=recvonly\r\n" + ), + }, + "handle_id": sub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the attached event + self._eventloop.run_until_complete(testIJanus(self)) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": sub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "started": "ok", + "room": room, + }, + }, + }, + ) + + sub_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sub_sock.settimeout(1) + sub_sock.bind(("203.0.113.2", 30002)) + + # trickle update + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "trickle", + "candidate": { + "candidate": "candidate:3fgsdfs273 1 udp 2113937151 203.0.113.2 30002 typ host generation 0", + "sdpMid": "audio", + "usernameFragment": "abcd", + }, + "handle_id": sub_handle, + "session_id": session, + "token": token, + }, + ) + ) + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + + m = sub_sock.recv(1000) + self.assertRegex( + m, + re.compile( + b"^\x00\x01\x00.\x21\x12\xa4\x42(............)\x80\x22\x00.rtpengine.*?\x00\x06\x00\x0dabcd:(........)\x00\x00\x00\x80\\\x29\x00\x08........\x00\\\x24\x00\x04\x6e\xff\xff\xff\x00\x08\x00\x14....................\x80\\\x28\x00\x04....$", + re.DOTALL, + ), + ) + + # TCP trickle test + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "trickle", + "candidate": { + "candidate": "candidate:6 2 TCP 2105393406 2607:fea8:ab00:33::9f4 9 typ host tcptype active", + "sdpMid": "audio", + "usernameFragment": "abcd", + }, + "handle_id": sub_handle, + "session_id": session, + "token": token, + }, + ) + ) + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + + self.destroyVideoroom(token, session, control_handle, room) + pub_sock.close() + sub_sock.close() + + def testVideoroomPubSub(self): + (token, session, control_handle, room) = self.startVideoroom() + + # XXX add tests for requests for invalid IDs/handles + + handle_p_1 = self.createHandle(token, session) + self.assertNotEqual(handle_p_1, control_handle) + + # create feed for publisher #1 + feed_1 = self.createPublisher(token, session, room, handle_p_1) + self.assertNotEqual(feed_1, control_handle) + + # configure publisher feed #1 w broken SDP + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "configure", + "room": room, + "feed": feed_1, + "data": False, + "audio": True, + "video": True, + }, + "jsep": { + "type": "offer", + "sdp": "blah", + }, + "handle_id": handle_p_1, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the event notification + self._eventloop.run_until_complete(testIJanus(self)) + self.assertEqual( + self._res, + { + "janus": "error", + "session_id": session, + "sender": handle_p_1, + "error": {"code": 512, "reason": "Failed to parse SDP"}, + "plugindata": {"plugin": "janus.plugin.videoroom", "data": {}}, + }, + ) + + # configure publisher feed #1 + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "configure", + "room": room, + "feed": feed_1, + "data": False, + "audio": True, + "video": True, + }, + "jsep": { + "type": "offer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.2\r\n" + "c=IN IP4 203.0.113.2\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 6000 RTP/AVP 96 8 0\r\n" + "a=rtpmap:96 opus/48000\r\n" + "a=sendonly\r\n" + ), + }, + "handle_id": handle_p_1, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the event notification + self._eventloop.run_until_complete(testIJanus(self)) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + # XXX check SDP + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": handle_p_1, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "configured": "ok", + "audio_codec": "opus", + }, + }, + "jsep": {"type": "answer", "sdp": sdp}, + }, + ) + + # attach subscriber handle #1 + handle_s_1 = self.createHandle(token, session) + self.assertNotEqual(handle_s_1, control_handle) + + # subscriber #1 joins publisher #1 + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "join", + "ptype": "subscriber", + "room": room, + "feed": feed_1, + }, + "handle_id": handle_s_1, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the attached event + self._eventloop.run_until_complete(testIJanus(self)) + self.assertEqual(feed_1, self._res["plugindata"]["data"]["id"]) + self.assertNotEqual(feed_1, control_handle) + self.assertNotEqual(feed_1, session) + self.assertNotEqual(feed_1, room) + self.assertNotEqual(feed_1, handle_p_1) + self.assertNotEqual(feed_1, handle_s_1) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + # XXX check SDP + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": handle_s_1, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "attached", + "room": room, + "id": feed_1, + }, + }, + "jsep": {"type": "offer", "sdp": sdp}, + }, + ) + + # subscriber #1 answer + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": {"request": "start", "room": room, "feed": feed_1}, + "jsep": { + "type": "answer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.2\r\n" + "c=IN IP4 203.0.113.2\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 7000 RTP/AVP 96\r\n" + "a=rtpmap:96 opus/48000\r\n" + "a=recvonly\r\n" + ), + }, + "handle_id": handle_s_1, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the attached event + self._eventloop.run_until_complete(testIJanus(self)) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": handle_s_1, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "started": "ok", + "room": room, + }, + }, + }, + ) + + handle_p_2 = self.createHandle(token, session) + self.assertNotEqual(handle_p_2, control_handle) + + feed_2 = self.createPublisher( + token, session, room, handle_p_2, [{"id": feed_1}] + ) + + # configure publisher feed #2 + self._eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "configure", + "room": room, + "feed": feed_2, + "data": False, + "audio": True, + "video": True, + }, + "jsep": { + "type": "offer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.2\r\n" + "c=IN IP4 0.0.0.0\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 9 RTP/AVP 8 0\r\n" + "a=mid:audio\r\n" + "a=rtpmap:96 opus/48000\r\n" + "a=sendonly\r\n" + ), + }, + "handle_id": handle_p_2, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + + # followed by the notification for publisher #1 + self._eventloop.run_until_complete(testIJson(self)) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": handle_p_1, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "publishers": [{"id": feed_2}], + }, + }, + }, + ) + + # followed by the "ok" event for publisher #2 + self._eventloop.run_until_complete(testIJanus(self)) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + # XXX check SDP + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": handle_p_2, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "configured": "ok", + "audio_codec": "PCMA", + }, + }, + "jsep": {"type": "answer", "sdp": sdp}, + }, + ) + + # detach publisher #1 + self._eventloop.run_until_complete( + testOJanus( + self, + { + "janus": "detach", + "handle_id": handle_p_1, + "session_id": session, + "token": token, + }, + ) + ) + # unpublished event is received first + self._eventloop.run_until_complete(testIJson(self)) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": handle_p_2, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "unpublished": feed_1, + }, + }, + }, + ) + # followed by leaving event is received first + self._eventloop.run_until_complete(testIJson(self)) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": handle_p_2, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "leaving": feed_1, + }, + }, + }, + ) + # and finally the success + self._eventloop.run_until_complete(testIJanus(self)) + self.assertEqual( + self._res, + { + "janus": "success", + "session_id": session, + "sender": handle_p_1, + }, + ) + + self.destroyVideoroom(token, session, control_handle, room) + + +if __name__ == "__main__": + so = tempfile.NamedTemporaryFile(mode="wb", delete=False) + se = tempfile.NamedTemporaryFile(mode="wb", delete=False) + os.environ["GLIB_SLICE"] = "debug-blocks" + proc = subprocess.Popen( + [ + os.environ.get("RTPE_BIN"), + "--config-file=none", + "-t", + "-1", + "-i", + "203.0.113.1", + "-f", + "-L", + "7", + "-E", + "--listen-http=localhost:9191", + "--janus-secret=dfgdfgdvgLyATjHPvckg", + "--delete-delay=0", + ], + stdout=so, + stderr=se, + ) + + code = 255 + + try: + unittest.main() + code = 0 + except SystemExit as e: + if e.code == 0: + code = 0 + else: + code = e.code + traceback.print_exc() + except: + traceback.print_exc() + + proc.terminate() + proc.wait() + + so.close() + se.close() + + if code == 0: + os.unlink(so.name) + os.unlink(se.name) + else: + print(f"HINT: Stdout and stderr are {so.name} and {se.name}") + sys.exit(code) diff --git a/t/test-transcode.c b/t/test-transcode.c index 03422553e..75898895b 100644 --- a/t/test-transcode.c +++ b/t/test-transcode.c @@ -9,6 +9,7 @@ int _log_facility_rtcp; int _log_facility_cdr; int _log_facility_dtmf; struct rtpengine_config rtpe_config; +struct rtpengine_config initial_rtpe_config; struct poller *rtpe_poller; struct poller_map *rtpe_poller_map; GString *dtmf_logs;