Browse Source

MT#55897 Document general objects in RTPEgnine code (part 1)

Based on the information gotten from Richard Fuchs
document the main objects in the code, to let the code be more
understandable for other code readers.

Mainly documented:
- call
- call_monologue
- call_subscription
- call_media
- packet_stream
- stream_fd
- sink_handler
- rtpe_callhash / rtpe_callhash_lock

Change-Id: I0cf122bea2d9c3f198b48da134a70301564ff1f9
pull/1577/head
Donat Zenichev 3 years ago
parent
commit
eb9aae1ddb
4 changed files with 199 additions and 55 deletions
  1. +15
    -1
      daemon/call.c
  2. +17
    -3
      daemon/media_socket.c
  3. +133
    -51
      include/call.h
  4. +34
    -0
      include/media_socket.h

+ 15
- 1
daemon/call.c View File

@ -1379,6 +1379,11 @@ void free_sink_handler(void *p) {
struct sink_handler *sh = p;
g_slice_free1(sizeof(*sh), sh);
}
/**
* A transfer of flags from the subscription (call_subscription) to the sink handlers (sink_handler) is done
* using the __init_streams() through __add_sink_handler().
*/
void __add_sink_handler(GQueue *q, struct packet_stream *sink, const struct sink_attrs *attrs) {
struct sink_handler *sh = g_slice_alloc0(sizeof(*sh));
sh->sink = sink;
@ -1397,6 +1402,7 @@ static void __reset_streams(struct call_media *media) {
g_queue_clear_full(&ps->rtp_mirrors, free_sink_handler);
}
}
// called once on media A for each sink media B
// B can be NULL
// attrs can be NULL
@ -3878,7 +3884,15 @@ restart:
return c;
}
/* returns call with master_lock held in W, or NULL if not found */
/** returns call with master_lock held in W, or NULL if not found
*
* The lookup of a call is performed via its call-ID.
* A reference to the call object is returned with
* the reference-count increased by one.
*
* Therefore the code must use obj_put() on the call after call_get()
* and after it's done operating on the object.
*/
struct call *call_get(const str *callid) {
struct call *ret;


+ 17
- 3
daemon/media_socket.c View File

@ -1166,8 +1166,12 @@ static void reset_ps_kernel_stats(struct packet_stream *ps) {
}
/* called with in_lock held */
// sink_handler can be NULL
/**
* The linkage between userspace and kernel module is in the kernelize_one().
*
* Called with in_lock held.
* sink_handler can be NULL.
*/
static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *outputs,
struct packet_stream *stream, struct sink_handler *sink_handler, GQueue *sinks,
GList **payload_types)
@ -2334,7 +2338,17 @@ static void count_stream_stats_userspace(struct packet_stream *ps) {
}
/* called lock-free */
/**
* Packet handling starts in stream_packet().
*
* This operates on the originating stream_fd (fd which received the packet)
* and on its linked packet_stream.
*
* Eventually proceeds to going through the list of sinks,
* either rtp_sinks or rtcp_sinks (egress handling).
*
* called lock-free.
*/
static int stream_packet(struct packet_handler_ctx *phc) {
/**
* Incoming packets:


+ 133
- 51
include/call.h View File

@ -308,12 +308,27 @@ struct loop_protector {
/**
* The packet_stream itself can be marked as:
* - SRTP endpoint
* - ICE endpoint
* - send/receive-only
*
* This is done through the various bit flags.
*/
struct packet_stream {
mutex_t in_lock,
out_lock;
/* Both locks valid only with call->master_lock held in R.
* Preempted by call->master_lock held in W.
* If both in/out are to be locked, in_lock must be locked first. */
* If both in/out are to be locked, in_lock must be locked first.
*
* The in_lock protects fields relevant to packet reception on that stream,
* meanwhile the out_lock protects fields relevant to packet egress.
*
* This allows packet handling on multiple ports and streams belonging
* to the same call to happen at the same time.
*/
mutex_t in_lock,
out_lock;
struct call_media *media; /* RO */
struct call *call; /* RO */
@ -325,22 +340,22 @@ struct packet_stream {
struct stream_fd * selected_sfd;
endpoint_t last_local_endpoint;
struct dtls_connection ice_dtls; /* LOCK: in_lock */
GQueue rtp_sinks; // LOCK: call->master_lock, in_lock for streamhandler
GQueue rtcp_sinks; // LOCK: call->master_lock, in_lock for streamhandler
GQueue rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
GQueue rtcp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */
GQueue rtp_mirrors; // LOCK: call->master_lock, in_lock for streamhandler
GQueue rtp_mirrors; /* LOCK: call->master_lock, in_lock for streamhandler */
struct endpoint endpoint; /* LOCK: out_lock */
struct endpoint detected_endpoints[4]; /* LOCK: out_lock */
struct timeval ep_detect_signal; /* LOCK: out_lock */
struct endpoint advertised_endpoint; /* RO */
struct endpoint learned_endpoint; /* LOCK: out_lock */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct endpoint detected_endpoints[4]; /* LOCK: out_lock */
struct timeval ep_detect_signal; /* LOCK: out_lock */
struct endpoint advertised_endpoint; /* RO */
struct endpoint learned_endpoint; /* LOCK: out_lock */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */
unsigned int ssrc_in_idx, // LOCK: in_lock
ssrc_out_idx; // LOCK: out_lock
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */
unsigned int ssrc_in_idx, /* LOCK: in_lock */
ssrc_out_idx; /* LOCK: out_lock */
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */
struct stream_stats stats_in;
struct stream_stats stats_out;
@ -348,7 +363,7 @@ struct packet_stream {
struct stream_stats kernel_stats_out;
unsigned char in_tos_tclass;
atomic64 last_packet;
GHashTable *rtp_stats; /* LOCK: call->master_lock */
GHashTable *rtp_stats; /* LOCK: call->master_lock */
struct rtp_stats *rtp_stats_cache;
unsigned int stats_flags;
enum endpoint_learning el_flags;
@ -360,19 +375,24 @@ struct packet_stream {
unsigned int lp_count;
#endif
X509 *dtls_cert; /* LOCK: in_lock */
X509 *dtls_cert; /* LOCK: in_lock */
/* in_lock must be held for SETTING these: */
volatile unsigned int ps_flags;
};
/* protected by call->master_lock, except the RO elements */
/**
* Protected by call->master_lock, except the RO elements.
*
* call_media is not reference-counted and is completely owned by the call object.
* Therefore call_media is released when the call is destroyed.
*/
struct call_media {
struct call_monologue *monologue; /* RO */
struct call *call; /* RO */
struct call_monologue *monologue; /* RO */
struct call *call; /* RO */
unsigned int index; /* RO */
unsigned int unique_id; /* RO */
unsigned int index; /* RO */
unsigned int unique_id; /* RO */
str type;
enum media_type type_id;
str protocol_str;
@ -386,21 +406,21 @@ struct call_media {
str media_id;
str label;
GQueue sdes_in, sdes_out;
struct dtls_fingerprint fingerprint; /* as received */
const struct dtls_hash_func *fp_hash_func; // outgoing
struct dtls_fingerprint fingerprint; /* as received */
const struct dtls_hash_func *fp_hash_func; /* outgoing */
GQueue streams; /* normally RTP + RTCP */
GQueue streams; /* normally RTP + RTCP */
GQueue endpoint_maps;
struct codec_store codecs;
GQueue sdp_attributes; // str_sprintf()
GHashTable *codec_handlers; // int payload type -> struct codec_handler
// XXX combine this with 'codecs' hash table?
GQueue codec_handlers_store; // storage for struct codec_handler
GQueue sdp_attributes; /* str_sprintf() */
GHashTable *codec_handlers; /* int payload type -> struct codec_handler
XXX combine this with 'codecs' hash table? */
GQueue codec_handlers_store; /* storage for struct codec_handler */
struct codec_handler *codec_handler_cache;
struct rtcp_handler *rtcp_handler;
struct rtcp_timer *rtcp_timer; // master lock for scheduling purposes
struct mqtt_timer *mqtt_timer; // master lock for scheduling purposes
struct rtcp_timer *rtcp_timer; /* master lock for scheduling purposes */
struct mqtt_timer *mqtt_timer; /* master lock for scheduling purposes */
//struct codec_handler *dtmf_injector;
struct t38_gateway *t38_gateway;
struct codec_handler *t38_handler;
@ -408,7 +428,7 @@ struct call_media {
unsigned int buffer_delay;
mutex_t dtmf_lock;
unsigned long dtmf_ts; // TS of last processed end event
unsigned long dtmf_ts; /* TS of last processed end event */
// lists are append-only
GQueue dtmf_recv;
GQueue dtmf_send;
@ -417,12 +437,23 @@ struct call_media {
encoder_callback_t encoder_callback;
#endif
int ptime; // either from SDP or overridden
int ptime; /* either from SDP or overridden */
volatile unsigned int media_flags;
};
// link between subscribers and subscriptions
/**
* Link between subscribers and subscriptions.
*
* Contain flags and attributes, which can be used
* to mark a subscription (for example, as an egress subscription).
*
* During signalling events, the list of subscriptions for each call_monologue
* is used to create the list of rtp_sink and rtcp_sink given in each packet_stream.
*
* Each entry in these lists is a sink_handler object, which again contains flags and attributes.
* Flags from a call_subscription are copied into the sink_handler.
*/
struct call_subscription {
struct call_monologue *monologue;
GList *link; // link into the corresponding opposite list
@ -430,28 +461,36 @@ struct call_subscription {
struct sink_attrs attrs;
};
/* half a dialogue */
/* protected by call->master_lock, except the RO elements */
/**
* Half a dialogue.
* Protected by call->master_lock, except the RO elements.
*
* call_monologue (call participant) contains a list of subscribers
* and subscriptions, which are other call_monologue's.
*
* These lists are mutual.
* A regular A/B call has two call_monologue objects with each subscribed to the other.
*/
struct call_monologue {
struct call *call; /* RO */
unsigned int unique_id; /* RO */
struct call *call; /* RO */
unsigned int unique_id; /* RO */
str tag;
str viabranch;
enum tag_type tagtype;
str label;
time_t created; /* RO */
time_t created; /* RO */
time_t deleted;
struct timeval started; /* for CDR */
struct timeval terminated; /* for CDR */
struct timeval started; /* for CDR */
struct timeval terminated; /* for CDR */
enum termination_reason term_reason;
const struct logical_intf *logical_intf;
GHashTable *other_tags;
GHashTable *branches;
GQueue subscriptions; // who am I subscribed to (sources)
GHashTable *subscriptions_ht; // for quick lookup
GQueue subscribers; // who is subscribed to me (sinks)
GHashTable *subscribers_ht; // for quick lookup
GQueue subscriptions; /* who am I subscribed to (sources) */
GHashTable *subscriptions_ht; /* for quick lookup */
GQueue subscribers; /* who is subscribed to me (sinks) */
GHashTable *subscribers_ht; /* for quick lookup */
GQueue medias;
GHashTable *media_ids;
struct media_player *player;
@ -536,23 +575,61 @@ struct call_iterator_entry {
mutex_unlock(&rtpe_call_iterators[__which].lock); \
} while (0)
/**
* stuct call is the main parent structure of all call-related objects.
*
* The logical object hierarchy under the 'stuct call':
* call > call_monologue > call_media > packet_stream > stream_fd
*
* struct call usually has multiple call_monologue objects.
* Meanwhile each sub-object of call, as a parent of own sub-objects,
* can also contain multiple child objects.
*
* Furthermore, each child object contains a back ptr to its parent object.
*
* The parent call object contains one list (as GQueue) for each kind of child object.
* These lists are what is used to free and release the child objects
* during a call teardown.
* Every child object owned by the call is added to its respective list exactly once.
*
* Call object is reference-counted through the struct obj.
*/
struct call {
/* struct obj member must always be the first member in a struct.
*
* obj is created with a cleanup handler, see obj_alloc(),
* and this handler is executed whenever the reference count drops to zero.
*
* References are acquired and released through obj_get() and obj_put()
* (plus some other wrapper functions).
*/
struct obj obj;
mutex_t buffer_lock;
call_buffer_t buffer;
/* everything below protected by master_lock */
/* master_lock protects the entire call and all the contained objects.
*
* All the fields and any nested sub-object must:
* - only be accessed with the master_lock held as a read lock
* - only be modified with the master_lock held as a write lock
*
* Therefore, during signalling events acquire a write-lock,
* and during RTP packets handling acquire a read-lock.
*/
rwlock_t master_lock;
GQueue monologues;
GQueue medias;
/* everything below is protected by the master_lock */
GQueue monologues; /* call_monologue */
GQueue medias; /* call_media */
GHashTable *tags;
GHashTable *viabranches;
GHashTable *labels;
GQueue streams;
GQueue stream_fds;
GQueue stream_fds; /* stream_fd */
GQueue endpoint_maps;
struct dtls_cert *dtls_cert; /* for outgoing */
struct dtls_cert *dtls_cert; /* for outgoing */
struct mqtt_timer *mqtt_timer;
struct janus_session *janus_session;
@ -596,7 +673,12 @@ struct call {
};
/**
* The main entry point into call objects for signalling events is the call-ID:
* Therefore the main entry point is the global hash table rtpe_callhash (protected by rtpe_callhash_lock),
* which uses call-IDs as keys and call objects as values,
* while holding a reference to each contained call.
*/
extern rwlock_t rtpe_callhash_lock;
extern GHashTable *rtpe_callhash;
extern struct call_iterator_list rtpe_call_iterators[NUM_CALL_ITERATORS];


+ 34
- 0
include/media_socket.h View File

@ -116,11 +116,39 @@ struct intf_list {
const struct local_intf *local_intf;
GQueue list;
};
/**
* stream_fd is an entry-point object for RTP packets handling,
* because of that it's also reference-counted.
*
* stream_fd object us only released, when it is removed from the poller
* and also removed from the call object.
*
* Contains an information required for media processing, such as media ports.
*/
struct stream_fd {
/* struct obj member must always be the first member in a struct.
*
* obj is created with a cleanup handler, see obj_alloc(),
* and this handler is executed whenever the reference count drops to zero.
*
* References are acquired and released through obj_get() and obj_put()
* (plus some other wrapper functions).
*/
struct obj obj;
unsigned int unique_id; /* RO */
socket_t socket; /* RO */
const struct local_intf *local_intf; /* RO */
/* stream_fd object holds a reference to the call it belongs to.
* Which in turn holds references to all stream_fd objects it contains,
* what makes these references circular.
*
* The call is only released when it has been dissociated from all stream_fd objects,
* which happens during call teardown.
*/
struct call *call; /* RO */
struct packet_stream *stream; /* LOCK: call->master_lock */
struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */
@ -128,6 +156,7 @@ struct stream_fd {
int error_strikes;
struct poller *poller;
};
struct sink_attrs {
bool block_media;
bool silence_media;
@ -137,6 +166,11 @@ struct sink_attrs {
unsigned int transcoding:1;
unsigned int egress:1;
};
/**
* During actual packet handling and forwarding,
* only the sink_handler objects (and the packet_stream objects they are related to) are used.
*/
struct sink_handler {
struct packet_stream *sink;
const struct streamhandler *handler;


Loading…
Cancel
Save