diff --git a/daemon/call.c b/daemon/call.c index 15a2043e5..70c510183 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -488,7 +488,7 @@ static void callmaster_timer(void *ptr) { while (i) { ke = i->data; - sfd = hlp.ports[ke->target.target_port]; + sfd = hlp.ports[ke->target.local.port]; // XXX fix for multiple addresses if (!sfd) goto next; @@ -569,7 +569,7 @@ static void callmaster_timer(void *ptr) { redis_update(ps->call, m->conf.redis); next: - hlp.ports[ke->target.target_port] = NULL; + hlp.ports[ke->target.local.port] = NULL; g_slice_free1(sizeof(*ke), ke); i = g_list_delete_link(i, i); if (sfd) diff --git a/daemon/kernel.c b/daemon/kernel.c index e6cbf9219..407ea88ba 100644 --- a/daemon/kernel.c +++ b/daemon/kernel.c @@ -86,13 +86,13 @@ int kernel_add_stream(int fd, struct rtpengine_target_info *mti, int update) { } -int kernel_del_stream(int fd, u_int16_t p) { +int kernel_del_stream(int fd, const struct re_address *a) { struct rtpengine_message msg; int ret; ZERO(msg); msg.cmd = MMG_DEL; - msg.target.target_port = p; + msg.target.local = *a; ret = write(fd, &msg, sizeof(msg)); if (ret > 0) diff --git a/daemon/kernel.h b/daemon/kernel.h index 9271ef0f4..bc93cac58 100644 --- a/daemon/kernel.h +++ b/daemon/kernel.h @@ -11,6 +11,7 @@ struct rtpengine_target_info; +struct re_address; @@ -18,7 +19,7 @@ int kernel_create_table(unsigned int); int kernel_open_table(unsigned int); int kernel_add_stream(int, struct rtpengine_target_info *, int); -int kernel_del_stream(int, u_int16_t); +int kernel_del_stream(int, const struct re_address *); GList *kernel_list(unsigned int); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 146c671a3..cc20eddb6 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -689,7 +689,7 @@ void kernelize(struct packet_stream *stream) { mutex_lock(&sink->out_lock); - reti.target_port = stream->selected_sfd->socket.local.port; + __re_address_translate_ep(&reti.local, &stream->selected_sfd->socket.local); reti.tos = call->tos; reti.rtcp_mux = MEDIA_ISSET(stream->media, RTCP_MUX); reti.dtls = MEDIA_ISSET(stream->media, DTLS); @@ -747,13 +747,17 @@ no_kernel: /* must be called with in_lock held or call->master_lock held in W */ void __unkernelize(struct packet_stream *p) { + struct re_address rea; + if (!PS_ISSET(p, KERNELIZED)) return; if (PS_ISSET(p, NO_KERNEL_SUPPORT)) return; - if (p->call->callmaster->conf.kernelfd >= 0) - kernel_del_stream(p->call->callmaster->conf.kernelfd, p->selected_sfd->socket.local.port); + if (p->call->callmaster->conf.kernelfd >= 0) { + __re_address_translate_ep(&rea, &p->selected_sfd->socket.local); + kernel_del_stream(p->call->callmaster->conf.kernelfd, &rea); + } PS_CLEAR(p, KERNELIZED); } diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index f57c7f620..740355637 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -120,7 +120,8 @@ static void *proc_main_list_next(struct seq_file *, void *, loff_t *); static int proc_main_list_show(struct seq_file *, void *); static void table_push(struct rtpengine_table *); -static struct rtpengine_target *get_target(struct rtpengine_table *, u_int16_t); +static struct rtpengine_target *get_target(struct rtpengine_table *, const struct re_address *); +static int is_valid_address(const struct re_address *rea); static int aes_f8_session_key_init(struct re_crypto_context *, struct rtpengine_srtp *); static int srtp_encrypt_aes_cm(struct re_crypto_context *, struct rtpengine_srtp *, @@ -176,8 +177,19 @@ struct re_bitfield { }; struct re_bucket { - struct re_bitfield targets; - struct rtpengine_target *target[256]; + struct re_bitfield ports_lo_bf; + struct rtpengine_target *ports_lo[256]; +}; + +struct re_dest_addr { + struct re_address destination; + struct re_bitfield ports_hi_bf; + struct re_bucket *ports_hi[256]; +}; + +struct re_dest_addr_hash { + struct re_bitfield addrs_bf; + struct re_dest_addr *addrs[256]; }; struct rtpengine_table { @@ -192,10 +204,9 @@ struct rtpengine_table { struct proc_dir_entry *list; struct proc_dir_entry *blist; - struct re_bitfield buckets; - struct re_bucket *bucket[256]; + struct re_dest_addr_hash dest_addr_hash; - unsigned int targets; + unsigned int num_targets; }; struct re_cipher { @@ -514,7 +525,8 @@ static void clear_proc(struct proc_dir_entry **e) { static void table_push(struct rtpengine_table *t) { - int i, j; + int i, j, k; + struct re_dest_addr *rda; struct re_bucket *b; if (!t) @@ -525,23 +537,33 @@ static void table_push(struct rtpengine_table *t) { DBG("Freeing table\n"); - for (i = 0; i < 256; i++) { - b = t->bucket[i]; - if (!b) + for (k = 0; k < 256; k++) { + rda = t->dest_addr_hash.addrs[i]; + if (!rda) continue; - for (j = 0; j < 256; j++) { - if (!b->target[j]) + for (i = 0; i < 256; i++) { + b = rda->ports_hi[i]; + if (!b) continue; - b->target[j]->table = -1; - target_push(b->target[j]); - b->target[j] = NULL; + + for (j = 0; j < 256; j++) { + if (!b->ports_lo[j]) + continue; + b->ports_lo[j]->table = -1; + target_push(b->ports_lo[j]); + b->ports_lo[j] = NULL; + } + + kfree(b); + rda->ports_hi[i] = NULL; } - kfree(b); - t->bucket[i] = NULL; + kfree(rda); + t->dest_addr_hash.addrs[i] = NULL; } + clear_proc(&t->status); clear_proc(&t->control); clear_proc(&t->list); @@ -634,8 +656,7 @@ static ssize_t proc_status(struct file *f, char __user *b, size_t l, loff_t *o) read_lock_irqsave(&t->target_lock, flags); len += sprintf(buf + len, "Refcount: %u\n", atomic_read(&t->refcnt) - 1); len += sprintf(buf + len, "Control PID: %u\n", t->pid); - len += sprintf(buf + len, "Targets: %u\n", t->targets); - len += sprintf(buf + len, "Buckets: %u\n", t->buckets.used); + len += sprintf(buf + len, "Targets: %u\n", t->num_targets); read_unlock_irqrestore(&t->target_lock, flags); table_push(t); @@ -736,13 +757,19 @@ static inline void bitfield_clear(struct re_bitfield *bf, unsigned char i) { bf->b[b] &= ~k; bf->used--; } -static inline struct rtpengine_target *find_next_target(struct rtpengine_table *t, int *port) { +static inline struct rtpengine_target *find_next_target(struct rtpengine_table *t, int *addr_bucket, + int *port) +{ unsigned long flags; + struct re_dest_addr *rda; struct re_bucket *b; unsigned char hi, lo; - unsigned int hi_b, lo_b; + unsigned int rda_b, hi_b, lo_b; struct rtpengine_target *g; + if (*addr_bucket < 0 || *addr_bucket > 255) + return NULL; + if (*port < 0 || *port > 0xffff) return NULL; @@ -752,27 +779,39 @@ static inline struct rtpengine_target *find_next_target(struct rtpengine_table * read_lock_irqsave(&t->target_lock, flags); for (;;) { + rda_b = bitfield_slot(*addr_bucket); + if (!t->dest_addr_hash.addrs_bf.b[rda_b]) { + *addr_bucket = bitfield_next_slot(rda_b); + goto next_rda; + } + + rda = t->dest_addr_hash.addrs[*addr_bucket]; + if (!rda) { + (*addr_bucket)++; + goto next_rda; + } + hi_b = bitfield_slot(hi); - if (!t->buckets.b[hi_b]) { + if (!rda->ports_hi_bf.b[hi_b]) { hi = bitfield_next_slot(hi_b); lo = 0; - goto next; + goto next_hi; } - b = t->bucket[hi]; + b = rda->ports_hi[hi]; if (!b) { hi++; lo = 0; - goto next; + goto next_hi; } lo_b = bitfield_slot(lo); - if (!b->targets.b[lo_b]) { + if (!b->ports_lo_bf.b[lo_b]) { lo = bitfield_next_slot(lo_b); goto next_lo; } - g = b->target[lo]; + g = b->ports_lo[lo]; if (!g) { lo++; goto next_lo; @@ -784,8 +823,11 @@ static inline struct rtpengine_target *find_next_target(struct rtpengine_table * next_lo: if (!lo) hi++; -next: +next_hi: if (!hi && !lo) + (*addr_bucket)++; +next_rda: + if (!*addr_bucket && !hi && !lo) break; } @@ -832,7 +874,7 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t u_int32_t id; struct rtpengine_table *t; struct rtpengine_list_entry op; - int err, port, i; + int err, port, addr_bucket, i; struct rtpengine_target *g; unsigned long flags; @@ -847,9 +889,10 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t if (!t) return -ENOENT; - port = (int) *o; - g = find_next_target(t, &port); - *o = port; + addr_bucket = ((int) *o) >> 16; + port = ((int) *o) & 0xffff; + g = find_next_target(t, &addr_bucket, &port); + *o = (addr_bucket << 16) | port; err = 0; if (!g) goto err; @@ -928,34 +971,34 @@ static void *proc_list_next(struct seq_file *f, void *v, loff_t *o) { /* v is in u_int32_t id = (u_int32_t) (unsigned long) f->private; struct rtpengine_table *t; struct rtpengine_target *g; - int port; + int port, addr_bucket; - port = (int) *o; + addr_bucket = ((int) *o) >> 16; + port = ((int) *o) & 0xffff; t = get_table(id); if (!t) return NULL; - g = find_next_target(t, &port); + g = find_next_target(t, &addr_bucket, &port); - *o = port; + *o = (addr_bucket << 16) | port; table_push(t); return g; } -static void proc_list_addr_print(struct seq_file *f, const char *s, const struct re_address *a) { +static void seq_addr_print(struct seq_file *f, const struct re_address *a) { if (!a->family) return; - seq_printf(f, " %6s ", s); switch (a->family) { case AF_INET: - seq_printf(f, "inet4 %u.%u.%u.%u:%u\n", a->u.u8[0], a->u.u8[1], a->u.u8[2], + seq_printf(f, "inet4 %u.%u.%u.%u:%u", a->u.u8[0], a->u.u8[1], a->u.u8[2], a->u.u8[3], a->port); break; case AF_INET6: - seq_printf(f, "inet6 [%x:%x:%x:%x:%x:%x:%x:%x]:%u\n", + seq_printf(f, "inet6 [%x:%x:%x:%x:%x:%x:%x:%x]:%u", htons(a->u.u16[0]), htons(a->u.u16[1]), htons(a->u.u16[2]), htons(a->u.u16[3]), htons(a->u.u16[4]), htons(a->u.u16[5]), htons(a->u.u16[6]), htons(a->u.u16[7]), a->port); @@ -966,6 +1009,15 @@ static void proc_list_addr_print(struct seq_file *f, const char *s, const struct } } +static void proc_list_addr_print(struct seq_file *f, const char *s, const struct re_address *a) { + if (!a->family) + return; + + seq_printf(f, " %6s ", s); + seq_addr_print(f, a); + seq_printf(f, "\n"); +} + static void proc_list_crypto_print(struct seq_file *f, struct re_crypto_context *c, struct rtpengine_srtp *s, const char *label) { @@ -993,7 +1045,9 @@ static int proc_list_show(struct seq_file *f, void *v) { struct rtpengine_target *g = v; int i; - seq_printf(f, "port %5u:\n", g->target.target_port); + seq_printf(f, "local "); + seq_addr_print(f, &g->target.local); + seq_printf(f, "\n"); proc_list_addr_print(f, "src", &g->target.src_addr); proc_list_addr_print(f, "dst", &g->target.dst_addr); proc_list_addr_print(f, "mirror", &g->target.mirror_addr); @@ -1024,37 +1078,119 @@ static int proc_list_show(struct seq_file *f, void *v) { +static unsigned int re_address_hash(const struct re_address *a) { + u_int32_t ret = 0; -static int table_del_target(struct rtpengine_table *t, u_int16_t port) { + if (!a) + goto out; + + ret += a->family; + + switch (a->family) { + case AF_INET: + ret += a->u.ipv4; + break; + case AF_INET6: + ret += a->u.u32[0]; + ret += a->u.u32[1]; + ret += a->u.u32[2]; + ret += a->u.u32[3]; + break; + default: + goto out; + } + + ret = (ret & 0xffff) ^ ((ret & 0xffff0000) >> 16); + ret = (ret & 0xff) ^ ((ret & 0xff00) >> 8); + +out: + return ret; +} + +static int re_address_match(const struct re_address *a, const struct re_address *b) { + if (!a || !b) + return 0; + if (a->family != b->family) + return 0; + + switch (a->family) { + case AF_INET: + if (a->u.ipv4 == b->u.ipv4) + return 1; + break; + case AF_INET6: + if (!memcmp(a->u.ipv6, b->u.ipv6, sizeof(a->u.ipv6))) + return 1; + break; + default: + return 0; + } + + return 0; +} + +static struct re_dest_addr *find_dest_addr(const struct re_dest_addr_hash *h, const struct re_address *local) { + unsigned int rda_hash, i; + struct re_dest_addr *rda; + + i = rda_hash = re_address_hash(local); + + while (1) { + rda = h->addrs[i]; + if (!rda) + return NULL; + if (re_address_match(local, &rda->destination)) + return rda; + + i++; + if (i >= 256) + i = 0; + if (i == rda_hash) + return NULL; + } +} + + + + + +static int table_del_target(struct rtpengine_table *t, const struct re_address *local) { unsigned char hi, lo; + struct re_dest_addr *rda; struct re_bucket *b; struct rtpengine_target *g = NULL; unsigned long flags; - if (!port) + if (!local || !is_valid_address(local)) return -EINVAL; - hi = (port & 0xff00) >> 8; - lo = port & 0xff; + hi = (local->port & 0xff00) >> 8; + lo = local->port & 0xff; write_lock_irqsave(&t->target_lock, flags); - b = t->bucket[hi]; + + rda = find_dest_addr(&t->dest_addr_hash, local); + if (!rda) + goto out; + b = rda->ports_hi[hi]; if (!b) goto out; - g = b->target[lo]; + g = b->ports_lo[lo]; if (!g) goto out; - b->target[lo] = NULL; - bitfield_clear(&b->targets, lo); - t->targets--; - if (!b->targets.used) { - t->bucket[hi] = NULL; - bitfield_clear(&t->buckets, hi); + b->ports_lo[lo] = NULL; + bitfield_clear(&b->ports_lo_bf, lo); + t->num_targets--; + if (!b->ports_lo_bf.used) { + rda->ports_hi[hi] = NULL; + bitfield_clear(&rda->ports_hi_bf, hi); } else b = NULL; + /* not freeing or NULLing the re_dest_addr due to hash collision logic */ + out: write_unlock_irqrestore(&t->target_lock, flags); @@ -1071,7 +1207,7 @@ out: -static int is_valid_address(struct re_address *rea) { +static int is_valid_address(const struct re_address *rea) { switch (rea->family) { case AF_INET: if (!rea->u.ipv4) @@ -1396,13 +1532,17 @@ static void crypto_context_init(struct re_crypto_context *c, struct rtpengine_sr static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_info *i, int update) { unsigned char hi, lo; + unsigned int rda_hash, rh_it; struct rtpengine_target *g; + struct re_dest_addr *rda; struct re_bucket *b, *ba = NULL; struct rtpengine_target *og = NULL; int err, j; unsigned long flags; - if (!i->target_port) + /* validation */ + + if (!is_valid_address(&i->local)) return -EINVAL; if (!is_valid_address(&i->src_addr)) return -EINVAL; @@ -1423,6 +1563,8 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i DBG("Creating new target\n"); + /* initializing */ + err = -ENOMEM; g = kmalloc(sizeof(*g), GFP_KERNEL); if (!g) @@ -1443,37 +1585,86 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i if (err) goto fail2; - hi = (i->target_port & 0xff00) >> 8; - lo = i->target_port & 0xff; + /* find or allocate re_dest_addr */ + + rda_hash = re_address_hash(&i->local); + hi = (i->local.port & 0xff00) >> 8; + lo = i->local.port & 0xff; +retry: + rh_it = rda_hash; write_lock_irqsave(&t->target_lock, flags); - if (!(b = t->bucket[hi])) { - err = -ENOENT; - if (update) + + rda = t->dest_addr_hash.addrs[rh_it]; + while (rda) { + if (re_address_match(&rda->destination, &i->local)) + goto got_rda; + rh_it++; + if (rh_it >= 256) + rh_it = 0; + err = -ENXIO; + if (rh_it == rda_hash) goto fail4; + rda = t->dest_addr_hash.addrs[rh_it]; + } + + err = -ENOENT; + if (update) + goto fail4; + write_unlock_irqrestore(&t->target_lock, flags); + + rda = kmalloc(sizeof(*rda), GFP_KERNEL); + err = -ENOMEM; + if (!rda) + goto fail2; + memset(rda, 0, sizeof(*rda)); + memcpy(&rda->destination, &i->local, sizeof(rda->destination)); + + write_lock_irqsave(&t->target_lock, flags); + + if (t->dest_addr_hash.addrs[rh_it]) { write_unlock_irqrestore(&t->target_lock, flags); + kfree(rda); + goto retry; + } + + t->dest_addr_hash.addrs[rh_it] = rda; + bitfield_set(&t->dest_addr_hash.addrs_bf, rh_it); - b = kmalloc(sizeof(*b), GFP_KERNEL); - err = -ENOMEM; - if (!b) - goto fail2; - memset(b, 0, sizeof(*b)); +got_rda: + /* find or allocate re_bucket */ - write_lock_irqsave(&t->target_lock, flags); + if ((b = rda->ports_hi[hi])) + goto got_bucket; - if (!t->bucket[hi]) { - t->bucket[hi] = b; - bitfield_set(&t->buckets, hi); - } - else { - ba = b; - b = t->bucket[hi]; - } + err = -ENOENT; + if (update) + goto fail4; + + write_unlock_irqrestore(&t->target_lock, flags); + + b = kmalloc(sizeof(*b), GFP_KERNEL); + err = -ENOMEM; + if (!b) + goto fail2; + memset(b, 0, sizeof(*b)); + + write_lock_irqsave(&t->target_lock, flags); + + if (!rda->ports_hi[hi]) { + rda->ports_hi[hi] = b; + bitfield_set(&rda->ports_hi_bf, hi); + } + else { + ba = b; + b = rda->ports_hi[hi]; } + +got_bucket: if (update) { err = -ENOENT; - og = b->target[lo]; + og = b->ports_lo[lo]; if (!og) goto fail4; @@ -1492,13 +1683,13 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i } else { err = -EEXIST; - if (b->target[lo]) + if (b->ports_lo[lo]) goto fail4; - bitfield_set(&b->targets, lo); - t->targets++; + bitfield_set(&b->ports_lo_bf, lo); + t->num_targets++; } - b->target[lo] = g; + b->ports_lo[lo] = g; g = NULL; write_unlock_irqrestore(&t->target_lock, flags); @@ -1523,21 +1714,24 @@ fail1: -static struct rtpengine_target *get_target(struct rtpengine_table *t, u_int16_t port) { +static struct rtpengine_target *get_target(struct rtpengine_table *t, const struct re_address *local) { unsigned char hi, lo; + struct re_dest_addr *rda; struct rtpengine_target *r; unsigned long flags; if (!t) return NULL; - if (!port) + if (!local) return NULL; - hi = (port & 0xff00) >> 8; - lo = port & 0xff; + hi = (local->port & 0xff00) >> 8; + lo = local->port & 0xff; read_lock_irqsave(&t->target_lock, flags); - r = t->bucket[hi] ? t->bucket[hi]->target[lo] : NULL; + + rda = find_dest_addr(&t->dest_addr_hash, local); + r = rda ? (rda->ports_hi[hi] ? rda->ports_hi[hi]->ports_lo[lo] : NULL) : NULL; if (r) target_hold(r); read_unlock_irqrestore(&t->target_lock, flags); @@ -1684,7 +1878,7 @@ static ssize_t proc_control_write(struct file *file, const char __user *buf, siz break; case MMG_DEL: - err = table_del_target(t, msg.target.target_port); + err = table_del_target(t, &msg.target.local); if (err) goto err; break; @@ -2201,7 +2395,9 @@ static inline int rtp_payload_type(const struct rtp_header *hdr, const struct rt return match - tg->payload_types; } -static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src, u_int8_t in_tos) { +static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src, + struct re_address *dst, u_int8_t in_tos) +{ struct udphdr *uh; struct rtpengine_target *g; struct sk_buff *skb2; @@ -2227,8 +2423,9 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, skb_trim(skb, datalen); src->port = ntohs(uh->source); + dst->port = ntohs(uh->dest); - g = get_target(t, ntohs(uh->dest)); + g = get_target(t, dst); if (!g) goto skip2; @@ -2396,7 +2593,7 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para struct sk_buff *skb; struct iphdr *ih; struct rtpengine_table *t; - struct re_address src; + struct re_address src, dst; t = get_table(pinfo->id); if (!t) @@ -2415,8 +2612,10 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para memset(&src, 0, sizeof(src)); src.family = AF_INET; src.u.ipv4 = ih->saddr; + dst.family = AF_INET; + dst.u.ipv4 = ih->daddr; - return rtpengine46(skb, t, &src, (u_int8_t)ih->tos); + return rtpengine46(skb, t, &src, &dst, (u_int8_t)ih->tos); skip2: kfree_skb(skb); @@ -2438,7 +2637,7 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para struct sk_buff *skb; struct ipv6hdr *ih; struct rtpengine_table *t; - struct re_address src; + struct re_address src, dst; t = get_table(pinfo->id); if (!t) @@ -2458,8 +2657,10 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para memset(&src, 0, sizeof(src)); src.family = AF_INET6; memcpy(&src.u.ipv6, &ih->saddr, sizeof(src.u.ipv6)); + dst.family = AF_INET6; + memcpy(&dst.u.ipv6, &ih->daddr, sizeof(dst.u.ipv6)); - return rtpengine46(skb, t, &src, ipv6_get_dsfield(ih)); + return rtpengine46(skb, t, &src, &dst, ipv6_get_dsfield(ih)); skip2: kfree_skb(skb); diff --git a/kernel-module/xt_RTPENGINE.h b/kernel-module/xt_RTPENGINE.h index f30c5320b..70367bdd8 100644 --- a/kernel-module/xt_RTPENGINE.h +++ b/kernel-module/xt_RTPENGINE.h @@ -75,7 +75,7 @@ enum rtpengine_src_mismatch { }; struct rtpengine_target_info { - u_int16_t target_port; + struct re_address local; struct re_address expected_src; /* for incoming packets */ enum rtpengine_src_mismatch src_mismatch;