diff --git a/README.md b/README.md index e21f94fad..c32600465 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ the following additional features are available: - Transcoding between RFC 2833/4733 DTMF event packets and in-band DTMF tones (and vice versa) - Injection of DTMF events or PCM DTMF tones into running audio streams - Playback of pre-recorded streams/announcements +- Transcoding between T.38 and PCM (G.711 or other audio codecs) *Rtpengine* does not (yet) support: @@ -411,9 +412,6 @@ the necessary conversions. If repacketization (using the `ptime` option) is requested, the transcoding feature will also be engaged for the call, even if no additional codecs were requested. -Non-audio pseudo-codecs (such as T.38) are not currently supported, with the exception of RFC -2833/4733 DTMF event packets (`telephone-event`) as described below. - G.729 support ------------- @@ -464,6 +462,16 @@ Enabling DTMF transcoding (in one of the two ways described above) implicitly en `always transcode` for the call and forces all of the audio to pass through the transcoding engine. Therefore, for performance reasons, this should only be done when really necessary. +T.38 +---- +*Rtpengine* can translate between fax endpoints that speak T.38 over UDPTL and fax endpoints that speak +T.30 over regular audio channels. Any audio codec can theoretically be used for T.30 transmissions, but +codecs that are too compressed will make the fax transmission fail. The most commonly used audio codecs +for fax are the G.711 codecs (`PCMU` and `PCMA`), which are the default codecs *rtpengine* will use in +this case if no other codecs are specified. + +For further information, see the section on the `T.38` dictionary key below. + Call recording ============== @@ -1058,6 +1066,37 @@ Optionally included keys are: who has sent the offer. It will be inserted in the `answer` SDP. This option is also ignored in `answer` messages. +* `T.38` + + Contains a list of strings. Each string is a flag that controls the behaviour regarding + T.38 transcoding. These flags are ignored if the message is not an `offer`. + Recognised flags are: + + - `decode` + + If the received SDP contains a media section with an `image` type, `UDPTL` + transport, and `t38` format string, this flag instructs *rtpengine* to convert + this media section into an `audio` type using RTP as transport protocol. + Other transport protocols (such as SRTP) can be selected using `transport protocol` + as described above. + + The default audio codecs to be offered are `PCMU` and `PCMA`. Other audio codecs + can be specified using the `transcode=` flag described above, in which case the + default codecs will not be offered automatically. + + - `force` + + If the received SDP contains an audio media section using RTP transport, this flag + instructs *rtpengine* to convert it to an `image` type media section using the UDPTL + protocol. The first supported audio codec that was offered will be used to transport + T.30. Default options for T.38 are used for the generated SDP. + + - `stop` + + Stops a currently active T.38 gateway that was previously engaged using the `decode` + or `force` flags. This is useful to handle a rejected T.38 offer and revert the + session back to media passthrough. + * `supports` Contains a list of strings. Each string indicates support for an additional feature diff --git a/daemon/Makefile b/daemon/Makefile index 7d0b11254..d5c83938e 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -127,7 +127,7 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \ media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \ - codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c + codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c t38.c LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c dtmflib.c ifeq ($(with_transcoding),yes) LIBSRCS+= codeclib.c resample.c diff --git a/daemon/call.c b/daemon/call.c index e5becbbc9..7729d4840 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -46,6 +46,7 @@ #include "codec.h" #include "media_player.h" #include "jitter_buffer.h" +#include "t38.h" /* also serves as array index for callstream->peers[] */ @@ -1816,6 +1817,19 @@ static void __update_media_id(struct call_media *media, struct call_media *other } } +static void __t38_reset(struct call_media *media, struct call_media *other_media) { + ilog(LOG_DEBUG, "Stopping T.38 gateway and resetting %s/" STR_FORMAT " to %s/" STR_FORMAT, + media->protocol->name, + STR_FMT(&media->format_str), + other_media->protocol->name, + STR_FMT(&other_media->format_str)); + + media->protocol = other_media->protocol; + media->type_id = other_media->type_id; + call_str_cpy(media->call, &media->type, &other_media->type); + call_str_cpy(media->call, &media->format_str, &other_media->format_str); +} + static void __update_media_protocol(struct call_media *media, struct call_media *other_media, struct stream_params *sp, struct sdp_ng_flags *flags) { @@ -1845,6 +1859,42 @@ static void __update_media_protocol(struct call_media *media, struct call_media if (!flags) return; + // T.38 decoder? + if (other_media->type_id == MT_IMAGE && proto_is(other_media->protocol, PROTO_UDPTL) + && flags->t38_decode) + { + media->protocol = flags->transport_protocol; + if (!media->protocol) + media->protocol = &transport_protocols[PROTO_RTP_AVP]; + media->type_id = MT_AUDIO; + call_str_cpy_c(media->call, &media->type, "audio"); + return; + } + + // T.38 encoder? + if (other_media->type_id == MT_AUDIO && proto_is_rtp(other_media->protocol) + && flags->t38_force) + { + media->protocol = &transport_protocols[PROTO_UDPTL]; + media->type_id = MT_IMAGE; + call_str_cpy_c(media->call, &media->type, "image"); + call_str_cpy_c(media->call, &media->format_str, "t38"); + return; + } + + // previous T.38 gateway but now stopping? + if (flags->t38_stop) { + if (other_media->type_id == MT_AUDIO && proto_is_rtp(other_media->protocol) + && media->type_id == MT_IMAGE + && proto_is(media->protocol, PROTO_UDPTL)) + __t38_reset(media, other_media); + else if (media->type_id == MT_AUDIO && proto_is_rtp(media->protocol) + && other_media->type_id == MT_IMAGE + && proto_is(other_media->protocol, PROTO_UDPTL)) + __t38_reset(media, other_media); + // drop through for protocol override + } + /* allow override of outgoing protocol even if we know it already */ /* but only if this is an RTP-based protocol */ if (flags->transport_protocol @@ -1962,10 +2012,13 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, } if (str_cmp_str(&other_media->format_str, &sp->format_str)) call_str_cpy(call, &other_media->format_str, &sp->format_str); - if (str_cmp_str(&media->format_str, &sp->format_str)) - call_str_cpy(call, &media->format_str, &sp->format_str); + if (str_cmp_str(&media->format_str, &sp->format_str)) { + // update opposite side format string only if protocols match + if (media->protocol == other_media->protocol) + call_str_cpy(call, &media->format_str, &sp->format_str); + } codec_rtp_payload_types(media, other_media, &sp->rtp_payload_types, flags); - codec_handlers_update(media, other_media, flags); + codec_handlers_update(media, other_media, flags, sp); /* send and recv are from our POV */ bf_copy_same(&media->media_flags, &sp->sp_flags, @@ -2064,6 +2117,7 @@ init: ice_update(media->ice_agent, NULL); /* this is in case rtcp-mux has changed */ recording_setup_media(media); + t38_gateway_start(media->t38_gateway); } return 0; @@ -2324,6 +2378,8 @@ no_stats_output: for (l = c->medias.head; l; l = l->next) { md = l->data; ice_shutdown(&md->ice_agent); + t38_gateway_stop(md->t38_gateway); + t38_gateway_put(&md->t38_gateway); } for (l = c->monologues.head; l; l = l->next) { @@ -2411,6 +2467,8 @@ static void __call_free(void *p) { g_queue_clear_full(&md->codecs_prefs_recv, (GDestroyNotify) payload_type_free); g_queue_clear_full(&md->codecs_prefs_send, (GDestroyNotify) payload_type_free); codec_handlers_free(md); + codec_handler_free(&md->t38_handler); + t38_gateway_put(&md->t38_gateway); g_queue_clear_full(&md->sdp_attributes, free); g_slice_free1(sizeof(*md), md); } @@ -2804,6 +2862,16 @@ struct call_monologue *call_get_mono_dialogue(struct call *call, const str *from } + +static void monologue_stop(struct call_monologue *ml) { + media_player_stop(ml->player); + for (GList *l = ml->medias.head; l; l = l->next) { + struct call_media *m = l->data; + t38_gateway_stop(m->t38_gateway); + } +} + + int call_delete_branch(const str *callid, const str *branch, const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay) { @@ -2866,7 +2934,7 @@ do_delete: if (output) ng_call_stats(c, fromtag, totag, output, NULL); - media_player_stop(ml->player); + monologue_stop(ml); if (delete_delay > 0) { ilog(LOG_INFO, "Scheduling deletion of call branch '" STR_FORMAT_M "' " @@ -2887,7 +2955,7 @@ do_delete: del_all: for (i = c->monologues.head; i; i = i->next) { ml = i->data; - media_player_stop(ml->player); + monologue_stop(ml); } if (delete_delay > 0) { diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 92ef5d3aa..e226d3f54 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -548,6 +548,26 @@ INLINE void ng_sdes_option(struct sdp_ng_flags *out, str *s, void *dummy) { } +#ifdef WITH_TRANSCODING +INLINE void ng_t38_option(struct sdp_ng_flags *out, str *s, void *dummy) { + switch (__csh_lookup(s)) { + case CSH_LOOKUP("decode"): + out->t38_decode = 1; + break; + case CSH_LOOKUP("force"): + out->t38_force = 1; + break; + case CSH_LOOKUP("stop"): + out->t38_stop = 1; + break; + default: + ilog(LOG_WARN, "Unknown 'T.38' flag encountered: '" STR_FORMAT "'", + STR_FMT(s)); + } +} +#endif + + static void call_ng_flags_list(struct sdp_ng_flags *out, bencode_item_t *input, const char *key, void (*callback)(struct sdp_ng_flags *, str *, void *), void *parm) { @@ -728,6 +748,10 @@ static void call_ng_flags_flags(struct sdp_ng_flags *out, str *s, void *dummy) { if (call_ng_flags_prefix(out, s, "codec-set-", call_ng_flags_str_ht_split, &out->codec_set)) return; + if (call_ng_flags_prefix(out, s, "T38-", ng_t38_option, NULL)) + return; + if (call_ng_flags_prefix(out, s, "T.38-", ng_t38_option, NULL)) + return; #endif } @@ -809,6 +833,10 @@ static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *inpu call_ng_flags_list(out, input, "rtcp-mux", call_ng_flags_rtcp_mux, NULL); call_ng_flags_list(out, input, "SDES", ng_sdes_option, NULL); +#ifdef WITH_TRANSCODING + call_ng_flags_list(out, input, "T38", ng_t38_option, NULL); + call_ng_flags_list(out, input, "T.38", ng_t38_option, NULL); +#endif bencode_get_alt(input, "transport-protocol", "transport protocol", &out->transport_protocol_str); out->transport_protocol = transport_protocol(&out->transport_protocol_str); @@ -1234,6 +1262,7 @@ static void ng_stats_media(bencode_item_t *list, const struct call_media *m, BF_M("unidirectional", UNIDIRECTIONAL); BF_M("loop check", LOOP_CHECK); BF_M("transcoding", TRANSCODE); + BF_M("generator/sink", GENERATOR); stats: for (l = m->streams.head; l; l = l->next) { diff --git a/daemon/codec.c b/daemon/codec.c index bf0ca16fe..31436c8ab 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -16,6 +16,8 @@ #include "call_interfaces.h" #include "dtmf.h" #include "dtmflib.h" +#include "t38.h" +#include "media_player.h" @@ -25,8 +27,6 @@ static codec_handler_func handler_func_passthrough; static struct rtp_payload_type *__rtp_payload_type_copy(const struct rtp_payload_type *pt); static void __rtp_payload_type_dup(struct call *call, struct rtp_payload_type *pt); static void __rtp_payload_type_add_name(GHashTable *, struct rtp_payload_type *pt); -static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2); -static int packet_decoded_fifo(decoder_t *decoder, AVFrame *frame, void *u1, void *u2); static struct codec_handler codec_handler_stub = { @@ -107,6 +107,7 @@ static codec_handler_func handler_func_transcode; static codec_handler_func handler_func_playback; static codec_handler_func handler_func_inject_dtmf; static codec_handler_func handler_func_dtmf; +static codec_handler_func handler_func_t38; static struct ssrc_entry *__ssrc_handler_transcode_new(void *p); static struct ssrc_entry *__ssrc_handler_new(void *p); @@ -114,6 +115,10 @@ static void __free_ssrc_handler(void *); static void __transcode_packet_free(struct transcode_packet *); +static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2); +static int packet_decoded_fifo(decoder_t *decoder, AVFrame *frame, void *u1, void *u2); +static int packet_decoded_direct(decoder_t *decoder, AVFrame *frame, void *u1, void *u2); + static struct codec_handler codec_handler_stub_ssrc = { .source_pt.payload_type = -1, @@ -150,7 +155,8 @@ void codec_handler_free(struct codec_handler **handler) { static struct codec_handler *__handler_new(const struct rtp_payload_type *pt) { struct codec_handler *handler = g_slice_alloc0(sizeof(*handler)); - handler->source_pt = *pt; + if (pt) + handler->source_pt = *pt; handler->output_handler = handler; // default handler->dtmf_payload_type = -1; handler->packet_encoded = packet_encoded_rtp; @@ -236,14 +242,17 @@ reset: check_output:; // check if we have multiple decoders transcoding to the same output PT - struct codec_handler *output_handler = g_hash_table_lookup(output_transcoders, - GINT_TO_POINTER(dest->payload_type)); + struct codec_handler *output_handler = NULL; + if (output_transcoders) + output_handler = g_hash_table_lookup(output_transcoders, + GINT_TO_POINTER(dest->payload_type)); if (output_handler) { ilog(LOG_DEBUG, "Using existing encoder context"); handler->output_handler = output_handler; } else { - g_hash_table_insert(output_transcoders, GINT_TO_POINTER(dest->payload_type), handler); + if (output_transcoders) + g_hash_table_insert(output_transcoders, GINT_TO_POINTER(dest->payload_type), handler); handler->output_handler = handler; // make sure we don't have a stale pointer } } @@ -528,6 +537,7 @@ static void __check_dtmf_injector(const struct sdp_ng_flags *flags, struct call_ + static struct codec_handler *__get_pt_handler(struct call_media *receiver, struct rtp_payload_type *pt) { ensure_codec_def(pt, receiver); struct codec_handler *handler; @@ -560,13 +570,150 @@ static struct codec_handler *__get_pt_handler(struct call_media *receiver, struc return handler; } + + + +static void __check_t38_decoder(struct call_media *t38_media) { + if (t38_media->t38_handler) + return; + ilog(LOG_DEBUG, "Creating T.38 packet handler"); + t38_media->t38_handler = __handler_new(NULL); + t38_media->t38_handler->func = handler_func_t38; +} + +static int packet_encoded_t38(encoder_t *enc, void *u1, void *u2) { + struct media_packet *mp = u2; + + if (!mp->media) + return 0; + + return t38_gateway_input_samples(mp->media->t38_gateway, + (int16_t *) enc->avpkt.data, enc->avpkt.size / 2); +} + +static void __generator_stop(struct call_media *media) { + if (media->t38_gateway) { + t38_gateway_stop(media->t38_gateway); + t38_gateway_put(&media->t38_gateway); + } +} + +static void __check_t38_gateway(struct call_media *pcm_media, struct call_media *t38_media, + const struct stream_params *sp) +{ + const struct t38_options *t_opts; + struct t38_options t_opts_stor = {0,}; + + if (sp) + t_opts = &sp->t38_options; + else { + // create our own options + t_opts_stor.max_ec_entries = 3; + t_opts = &t_opts_stor; + } + + MEDIA_SET(pcm_media, TRANSCODE); + MEDIA_SET(pcm_media, GENERATOR); + MEDIA_SET(t38_media, TRANSCODE); + MEDIA_SET(t38_media, GENERATOR); + + if (t38_gateway_pair(t38_media, pcm_media, t_opts)) + return; + + // need a packet handler on the T.38 side + __check_t38_decoder(t38_media); + + + // for each codec type supported by the pcm_media, we create a codec handler that + // links to the T.38 encoder + for (GList *l = pcm_media->codecs_prefs_recv.head; l; l = l->next) { + struct rtp_payload_type *pt = l->data; + struct codec_handler *handler = __get_pt_handler(pcm_media, pt); + if (!pt->codec_def) { + // should not happen + ilog(LOG_WARN, "Unsupported codec " STR_FORMAT " for T.38 transcoding", + STR_FMT(&pt->encoding_with_params)); + continue; + } + + ilog(LOG_DEBUG, "Creating T.38 encoder for " STR_FORMAT, STR_FMT(&pt->encoding_with_params)); + + __make_transcoder(handler, &pcm_media->t38_gateway->pcm_pt, NULL, -1, 0); + + handler->packet_decoded = packet_decoded_direct; + handler->packet_encoded = packet_encoded_t38; + } +} + +// call must be locked in W +static int codec_handler_udptl_update(struct call_media *receiver, struct call_media *sink) { + // anything to do? + if (proto_is(sink->protocol, PROTO_UDPTL)) + return 0; + + if (sink->type_id == MT_AUDIO && proto_is_rtp(sink->protocol) && receiver->type_id == MT_IMAGE) { + if (!str_cmp(&receiver->format_str, "t38")) { + __check_t38_gateway(sink, receiver, NULL); + return 1; + } + } + ilog(LOG_WARN, "Unsupported non-RTP protocol: " STR_FORMAT "/" STR_FORMAT + " -> " STR_FORMAT "/" STR_FORMAT, + STR_FMT(&receiver->type), STR_FMT(&receiver->format_str), + STR_FMT(&sink->type), STR_FMT(&sink->format_str)); + return 0; +} + +// call must be locked in W +// for transcoding RTP types to non-RTP +static int codec_handler_non_rtp_update(struct call_media *receiver, struct call_media *sink, + const struct sdp_ng_flags *flags, const struct stream_params *sp) +{ + if (proto_is(sink->protocol, PROTO_UDPTL) && !str_cmp(&sink->format_str, "t38")) { + __check_t38_gateway(receiver, sink, sp); + return 1; + } + ilog(LOG_WARN, "Unsupported non-RTP protocol: " STR_FORMAT "/" STR_FORMAT + " -> " STR_FORMAT "/" STR_FORMAT, + STR_FMT(&receiver->type), STR_FMT(&receiver->format_str), + STR_FMT(&sink->type), STR_FMT(&sink->format_str)); + return 0; +} + + + // call must be locked in W void codec_handlers_update(struct call_media *receiver, struct call_media *sink, - const struct sdp_ng_flags *flags) + const struct sdp_ng_flags *flags, const struct stream_params *sp) { + MEDIA_CLEAR(receiver, GENERATOR); + MEDIA_CLEAR(sink, GENERATOR); + + // non-RTP protocol? + if (proto_is(receiver->protocol, PROTO_UDPTL)) { + if (codec_handler_udptl_update(receiver, sink)) + return; + } + // everything else is unsupported: pass through + if (proto_is_not_rtp(receiver->protocol)) { + __generator_stop(receiver); + __generator_stop(sink); + return; + } + if (!receiver->codec_handlers) receiver->codec_handlers = g_hash_table_new(g_direct_hash, g_direct_equal); + // should we transcode to a non-RTP protocol? + if (proto_is_not_rtp(sink->protocol)) { + if (codec_handler_non_rtp_update(receiver, sink, flags, sp)) + return; + } + + // we're doing some kind of media passthrough - shut down local generators + __generator_stop(receiver); + __generator_stop(sink); + MEDIA_CLEAR(receiver, TRANSCODE); receiver->rtcp_handler = NULL; GSList *passthrough_handlers = NULL; @@ -800,6 +947,11 @@ static struct codec_handler *codec_handler_get_rtp(struct call_media *m, int pay return h; } +static struct codec_handler *codec_handler_get_udptl(struct call_media *m) { + if (m->t38_handler) + return m->t38_handler; + return NULL; +} #endif @@ -814,6 +966,8 @@ struct codec_handler *codec_handler_get(struct call_media *m, int payload_type) if (m->protocol->rtp) ret = codec_handler_get_rtp(m, payload_type); + else if (m->protocol->index == PROTO_UDPTL) + ret = codec_handler_get_udptl(m); out: if (ret) @@ -1179,6 +1333,13 @@ static int handler_func_dtmf(struct codec_handler *h, struct media_packet *mp) { return __handler_func_sequencer(mp, packet); } + +static int handler_func_t38(struct codec_handler *h, struct media_packet *mp) { + if (!mp->media) + return 0; + + return t38_gateway_input_udptl(mp->media->t38_gateway, &mp->raw); +} #endif @@ -1621,6 +1782,9 @@ discard: static int packet_decoded_fifo(decoder_t *decoder, AVFrame *frame, void *u1, void *u2) { return packet_decoded_common(decoder, frame, u1, u2, encoder_input_fifo); } +static int packet_decoded_direct(decoder_t *decoder, AVFrame *frame, void *u1, void *u2) { + return packet_decoded_common(decoder, frame, u1, u2, encoder_input_data); +} static int packet_decode(struct codec_ssrc_handler *ch, struct transcode_packet *packet, struct media_packet *mp) { @@ -1805,6 +1969,10 @@ void __rtp_payload_type_add_recv(struct call_media *media, { if (!pt) return; + if (proto_is_not_rtp(media->protocol)) { + payload_type_free(pt); + return; + } // update ptime in case it was overridden if (media->ptime > 0) pt->ptime = media->ptime; @@ -1818,6 +1986,10 @@ void __rtp_payload_type_add_send(struct call_media *other_media, { if (!pt) return; + if (proto_is_not_rtp(other_media->protocol)) { + payload_type_free(pt); + return; + } // update ptime in case it was overridden if (other_media->ptime > 0) pt->ptime = other_media->ptime; @@ -1829,6 +2001,8 @@ void __rtp_payload_type_add_send(struct call_media *other_media, void __rtp_payload_type_add_send_dup(struct call_media *other_media, struct rtp_payload_type *pt) { + if (proto_is_not_rtp(other_media->protocol)) + return; pt = __rtp_payload_type_copy(pt); __rtp_payload_type_add_send(other_media, pt); } @@ -1900,10 +2074,19 @@ void codec_rtp_payload_types(struct call_media *media, struct call_media *other_ int strip_all = 0, mask_all = 0; // start fresh - // receiving part for 'media' - g_queue_clear_full(&media->codecs_prefs_recv, (GDestroyNotify) payload_type_free); - g_hash_table_remove_all(media->codecs_recv); - g_hash_table_remove_all(media->codec_names_recv); + if (!proto_is_rtp(other_media->protocol) && proto_is_rtp(media->protocol) && flags->opmode == OP_OFFER) { + // leave them alone if incoming proto is not RTP but outgoing is, + // as this is needed for T.38 decoding during a re-invite. + // this special case is only needed in an offer as in the answer + // we can go by media->codecs_prefs_send. + ; + } + else { + // receiving part for 'media' + g_queue_clear_full(&media->codecs_prefs_recv, (GDestroyNotify) payload_type_free); + g_hash_table_remove_all(media->codecs_recv); + g_hash_table_remove_all(media->codec_names_recv); + } // and sending part for 'other_media' g_queue_clear_full(&other_media->codecs_prefs_send, (GDestroyNotify) payload_type_free); g_hash_table_remove_all(other_media->codecs_send); @@ -1990,6 +2173,56 @@ void codec_rtp_payload_types(struct call_media *media, struct call_media *other_ STR_FMT(&pt->encoding_with_params), pt->payload_type); __rtp_payload_type_add_recv(media, pt); } + + if (media->type_id == MT_AUDIO && other_media->type_id == MT_IMAGE) { + if (media->codecs_prefs_recv.length == 0) { + // find some codecs to put into our outgoing SDP body + + if (media->codecs_prefs_send.length && media->t38_gateway + && flags->opmode == OP_ANSWER) + { + // audio -> T.38 transcoder, answer: + // we answer with the codec that we're sending audio with, taken from + // our PCM player + if (media->t38_gateway && media->t38_gateway->pcm_player + && media->t38_gateway->pcm_player->handler) + __rtp_payload_type_add_recv(media, + __rtp_payload_type_copy(&media->t38_gateway->pcm_player->handler->dest_pt)); + } + else if (flags->opmode == OP_OFFER) { + // T.38 -> audio transcoder, initial offer, and no codecs have been given. + // Default to PCMA and PCMU + // XXX can we improve the codec lookup/synthesis? + static const str PCMU_str = STR_CONST_INIT("PCMU"); + static const str PCMA_str = STR_CONST_INIT("PCMA"); + pt = codec_add_payload_type(&PCMU_str, media); + assert(pt != NULL); + __rtp_payload_type_add_recv(media, pt); + pt = codec_add_payload_type(&PCMA_str, media); + assert(pt != NULL); + __rtp_payload_type_add_recv(media, pt); + + ilog(LOG_DEBUG, "Using default codecs PCMU and PCMA for T.38 gateway"); + } + } + else if (flags->opmode == OP_OFFER) { + // re-invite - we remember some codecs from before, or perhaps they + // were added manually through the transcoding options. make sure + // they're all supported by us + + for (GList *l = media->codecs_prefs_recv.head; l;) { + pt = l->data; + ensure_codec_def(pt, media); + if (pt->codec_def) { + l = l->next; + continue; + } + ilog(LOG_DEBUG, "Eliminating unsupported codec " STR_FORMAT, + STR_FMT(&pt->encoding_with_params)); + l = __delete_receiver_codec(media, l); + } + } + } #endif g_hash_table_destroy(removed); diff --git a/daemon/media_player.c b/daemon/media_player.c index 0c4e7b346..91ca9c910 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -25,7 +25,10 @@ #ifdef WITH_TRANSCODING static struct timerthread media_player_thread; static MYSQL __thread *mysql_conn; + +static void media_player_read_packet(struct media_player *mp); #endif + static struct timerthread send_timer_thread; @@ -33,7 +36,6 @@ static struct timerthread send_timer_thread; static void send_timer_send_nolock(struct send_timer *st, struct codec_packet *cp); static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp); -static void media_player_read_packet(struct media_player *mp); @@ -51,6 +53,7 @@ static void media_player_shutdown(struct media_player *mp) { ilog(LOG_DEBUG, "shutting down media_player"); timerthread_obj_deschedule(&mp->tt_obj); + mp->next_run.tv_sec = 0; avformat_close_input(&mp->fmtctx); if (mp->sink) { @@ -222,6 +225,7 @@ int media_player_setup(struct media_player *mp, const struct rtp_payload_type *s struct rtp_payload_type *dst_pt; for (GList *l = mp->media->codecs_prefs_send.head; l; l = l->next) { dst_pt = l->data; + ensure_codec_def(dst_pt, mp->media); if (dst_pt->codec_def && !dst_pt->codec_def->supplemental) goto found; } @@ -241,7 +245,17 @@ found: mp->sync_ts += ts_diff_us * dst_pt->clock_rate / 1000000 / dst_pt->codec_def->clockrate_mult; } - mp->handler = codec_handler_make_playback(src_pt, dst_pt, mp->sync_ts); + // if we already have a handler, see if anything needs changing + if (mp->handler) { + if (rtp_payload_type_cmp(&mp->handler->dest_pt, dst_pt) + || rtp_payload_type_cmp(&mp->handler->source_pt, src_pt)) + { + ilog(LOG_DEBUG, "Resetting codec handler for media player"); + codec_handler_free(&mp->handler); + } + } + if (!mp->handler) + mp->handler = codec_handler_make_playback(src_pt, dst_pt, mp->sync_ts); if (!mp->handler) return -1; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index e6ca24c88..64d94e894 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1030,6 +1030,7 @@ void kernelize(struct packet_stream *stream) { struct packet_stream *sink = NULL; const char *nk_warn_msg; int non_forwarding = 0; + struct call_media *media = stream->media; if (PS_ISSET(stream, KERNELIZED)) return; @@ -1046,9 +1047,11 @@ void kernelize(struct packet_stream *stream) { else goto no_kernel; } + if (MEDIA_ISSET(media, GENERATOR)) + goto no_kernel; if (!stream->selected_sfd) goto no_kernel; - if (stream->media->monologue->block_media || call->block_media) + if (media->monologue->block_media || call->block_media) goto no_kernel; if (!stream->endpoint.address.family) goto no_kernel; @@ -1090,16 +1093,16 @@ void kernelize(struct packet_stream *stream) { __re_address_translate_ep(&reti.local, &stream->selected_sfd->socket.local); reti.tos = call->tos; - reti.rtcp_mux = MEDIA_ISSET(stream->media, RTCP_MUX); - reti.dtls = MEDIA_ISSET(stream->media, DTLS); - reti.stun = stream->media->ice_agent ? 1 : 0; + reti.rtcp_mux = MEDIA_ISSET(media, RTCP_MUX); + reti.dtls = MEDIA_ISSET(media, DTLS); + reti.stun = media->ice_agent ? 1 : 0; reti.non_forwarding = non_forwarding; __re_address_translate_ep(&reti.dst_addr, &sink->endpoint); __re_address_translate_ep(&reti.src_addr, &sink->selected_sfd->socket.local); if (stream->ssrc_in) { reti.ssrc = htonl(stream->ssrc_in->parent->h.ssrc); - if (MEDIA_ISSET(stream->media, TRANSCODE)) { + if (MEDIA_ISSET(media, TRANSCODE)) { reti.ssrc_out = htonl(stream->ssrc_in->ssrc_map_out); reti.transcoding = 1; } @@ -1119,7 +1122,7 @@ void kernelize(struct packet_stream *stream) { ZERO(stream->kernel_stats); - if (proto_is_rtp(stream->media->protocol)) { + if (proto_is_rtp(media->protocol)) { GList *values, *l; struct rtp_stats *rs; @@ -1133,13 +1136,17 @@ void kernelize(struct packet_stream *stream) { } rs = l->data; // only add payload types that are passthrough - struct codec_handler *ch = codec_handler_get(stream->media, rs->payload_type); + struct codec_handler *ch = codec_handler_get(media, rs->payload_type); if (!ch->kernelize) continue; reti.payload_types[reti.num_payload_types++] = rs->payload_type; } g_list_free(values); } + else { + if (MEDIA_ISSET(media, TRANSCODE)) + goto no_kernel; + } recording_stream_kernel_info(stream, &reti); @@ -1813,7 +1820,7 @@ static int stream_packet(struct packet_handler_ctx *phc) { if (G_UNLIKELY(!phc->sink || !phc->sink->selected_sfd || !phc->out_srtp || !phc->out_srtp->selected_sfd || !phc->in_srtp->selected_sfd)) { - ilog(LOG_WARNING, "RTP packet from %s%s%s discarded", FMT_M(endpoint_print_buf(&phc->mp.fsin))); + ilog(LOG_WARNING, "Media packet from %s%s%s discarded", FMT_M(endpoint_print_buf(&phc->mp.fsin))); atomic64_inc(&phc->mp.stream->stats.errors); atomic64_inc(&rtpe_statsps.errors); goto out; diff --git a/daemon/redis.c b/daemon/redis.c index 269e65c36..0b8b303cc 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1452,7 +1452,7 @@ static int json_link_medias(struct call *c, struct redis_list *medias, for (GList *l = other_ml->medias.head; l; l = l->next) { struct call_media *other_m = l->data; if (other_m->index == med->index) { - codec_handlers_update(med, other_m, NULL); + codec_handlers_update(med, other_m, NULL, NULL); break; } } diff --git a/daemon/sdp.c b/daemon/sdp.c index ce7bb355b..1ccbbc8d3 100644 --- a/daemon/sdp.c +++ b/daemon/sdp.c @@ -169,6 +169,31 @@ struct attribute_fmtp { unsigned int payload_type; }; +struct attribute_t38faxratemanagement { + enum { + RM_UNKNOWN = 0, + RM_LOCALTCF, + RM_TRANSFERREDTCF, + } rm; +}; + +struct attribute_t38faxudpec { + enum { + EC_UNKNOWN = 0, + EC_NONE, + EC_REDUNDANCY, + EC_FEC, + } ec; +}; + +struct attribute_t38faxudpecdepth { + str minred_str; + str maxred_str; + + int minred; + int maxred; +}; + struct sdp_attribute { /* example: a=rtpmap:8 PCMA/8000 */ str full_line, /* including a= and \r\n */ line_value, /* without a= and without \r\n */ @@ -204,6 +229,16 @@ struct sdp_attribute { /* example: a=rtpmap:8 PCMA/8000 */ ATTR_RTPENGINE, ATTR_PTIME, ATTR_RTCP_FB, + ATTR_T38FAXVERSION, + ATTR_T38FAXUDPEC, + ATTR_T38FAXUDPECDEPTH, + ATTR_T38FAXUDPFECMAXSPAN, + ATTR_T38FAXMAXDATAGRAM, + ATTR_T38FAXMAXIFP, + ATTR_T38FAXFILLBITREMOVAL, + ATTR_T38FAXTRANSCODINGMMR, + ATTR_T38FAXTRANSCODINGJBIG, + ATTR_T38FAXRATEMANAGEMENT, ATTR_END_OF_CANDIDATES, } attr; @@ -217,6 +252,10 @@ struct sdp_attribute { /* example: a=rtpmap:8 PCMA/8000 */ struct attribute_setup setup; struct attribute_rtpmap rtpmap; struct attribute_fmtp fmtp; + struct attribute_t38faxudpec t38faxudpec; + int i; + struct attribute_t38faxudpecdepth t38faxudpecdepth; + struct attribute_t38faxratemanagement t38faxratemanagement; } u; }; @@ -769,6 +808,71 @@ static int parse_attribute_fmtp(struct sdp_attribute *output) { return 0; } +static int parse_attribute_int(struct sdp_attribute *output, int attr_id, int defval) { + output->attr = attr_id; + output->u.i = str_to_i(&output->value, defval); + return 0; +} + +// XXX combine this with parse_attribute_setup ? +static int parse_attribute_t38faxudpec(struct sdp_attribute *output) { + output->attr = ATTR_T38FAXUDPEC; + + switch (__csh_lookup(&output->value)) { + case CSH_LOOKUP("t38UDPNoEC"): + output->u.t38faxudpec.ec = EC_NONE; + break; + case CSH_LOOKUP("t38UDPRedundancy"): + output->u.t38faxudpec.ec = EC_REDUNDANCY; + break; + case CSH_LOOKUP("t38UDPFEC"): + output->u.t38faxudpec.ec = EC_FEC; + break; + default: + output->u.t38faxudpec.ec = EC_UNKNOWN; + break; + } + + return 0; +} + +// XXX combine this with parse_attribute_setup ? +static int parse_attribute_t38faxratemanagement(struct sdp_attribute *output) { + output->attr = ATTR_T38FAXRATEMANAGEMENT; + + switch (__csh_lookup(&output->value)) { + case CSH_LOOKUP("localTFC"): + output->u.t38faxratemanagement.rm = RM_LOCALTCF; + break; + case CSH_LOOKUP("transferredTCF"): + output->u.t38faxratemanagement.rm = RM_TRANSFERREDTCF; + break; + default: + output->u.t38faxratemanagement.rm = RM_UNKNOWN; + break; + } + + return 0; +} + +static int parse_attribute_t38faxudpecdepth(struct sdp_attribute *output) { + PARSE_DECL; + struct attribute_t38faxudpecdepth *a; + + output->attr = ATTR_T38FAXUDPECDEPTH; + a = &output->u.t38faxudpecdepth; + + PARSE_INIT; + EXTRACT_TOKEN(u.t38faxudpecdepth.minred_str); + a->maxred_str = *value_str; + + a->minred = str_to_i(&a->minred_str, 0); + a->maxred = str_to_i(&a->maxred_str, -1); + + return 0; +} + + static int parse_attribute(struct sdp_attribute *a) { int ret; @@ -873,6 +977,36 @@ static int parse_attribute(struct sdp_attribute *a) { case CSH_LOOKUP("rtcp-fb"): a->attr = ATTR_RTCP_FB; break; + case CSH_LOOKUP("T38FaxVersion"): + ret = parse_attribute_int(a, ATTR_T38FAXVERSION, -1); + break; + case CSH_LOOKUP("T38FaxUdpEC"): + ret = parse_attribute_t38faxudpec(a); + break; + case CSH_LOOKUP("T38FaxUdpECDepth"): + ret = parse_attribute_t38faxudpecdepth(a); + break; + case CSH_LOOKUP("T38FaxUdpFECMaxSpan"): + ret = parse_attribute_int(a, ATTR_T38FAXUDPFECMAXSPAN, 0); + break; + case CSH_LOOKUP("T38FaxMaxDatagram"): + ret = parse_attribute_int(a, ATTR_T38FAXMAXDATAGRAM, -1); + break; + case CSH_LOOKUP("T38FaxMaxIFP"): + ret = parse_attribute_int(a, ATTR_T38FAXMAXIFP, -1); + break; + case CSH_LOOKUP("T38FaxFillBitRemoval"): + a->attr = ATTR_T38FAXFILLBITREMOVAL; + break; + case CSH_LOOKUP("T38FaxTranscodingMMR"): + a->attr = ATTR_T38FAXTRANSCODINGMMR; + break; + case CSH_LOOKUP("T38FaxTranscodingJBIG"): + a->attr = ATTR_T38FAXTRANSCODINGJBIG; + break; + case CSH_LOOKUP("T38FaxRateManagement"): + ret = parse_attribute_t38faxratemanagement(a); + break; } return ret; @@ -1228,6 +1362,63 @@ no_cand: sp->ice_pwd = attr->value; } +static void __sdp_t38(struct stream_params *sp, struct sdp_media *media) { + struct sdp_attribute *attr; + struct t38_options *to = &sp->t38_options; + + attr = attr_get_by_id(&media->attributes, ATTR_T38FAXVERSION); + if (attr) + to->version = attr->u.i; + + attr = attr_get_by_id(&media->attributes, ATTR_T38FAXUDPEC); + if (attr) { + if (attr->u.t38faxudpec.ec == EC_REDUNDANCY) + to->max_ec_entries = to->min_ec_entries = 3; // defaults + else if (attr->u.t38faxudpec.ec == EC_FEC) { + // defaults + to->max_ec_entries = to->min_ec_entries = 3; + to->fec_span = 3; + } + // else default to 0 + } + else // no EC specified, defaults: + to->max_ec_entries = to->min_ec_entries = 3; // defaults + + attr = attr_get_by_id(&media->attributes, ATTR_T38FAXUDPECDEPTH); + if (attr) { + to->min_ec_entries = attr->u.t38faxudpecdepth.minred; + to->max_ec_entries = attr->u.t38faxudpecdepth.maxred; + } + + attr = attr_get_by_id(&media->attributes, ATTR_T38FAXUDPFECMAXSPAN); + if (attr) + to->fec_span = attr->u.i; + + attr = attr_get_by_id(&media->attributes, ATTR_T38FAXMAXDATAGRAM); + if (attr) + to->max_datagram = attr->u.i; + + attr = attr_get_by_id(&media->attributes, ATTR_T38FAXMAXIFP); + if (attr) + to->max_ifp = attr->u.i; + + attr = attr_get_by_id(&media->attributes, ATTR_T38FAXFILLBITREMOVAL); + if (attr) + to->fill_bit_removal = 1; + + attr = attr_get_by_id(&media->attributes, ATTR_T38FAXTRANSCODINGMMR); + if (attr) + to->transcoding_mmr = 1; + + attr = attr_get_by_id(&media->attributes, ATTR_T38FAXTRANSCODINGJBIG); + if (attr) + to->transcoding_jbig = 1; + + attr = attr_get_by_id(&media->attributes, ATTR_T38FAXRATEMANAGEMENT); + if (attr) + to->local_tcf = (attr->u.t38faxratemanagement.rm == RM_LOCALTCF) ? 1 : 0; +} + /* XXX split this function up */ int sdp_streams(const GQueue *sessions, GQueue *streams, struct sdp_ng_flags *flags) { @@ -1340,6 +1531,7 @@ int sdp_streams(const GQueue *sessions, GQueue *streams, struct sdp_ng_flags *fl SP_SET(sp, RTCP_FB); __sdp_ice(sp, media); + __sdp_t38(sp, media); /* determine RTCP endpoint */ @@ -1720,6 +1912,10 @@ static int process_media_attributes(struct sdp_chopper *chop, struct sdp_media * for (l = attrs->list.head; l; l = l->next) { attr = l->data; + // strip all attributes if we're sink and generator - make our own clean SDP + if (MEDIA_ISSET(media, GENERATOR)) + goto strip; + switch (attr->attr) { case ATTR_ICE: case ATTR_ICE_UFRAG: @@ -2145,7 +2341,7 @@ int sdp_replace(struct sdp_chopper *chop, GQueue *sessions, struct call_monologu chopper_append_c(chop, "\r\n"); } - if (call_media->protocol && call_media->protocol->rtp) + if (proto_is_rtp(call_media->protocol)) insert_codec_parameters(chop, call_media); insert_sdp_attributes(chop, call_media); diff --git a/daemon/t38.c b/daemon/t38.c new file mode 100644 index 000000000..689775310 --- /dev/null +++ b/daemon/t38.c @@ -0,0 +1,726 @@ +#include "t38.h" + + + +#ifdef WITH_TRANSCODING + + +#include +#include +#include "codec.h" +#include "call.h" +#include "log.h" +#include "str.h" +#include "media_player.h" + + + +struct udptl_packet { + seq_packet_t p; + str *s; +}; + + + +static void __add_udptl_len(GString *s, const void *buf, unsigned int len) { + if (len < 0x80) { + g_string_append_c(s, len); + if (len) + g_string_append_len(s, buf, len); + return; + } + + if (len < 0x4000) { + uint16_t enc_len = htons(0x8000 | len); + g_string_append_len(s, (void *) &enc_len, 2); + g_string_append_len(s, buf, len); + return; + } + + // fragmented - we don't support more than 65535 bytes + unsigned int mult = len >> 14; + g_string_append_c(s, 0xc0 | mult); + mult <<= 14; + // one portion + g_string_append_len(s, buf, mult); + // remainder - may be zero length + __add_udptl_len(s, buf + mult, len - mult); +} + +static void __add_udptl_raw(GString *s, const char *buf, size_t len) { + assert(len < 0x10000); + + if (len == 0) { + // add a single zero byte, length 1 + __add_udptl_len(s, "\x00", 1); + return; + } + + __add_udptl_len(s, buf, len); +} + +static void __add_udptl(GString *s, const str *buf) { + __add_udptl_raw(s, buf->s, buf->len); +} + + +static void g_string_null_extend(GString *s, size_t len) { + if (s->len >= len) + return; + + size_t oldb = s->len; + size_t newb = len - s->len; + g_string_set_size(s, len); + memset(s->str + oldb, 0, newb); +} + +// call is locked in R or W +static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const uint8_t *b, int len, int count) { + struct t38_gateway *tg = user_data; + + // cap the max length of packet we can handle + if (len < 0 || len >= 0x10000) { + ilog(LOG_ERR, "Received %i bytes from T.38 encoder - discarding", len); + return -1; + } + + ilog(LOG_DEBUG, "Received %i bytes from T.38 encoder", len); + + // build udptl packet: use a conservative guess for required buffer + GString *s = g_string_sized_new(512); + + // add seqnum + uint16_t seq = htons(tg->seqnum); + g_string_append_len(s, (void *) &seq, 2); + + // add primary IFP packet + str buf = STR_CONST_INIT_LEN((char *) b, len); + __add_udptl(s, &buf); + + // add error correction packets + if (tg->options.fec_span > 1) { + // forward error correction + g_string_append_c(s, 0x80); + + // figure out how many packets we have and which span to use + unsigned int packets = tg->options.fec_span * tg->options.max_ec_entries; + if (packets > tg->udptl_ec_out.length) + packets = tg->udptl_ec_out.length; + unsigned int span = packets / tg->options.max_ec_entries; + if (!span) + span = 1; + packets = span * tg->options.max_ec_entries; // our own packets we use + unsigned int entries = packets / span; // FEC entries in the output + if (entries > tg->udptl_ec_out.length) + entries = tg->udptl_ec_out.length; + packets = entries * span; + + assert(span < 0x80); + assert(entries < 0x80); + + g_string_append_c(s, 0x01); + g_string_append_c(s, span); + + // create needed number of FEC packet entries + GQueue fec = G_QUEUE_INIT; + for (int i = 0; i < entries; i++) + g_queue_push_tail(&fec, g_string_new("")); + + // take each input packet, going backwards in time, and XOR it into + // the respective output FEC packet + GList *inp = tg->udptl_ec_out.head; + for (int i = 0; i < packets; i++) { + assert(inp != NULL); + str *ip = inp->data; + // just keep shifting the list around + GString *outp = g_queue_pop_head(&fec); + + // extend string as needed + g_string_null_extend(outp, ip->len); + + for (size_t j = 0; j < ip->len; j++) + outp->str[j] ^= ip->s[j]; + + g_queue_push_tail(&fec, outp); + inp = inp->next; + } + + // output list is now complete, but in reverse. append it to output buffer + GString *ec = g_string_sized_new(512); + entries = 0; + int going = 1; + while (fec.length) { + GString *outp = g_queue_pop_tail(&fec); + if (going) { + if (s->len + ec->len + outp->len > tg->options.max_datagram) + going = 0; + else { + __add_udptl_raw(ec, outp->str, outp->len); + entries++; + } + } + g_string_free(outp, TRUE); + } + + g_string_append_c(s, entries); + g_string_append_len(s, ec->str, ec->len); + g_string_free(ec, TRUE); + } + else { + // redundancy error correction + g_string_append_c(s, 0x00); + + GString *ec = g_string_sized_new(512); + int entries = 0; + + for (GList *l = tg->udptl_ec_out.head; l; l = l->next) { + str *ec_s = l->data; + // stop when we exceed max datagram length + if (s->len + ec->len + ec_s->len > tg->options.max_datagram) + break; + // add redundancy packet + __add_udptl(ec, ec_s); + entries++; + } + + // number of entries - must be <0x80 as verified in settings + g_string_append_c(s, entries); + g_string_append_len(s, ec->str, ec->len); + g_string_free(ec, TRUE); + } + + // done building our packet - add primary to our error correction buffer + tg->seqnum++; + unsigned int q_entries = tg->options.max_ec_entries * tg->options.fec_span; + if (q_entries) { + while (tg->udptl_ec_out.length >= q_entries) { + str *ec_s = g_queue_pop_tail(&tg->udptl_ec_out); + free(ec_s); + } + g_queue_push_head(&tg->udptl_ec_out, str_dup(&buf)); + } + + // send our packet if we can + struct packet_stream *ps = NULL; + if (tg->t38_media && tg->t38_media->streams.head) + ps = tg->t38_media->streams.head->data; + if (ps) + mutex_lock(&ps->out_lock); + struct stream_fd *sfd = NULL; + if (ps) + sfd = ps->selected_sfd; + if (sfd) { + for (int i = 0; i < count; i++) { + ilog(LOG_DEBUG, "Sending %u UDPTL bytes", (unsigned int) s->len); + socket_sendto(&sfd->socket, s->str, s->len, &ps->endpoint); + } + } + else + ilog(LOG_WARN | LOG_FLAG_LIMIT, "Unable to send T.38 UDPTL packet due to lack of " + "socket or stream"); + if (ps) + mutex_unlock(&ps->out_lock); + + g_string_free(s, TRUE); + + return 0; +} + +void __t38_gateway_free(void *p) { + struct t38_gateway *tg = p; + ilog(LOG_DEBUG, "Destroying T.38 gateway"); + if (tg->gw) + t38_gateway_free(tg->gw); + if (tg->pcm_player) { + media_player_stop(tg->pcm_player); + media_player_put(&tg->pcm_player); + } + if (tg->udptl_fec) + g_hash_table_destroy(tg->udptl_fec); + g_queue_clear_full(&tg->udptl_ec_out, free); + packet_sequencer_destroy(&tg->sequencer); +} + +// call is locked in R and mp is locked +static void t38_pcm_player(struct media_player *mp) { + if (!mp || !mp->media) + return; + + struct t38_gateway *tg = mp->media->t38_gateway; + if (!tg) + return; + + ilog(LOG_DEBUG, "Generating T.38 PCM samples"); + + mutex_lock(&tg->lock); + + int16_t smp[80]; + int num = t38_gateway_tx(tg->gw, smp, 80); + if (num <= 0) { + // use a fixed interval of 10 ms + timeval_add_usec(&mp->next_run, 10000); + timerthread_obj_schedule_abs(&mp->tt_obj, &mp->next_run); + mutex_unlock(&tg->lock); + return; + } + + ilog(LOG_DEBUG, "Generated %i T.38 PCM samples", num); + + // this reschedules our player as well + media_player_add_packet(tg->pcm_player, (char *) smp, num * 2, num * 1000000 / 8000, tg->pts); + + tg->pts += num; + + mutex_unlock(&tg->lock); +} + + +static void __udptl_packet_free(struct udptl_packet *p) { + if (p->s) + free(p->s); + g_slice_free1(sizeof(*p), p); +} + + +static void __t38_options_normalise(struct t38_options *opts) { + if (opts->version < 0) + opts->version = 0; + if (opts->fec_span < 1) + opts->fec_span = 1; + if (opts->min_ec_entries < 0) + opts->min_ec_entries = 0; + if (opts->min_ec_entries >= 0x80) + opts->min_ec_entries = 0x7f; + if (opts->max_ec_entries < 0) + opts->max_ec_entries = 0; + if (opts->max_ec_entries >= 0x80) + opts->max_ec_entries = 0x7f; + if (opts->max_ifp <= 0 || opts->max_ifp >= 0x4000) + opts->max_ifp = 0x3fff; + if (opts->max_datagram <= 0 || opts->max_datagram >= 0x4000) + opts->max_datagram = 0x3fff; +} + +// call is locked in W +int t38_gateway_pair(struct call_media *t38_media, struct call_media *pcm_media, + const struct t38_options *options) +{ + const char *err = NULL; + + if (!t38_media || !pcm_media || !options) + return -1; + + struct t38_options opts = *options; + __t38_options_normalise(&opts); + + // do we have one yet? + if (t38_media->t38_gateway + && t38_media->t38_gateway == pcm_media->t38_gateway) + { + // XXX check options here? + return 0; + } + + // release old structs, if any + t38_gateway_put(&t38_media->t38_gateway); + t38_gateway_put(&pcm_media->t38_gateway); + + ilog(LOG_DEBUG, "Creating new T.38 gateway"); + + // create and init new + struct t38_gateway *tg = obj_alloc0("t38_gateway", sizeof(*tg), __t38_gateway_free); + + tg->t38_media = t38_media; + tg->pcm_media = pcm_media; + mutex_init(&tg->lock); + tg->udptl_fec = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, + (GDestroyNotify) __udptl_packet_free); + tg->options = opts; + + tg->pcm_pt.payload_type = -1; + str_init(&tg->pcm_pt.encoding, "PCM-S16LE"); + tg->pcm_pt.encoding_with_params = tg->pcm_pt.encoding; + tg->pcm_pt.clock_rate = 8000; + tg->pcm_pt.channels = 1; + + err = "Failed to init PCM codec"; + ensure_codec_def(&tg->pcm_pt, pcm_media); + if (!tg->pcm_pt.codec_def) + goto err; + + err = "Failed to create spandsp T.38 gateway"; + if (!(tg->gw = t38_gateway_init(NULL, t38_gateway_handler, tg))) + goto err; + + err = "Failed to create media player"; + if (!(tg->pcm_player = media_player_new(pcm_media->monologue))) + goto err; + // even though we call media_player_set_media() here, we need to call it again in + // t38_gateway_start because our sink might not have any streams added here yet, + // leaving the media_player setup incomplete + media_player_set_media(tg->pcm_player, pcm_media); + tg->pcm_player->run_func = t38_pcm_player; + + // set options + t38_core_state_t *t38 = t38_gateway_get_t38_core_state(tg->gw); + t38_gateway_set_ecm_capability(tg->gw, TRUE); + t38_gateway_set_transmit_on_idle(tg->gw, TRUE); + t38_gateway_set_supported_modems(tg->gw, T30_SUPPORT_V17 | T30_SUPPORT_V27TER | T30_SUPPORT_V29 + | T30_SUPPORT_V34HDX | T30_SUPPORT_IAF); + t38_set_t38_version(t38, opts.version); + t38_set_data_rate_management_method(t38, + opts.local_tcf ? 1 : 2); + t38_set_fill_bit_removal(t38, opts.fill_bit_removal); + t38_set_mmr_transcoding(t38, opts.transcoding_mmr); + t38_set_jbig_transcoding(t38, opts.transcoding_jbig); + t38_set_max_datagram_size(t38, opts.max_ifp); + + packet_sequencer_init(&tg->sequencer, (GDestroyNotify) __udptl_packet_free); + tg->sequencer.seq = 0; + + // done - add references to media structs + t38_media->t38_gateway = tg; + pcm_media->t38_gateway = obj_get(tg); + + // add SDP options for T38 + g_queue_clear_full(&t38_media->sdp_attributes, free); + + g_queue_push_tail(&t38_media->sdp_attributes, str_sprintf("T38FaxVersion:%i", tg->options.version)); + g_queue_push_tail(&t38_media->sdp_attributes, str_sprintf("T38MaxBitRate:14400")); + g_queue_push_tail(&t38_media->sdp_attributes, str_sprintf("T38FaxRateManagement:%s", + tg->options.local_tcf ? "localTFC" : "transferredTCF")); + g_queue_push_tail(&t38_media->sdp_attributes, str_sprintf("T38FaxMaxBuffer:1800")); + g_queue_push_tail(&t38_media->sdp_attributes, str_sprintf("T38FaxMaxDatagram:512")); + + if (tg->options.max_ec_entries == 0) + g_queue_push_tail(&t38_media->sdp_attributes, str_sprintf("T38FaxUdpEC:t38UDPNoEC")); + else if (tg->options.fec_span > 1) + g_queue_push_tail(&t38_media->sdp_attributes, str_sprintf("T38FaxUdpEC:t38UDPFEC")); + else + g_queue_push_tail(&t38_media->sdp_attributes, str_sprintf("T38FaxUdpEC:t38UDPRedundancy")); + // XXX more options possible here + + return 0; + +err: + if (err) + ilog(LOG_ERR, "Failed to create T.38 gateway: %s", err); + t38_gateway_put(&tg); + return -1; +} + + +// call is locked in W +void t38_gateway_start(struct t38_gateway *tg) { + if (!tg) + return; + + // set up our player first + media_player_set_media(tg->pcm_player, tg->pcm_media); + if (media_player_setup(tg->pcm_player, &tg->pcm_pt)) + return; + + // now start our player if we can or should + // already running? + if (tg->pcm_player->next_run.tv_sec) + return; + + // only start our player only if we can send both ways + if (!tg->pcm_media->codecs_prefs_send.length) + return; + if (!tg->pcm_media->streams.length) + return; + if (!tg->t38_media->streams.length) + return; + + struct packet_stream *ps; + ps = tg->pcm_media->streams.head->data; + if (!PS_ISSET(ps, FILLED)) + return; + ps = tg->t38_media->streams.head->data; + if (!PS_ISSET(ps, FILLED)) + return; + + ilog(LOG_DEBUG, "Starting T.38 PCM player"); + + // start off PCM player + tg->pcm_player->next_run = rtpe_now; + timerthread_obj_schedule_abs(&tg->pcm_player->tt_obj, &tg->pcm_player->next_run); +} + + +// call is locked in R +int t38_gateway_input_samples(struct t38_gateway *tg, int16_t amp[], int len) { + if (!tg) + return 0; + if (len <= 0) + return 0; + + ilog(LOG_DEBUG, "Adding %i samples to T.38 encoder", len); + + mutex_lock(&tg->lock); + + int left = t38_gateway_rx(tg->gw, amp, len); + if (left) + ilog(LOG_WARN | LOG_FLAG_LIMIT, "%i PCM samples were not processed by the T.38 gateway", + left); + + mutex_unlock(&tg->lock); + + return 0; +} + + +static ssize_t __get_udptl_len(str *s) { + ssize_t ret; + + if (s->len < 1) + return -1; + + if (!(s->s[0] & 0x80)) { + ret = s->s[0]; + str_shift(s, 1); + return ret; + } + + if (s->len < 2) + return -1; + + if (!(s->s[0] & 0x40)) { + ret = ntohs(*((uint16_t *) s->s)) & 0x3fff; + str_shift(s, 2); + return ret; + } + + ilog(LOG_INFO | LOG_FLAG_LIMIT, "Decoding UDPTL fragments is not supported"); + return -1; +} + +static int __get_udptl(str *piece, str *s) { + ssize_t len = __get_udptl_len(s); + if (len < 0) + return -1; + + return str_shift_ret(s, len, piece); +} + + +static struct udptl_packet *__make_udptl_packet(const str *piece, uint16_t seq) { + struct udptl_packet *up = g_slice_alloc0(sizeof(*up)); + up->p.seq = seq; + up->s = str_dup(piece); + return up; +} + +static void __fec_save(struct t38_gateway *tg, const str *piece, uint16_t seq) { + struct udptl_packet *up = __make_udptl_packet(piece, seq); + g_hash_table_insert(tg->udptl_fec, GUINT_TO_POINTER(seq), up); +} + +int t38_gateway_input_udptl(struct t38_gateway *tg, const str *buf) { + const char *err = NULL; + + if (!tg) + return 0; + if (!buf || !buf->len) + return 0; + + if (buf->len < 4) { + ilog(LOG_INFO | LOG_FLAG_LIMIT, "Ignoring short UDPTL packet (%i bytes)", buf->len); + return 0; + } + + ilog(LOG_DEBUG, "Processing %i UDPTL bytes", buf->len); + + str s = *buf; + str piece; + + // get seq num + uint16_t seq; + if (str_shift_ret(&s, 2, &piece)) + goto err; + seq = ntohs(*((uint16_t *) piece.s)); + + err = "Invalid primary UDPTL packet"; + if (__get_udptl(&piece, &s)) + goto err; + + ilog(LOG_DEBUG, "Received primary IFP packet, len %i, seq %i", piece.len, seq); + str primary = piece; + struct udptl_packet *up = __make_udptl_packet(&primary, seq); + + err = "Error correction mode byte missing"; + if (str_shift_ret(&s, 1, &piece)) + goto err; + char fec = piece.s[0]; + + mutex_lock(&tg->lock); + + // XXX possible short path here without going through the sequencer + int ret = packet_sequencer_insert(&tg->sequencer, &up->p); + if (ret < 0) { + // main seq is dupe - everything else must be dupe too + __udptl_packet_free(up); + goto out; + } + + up = NULL; + + if (!(fec & 0x80)) { + // packet redundancy + if (packet_sequencer_next_ok(&tg->sequencer)) + goto seq_ok; + + // process EC packets as well as something's wrong + ssize_t num_packets = __get_udptl_len(&s); + err = "Invalid number of EC packets"; + if (num_packets < 0 || num_packets > 100) + goto err; + for (int i = 0; i < num_packets; i++) { + if (__get_udptl(&piece, &s)) { + ilog(LOG_WARN | LOG_FLAG_LIMIT, + "Invalid UDPTL error correction packet at index %i", + i); + break; + } + // ignore zero-length packets + if (!piece.len) + continue; + ilog(LOG_DEBUG, "Received secondary IFP packet, len %i, seq %i", piece.len, + seq - 1 - i); + up = __make_udptl_packet(&piece, seq - 1 - i); + packet_sequencer_insert(&tg->sequencer, &up->p); + up = NULL; + + // can we stop here? + if (packet_sequencer_next_ok(&tg->sequencer)) + break; + } + } + else { + // FEC + // start by saving the new packet + __fec_save(tg, &primary, seq); + + if (packet_sequencer_next_ok(&tg->sequencer)) + goto seq_ok; + + // process all FEC packets + err = "Invalid number of FEC packets"; + if (str_shift_ret(&s, 2, &piece)) + goto err; + if (piece.s[0] != 0x01) + goto err; + unsigned int span = piece.s[1]; + if (span <= 0 || span >= 0x80) + goto err; + ssize_t entries = __get_udptl_len(&s); + if (entries < 0 || entries > 100) + goto err; + + // first seq we can possibly recover + uint16_t seq_start = seq - span * entries; + + while (entries) { + // get our entry + if (__get_udptl(&piece, &s)) { + ilog(LOG_WARN | LOG_FLAG_LIMIT, + "Invalid UDPTL error correction packet at index %i", + seq_start); + break; + } + // check each of the entries covered by `span` + for (int i = 0; i < span; i++) { + uint16_t seq_fec = seq_start + i * span; + // skip if we already know this packet + if (g_hash_table_lookup(tg->udptl_fec, GUINT_TO_POINTER(seq_fec))) + continue; + + // can we recover it? we need all other packets from the series + GString *rec_s = g_string_new(""); + int complete = 1; + + for (int j = 0; j < span; j++) { + uint16_t seq_rec = seq_start + i * span; + if (seq_rec == seq_fec) + continue; + struct udptl_packet *recp = + g_hash_table_lookup(tg->udptl_fec, GUINT_TO_POINTER(seq_rec)); + if (!recp) { + ilog(LOG_WARN | LOG_FLAG_LIMIT, "Unable to recover UDPTL FEC " + "packet with seq %i due to missing seq %i", + seq_fec, seq_rec); + complete = 0; + break; + } + + // XOR in packet + for (size_t j = 0; j < recp->s->len; j++) + rec_s->str[j] ^= recp->s->s[j]; + } + + if (complete) { + ilog(LOG_WARN | LOG_FLAG_LIMIT, "Recovered UDPTL " + "packet with seq %i from FEC", + seq_fec); + + str rec_str = STR_CONST_INIT_LEN(rec_s->str, rec_s->len); + __fec_save(tg, &rec_str, seq_fec); + up = __make_udptl_packet(&rec_str, seq_fec); + packet_sequencer_insert(&tg->sequencer, &up->p); + up = NULL; + } + + g_string_free(rec_s, TRUE); + + // no point in continuing further: one packet was missing, which means + // that no other packet in this span can be recovered + break; + } + + // proceed to next entry + entries--; + seq_start++; + } + } + +seq_ok:; + + t38_core_state_t *t38 = t38_gateway_get_t38_core_state(tg->gw); + + // process any packets that we can + while (1) { + up = packet_sequencer_next_packet(&tg->sequencer); + if (!up) + break; + + ilog(LOG_DEBUG, "Processing %i IFP bytes, seq %i", up->s->len, up->p.seq); + + t38_core_rx_ifp_packet(t38, (uint8_t *) up->s->s, up->s->len, up->p.seq); + + __udptl_packet_free(up); + } + +out: + mutex_unlock(&tg->lock); + return 0; + +err: + if (err) + ilog(LOG_ERR | LOG_FLAG_LIMIT, "Failed to process UDPTL/T.38/IFP packet: %s", err); + return -1; +} + + +void t38_gateway_stop(struct t38_gateway *tg) { + if (!tg) + return; + if (tg->pcm_player) + media_player_stop(tg->pcm_player); + if (tg->t38_media) + g_queue_clear_full(&tg->t38_media->sdp_attributes, free); +} + + + +#endif diff --git a/include/call.h b/include/call.h index da33d994c..cabe9d030 100644 --- a/include/call.h +++ b/include/call.h @@ -20,6 +20,7 @@ #include "recording.h" #include "statistics.h" #include "codeclib.h" +#include "t38.h" #define UNDEFINED ((unsigned int) -1) @@ -151,6 +152,7 @@ enum call_type { #define MEDIA_FLAG_TRANSCODE 0x00800000 #define MEDIA_FLAG_PTIME_OVERRIDE 0x01000000 #define MEDIA_FLAG_RTCP_FB SHARED_FLAG_RTCP_FB +#define MEDIA_FLAG_GENERATOR 0x02000000 /* access macros */ #define SP_ISSET(p, f) bf_isset(&(p)->sp_flags, SP_FLAG_ ## f) @@ -223,6 +225,7 @@ struct stream_params { str ice_pwd; int ptime; str media_id; + struct t38_options t38_options; }; struct endpoint_map { @@ -330,6 +333,8 @@ struct call_media { struct codec_handler *codec_handler_cache; struct rtcp_handler *rtcp_handler; struct codec_handler *dtmf_injector; + struct t38_gateway *t38_gateway; + struct codec_handler *t38_handler; int ptime; // either from SDP or overridden diff --git a/include/call_interfaces.h b/include/call_interfaces.h index f225e7450..89d2d2413 100644 --- a/include/call_interfaces.h +++ b/include/call_interfaces.h @@ -76,6 +76,9 @@ struct sdp_ng_flags { always_transcode:1, asymmetric_codecs:1, inject_dtmf:1, + t38_decode:1, + t38_force:1, + t38_stop:1, supports_load_limit:1, dtls_off:1, sdes_off:1, diff --git a/include/codec.h b/include/codec.h index 702a8956c..6f205eb07 100644 --- a/include/codec.h +++ b/include/codec.h @@ -18,6 +18,7 @@ struct ssrc_hash; struct sdp_ng_flags; struct codec_ssrc_handler; struct rtp_header; +struct stream_params; typedef int codec_handler_func(struct codec_handler *, struct media_packet *); @@ -35,8 +36,10 @@ struct codec_handler { struct ssrc_hash *ssrc_hash; struct codec_handler *output_handler; // == self, or other PT handler +#ifdef WITH_TRANSCODING int (*packet_encoded)(encoder_t *enc, void *u1, void *u2); int (*packet_decoded)(decoder_t *, AVFrame *, void *, void *); +#endif // for media playback struct codec_ssrc_handler *ssrc_handler; @@ -54,7 +57,6 @@ struct codec_handler *codec_handler_get(struct call_media *, int payload_type); void codec_handlers_free(struct call_media *); struct codec_handler *codec_handler_make_playback(const struct rtp_payload_type *src_pt, const struct rtp_payload_type *dst_pt, unsigned long ts); -void codec_handler_free(struct codec_handler **handler); void ensure_codec_def(struct rtp_payload_type *pt, struct call_media *media); void codec_add_raw_packet(struct media_packet *mp); @@ -76,7 +78,9 @@ void __rtp_payload_type_add_send(struct call_media *other_media, struct rtp_payl #ifdef WITH_TRANSCODING -void codec_handlers_update(struct call_media *receiver, struct call_media *sink, const struct sdp_ng_flags *); +void codec_handler_free(struct codec_handler **handler); +void codec_handlers_update(struct call_media *receiver, struct call_media *sink, const struct sdp_ng_flags *, + const struct stream_params *); void codec_add_dtmf_event(struct codec_ssrc_handler *ch, int code, int level, uint64_t ts); uint64_t codec_last_dtmf_event(struct codec_ssrc_handler *ch); uint64_t codec_encoder_pts(struct codec_ssrc_handler *ch); @@ -86,7 +90,8 @@ uint64_t codec_decoder_unskip_pts(struct codec_ssrc_handler *ch); #else INLINE void codec_handlers_update(struct call_media *receiver, struct call_media *sink, - const struct sdp_ng_flags *flags) { } + const struct sdp_ng_flags *flags, const struct stream_params *sp) { } +INLINE void codec_handler_free(struct codec_handler **handler) { } #endif diff --git a/include/media_socket.h b/include/media_socket.h index 5ab3e8827..d3f9df3eb 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -202,6 +202,11 @@ INLINE int proto_is_not_rtp(const struct transport_protocol *protocol) { return 0; return protocol->rtp ? 0 : 1; } +INLINE int proto_is(const struct transport_protocol *protocol, enum transport_protocol_index idx) { + if (!protocol) + return 0; + return (protocol->index == idx) ? 1 : 0; +} #endif diff --git a/include/t38.h b/include/t38.h new file mode 100644 index 000000000..857eb8378 --- /dev/null +++ b/include/t38.h @@ -0,0 +1,97 @@ +#ifndef _T38_H_ +#define _T38_H_ + + +struct t38_gateway; + +struct t38_options { + int version; + int fec_span; // 1 means no FEC + int min_ec_entries; // currently ignored + int max_ec_entries; + int max_ifp; + int max_datagram; + + int local_tcf:1; + int fill_bit_removal:1; + int transcoding_mmr:1; + int transcoding_jbig:1; +}; + + + +#ifdef WITH_TRANSCODING + + + +#include +#include +#include +#include +#include +#include + +#include "rtplib.h" +#include "aux.h" +#include "obj.h" +#include "codeclib.h" + + + +struct call_media; +struct media_packet; +struct media_player; + + +struct t38_gateway { + struct obj obj; // use refcount as this struct is shared between two medias + mutex_t lock; + struct call_media *t38_media; + struct call_media *pcm_media; + struct rtp_payload_type pcm_pt; // PCM input for spandsp + t38_gateway_state_t *gw; + + struct t38_options options; + + // udptl outgoing stuff + uint16_t seqnum; + GQueue udptl_ec_out; // seq, seq-1, seq-2, ... + // udptl incoming stuff + packet_sequencer_t sequencer; + GHashTable *udptl_fec; + + // player for PCM data + struct media_player *pcm_player; + unsigned long long pts; +}; + + + +int t38_gateway_pair(struct call_media *t38_media, struct call_media *pcm_media, const struct t38_options *); +void t38_gateway_start(struct t38_gateway *); +int t38_gateway_input_samples(struct t38_gateway *, int16_t amp[], int len); +int t38_gateway_input_udptl(struct t38_gateway *, const str *); +void t38_gateway_stop(struct t38_gateway *); + + +INLINE void t38_gateway_put(struct t38_gateway **tp) { + if (!tp || !*tp) + return; + obj_put(*tp); + *tp = NULL; +} + + +#else + +#include "compat.h" + +// stubs +INLINE void t38_gateway_start(struct t38_gateway *tg) { } +INLINE void t38_gateway_stop(struct t38_gateway *tg) { } +INLINE void t38_gateway_put(struct t38_gateway **tp) { } + + +#endif + +#endif diff --git a/lib/codeclib.c b/lib/codeclib.c index 72b9741ff..df5934343 100644 --- a/lib/codeclib.c +++ b/lib/codeclib.c @@ -958,6 +958,12 @@ void *packet_sequencer_force_next_packet(packet_sequencer_t *ps) { return __packet_sequencer_next_packet(ps, 0); } +int packet_sequencer_next_ok(packet_sequencer_t *ps) { + if (g_tree_lookup(ps->packets, GINT_TO_POINTER(ps->seq))) + return 1; + return 0; +} + int packet_sequencer_insert(packet_sequencer_t *ps, seq_packet_t *p) { int ret = 0; diff --git a/lib/codeclib.h b/lib/codeclib.h index 55bce19b8..87dc7fd1f 100644 --- a/lib/codeclib.h +++ b/lib/codeclib.h @@ -7,6 +7,14 @@ struct packet_sequencer_s; typedef struct codec_def_s codec_def_t; typedef struct packet_sequencer_s packet_sequencer_t; +enum media_type { + MT_UNKNOWN = 0, + MT_AUDIO, + MT_VIDEO, + MT_IMAGE, + MT_OTHER, +}; + #ifndef WITHOUT_CODECLIB @@ -47,14 +55,6 @@ typedef void set_dec_options_f(decoder_t *, const str *); -enum media_type { - MT_UNKNOWN = 0, - MT_AUDIO, - MT_VIDEO, - MT_IMAGE, - MT_OTHER, -}; - struct codec_type_s { void (*def_init)(codec_def_t *); @@ -226,6 +226,7 @@ void __packet_sequencer_init(packet_sequencer_t *ps, GDestroyNotify); INLINE void packet_sequencer_init(packet_sequencer_t *ps, GDestroyNotify); void packet_sequencer_destroy(packet_sequencer_t *ps); void *packet_sequencer_next_packet(packet_sequencer_t *ps); +int packet_sequencer_next_ok(packet_sequencer_t *ps); void *packet_sequencer_force_next_packet(packet_sequencer_t *ps); int packet_sequencer_insert(packet_sequencer_t *ps, seq_packet_t *); @@ -263,9 +264,6 @@ INLINE char *av_error(int no) { #else // stubs -enum media_type { - MT_INVALID = -1, -}; struct codec_def_s { int dtmf; }; diff --git a/perl/NGCP/Rtpengine/AutoTest.pm b/perl/NGCP/Rtpengine/AutoTest.pm index cdaec2349..17742c9b6 100644 --- a/perl/NGCP/Rtpengine/AutoTest.pm +++ b/perl/NGCP/Rtpengine/AutoTest.pm @@ -20,7 +20,7 @@ BEGIN { require Exporter; @ISA = qw(Exporter); our @EXPORT = qw(autotest_start new_call offer answer ft tt snd srtp_snd rtp rcv srtp_rcv - srtp_dec escape rtpm reverse_tags new_tt crlf sdp_split rtpe_req offer_answer); + srtp_dec escape rtpm rtpmre reverse_tags new_tt crlf sdp_split rtpe_req offer_answer); }; @@ -166,6 +166,9 @@ sub rcv { if ($cb) { $p = $cb->($hdr_mark, $pt, $seq, $ts, $ssrc, $payload, $p, $cb_arg); } + if ($p !~ $match) { + print(unpack('H*', $p) . "\n"); + } like $p, $match, 'received packet matches'; my @matches = $p =~ $match; for my $m (@matches) { @@ -196,22 +199,24 @@ sub srtp_dec { sub escape { return "\Q$_[0]\E"; } -sub rtpm { - my ($pt, $seq, $ts, $ssrc, $payload, $alt_payload) = @_; - print("rtp matcher $pt $seq $ts $ssrc " . unpack('H*', $payload) . "\n"); +sub rtpmre { + my ($pt, $seq, $ts, $ssrc, $xre) = @_; + #print("rtp matcher $pt $seq $ts $ssrc $xre\n"); my $re = ''; $re .= escape(pack('C', 0x80)); $re .= escape(pack('C', $pt)); $re .= $seq >= 0 ? escape(pack('n', $seq)) : '(..)'; $re .= $ts >= 0 ? escape(pack('N', $ts)) : '(....)'; $re .= $ssrc >= 0 ? escape(pack('N', $ssrc)) : '(....)'; + $re .= $xre; + return qr/^$re$/s; +} +sub rtpm { + my ($pt, $seq, $ts, $ssrc, $payload, $alt_payload) = @_; if (!$alt_payload) { - $re .= escape($payload); + return rtpmre($pt, $seq, $ts, $ssrc, escape($payload)); } - else { - $re .= '(' . escape($payload) . '|' . escape($alt_payload) . ')'; - } - return qr/^$re$/s; + return rtpmre($pt, $seq, $ts, $ssrc, '(' . escape($payload) . '|' . escape($alt_payload) . ')'); } sub ft { return $ft; } diff --git a/t/.gitignore b/t/.gitignore index d0321297f..74e4d8496 100644 --- a/t/.gitignore +++ b/t/.gitignore @@ -54,3 +54,8 @@ test-dtmf-detect dtmf_rx_fillin.h *-test.c jitter_buffer.c +t38.c +spandsp_recv_fax_pcm +spandsp_recv_fax_t38 +spandsp_send_fax_pcm +spandsp_send_fax_t38 diff --git a/t/Makefile b/t/Makefile index 518830fc1..027426676 100644 --- a/t/Makefile +++ b/t/Makefile @@ -63,6 +63,8 @@ HASHSRCS= ifeq ($(with_transcoding),yes) SRCS+= transcode-test.c test-dtmf-detect.c payload-tracker-test.c +SRCS+= spandsp_recv_fax_pcm.c spandsp_recv_fax_t38.c spandsp_send_fax_pcm.c \ + spandsp_send_fax_t38.c ifeq ($(with_amr_tests),yes) SRCS+= amr-decode-test.c amr-encode-test.c endif @@ -70,7 +72,7 @@ LIBSRCS+= codeclib.c resample.c socket.c streambuf.c dtmflib.c DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \ dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \ cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \ - media_player.c jitter_buffer.c + media_player.c jitter_buffer.c t38.c HASHSRCS+= call_interfaces.c control_ng.c sdp.c endif @@ -117,6 +119,16 @@ daemon-tests: tests-preload.so bitstr-test: bitstr-test.o +spandsp_send_fax_pcm: spandsp_send_fax_pcm.o + +spandsp_recv_fax_pcm: spandsp_recv_fax_pcm.o + +spandsp_send_fax_t38: spandsp_send_fax_t38.o + +spandsp_recv_fax_t38: spandsp_recv_fax_t38.o + +spandsp_raw_fax_tests: spandsp_send_fax_pcm spandsp_recv_fax_pcm spandsp_send_fax_t38 spandsp_recv_fax_t38 + amr-decode-test: amr-decode-test.o $(COMMONOBJS) codeclib.o resample.o dtmflib.o amr-encode-test: amr-encode-test.o $(COMMONOBJS) codeclib.o resample.o dtmflib.o @@ -130,7 +142,7 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \ control_ng.strhash.o \ streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \ - media_player.o jitter_buffer.o dtmflib.o + media_player.o jitter_buffer.o dtmflib.o t38.o payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o codeclib.o \ resample.o dtmflib.o diff --git a/t/auto-daemon-tests-t38.pl b/t/auto-daemon-tests-t38.pl new file mode 100755 index 000000000..5fe59d34f --- /dev/null +++ b/t/auto-daemon-tests-t38.pl @@ -0,0 +1,686 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use NGCP::Rtpengine::Test; +use NGCP::Rtpclient::SRTP; +use NGCP::Rtpengine::AutoTest; +use Test::More; +use IPC::Open3; + + +autotest_start(qw(--config-file=none -t -1 -i 203.0.113.1 -i 2001:db8:4321::1 + -n 2223 -c 12345 -f -L 7 -E -u 2222 --jitter-buffer=10)) + or die; + + +my ($sock_a, $sock_b, $port_a, $port_b, $ssrc, $resp, $srtp_ctx_a, $srtp_ctx_b, @ret1, @ret2); + + + +sub fec { + my ($seq_out, $num_ec, $span, $packets) = @_; + + my $ec_ents = 0; + my $ec_list = ''; + + for my $ec_pack (0 .. ($num_ec-1)) { + my $ec_seq = $seq_out - $num_ec * $span + $ec_pack; + last if $ec_seq < 0 || !exists($packets->[$ec_seq]); + my $xor = ''; + for my $fec_iter ((0 .. ($span-1))) { + my $fec_seq = $ec_seq + $fec_iter * $num_ec; + my $ecpkt = $packets->[$fec_seq]; + ok (defined $ecpkt, "FEC packet $fec_seq exists"); + ok (length($ecpkt) < 0x80, 'FEC packet short enough'); + $xor ^= $ecpkt; + } + $ec_list .= pack('Ca*', length($xor), $xor); + $ec_ents++; + } + + return ($ec_ents, $ec_list); +} + +sub t38_gw_test { + my ($testname, $pcm_cmd, $t38_cmd, %opts) = @_; + + my ($pcm_pid, $pcm_src, $pcm_sink); + ok($pcm_pid = open3($pcm_sink, $pcm_src, '>&STDERR', $pcm_cmd), + "$testname - spandsp_send_fax_pcm"); + + unlink('out.tif'); + ok (! -e 'out.tif', 'output file does not exists'); + + my ($t38_pid, $t38_src, $t38_sink); + ok($t38_pid = open3($t38_sink, $t38_src, '>&STDERR', $t38_cmd), + "$testname - spandsp_recv_fax_t38"); + + my ($buf, $rin); + my $seq = -1; + my $t38_pkt = ''; + my $udptl_seq = 0; + my @udptl_ec_in; + my @udptl_ec_out; + my $done = 0; + my $sqo = 1000; + my $tso = 3000; + my $ts = -1; + + my $rev = $opts{reverse} // 0; + my $pcm_sock = $rev ? $sock_a : $sock_b; + my $pcm_port = $rev ? $port_b : $port_a; + my $t38_sock = $rev ? $sock_b : $sock_a; + my $t38_port = $rev ? $port_a : $port_b; + + my $num_ec = $opts{num_ec} // 3; + my $span = $opts{span} // 1; + my $fec = $span > 1; + + # speed is controlled by the PCM generator + while (!$done && sysread($pcm_src, $buf = '', 160) == 160) { + # send generated PCM to rtpengine + snd($pcm_sock, $pcm_port, rtp(8, $sqo += 1, $tso += 160, 0x1234, $buf)); + # it will also have generated a block of PCM + if ($seq == -1) { + ($seq, $ts, $ssrc, $buf) = rcv($pcm_sock, $pcm_port, rtpmre(8 | 0x80, -1, -1, -1, '(' . ("." x 160) . ')')); + } + else { + ($buf) = rcv($pcm_sock, $pcm_port, rtpmre(8, $seq += 1, $ts += 160, $ssrc, '(' . ("." x 160) . ')')); + } + # write it back to our PCM endpoint + is length($buf), 160, 'buf length ok'; + ok (syswrite($pcm_sink, $buf), 'PCM writeback'); + + # read from our local T.38 producer? + $rin = ''; + vec($rin, fileno($t38_src), 1) = 1; + while (select(my $rout = $rin, undef, undef, 0) == 1) { + my $ret = sysread($t38_src, $buf = '', 1); + ok (defined($ret), 'T.38 read ok'); + + if ($ret == 0) { + # EOF + $done = 1; + ok (waitpid($t38_pid, 0), 'T.38 spandsp finished'); + undef($t38_pid); + last; + } + + $t38_pkt .= $buf; + # complete packet? + my ($seq_out, $len, $pkt) = unpack('SSa*', $t38_pkt); + next unless defined($pkt); # nope + next if length($pkt) < $len; # nope + + # extract... + substr($t38_pkt, 0, $len + 4) = ''; + substr($pkt, $len) = ''; + + ok ($len > 0 && $len < 0x80, "local packet $seq_out short enough"); + + # save for EC + $udptl_ec_out[$seq_out] = $pkt; + + # redundancy: + my $ec_method = 0x00; + my $ec_span = ''; + my $ec_ents = 0; + my $ec_list = ''; + if (!$fec) { + for my $ec_seq (reverse(($seq_out - $num_ec) .. ($seq_out - 1))) { + last if $ec_seq < 0 || !exists($udptl_ec_out[$ec_seq]); + my $ecpkt = $udptl_ec_out[$ec_seq]; + ok (length($ecpkt) < 0x80, 'EC packet short enough'); + $ec_list .= pack('Ca*', length($ecpkt), $ecpkt); + $ec_ents++; + } + } + else { + $ec_method = 0x80; + $ec_span = pack('CC', 1, $span); + ($ec_ents, $ec_list) = fec($seq_out, $num_ec, $span, \@udptl_ec_out); + } + + # pack into UDPTL with redundancy + my $udptl = pack('nCa*Ca*Ca*', $seq_out, length($pkt), $pkt, $ec_method, + $ec_span, $ec_ents, $ec_list); + + # send + snd($t38_sock, $t38_port, $udptl); + } + + # read from our UDPTL source? + $rin = ''; + vec($rin, fileno($t38_sock), 1) = 1; + while (select(my $rout = $rin, undef, undef, 0) == 1) { + my ($enc_seq, $len, $pkt) = rcv($t38_sock, $t38_port, qr/^(..)(.)(.*)$/s); + + # allow for duplicates, as they're generated in some cases + ok ($enc_seq == $udptl_seq || $enc_seq == $udptl_seq + 1, "UDPTL seq $enc_seq"); + $udptl_seq = $enc_seq; + + $len = ord($len); + ok ($len > 0 && $len < 0x80, 'remote packet short enough'); + + # extract... + my $ifp = substr($pkt, 0, $len, ''); + ok (length($ifp) == $len, 'length matches'); + + $udptl_ec_in[$udptl_seq] = $ifp; + + my $red = substr($pkt, 0, 1, ''); + ok ($red eq ($fec ? "\x80" : "\x00"), 'redundacy method'); + + if (!$fec) { + my $nec = substr($pkt, 0, 1, ''); + ok ($nec eq chr($udptl_seq > 3 ? 3 : $udptl_seq), "num EC packets " . ord($nec)); + $nec = ord($nec); + + # check EC packets + for my $ec_seq (reverse(($udptl_seq - $nec) .. ($udptl_seq - 1))) { + my $len = substr($pkt, 0, 1, ''); + $len = ord($len); + ok ($len > 0 && $len < 0x80, 'EC packet short enough'); + my $ec = substr($pkt, 0, $len, ''); + if ($ec_seq == 0 && !exists($udptl_ec_in[$ec_seq])) { + # this happens on T.38=force before the answer + # was seen. seq 0 is generated but not sent as + # we don't have an endpoint yet. + # XXX can this be fixed? queue packet? + ; + } + else { + ok ($ec eq $udptl_ec_in[$ec_seq], 'EC packet matches'); + } + } + } + else { + ok (substr($pkt, 0, 1, '') eq "\x01", 'FEC span header'); + my $nspan = substr($pkt, 0, 1, ''); + $nspan = ord($nspan); + ok ($nspan >= 1, 'FEC span min'); + my $expspan = $span; + my $expent = $num_ec; + while ($udptl_seq < $expspan * $expent) { + if ($expspan > 1) { + $expspan--; + next; + } + $expent--; + } + ok ($expspan == $nspan, "FEC span $expspan == $nspan"); + my $nec = ord(substr($pkt, 0, 1, '')); + ok ($expent == $nec, "FEC num entries $expent == $nec"); + # extract all entries and compare with self-generated list + my ($fec_entries, $fec_blob) = fec($udptl_seq, $nec, $nspan, \@udptl_ec_in); + my $recv_blob = ''; + for (1 .. $nec) { + my $len = substr($pkt, 0, 1, ''); + $len = ord($len); + ok ($len > 0 && $len < 0x80, 'FEC packet short enough'); + my $ec = substr($pkt, 0, $len, ''); + $recv_blob .= pack('Ca*', $len, $ec); + } + ok ($fec_entries == $nec, "num actual FEC entries $fec_entries == $nec"); + ok ($recv_blob eq $fec_blob, 'FEC blob matches'); + } + + # everything passed, write to T.38 end + ok (syswrite($t38_sink, pack('SSa*', $udptl_seq, length($ifp), $ifp)), 'T.38 writeback'); + } + } + + # delete to stop PCM player + rtpe_req('delete', "$testname delete", { 'from-tag' => ft() }); + + undef($t38_src); + undef($t38_sink); + undef($pcm_src); + undef($pcm_sink); + + if ($t38_pid) { + ok (waitpid($t38_pid, 0), 'T.38 spandsp finished'); + undef($t38_pid); + } + if ($pcm_pid) { + ok (waitpid($pcm_pid, 0), 'PCM spandsp finished'); + undef($pcm_pid); + } + + ok (-f 'out.tif', 'output file exists'); + ok (-s 'out.tif' > 10000, 'output file large enough'); + unlink('out.tif'); + +} + + + + +($sock_a, $sock_b) = new_call([qw(198.51.100.1 4020)], [qw(198.51.100.3 4022)]); + +($port_a) = offer('T.38 after re-invite', { ICE => 'remove', + }, < 'remove' }, < [ 'force' ], ICE => 'remove', + }, < 'remove' }, < 1); + + + + +done_testing(); +exit; + + + +($sock_a, $sock_b) = new_call([qw(198.51.100.1 4016)], [qw(198.51.100.3 4018)]); + +($port_a) = offer('plain T.38, reverse invite', { 'T.38' => [ 'force' ], ICE => 'remove', + }, < 'remove' }, < 1); + + + + +($sock_a, $sock_b) = new_call([qw(198.51.100.1 4000)], [qw(198.51.100.3 4002)]); + +($port_a) = offer('plain T.38, forward invite', { 'T.38' => [ 'decode' ], ICE => 'remove', + 'codec' => { 'transcode' => ['PCMA'] } }, < 'remove' }, < [ 'decode' ], ICE => 'remove', + 'codec' => { 'transcode' => ['PCMA'] } }, < 'remove' }, < [ 'decode' ], ICE => 'remove', + 'codec' => { 'transcode' => ['PCMA'] } }, < 'remove' }, < 3); + + + + + +($sock_a, $sock_b) = new_call([qw(198.51.100.1 4012)], [qw(198.51.100.3 4014)]); + +($port_a) = offer('FEC span 5', { 'T.38' => [ 'decode' ], ICE => 'remove', + 'codec' => { 'transcode' => ['PCMA'] } }, < 'remove' }, < 5); + + + + +# XXX packet loss tests +# XXX tests of different SDP options + + + + +done_testing(); diff --git a/t/auto-daemon-tests.pl b/t/auto-daemon-tests.pl index ab71d7176..abdce172e 100755 --- a/t/auto-daemon-tests.pl +++ b/t/auto-daemon-tests.pl @@ -18,6 +18,468 @@ my ($sock_a, $sock_b, $port_a, $port_b, $ssrc, $resp, $srtp_ctx_a, $srtp_ctx_b, +# T.38 signalling scenarios + +new_call(); + +offer('forward T.38 invite without codecs given', { 'T.38' => [ 'decode' ], ICE => 'remove', + }, < ft() }); + + + + + + +new_call(); + +offer('T.38 forward re-invite', { ICE => 'remove', + }, < 'remove', + }, < 'remove', 'T.38' => [ 'force' ], + }, < 'remove', + }, < ft() }); + + + + +new_call(); + +offer('T.38 reverse re-invite', { ICE => 'remove', + }, < 'remove', + }, < 'remove', 'T.38' => [ 'decode' ], + }, < 'remove' }, < ft() }); + + + + + + +new_call(); + +offer('T.38 forward re-invite w/ unsupported codec', { ICE => 'remove', + }, < 'remove', + }, < 'remove', 'T.38' => [ 'force' ], + }, < 'remove', + }, < ft() }); + + + + +new_call(); + +offer('T.38 reverse re-invite w/ unsupported codec', { ICE => 'remove', + }, < 'remove', + }, < 'remove', 'T.38' => [ 'decode' ], + }, < 'remove' }, < ft() }); + + + + + + # github issue 850 new_call; diff --git a/t/pcm_rtp_test.pl b/t/pcm_rtp_test.pl new file mode 100755 index 000000000..f03b7ca13 --- /dev/null +++ b/t/pcm_rtp_test.pl @@ -0,0 +1,59 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use IPC::Open3; +use IO::Socket; +use IO::Socket::IP; + +my $laddr = shift or die; +my $lport = shift or die; +my $raddr = shift or die; +my $rport = shift or die; + +my $sock = IO::Socket::IP->new(Type => &SOCK_DGRAM, Proto => 'udp', + LocalHost => $laddr, LocalPort => $lport, + PeerHost => $raddr, PeerPort => $rport, + ) + or die; + +my ($src, $sink); +my $pid = open3($sink, $src, '>&STDERR', @ARGV) or die; + +my ($playsrc, $playsink); +open($playsrc, '|-', qw(play -q -c 1 -e a-law -r 8000 -t raw -)) or die; +open($playsink, '|-', qw(play -q -c 1 -e a-law -r 8000 -t raw -)) or die; + +my $lseq = rand(); +my $lssrc = rand(); +my $lts = rand(); +my $lpt = 8; # PCMA +my $lmark = 0x80; +my $rseq = -1; +my $rts = -1; + +while (1) { + my $buf; + + last unless sysread($src, $buf = '', 160); + syswrite($playsrc, $buf); + + my $rtp = pack('CCnNN a*', 0x80, $lpt | $lmark, $lseq, $lts, $lssrc, $buf); + last unless $sock->syswrite($rtp) or last; + $lseq++; + $lts += 160; + $lmark = 0x00; + + last unless $sock->sysread($buf = '', 0xffff); + + my ($ver, $rpt, $seq, $ts, $rssrc, $payload) = unpack('CCnNN a*', $buf); + die unless length($payload) == 160; + die unless ($rpt & 0x7f) == $lpt; + die unless ($rseq == -1 || (($rseq + 1) & 0xffff) == $seq); + die unless ($rts == -1 || (($rts + 160) & 0xffffffff) == $ts); + syswrite($playsink, $payload); + $rseq = $seq; + $rts = $ts; + + last unless syswrite($sink, $payload); +} diff --git a/t/spandsp_fax_pcm_test.pl b/t/spandsp_fax_pcm_test.pl new file mode 100755 index 000000000..690dc7ea4 --- /dev/null +++ b/t/spandsp_fax_pcm_test.pl @@ -0,0 +1,32 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use IPC::Open2; +use POSIX ":sys_wait_h"; + +my ($send_src, $send_sink); +my $send_pid = open2($send_src, $send_sink, './spandsp_send_fax_pcm test.tif') or die; + +unlink('out.tif'); + +my ($recv_src, $recv_sink); +my $recv_pid = open2($recv_src, $recv_sink, './spandsp_recv_fax_pcm out.tif') or die; + +while ($send_pid && $recv_pid) { + + my $buf; + + if (sysread($send_src, $buf = '', 160)) { + syswrite($recv_sink, $buf) or die; + } + + if (sysread($recv_src, $buf = '', 160)) { + syswrite($send_sink, $buf) or die; + } + + undef($send_pid) if waitpid($send_pid, WNOHANG); + undef($recv_pid) if waitpid($recv_pid, WNOHANG); +} + +sleep(5); diff --git a/t/spandsp_fax_t38_test.pl b/t/spandsp_fax_t38_test.pl new file mode 100755 index 000000000..6e586945d --- /dev/null +++ b/t/spandsp_fax_t38_test.pl @@ -0,0 +1,38 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use IPC::Open2; +use POSIX ":sys_wait_h"; + +my ($send_src, $send_sink); +my $send_pid = open2($send_src, $send_sink, './spandsp_send_fax_t38 test.tif') or die; + +unlink('out.tif'); + +my ($recv_src, $recv_sink); +my $recv_pid = open2($recv_src, $recv_sink, './spandsp_recv_fax_t38 out.tif') or die; + +while ($send_pid && $recv_pid) { + + my ($buf, $rin); + + $rin = ''; + vec($rin, fileno($send_src), 1) = 1; + while (select(my $rout = $rin, undef, undef, 0.02) == 1) { + sysread($send_src, $buf = '', 1); + syswrite($recv_sink, $buf) or last; + } + + $rin = ''; + vec($rin, fileno($recv_src), 1) = 1; + while (select(my $rout = $rin, undef, undef, 0.02) == 1) { + sysread($recv_src, $buf = '', 1); + syswrite($send_sink, $buf) or last; + } + + undef($send_pid) if waitpid($send_pid, WNOHANG); + undef($recv_pid) if waitpid($recv_pid, WNOHANG); +} + +sleep(5); diff --git a/t/spandsp_recv_fax_pcm.c b/t/spandsp_recv_fax_pcm.c new file mode 100644 index 000000000..dc09c5c00 --- /dev/null +++ b/t/spandsp_recv_fax_pcm.c @@ -0,0 +1,151 @@ +#undef NDEBUG +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +#define SAMPLES_PER_CHUNK 160 +#define MICROSECONDS_PER_CHUNK 20000 + + +// from ITU G.191 +void alaw_compress (size_t lseg, int16_t *linbuf, uint8_t *logbuf) { + short ix, iexp; + long n; + + for (n = 0; n < lseg; n++) { + ix = linbuf[n] < 0 /* 0 <= ix < 2048 */ + ? (~linbuf[n]) >> 4 /* 1's complement for negative values */ + : (linbuf[n]) >> 4; + + /* Do more, if exponent > 0 */ + if (ix > 15) { /* exponent=0 for ix <= 15 */ + iexp = 1; /* first step: */ + while (ix > 16 + 15) { /* find mantissa and exponent */ + ix >>= 1; + iexp++; + } + ix -= 16; /* second step: remove leading '1' */ + + ix += iexp << 4; /* now compute encoded value */ + } + if (linbuf[n] >= 0) + ix |= (0x0080); /* add sign bit */ + + logbuf[n] = ix ^ (0x0055); /* toggle even bits */ + } +} +void alaw_expand (size_t lseg, uint8_t *logbuf, int16_t *linbuf) { + short ix, mant, iexp; + long n; + + for (n = 0; n < lseg; n++) { + ix = logbuf[n] ^ (0x0055); /* re-toggle toggled bits */ + + ix &= (0x007F); /* remove sign bit */ + iexp = ix >> 4; /* extract exponent */ + mant = ix & (0x000F); /* now get mantissa */ + if (iexp > 0) + mant = mant + 16; /* add leading '1', if exponent > 0 */ + + mant = (mant << 4) + (0x0008); /* now mantissa left justified and */ + /* 1/2 quantization step added */ + if (iexp > 1) /* now left shift according exponent */ + mant = mant << (iexp - 1); + + linbuf[n] = logbuf[n] > 127 /* invert, if negative sample */ + ? mant : -mant; + } +} + + + +int done = 0; + +static void phase_e_handler(t30_state_t *s, void *user_data, int result) { + fprintf(stderr, "recv: phase E result %i\n", result); + assert(result == T30_ERR_OK); + done = 1; +} + + +int main(int argc, char **argv) { + assert(argc == 2); + const char *output_file_name = argv[1]; + + fax_state_t *fax = fax_init(NULL, FALSE); + assert(fax != NULL); + + int use_transmit_on_idle = 1; + int use_tep = 0; + int supported_modems = T30_SUPPORT_V27TER | T30_SUPPORT_V29 | T30_SUPPORT_V17; + int use_ecm = 0; + + // taken from t38_gateway_tests.c + t30_state_t *t30 = fax_get_t30_state(fax); + fax_set_transmit_on_idle(fax, use_transmit_on_idle); + fax_set_tep_mode(fax, use_tep); + t30_set_supported_modems(t30, supported_modems); + t30_set_tx_ident(t30, "11111111"); + t30_set_tx_nsf(t30, (const uint8_t *) "\x50\x00\x00\x00Spandsp\x00", 12); + t30_set_rx_file(t30, output_file_name, -1); + t30_set_phase_e_handler(t30, phase_e_handler, NULL); + t30_set_ecm_capability(t30, use_ecm); + if (use_ecm) + t30_set_supported_compressions(t30, T30_SUPPORT_T4_1D_COMPRESSION | T30_SUPPORT_T4_2D_COMPRESSION | T30_SUPPORT_T6_COMPRESSION); + t30_set_minimum_scan_line_time(t30, 40); + + struct timeval now, next; + + setbuf(stdout, NULL); + gettimeofday(&now, NULL); + + while (!done) { + next = now; + next.tv_usec += MICROSECONDS_PER_CHUNK; + while (next.tv_usec >= 1000000) { + next.tv_usec -= 1000000; + next.tv_sec++; + } + + int16_t samples[SAMPLES_PER_CHUNK]; + + int ret = fax_tx(fax, samples, SAMPLES_PER_CHUNK); + assert(ret == SAMPLES_PER_CHUNK); + + uint8_t alaw[SAMPLES_PER_CHUNK]; + alaw_compress(SAMPLES_PER_CHUNK, samples, alaw); + + ret = fwrite(alaw, SAMPLES_PER_CHUNK, 1, stdout); + assert(ret == 1); + + ret = fread(alaw, SAMPLES_PER_CHUNK, 1, stdin); + assert(ret == 1); + + alaw_expand(SAMPLES_PER_CHUNK, alaw, samples); + + ret = fax_rx(fax, samples, SAMPLES_PER_CHUNK); + assert(ret == 0); + + while (1) { + gettimeofday(&now, NULL); + long long diff = ((long long) next.tv_sec - now.tv_sec) * 1000000 + + ((long long) next.tv_usec - now.tv_usec); + if (diff <= 0) + break; + usleep(diff); + } + } + + return 0; +} diff --git a/t/spandsp_recv_fax_t38.c b/t/spandsp_recv_fax_t38.c new file mode 100644 index 000000000..709117f1e --- /dev/null +++ b/t/spandsp_recv_fax_t38.c @@ -0,0 +1,118 @@ +#undef NDEBUG +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +#define SAMPLES_PER_CHUNK 160 + + +static int packet_handler(t38_core_state_t *s, void *user_data, const uint8_t *buf, int len, int count) { + static uint16_t seq = 0; + + fprintf(stderr, "recv: writing %i bytes %i times\n", len, count); + + for (int i = 0; i < count; i++) { + uint16_t hdr[2] = {seq, len}; + int ret = write(1, hdr, sizeof(hdr)); + assert(ret == sizeof(hdr)); + ret = write(1, buf, len); + assert(ret == len); + } + + seq++; + + return 0; +} + +static void phase_e_handler(t30_state_t *s, void *user_data, int result) { + fprintf(stderr, "phase E result %i\n", result); + assert(result == T30_ERR_OK); +} + +static size_t nb_read(int fd, void *b, size_t len) { + size_t left = len; + while (left) { + ssize_t ret = read(fd, b + len - left, left); + if (ret > 0) { + left -= ret; + continue; + } + if (ret == 0) + return 0; + if (errno == EAGAIN && left != len) { + usleep(10000); + continue; + } + return -1; + } + return len; +} + +int main(int argc, char **argv) { + assert(argc == 2); + const char *output_file_name = argv[1]; + + t38_terminal_state_t *fax = t38_terminal_init(NULL, FALSE, packet_handler, NULL); + assert(fax != NULL); + + int use_tep = 0; + int supported_modems = T30_SUPPORT_V27TER | T30_SUPPORT_V29 | T30_SUPPORT_V17; + int use_ecm = 0; + int t38_version = 0; + int options = 0; + + + // taken from t38_terminal_tests.c + t30_state_t *t30 = t38_terminal_get_t30_state(fax); + t38_core_state_t *t38 = t38_terminal_get_t38_core_state(fax); + t38_set_t38_version(t38, t38_version); + t38_terminal_set_config(fax, options); + t38_terminal_set_tep_mode(fax, use_tep); + + t30_set_supported_modems(t30, supported_modems); + t30_set_tx_ident(t30, "22222222"); + t30_set_tx_nsf(t30, (const uint8_t *) "\x50\x00\x00\x00Spandsp\x00", 12); + t30_set_rx_file(t30, output_file_name, -1); + t30_set_ecm_capability(t30, use_ecm); + t30_set_phase_e_handler(t30, phase_e_handler, NULL); + + fcntl(0, F_SETFL, O_NONBLOCK); + + while (1) { + int done = t38_terminal_send_timeout(fax, SAMPLES_PER_CHUNK); + if (done) + break; + + uint16_t hdr[2]; + int ret = nb_read(0, hdr, sizeof(hdr)); + if (ret < 0 && errno == EAGAIN) { + usleep(20000); + continue; + } + assert(ret == sizeof(hdr)); + uint8_t buf[512]; + assert(hdr[1] <= sizeof(buf)); + do + ret = nb_read(0, buf, hdr[1]); + while (ret < 0 && errno == EAGAIN); + assert(ret == hdr[1]); + fprintf(stderr, "recv: processing %u bytes, seq %u\n", hdr[1], hdr[0]); + t38_core_rx_ifp_packet(t38, buf, hdr[1], hdr[0]); + } + + return 0; +} diff --git a/t/spandsp_send_fax_pcm.c b/t/spandsp_send_fax_pcm.c new file mode 100644 index 000000000..434916597 --- /dev/null +++ b/t/spandsp_send_fax_pcm.c @@ -0,0 +1,156 @@ +#undef NDEBUG +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +#define SAMPLES_PER_CHUNK 160 +#define MICROSECONDS_PER_CHUNK 20000 + + +// from ITU G.191 +void alaw_compress (size_t lseg, int16_t *linbuf, uint8_t *logbuf) { + short ix, iexp; + long n; + + for (n = 0; n < lseg; n++) { + ix = linbuf[n] < 0 /* 0 <= ix < 2048 */ + ? (~linbuf[n]) >> 4 /* 1's complement for negative values */ + : (linbuf[n]) >> 4; + + /* Do more, if exponent > 0 */ + if (ix > 15) { /* exponent=0 for ix <= 15 */ + iexp = 1; /* first step: */ + while (ix > 16 + 15) { /* find mantissa and exponent */ + ix >>= 1; + iexp++; + } + ix -= 16; /* second step: remove leading '1' */ + + ix += iexp << 4; /* now compute encoded value */ + } + if (linbuf[n] >= 0) + ix |= (0x0080); /* add sign bit */ + + logbuf[n] = ix ^ (0x0055); /* toggle even bits */ + } +} +void alaw_expand (size_t lseg, uint8_t *logbuf, int16_t *linbuf) { + short ix, mant, iexp; + long n; + + for (n = 0; n < lseg; n++) { + ix = logbuf[n] ^ (0x0055); /* re-toggle toggled bits */ + + ix &= (0x007F); /* remove sign bit */ + iexp = ix >> 4; /* extract exponent */ + mant = ix & (0x000F); /* now get mantissa */ + if (iexp > 0) + mant = mant + 16; /* add leading '1', if exponent > 0 */ + + mant = (mant << 4) + (0x0008); /* now mantissa left justified and */ + /* 1/2 quantization step added */ + if (iexp > 1) /* now left shift according exponent */ + mant = mant << (iexp - 1); + + linbuf[n] = logbuf[n] > 127 /* invert, if negative sample */ + ? mant : -mant; + } +} + + + +int done = 0; + +static void phase_e_handler(t30_state_t *s, void *user_data, int result) { + fprintf(stderr, "send: phase E result %i\n", result); + assert(result == T30_ERR_OK); + done = 1; +} + + +int main(int argc, char **argv) { + assert(argc == 2); + const char *input_file_name = argv[1]; + + fax_state_t *fax = fax_init(NULL, TRUE); + assert(fax != NULL); + + int use_transmit_on_idle = 1; + int use_tep = 0; + int supported_modems = T30_SUPPORT_V27TER | T30_SUPPORT_V29 | T30_SUPPORT_V17; + int use_ecm = 0; + + // taken from t38_gateway_tests.c + t30_state_t *t30 = fax_get_t30_state(fax); + fax_set_transmit_on_idle(fax, use_transmit_on_idle); + fax_set_tep_mode(fax, use_tep); + t30_set_supported_modems(t30, supported_modems); + t30_set_tx_ident(t30, "11111111"); + t30_set_tx_nsf(t30, (const uint8_t *) "\x50\x00\x00\x00Spandsp\x00", 12); + t30_set_tx_file(t30, input_file_name, -1, -1); + t30_set_phase_e_handler(t30, phase_e_handler, NULL); + t30_set_ecm_capability(t30, use_ecm); + if (use_ecm) + t30_set_supported_compressions(t30, T30_SUPPORT_T4_1D_COMPRESSION | T30_SUPPORT_T4_2D_COMPRESSION | T30_SUPPORT_T6_COMPRESSION); + t30_set_minimum_scan_line_time(t30, 40); + + struct timeval now, next; + + setbuf(stdout, NULL); + gettimeofday(&now, NULL); + + while (!done) { + next = now; + next.tv_usec += MICROSECONDS_PER_CHUNK; + while (next.tv_usec >= 1000000) { + next.tv_usec -= 1000000; + next.tv_sec++; + } + + int16_t samples[SAMPLES_PER_CHUNK]; + + int ret = fax_tx(fax, samples, SAMPLES_PER_CHUNK); + assert(ret == SAMPLES_PER_CHUNK); + + uint8_t alaw[SAMPLES_PER_CHUNK]; + alaw_compress(SAMPLES_PER_CHUNK, samples, alaw); + + ret = fwrite(alaw, SAMPLES_PER_CHUNK, 1, stdout); + if (ret < 1) + break; + + ret = fread(alaw, SAMPLES_PER_CHUNK, 1, stdin); + if (ret == 0) + break; + assert(ret == 1); + + alaw_expand(SAMPLES_PER_CHUNK, alaw, samples); + + ret = fax_rx(fax, samples, SAMPLES_PER_CHUNK); + assert(ret == 0); + + while (1) { + gettimeofday(&now, NULL); + long long diff = ((long long) next.tv_sec - now.tv_sec) * 1000000 + + ((long long) next.tv_usec - now.tv_usec); + if (diff <= 0) + break; + usleep(diff); + } + } + +// assert(done == 1); + + return 0; +} diff --git a/t/spandsp_send_fax_t38.c b/t/spandsp_send_fax_t38.c new file mode 100644 index 000000000..dad85bdbf --- /dev/null +++ b/t/spandsp_send_fax_t38.c @@ -0,0 +1,125 @@ +#undef NDEBUG +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +#define SAMPLES_PER_CHUNK 160 + + +static int packet_handler(t38_core_state_t *s, void *user_data, const uint8_t *buf, int len, int count) { + static uint16_t seq = 0; + + fprintf(stderr, "send: writing %i bytes %i times, seq %u\n", len, count, seq); + + for (int i = 0; i < count; i++) { + uint16_t hdr[2] = {seq, len}; + int ret = write(1, hdr, sizeof(hdr)); + assert(ret == sizeof(hdr)); + ret = write(1, buf, len); + assert(ret == len); + } + + seq++; + + return 0; +} + +int g_done = 0; + +static void phase_e_handler(t30_state_t *s, void *user_data, int result) { + fprintf(stderr, "phase E result %i\n", result); + assert(result == T30_ERR_OK); + g_done = 1; +} + +static size_t nb_read(int fd, void *b, size_t len) { + size_t left = len; + while (left) { + ssize_t ret = read(fd, b + len - left, left); + if (ret > 0) { + left -= ret; + continue; + } + if (ret == 0) + return 0; + if (errno == EAGAIN && left != len) { + usleep(10000); + continue; + } + return -1; + } + return len; +} + +int main(int argc, char **argv) { + assert(argc == 2); + const char *input_file_name = argv[1]; + + t38_terminal_state_t *fax = t38_terminal_init(NULL, TRUE, packet_handler, NULL); + assert(fax != NULL); + + int use_tep = 0; + int supported_modems = T30_SUPPORT_V27TER | T30_SUPPORT_V29 | T30_SUPPORT_V17; + int use_ecm = 0; + int t38_version = 0; + int options = 0; + + + // taken from t38_terminal_tests.c + t30_state_t *t30 = t38_terminal_get_t30_state(fax); + t38_core_state_t *t38 = t38_terminal_get_t38_core_state(fax); + t38_set_t38_version(t38, t38_version); + t38_terminal_set_config(fax, options); + t38_terminal_set_tep_mode(fax, use_tep); + + t30_set_supported_modems(t30, supported_modems); + t30_set_tx_ident(t30, "11111111"); + t30_set_tx_nsf(t30, (const uint8_t *) "\x50\x00\x00\x00Spandsp\x00", 12); + t30_set_tx_file(t30, input_file_name, -1, -1); + t30_set_ecm_capability(t30, use_ecm); + t30_set_phase_e_handler(t30, phase_e_handler, NULL); + + fcntl(0, F_SETFL, O_NONBLOCK); + + while (1) { + int done = t38_terminal_send_timeout(fax, SAMPLES_PER_CHUNK); + if (done) + break; + + uint16_t hdr[2]; + int ret = nb_read(0, hdr, sizeof(hdr)); + if (ret < 0 && errno == EAGAIN) { + usleep(20000); + continue; + } + if (ret == 0) + break; + assert(ret == sizeof(hdr)); + uint8_t buf[512]; + assert(hdr[1] <= sizeof(buf)); + do + ret = nb_read(0, buf, hdr[1]); + while (ret < 0 && errno == EAGAIN); + assert(ret == hdr[1]); + fprintf(stderr, "send: processing %u bytes, seq %u\n", hdr[1], hdr[0]); + t38_core_rx_ifp_packet(t38, buf, hdr[1], hdr[0]); + } + + //assert(g_done == 1); + + return 0; +} diff --git a/t/t38_udptl_test.pl b/t/t38_udptl_test.pl new file mode 100755 index 000000000..690fb6b99 --- /dev/null +++ b/t/t38_udptl_test.pl @@ -0,0 +1,63 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use IPC::Open3; +use IO::Socket; +use IO::Socket::IP; + +my $laddr = shift or die; +my $lport = shift or die; +my $raddr = shift or die; +my $rport = shift or die; + +my $sock = IO::Socket::IP->new(Type => &SOCK_DGRAM, Proto => 'udp', + LocalHost => $laddr, LocalPort => $lport, + PeerHost => $raddr, PeerPort => $rport, + ) + or die; + +my $devnull; +die unless open($devnull, '>', '/dev/null'); + +my ($src, $sink); +my $pid = open3($sink, $src, ">&".fileno($devnull), @ARGV) or die; + +my $lseq = 0; +my $rseq = 0; +my $srcbuf = ''; + +local $| = 1; + +while (1) { + my $rin = ''; + vec($rin, fileno($src), 1) = 1; + while (select(my $rout = $rin, undef, undef, 0.01) == 1) { + my $ret = sysread($src, my $buf, 1); + last unless $ret; + $srcbuf .= $buf; + my ($seq_out, $len, $pkt) = unpack('SSa*', $srcbuf); + next unless defined($pkt); + next if length($pkt) < $len; + + substr($srcbuf, 0, $len + 4) = ''; + substr($pkt, $len) = ''; + + my $udptl = pack('nCa*Ca*Ca*', $seq_out, length($pkt), $pkt, 0x00, + '', 0, ''); + + print('!'); + last unless $sock->syswrite($udptl); + } + + $rin = ''; + vec($rin, fileno($sock), 1) = 1; + while (select(my $rout = $rin, undef, undef, 0.01) == 1) { + my $ret = $sock->sysread(my $buf, 0xffff); + my ($seq, $len, $pkt) = unpack('nCa*', $buf); + my $t38 = substr($pkt, 0, $len); + + print('.'); + last unless syswrite($sink, pack('SSa*', $seq, length($t38), $t38)); + } +} diff --git a/t/test.tif b/t/test.tif new file mode 100644 index 000000000..b97d712af Binary files /dev/null and b/t/test.tif differ diff --git a/t/transcode-test.c b/t/transcode-test.c index 6bd3c34d0..d5c2e66d5 100644 --- a/t/transcode-test.c +++ b/t/transcode-test.c @@ -108,7 +108,7 @@ static void __sdp_pt_fmt(int num, str codec, int clockrate, str full_codec, str static void offer(void) { printf("offer\n"); codec_rtp_payload_types(media_B, media_A, &rtp_types, &flags); - codec_handlers_update(media_B, media_A, &flags); + codec_handlers_update(media_B, media_A, &flags, NULL); g_queue_clear(&rtp_types); memset(&flags, 0, sizeof(flags)); } @@ -116,7 +116,7 @@ static void offer(void) { static void answer(void) { printf("answer\n"); codec_rtp_payload_types(media_A, media_B, &rtp_types, &flags); - codec_handlers_update(media_A, media_B, &flags); + codec_handlers_update(media_A, media_B, &flags, NULL); } #define expect(side, dir, codecs) \ diff --git a/utils/rtpengine-ng-client b/utils/rtpengine-ng-client index a9b1380a3..24dd10bb5 100755 --- a/utils/rtpengine-ng-client +++ b/utils/rtpengine-ng-client @@ -71,6 +71,7 @@ GetOptions( 'blob=s' => \$options{'blob'}, 'blob-file=s' => \$options{'blob-file'}, 'db-id=i' => \$options{'db-id'}, + 'T38=s@' => \$options{'T.38'}, ) or die; my $cmd = shift(@ARGV) or die; @@ -89,7 +90,7 @@ for my $x (split(/,/, 'trust address,symmetric,asymmetric,unidirectional,force,s for my $x (split(/,/, 'origin,session connection')) { defined($options{'replace-' . $x}) and push(@{$packet{replace}}, $x); } -for my $x (split(/,/, 'rtcp-mux,SDES,supports')) { +for my $x (split(/,/, 'rtcp-mux,SDES,supports,T.38')) { $packet{$x} = $options{$x} if defined($options{$x}) && ref($options{$x}) eq 'ARRAY'; }