diff --git a/daemon/call.c b/daemon/call.c index a62c580d3..9584deeb0 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1379,6 +1379,11 @@ void free_sink_handler(void *p) { struct sink_handler *sh = p; g_slice_free1(sizeof(*sh), sh); } + +/** + * A transfer of flags from the subscription (call_subscription) to the sink handlers (sink_handler) is done + * using the __init_streams() through __add_sink_handler(). + */ void __add_sink_handler(GQueue *q, struct packet_stream *sink, const struct sink_attrs *attrs) { struct sink_handler *sh = g_slice_alloc0(sizeof(*sh)); sh->sink = sink; @@ -1397,6 +1402,7 @@ static void __reset_streams(struct call_media *media) { g_queue_clear_full(&ps->rtp_mirrors, free_sink_handler); } } + // called once on media A for each sink media B // B can be NULL // attrs can be NULL @@ -3878,7 +3884,15 @@ restart: return c; } -/* returns call with master_lock held in W, or NULL if not found */ +/** returns call with master_lock held in W, or NULL if not found + * + * The lookup of a call is performed via its call-ID. + * A reference to the call object is returned with + * the reference-count increased by one. + * + * Therefore the code must use obj_put() on the call after call_get() + * and after it's done operating on the object. + */ struct call *call_get(const str *callid) { struct call *ret; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 9f27c6319..dade55a64 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1166,8 +1166,12 @@ static void reset_ps_kernel_stats(struct packet_stream *ps) { } -/* called with in_lock held */ -// sink_handler can be NULL +/** + * The linkage between userspace and kernel module is in the kernelize_one(). + * + * Called with in_lock held. + * sink_handler can be NULL. + */ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *outputs, struct packet_stream *stream, struct sink_handler *sink_handler, GQueue *sinks, GList **payload_types) @@ -2334,7 +2338,17 @@ static void count_stream_stats_userspace(struct packet_stream *ps) { } -/* called lock-free */ +/** + * Packet handling starts in stream_packet(). + * + * This operates on the originating stream_fd (fd which received the packet) + * and on its linked packet_stream. + * + * Eventually proceeds to going through the list of sinks, + * either rtp_sinks or rtcp_sinks (egress handling). + * + * called lock-free. + */ static int stream_packet(struct packet_handler_ctx *phc) { /** * Incoming packets: diff --git a/include/call.h b/include/call.h index 74c62f71d..9782f0232 100644 --- a/include/call.h +++ b/include/call.h @@ -308,12 +308,27 @@ struct loop_protector { +/** + * The packet_stream itself can be marked as: + * - SRTP endpoint + * - ICE endpoint + * - send/receive-only + * + * This is done through the various bit flags. + */ struct packet_stream { - mutex_t in_lock, - out_lock; /* Both locks valid only with call->master_lock held in R. * Preempted by call->master_lock held in W. - * If both in/out are to be locked, in_lock must be locked first. */ + * If both in/out are to be locked, in_lock must be locked first. + * + * The in_lock protects fields relevant to packet reception on that stream, + * meanwhile the out_lock protects fields relevant to packet egress. + * + * This allows packet handling on multiple ports and streams belonging + * to the same call to happen at the same time. + */ + mutex_t in_lock, + out_lock; struct call_media *media; /* RO */ struct call *call; /* RO */ @@ -325,22 +340,22 @@ struct packet_stream { struct stream_fd * selected_sfd; endpoint_t last_local_endpoint; struct dtls_connection ice_dtls; /* LOCK: in_lock */ - GQueue rtp_sinks; // LOCK: call->master_lock, in_lock for streamhandler - GQueue rtcp_sinks; // LOCK: call->master_lock, in_lock for streamhandler + GQueue rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */ + GQueue rtcp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */ struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */ - GQueue rtp_mirrors; // LOCK: call->master_lock, in_lock for streamhandler + GQueue rtp_mirrors; /* LOCK: call->master_lock, in_lock for streamhandler */ struct endpoint endpoint; /* LOCK: out_lock */ - struct endpoint detected_endpoints[4]; /* LOCK: out_lock */ - struct timeval ep_detect_signal; /* LOCK: out_lock */ - struct endpoint advertised_endpoint; /* RO */ - struct endpoint learned_endpoint; /* LOCK: out_lock */ - struct crypto_context crypto; /* OUT direction, LOCK: out_lock */ + struct endpoint detected_endpoints[4]; /* LOCK: out_lock */ + struct timeval ep_detect_signal; /* LOCK: out_lock */ + struct endpoint advertised_endpoint; /* RO */ + struct endpoint learned_endpoint; /* LOCK: out_lock */ + struct crypto_context crypto; /* OUT direction, LOCK: out_lock */ struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */ *ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */ - unsigned int ssrc_in_idx, // LOCK: in_lock - ssrc_out_idx; // LOCK: out_lock - struct send_timer *send_timer; /* RO */ - struct jitter_buffer *jb; /* RO */ + unsigned int ssrc_in_idx, /* LOCK: in_lock */ + ssrc_out_idx; /* LOCK: out_lock */ + struct send_timer *send_timer; /* RO */ + struct jitter_buffer *jb; /* RO */ struct stream_stats stats_in; struct stream_stats stats_out; @@ -348,7 +363,7 @@ struct packet_stream { struct stream_stats kernel_stats_out; unsigned char in_tos_tclass; atomic64 last_packet; - GHashTable *rtp_stats; /* LOCK: call->master_lock */ + GHashTable *rtp_stats; /* LOCK: call->master_lock */ struct rtp_stats *rtp_stats_cache; unsigned int stats_flags; enum endpoint_learning el_flags; @@ -360,19 +375,24 @@ struct packet_stream { unsigned int lp_count; #endif - X509 *dtls_cert; /* LOCK: in_lock */ + X509 *dtls_cert; /* LOCK: in_lock */ /* in_lock must be held for SETTING these: */ volatile unsigned int ps_flags; }; -/* protected by call->master_lock, except the RO elements */ +/** + * Protected by call->master_lock, except the RO elements. + * + * call_media is not reference-counted and is completely owned by the call object. + * Therefore call_media is released when the call is destroyed. + */ struct call_media { - struct call_monologue *monologue; /* RO */ - struct call *call; /* RO */ + struct call_monologue *monologue; /* RO */ + struct call *call; /* RO */ - unsigned int index; /* RO */ - unsigned int unique_id; /* RO */ + unsigned int index; /* RO */ + unsigned int unique_id; /* RO */ str type; enum media_type type_id; str protocol_str; @@ -386,21 +406,21 @@ struct call_media { str media_id; str label; GQueue sdes_in, sdes_out; - struct dtls_fingerprint fingerprint; /* as received */ - const struct dtls_hash_func *fp_hash_func; // outgoing + struct dtls_fingerprint fingerprint; /* as received */ + const struct dtls_hash_func *fp_hash_func; /* outgoing */ - GQueue streams; /* normally RTP + RTCP */ + GQueue streams; /* normally RTP + RTCP */ GQueue endpoint_maps; struct codec_store codecs; - GQueue sdp_attributes; // str_sprintf() - GHashTable *codec_handlers; // int payload type -> struct codec_handler - // XXX combine this with 'codecs' hash table? - GQueue codec_handlers_store; // storage for struct codec_handler + GQueue sdp_attributes; /* str_sprintf() */ + GHashTable *codec_handlers; /* int payload type -> struct codec_handler + XXX combine this with 'codecs' hash table? */ + GQueue codec_handlers_store; /* storage for struct codec_handler */ struct codec_handler *codec_handler_cache; struct rtcp_handler *rtcp_handler; - struct rtcp_timer *rtcp_timer; // master lock for scheduling purposes - struct mqtt_timer *mqtt_timer; // master lock for scheduling purposes + struct rtcp_timer *rtcp_timer; /* master lock for scheduling purposes */ + struct mqtt_timer *mqtt_timer; /* master lock for scheduling purposes */ //struct codec_handler *dtmf_injector; struct t38_gateway *t38_gateway; struct codec_handler *t38_handler; @@ -408,7 +428,7 @@ struct call_media { unsigned int buffer_delay; mutex_t dtmf_lock; - unsigned long dtmf_ts; // TS of last processed end event + unsigned long dtmf_ts; /* TS of last processed end event */ // lists are append-only GQueue dtmf_recv; GQueue dtmf_send; @@ -417,12 +437,23 @@ struct call_media { encoder_callback_t encoder_callback; #endif - int ptime; // either from SDP or overridden + int ptime; /* either from SDP or overridden */ volatile unsigned int media_flags; }; -// link between subscribers and subscriptions +/** + * Link between subscribers and subscriptions. + * + * Contain flags and attributes, which can be used + * to mark a subscription (for example, as an egress subscription). + * + * During signalling events, the list of subscriptions for each call_monologue + * is used to create the list of rtp_sink and rtcp_sink given in each packet_stream. + * + * Each entry in these lists is a sink_handler object, which again contains flags and attributes. + * Flags from a call_subscription are copied into the sink_handler. + */ struct call_subscription { struct call_monologue *monologue; GList *link; // link into the corresponding opposite list @@ -430,28 +461,36 @@ struct call_subscription { struct sink_attrs attrs; }; -/* half a dialogue */ -/* protected by call->master_lock, except the RO elements */ +/** + * Half a dialogue. + * Protected by call->master_lock, except the RO elements. + * + * call_monologue (call participant) contains a list of subscribers + * and subscriptions, which are other call_monologue's. + * + * These lists are mutual. + * A regular A/B call has two call_monologue objects with each subscribed to the other. + */ struct call_monologue { - struct call *call; /* RO */ - unsigned int unique_id; /* RO */ + struct call *call; /* RO */ + unsigned int unique_id; /* RO */ str tag; str viabranch; enum tag_type tagtype; str label; - time_t created; /* RO */ + time_t created; /* RO */ time_t deleted; - struct timeval started; /* for CDR */ - struct timeval terminated; /* for CDR */ + struct timeval started; /* for CDR */ + struct timeval terminated; /* for CDR */ enum termination_reason term_reason; const struct logical_intf *logical_intf; GHashTable *other_tags; GHashTable *branches; - GQueue subscriptions; // who am I subscribed to (sources) - GHashTable *subscriptions_ht; // for quick lookup - GQueue subscribers; // who is subscribed to me (sinks) - GHashTable *subscribers_ht; // for quick lookup + GQueue subscriptions; /* who am I subscribed to (sources) */ + GHashTable *subscriptions_ht; /* for quick lookup */ + GQueue subscribers; /* who is subscribed to me (sinks) */ + GHashTable *subscribers_ht; /* for quick lookup */ GQueue medias; GHashTable *media_ids; struct media_player *player; @@ -536,23 +575,61 @@ struct call_iterator_entry { mutex_unlock(&rtpe_call_iterators[__which].lock); \ } while (0) +/** + * stuct call is the main parent structure of all call-related objects. + * + * The logical object hierarchy under the 'stuct call': + * call > call_monologue > call_media > packet_stream > stream_fd + * + * struct call usually has multiple call_monologue objects. + * Meanwhile each sub-object of call, as a parent of own sub-objects, + * can also contain multiple child objects. + * + * Furthermore, each child object contains a back ptr to its parent object. + * + * The parent call object contains one list (as GQueue) for each kind of child object. + * These lists are what is used to free and release the child objects + * during a call teardown. + * Every child object owned by the call is added to its respective list exactly once. + * + * Call object is reference-counted through the struct obj. + */ struct call { + + /* struct obj member must always be the first member in a struct. + * + * obj is created with a cleanup handler, see obj_alloc(), + * and this handler is executed whenever the reference count drops to zero. + * + * References are acquired and released through obj_get() and obj_put() + * (plus some other wrapper functions). + */ struct obj obj; mutex_t buffer_lock; call_buffer_t buffer; - /* everything below protected by master_lock */ + /* master_lock protects the entire call and all the contained objects. + * + * All the fields and any nested sub-object must: + * - only be accessed with the master_lock held as a read lock + * - only be modified with the master_lock held as a write lock + * + * Therefore, during signalling events acquire a write-lock, + * and during RTP packets handling acquire a read-lock. + */ rwlock_t master_lock; - GQueue monologues; - GQueue medias; + + /* everything below is protected by the master_lock */ + GQueue monologues; /* call_monologue */ + GQueue medias; /* call_media */ GHashTable *tags; GHashTable *viabranches; GHashTable *labels; GQueue streams; - GQueue stream_fds; + GQueue stream_fds; /* stream_fd */ GQueue endpoint_maps; - struct dtls_cert *dtls_cert; /* for outgoing */ + struct dtls_cert *dtls_cert; /* for outgoing */ struct mqtt_timer *mqtt_timer; struct janus_session *janus_session; @@ -596,7 +673,12 @@ struct call { }; - +/** + * The main entry point into call objects for signalling events is the call-ID: + * Therefore the main entry point is the global hash table rtpe_callhash (protected by rtpe_callhash_lock), + * which uses call-IDs as keys and call objects as values, + * while holding a reference to each contained call. + */ extern rwlock_t rtpe_callhash_lock; extern GHashTable *rtpe_callhash; extern struct call_iterator_list rtpe_call_iterators[NUM_CALL_ITERATORS]; diff --git a/include/media_socket.h b/include/media_socket.h index 9b047f1c7..360b75c40 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -116,11 +116,39 @@ struct intf_list { const struct local_intf *local_intf; GQueue list; }; + +/** + * stream_fd is an entry-point object for RTP packets handling, + * because of that it's also reference-counted. + * + * stream_fd object us only released, when it is removed from the poller + * and also removed from the call object. + * + * Contains an information required for media processing, such as media ports. + */ struct stream_fd { + + /* struct obj member must always be the first member in a struct. + * + * obj is created with a cleanup handler, see obj_alloc(), + * and this handler is executed whenever the reference count drops to zero. + * + * References are acquired and released through obj_get() and obj_put() + * (plus some other wrapper functions). + */ struct obj obj; + unsigned int unique_id; /* RO */ socket_t socket; /* RO */ const struct local_intf *local_intf; /* RO */ + + /* stream_fd object holds a reference to the call it belongs to. + * Which in turn holds references to all stream_fd objects it contains, + * what makes these references circular. + * + * The call is only released when it has been dissociated from all stream_fd objects, + * which happens during call teardown. + */ struct call *call; /* RO */ struct packet_stream *stream; /* LOCK: call->master_lock */ struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */ @@ -128,6 +156,7 @@ struct stream_fd { int error_strikes; struct poller *poller; }; + struct sink_attrs { bool block_media; bool silence_media; @@ -137,6 +166,11 @@ struct sink_attrs { unsigned int transcoding:1; unsigned int egress:1; }; + +/** + * During actual packet handling and forwarding, + * only the sink_handler objects (and the packet_stream objects they are related to) are used. + */ struct sink_handler { struct packet_stream *sink; const struct streamhandler *handler;