From 70eb6b9e81616d0ea54e18dab83f9cced4ab405b Mon Sep 17 00:00:00 2001 From: Eric Green Date: Thu, 17 Sep 2015 11:10:05 -0400 Subject: [PATCH] Record raw RTP to recording files on the filesystem. Pass in "record-call" flag over `rtpengine_offer` or `rtpengine_answer` message. RTP Engine tracks files used to record pcaps and send them back in the response message. Pipes call audio (unencrypted from both ends) to recording files. Sets up file descriptors for local files to dump RTP recordings. A file and a file descriptor per monologue in a call. Recorded streams will be running in user daemon mode, not in kernel mode. This removes first 12 octets from packet to record just the rtp. --- daemon/call.c | 29 +++++++++++++++++++++++ daemon/call.h | 11 +++++---- daemon/call_interfaces.c | 16 ++++++++++++- daemon/media_socket.c | 50 +++++++++++++++++++++++++++++++++++----- daemon/media_socket.h | 1 + 5 files changed, 96 insertions(+), 11 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index d00cc5e55..6728e5903 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -2210,6 +2210,11 @@ void call_destroy(struct call *c) { obj_put(sfd); } + while (c->recording_pcaps) { + free(c->recording_pcaps->data); + c->recording_pcaps = g_slist_delete_link(c->recording_pcaps, c->recording_pcaps); + } + rwlock_unlock_w(&c->master_lock); } @@ -2401,6 +2406,27 @@ struct call_monologue *__monologue_create(struct call *call) { ret->call = call; ret->created = poller_now; ret->other_tags = g_hash_table_new(str_hash, str_equal); + if (call->record_call) { + char recording_path[15]; + char logbuf[15]; + /* + * + * create a file descriptor per monologue which can be used for writing rtp to disk + * aka call recording. + */ + + sprintf(recording_path, "/tmp/%d", rand()); + GSList *list = NULL; + call->recording_pcaps = g_slist_prepend(call->recording_pcaps, g_strdup(recording_path)); + ilog(LOG_INFO, "xxegreen: path2 %s", call->recording_pcaps->data); + ilog(LOG_INFO, "XXXECT: Creating new file descriptor for recording at path %s", recording_path); + ret->recording_fd = open(recording_path, O_WRONLY | O_CREAT | O_TRUNC); + sprintf(logbuf, "%d", ret->recording_fd); + ilog(LOG_INFO, "XXXECT: FD created: %s", logbuf); + } else { + ret->recording_fd = -1; + } + g_queue_init(&ret->medias); gettimeofday(&ret->started, NULL); @@ -2474,6 +2500,9 @@ static void __monologue_destroy(struct call_monologue *monologue) { GList *l; call = monologue->call; + /* XXXECT BEGIN */ + close(monologue->recording_fd); + /* XXXECT END */ g_hash_table_remove(call->tags, &monologue->tag); diff --git a/daemon/call.h b/daemon/call.h index 9894f3730..99f04a89e 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -146,7 +146,7 @@ enum call_type { #define PS_FLAG_CONFIRMED 0x00200000 #define PS_FLAG_KERNELIZED 0x00400000 #define PS_FLAG_NO_KERNEL_SUPPORT 0x00800000 -#define PS_FLAG_UNUSED 0x01000000 +#define PS_FLAG_FORCE_DAEMON_MODE 0x01000000 #define PS_FLAG_FINGERPRINT_VERIFIED 0x02000000 #define PS_FLAG_STRICT_SOURCE SHARED_FLAG_STRICT_SOURCE #define PS_FLAG_MEDIA_HANDOVER SHARED_FLAG_MEDIA_HANDOVER @@ -397,7 +397,7 @@ struct call_monologue { enum termination_reason term_reason; GHashTable *other_tags; struct call_monologue *active_dialogue; - + int recording_fd; GQueue medias; }; @@ -413,14 +413,17 @@ struct call { rwlock_t master_lock; GQueue monologues; GQueue medias; - GHashTable *tags; + GHashTable *tags; GHashTable *viabranches; GQueue streams; GQueue stream_fds; GQueue endpoint_maps; struct dtls_cert *dtls_cert; /* for outgoing */ - str callid; + unsigned int record_call; + GSList *recording_pcaps; + + str callid; time_t created; time_t last_signal; time_t deleted; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 0353379bf..fbe23a4b1 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -647,7 +647,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster bencode_item_t *output, enum call_opmode opmode, const char* addr, const endpoint_t *sin) { - str sdp, fromtag, totag = STR_NULL, callid, viabranch; + str sdp, fromtag, totag = STR_NULL, callid, viabranch, recordcall = STR_NULL; char *errstr; GQueue parsed = G_QUEUE_INIT; GQueue streams = G_QUEUE_INIT; @@ -670,6 +670,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster str_swap(&totag, &fromtag); } bencode_dictionary_get_str(input, "via-branch", &viabranch); + bencode_dictionary_get_str(input, "record-call", &recordcall); if (sdp_parse(&sdp, &parsed)) return "Failed to parse SDP"; @@ -686,6 +687,12 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster if (!call) goto out; + if (recordcall.s && !str_cmp(&recordcall, "yes")) { + call->record_call = 1; + } else { + call->record_call = 0; + } + if (!call->created_from && addr) { call->created_from = call_strdup(call, addr); call->created_from_addr = sin->address; @@ -733,6 +740,13 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster bencode_dictionary_add_iovec(output, "sdp", &g_array_index(chopper->iov, struct iovec, 0), chopper->iov_num, chopper->str_len); bencode_dictionary_add_string(output, "result", "ok"); + bencode_item_t *recordings = bencode_dictionary_add_list(output, "recordings"); + GList *l; + char *recording_path; + for (l = call->recording_pcaps; l; l = l->next) { + ilog(LOG_INFO, "xxegreen: Recording path %s", l->data); + bencode_list_add_string(recordings, l->data); + } errstr = NULL; out: diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 8a5cc27cd..78a566134 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -237,7 +237,7 @@ static int has_free_ports_loc(struct local_intf *loc, unsigned int num_ports) { ilog(LOG_ERR, "has_free_ports_loc - NULL local interface"); return 0; } - + if (num_ports > loc->spec->port_pool.free_ports) { ilog(LOG_ERR, "Didn't found %d ports available for %.*s/%s", num_ports, loc->logical->name.len, loc->logical->name.s, @@ -297,7 +297,7 @@ static int has_free_ports_log_all(struct logical_intf *log, unsigned int num_por return 1; } -/* run round-robin-calls algorithm */ +/* run round-robin-calls algorithm */ static struct logical_intf* run_round_robin_calls(GQueue *q, unsigned int num_ports) { struct logical_intf *log = NULL; volatile unsigned int nr_tries = 0; @@ -340,7 +340,7 @@ select_log: // 2 streams => 4 x get_logical_interface calls at offer selection_count ++; if (selection_count % (num_ports / 2) == 0) { - selection_count = 0; + selection_count = 0; selection_index ++; selection_index = selection_index % nr_logs; } @@ -834,6 +834,8 @@ void kernelize(struct packet_stream *stream) { if (PS_ISSET(stream, KERNELIZED)) return; + if (PS_ISSET(stream, FORCE_DAEMON_MODE)) + return; if (cm->conf.kernelid < 0) goto no_kernel; nk_warn_msg = "interface to kernel module not open"; @@ -938,6 +940,8 @@ void __unkernelize(struct packet_stream *p) { return; if (PS_ISSET(p, NO_KERNEL_SUPPORT)) return; + if (PS_ISSET(p, FORCE_DAEMON_MODE)) + return; if (p->call->callmaster->conf.kernelfd >= 0) { __re_address_translate_ep(&rea, &p->selected_sfd->socket.local); @@ -1023,6 +1027,8 @@ static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, int ret = 0, update = 0, stun_ret = 0, handler_ret = 0, muxed_rtcp = 0, rtcp = 0, unk = 0; int i; + // XXEGREEN... This makes me nervous. + int recording_fd = sfd->stream->media->monologue->recording_fd; struct call *call; struct callmaster *cm; /*unsigned char cc;*/ @@ -1142,7 +1148,6 @@ loop_ok: } } - /* do we have somewhere to forward it to? */ if (!sink || !sink->selected_sfd || !out_srtp->selected_sfd || !in_srtp->selected_sfd) { @@ -1172,8 +1177,24 @@ loop_ok: /* return values are: 0 = forward packet, -1 = error/dont forward, * 1 = forward and push update to redis */ - if (rwf_in) + if (rwf_in) { handler_ret = rwf_in(s, in_srtp); + ilog(LOG_INFO, "xxegreen peer address as %s", endpoint_print_buf(fsin)); + } + // This might be the hook that rfuchs might be referring to + // ilog(LOG_WARNING, "xxegreen0: %s", s->s); + // EGREEN: This is working pretty nicely but we need to remove the first 12 bytes from each packet that it is dumping + if (recording_fd && recording_fd != -1) { + // I am aware that we need to do better and that this is a naive approach + int writelen = (s->len)-12; + char towrite[writelen]; + memcpy(towrite, &s->s[12], writelen); + write(recording_fd, towrite, writelen); + + // EGREEN: This is going to happen for every packet. We need to do better + PS_SET(stream, FORCE_DAEMON_MODE); + } + if (handler_ret >= 0) { if (rtcp) parse_and_log_rtcp_report(sfd, s, fsin, tv); @@ -1296,6 +1317,7 @@ kernel_check: kernelize(stream); forward: + // ilog(LOG_INFO, "XXEGREENSTREAM: %s", s->s); if (sink) mutex_lock(&sink->out_lock); @@ -1306,6 +1328,8 @@ forward: || stun_ret || handler_ret < 0) goto drop; + // s is my packet? + ilog(LOG_INFO, "XXEGREEN NOT"); ret = socket_sendto(&sink->selected_sfd->socket, s->s, s->len, &sink->endpoint); __C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&sink->endpoint.address), sink->endpoint.port); @@ -1367,6 +1391,20 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { log_info_stream_fd(sfd); + /* + * I should be able to create a filedescriptor here for each stream and pass it + * to stream_packet. Each file descriptor should then receive one side of the + * rtp for each call. I fully expect some packets to get dropped using this + * naive method. If we are seeing problems in testing we could use a RAM disk. + */ + + // var egreentmp1[15] = "/tmp" + rand(); + // ilog(LOG_INFO, "XXEGREEN: Creting new file descriptor for recording %s", egreentmp1); + // int egreenFD = open(egreentmp1, O_WRONLY | O_CREAT | O_TRUNC); + // char egreentmp[15]; + // sprintf(egreentmp, "%d", egreenFD); + // ilog(LOG_INFO, "XXEGREEN: FD created: %s", egreentmp); + for (iters = 0; ; iters++) { #if MAX_RECV_ITERS if (iters >= MAX_RECV_ITERS) { @@ -1434,7 +1472,7 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct lo sfd->call = obj_get(call); sfd->local_intf = lif; g_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */ - g_slice_free1(sizeof(*fd), fd); /* moved into sfd, thus free */ + //sfd->recording_fd = recording_fd; __C_DBG("stream_fd_new localport=%d", sfd->socket.local.port); diff --git a/daemon/media_socket.h b/daemon/media_socket.h index 44277dfca..78e3453a4 100644 --- a/daemon/media_socket.h +++ b/daemon/media_socket.h @@ -64,6 +64,7 @@ struct stream_fd { struct packet_stream *stream; /* LOCK: call->master_lock */ struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */ struct dtls_connection dtls; /* LOCK: stream->in_lock */ + int recording_fd; /* XXEGREEN file descriptor to record rtp to */ };