Browse Source

update kernel module for multiple local addresses

pull/163/head
Richard Fuchs 11 years ago
parent
commit
231c096420
6 changed files with 306 additions and 100 deletions
  1. +2
    -2
      daemon/call.c
  2. +2
    -2
      daemon/kernel.c
  3. +2
    -1
      daemon/kernel.h
  4. +7
    -3
      daemon/media_socket.c
  5. +292
    -91
      kernel-module/xt_RTPENGINE.c
  6. +1
    -1
      kernel-module/xt_RTPENGINE.h

+ 2
- 2
daemon/call.c View File

@ -488,7 +488,7 @@ static void callmaster_timer(void *ptr) {
while (i) {
ke = i->data;
sfd = hlp.ports[ke->target.target_port];
sfd = hlp.ports[ke->target.local.port]; // XXX fix for multiple addresses
if (!sfd)
goto next;
@ -569,7 +569,7 @@ static void callmaster_timer(void *ptr) {
redis_update(ps->call, m->conf.redis);
next:
hlp.ports[ke->target.target_port] = NULL;
hlp.ports[ke->target.local.port] = NULL;
g_slice_free1(sizeof(*ke), ke);
i = g_list_delete_link(i, i);
if (sfd)


+ 2
- 2
daemon/kernel.c View File

@ -86,13 +86,13 @@ int kernel_add_stream(int fd, struct rtpengine_target_info *mti, int update) {
}
int kernel_del_stream(int fd, u_int16_t p) {
int kernel_del_stream(int fd, const struct re_address *a) {
struct rtpengine_message msg;
int ret;
ZERO(msg);
msg.cmd = MMG_DEL;
msg.target.target_port = p;
msg.target.local = *a;
ret = write(fd, &msg, sizeof(msg));
if (ret > 0)


+ 2
- 1
daemon/kernel.h View File

@ -11,6 +11,7 @@
struct rtpengine_target_info;
struct re_address;
@ -18,7 +19,7 @@ int kernel_create_table(unsigned int);
int kernel_open_table(unsigned int);
int kernel_add_stream(int, struct rtpengine_target_info *, int);
int kernel_del_stream(int, u_int16_t);
int kernel_del_stream(int, const struct re_address *);
GList *kernel_list(unsigned int);


+ 7
- 3
daemon/media_socket.c View File

@ -689,7 +689,7 @@ void kernelize(struct packet_stream *stream) {
mutex_lock(&sink->out_lock);
reti.target_port = stream->selected_sfd->socket.local.port;
__re_address_translate_ep(&reti.local, &stream->selected_sfd->socket.local);
reti.tos = call->tos;
reti.rtcp_mux = MEDIA_ISSET(stream->media, RTCP_MUX);
reti.dtls = MEDIA_ISSET(stream->media, DTLS);
@ -747,13 +747,17 @@ no_kernel:
/* must be called with in_lock held or call->master_lock held in W */
void __unkernelize(struct packet_stream *p) {
struct re_address rea;
if (!PS_ISSET(p, KERNELIZED))
return;
if (PS_ISSET(p, NO_KERNEL_SUPPORT))
return;
if (p->call->callmaster->conf.kernelfd >= 0)
kernel_del_stream(p->call->callmaster->conf.kernelfd, p->selected_sfd->socket.local.port);
if (p->call->callmaster->conf.kernelfd >= 0) {
__re_address_translate_ep(&rea, &p->selected_sfd->socket.local);
kernel_del_stream(p->call->callmaster->conf.kernelfd, &rea);
}
PS_CLEAR(p, KERNELIZED);
}


+ 292
- 91
kernel-module/xt_RTPENGINE.c View File

@ -120,7 +120,8 @@ static void *proc_main_list_next(struct seq_file *, void *, loff_t *);
static int proc_main_list_show(struct seq_file *, void *);
static void table_push(struct rtpengine_table *);
static struct rtpengine_target *get_target(struct rtpengine_table *, u_int16_t);
static struct rtpengine_target *get_target(struct rtpengine_table *, const struct re_address *);
static int is_valid_address(const struct re_address *rea);
static int aes_f8_session_key_init(struct re_crypto_context *, struct rtpengine_srtp *);
static int srtp_encrypt_aes_cm(struct re_crypto_context *, struct rtpengine_srtp *,
@ -176,8 +177,19 @@ struct re_bitfield {
};
struct re_bucket {
struct re_bitfield targets;
struct rtpengine_target *target[256];
struct re_bitfield ports_lo_bf;
struct rtpengine_target *ports_lo[256];
};
struct re_dest_addr {
struct re_address destination;
struct re_bitfield ports_hi_bf;
struct re_bucket *ports_hi[256];
};
struct re_dest_addr_hash {
struct re_bitfield addrs_bf;
struct re_dest_addr *addrs[256];
};
struct rtpengine_table {
@ -192,10 +204,9 @@ struct rtpengine_table {
struct proc_dir_entry *list;
struct proc_dir_entry *blist;
struct re_bitfield buckets;
struct re_bucket *bucket[256];
struct re_dest_addr_hash dest_addr_hash;
unsigned int targets;
unsigned int num_targets;
};
struct re_cipher {
@ -514,7 +525,8 @@ static void clear_proc(struct proc_dir_entry **e) {
static void table_push(struct rtpengine_table *t) {
int i, j;
int i, j, k;
struct re_dest_addr *rda;
struct re_bucket *b;
if (!t)
@ -525,23 +537,33 @@ static void table_push(struct rtpengine_table *t) {
DBG("Freeing table\n");
for (i = 0; i < 256; i++) {
b = t->bucket[i];
if (!b)
for (k = 0; k < 256; k++) {
rda = t->dest_addr_hash.addrs[i];
if (!rda)
continue;
for (j = 0; j < 256; j++) {
if (!b->target[j])
for (i = 0; i < 256; i++) {
b = rda->ports_hi[i];
if (!b)
continue;
b->target[j]->table = -1;
target_push(b->target[j]);
b->target[j] = NULL;
for (j = 0; j < 256; j++) {
if (!b->ports_lo[j])
continue;
b->ports_lo[j]->table = -1;
target_push(b->ports_lo[j]);
b->ports_lo[j] = NULL;
}
kfree(b);
rda->ports_hi[i] = NULL;
}
kfree(b);
t->bucket[i] = NULL;
kfree(rda);
t->dest_addr_hash.addrs[i] = NULL;
}
clear_proc(&t->status);
clear_proc(&t->control);
clear_proc(&t->list);
@ -634,8 +656,7 @@ static ssize_t proc_status(struct file *f, char __user *b, size_t l, loff_t *o)
read_lock_irqsave(&t->target_lock, flags);
len += sprintf(buf + len, "Refcount: %u\n", atomic_read(&t->refcnt) - 1);
len += sprintf(buf + len, "Control PID: %u\n", t->pid);
len += sprintf(buf + len, "Targets: %u\n", t->targets);
len += sprintf(buf + len, "Buckets: %u\n", t->buckets.used);
len += sprintf(buf + len, "Targets: %u\n", t->num_targets);
read_unlock_irqrestore(&t->target_lock, flags);
table_push(t);
@ -736,13 +757,19 @@ static inline void bitfield_clear(struct re_bitfield *bf, unsigned char i) {
bf->b[b] &= ~k;
bf->used--;
}
static inline struct rtpengine_target *find_next_target(struct rtpengine_table *t, int *port) {
static inline struct rtpengine_target *find_next_target(struct rtpengine_table *t, int *addr_bucket,
int *port)
{
unsigned long flags;
struct re_dest_addr *rda;
struct re_bucket *b;
unsigned char hi, lo;
unsigned int hi_b, lo_b;
unsigned int rda_b, hi_b, lo_b;
struct rtpengine_target *g;
if (*addr_bucket < 0 || *addr_bucket > 255)
return NULL;
if (*port < 0 || *port > 0xffff)
return NULL;
@ -752,27 +779,39 @@ static inline struct rtpengine_target *find_next_target(struct rtpengine_table *
read_lock_irqsave(&t->target_lock, flags);
for (;;) {
rda_b = bitfield_slot(*addr_bucket);
if (!t->dest_addr_hash.addrs_bf.b[rda_b]) {
*addr_bucket = bitfield_next_slot(rda_b);
goto next_rda;
}
rda = t->dest_addr_hash.addrs[*addr_bucket];
if (!rda) {
(*addr_bucket)++;
goto next_rda;
}
hi_b = bitfield_slot(hi);
if (!t->buckets.b[hi_b]) {
if (!rda->ports_hi_bf.b[hi_b]) {
hi = bitfield_next_slot(hi_b);
lo = 0;
goto next;
goto next_hi;
}
b = t->bucket[hi];
b = rda->ports_hi[hi];
if (!b) {
hi++;
lo = 0;
goto next;
goto next_hi;
}
lo_b = bitfield_slot(lo);
if (!b->targets.b[lo_b]) {
if (!b->ports_lo_bf.b[lo_b]) {
lo = bitfield_next_slot(lo_b);
goto next_lo;
}
g = b->target[lo];
g = b->ports_lo[lo];
if (!g) {
lo++;
goto next_lo;
@ -784,8 +823,11 @@ static inline struct rtpengine_target *find_next_target(struct rtpengine_table *
next_lo:
if (!lo)
hi++;
next:
next_hi:
if (!hi && !lo)
(*addr_bucket)++;
next_rda:
if (!*addr_bucket && !hi && !lo)
break;
}
@ -832,7 +874,7 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t
u_int32_t id;
struct rtpengine_table *t;
struct rtpengine_list_entry op;
int err, port, i;
int err, port, addr_bucket, i;
struct rtpengine_target *g;
unsigned long flags;
@ -847,9 +889,10 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t
if (!t)
return -ENOENT;
port = (int) *o;
g = find_next_target(t, &port);
*o = port;
addr_bucket = ((int) *o) >> 16;
port = ((int) *o) & 0xffff;
g = find_next_target(t, &addr_bucket, &port);
*o = (addr_bucket << 16) | port;
err = 0;
if (!g)
goto err;
@ -928,34 +971,34 @@ static void *proc_list_next(struct seq_file *f, void *v, loff_t *o) { /* v is in
u_int32_t id = (u_int32_t) (unsigned long) f->private;
struct rtpengine_table *t;
struct rtpengine_target *g;
int port;
int port, addr_bucket;
port = (int) *o;
addr_bucket = ((int) *o) >> 16;
port = ((int) *o) & 0xffff;
t = get_table(id);
if (!t)
return NULL;
g = find_next_target(t, &port);
g = find_next_target(t, &addr_bucket, &port);
*o = port;
*o = (addr_bucket << 16) | port;
table_push(t);
return g;
}
static void proc_list_addr_print(struct seq_file *f, const char *s, const struct re_address *a) {
static void seq_addr_print(struct seq_file *f, const struct re_address *a) {
if (!a->family)
return;
seq_printf(f, " %6s ", s);
switch (a->family) {
case AF_INET:
seq_printf(f, "inet4 %u.%u.%u.%u:%u\n", a->u.u8[0], a->u.u8[1], a->u.u8[2],
seq_printf(f, "inet4 %u.%u.%u.%u:%u", a->u.u8[0], a->u.u8[1], a->u.u8[2],
a->u.u8[3], a->port);
break;
case AF_INET6:
seq_printf(f, "inet6 [%x:%x:%x:%x:%x:%x:%x:%x]:%u\n",
seq_printf(f, "inet6 [%x:%x:%x:%x:%x:%x:%x:%x]:%u",
htons(a->u.u16[0]), htons(a->u.u16[1]),
htons(a->u.u16[2]), htons(a->u.u16[3]), htons(a->u.u16[4]), htons(a->u.u16[5]),
htons(a->u.u16[6]), htons(a->u.u16[7]), a->port);
@ -966,6 +1009,15 @@ static void proc_list_addr_print(struct seq_file *f, const char *s, const struct
}
}
static void proc_list_addr_print(struct seq_file *f, const char *s, const struct re_address *a) {
if (!a->family)
return;
seq_printf(f, " %6s ", s);
seq_addr_print(f, a);
seq_printf(f, "\n");
}
static void proc_list_crypto_print(struct seq_file *f, struct re_crypto_context *c,
struct rtpengine_srtp *s, const char *label)
{
@ -993,7 +1045,9 @@ static int proc_list_show(struct seq_file *f, void *v) {
struct rtpengine_target *g = v;
int i;
seq_printf(f, "port %5u:\n", g->target.target_port);
seq_printf(f, "local ");
seq_addr_print(f, &g->target.local);
seq_printf(f, "\n");
proc_list_addr_print(f, "src", &g->target.src_addr);
proc_list_addr_print(f, "dst", &g->target.dst_addr);
proc_list_addr_print(f, "mirror", &g->target.mirror_addr);
@ -1024,37 +1078,119 @@ static int proc_list_show(struct seq_file *f, void *v) {
static unsigned int re_address_hash(const struct re_address *a) {
u_int32_t ret = 0;
static int table_del_target(struct rtpengine_table *t, u_int16_t port) {
if (!a)
goto out;
ret += a->family;
switch (a->family) {
case AF_INET:
ret += a->u.ipv4;
break;
case AF_INET6:
ret += a->u.u32[0];
ret += a->u.u32[1];
ret += a->u.u32[2];
ret += a->u.u32[3];
break;
default:
goto out;
}
ret = (ret & 0xffff) ^ ((ret & 0xffff0000) >> 16);
ret = (ret & 0xff) ^ ((ret & 0xff00) >> 8);
out:
return ret;
}
static int re_address_match(const struct re_address *a, const struct re_address *b) {
if (!a || !b)
return 0;
if (a->family != b->family)
return 0;
switch (a->family) {
case AF_INET:
if (a->u.ipv4 == b->u.ipv4)
return 1;
break;
case AF_INET6:
if (!memcmp(a->u.ipv6, b->u.ipv6, sizeof(a->u.ipv6)))
return 1;
break;
default:
return 0;
}
return 0;
}
static struct re_dest_addr *find_dest_addr(const struct re_dest_addr_hash *h, const struct re_address *local) {
unsigned int rda_hash, i;
struct re_dest_addr *rda;
i = rda_hash = re_address_hash(local);
while (1) {
rda = h->addrs[i];
if (!rda)
return NULL;
if (re_address_match(local, &rda->destination))
return rda;
i++;
if (i >= 256)
i = 0;
if (i == rda_hash)
return NULL;
}
}
static int table_del_target(struct rtpengine_table *t, const struct re_address *local) {
unsigned char hi, lo;
struct re_dest_addr *rda;
struct re_bucket *b;
struct rtpengine_target *g = NULL;
unsigned long flags;
if (!port)
if (!local || !is_valid_address(local))
return -EINVAL;
hi = (port & 0xff00) >> 8;
lo = port & 0xff;
hi = (local->port & 0xff00) >> 8;
lo = local->port & 0xff;
write_lock_irqsave(&t->target_lock, flags);
b = t->bucket[hi];
rda = find_dest_addr(&t->dest_addr_hash, local);
if (!rda)
goto out;
b = rda->ports_hi[hi];
if (!b)
goto out;
g = b->target[lo];
g = b->ports_lo[lo];
if (!g)
goto out;
b->target[lo] = NULL;
bitfield_clear(&b->targets, lo);
t->targets--;
if (!b->targets.used) {
t->bucket[hi] = NULL;
bitfield_clear(&t->buckets, hi);
b->ports_lo[lo] = NULL;
bitfield_clear(&b->ports_lo_bf, lo);
t->num_targets--;
if (!b->ports_lo_bf.used) {
rda->ports_hi[hi] = NULL;
bitfield_clear(&rda->ports_hi_bf, hi);
}
else
b = NULL;
/* not freeing or NULLing the re_dest_addr due to hash collision logic */
out:
write_unlock_irqrestore(&t->target_lock, flags);
@ -1071,7 +1207,7 @@ out:
static int is_valid_address(struct re_address *rea) {
static int is_valid_address(const struct re_address *rea) {
switch (rea->family) {
case AF_INET:
if (!rea->u.ipv4)
@ -1396,13 +1532,17 @@ static void crypto_context_init(struct re_crypto_context *c, struct rtpengine_sr
static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_info *i, int update) {
unsigned char hi, lo;
unsigned int rda_hash, rh_it;
struct rtpengine_target *g;
struct re_dest_addr *rda;
struct re_bucket *b, *ba = NULL;
struct rtpengine_target *og = NULL;
int err, j;
unsigned long flags;
if (!i->target_port)
/* validation */
if (!is_valid_address(&i->local))
return -EINVAL;
if (!is_valid_address(&i->src_addr))
return -EINVAL;
@ -1423,6 +1563,8 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
DBG("Creating new target\n");
/* initializing */
err = -ENOMEM;
g = kmalloc(sizeof(*g), GFP_KERNEL);
if (!g)
@ -1443,37 +1585,86 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
if (err)
goto fail2;
hi = (i->target_port & 0xff00) >> 8;
lo = i->target_port & 0xff;
/* find or allocate re_dest_addr */
rda_hash = re_address_hash(&i->local);
hi = (i->local.port & 0xff00) >> 8;
lo = i->local.port & 0xff;
retry:
rh_it = rda_hash;
write_lock_irqsave(&t->target_lock, flags);
if (!(b = t->bucket[hi])) {
err = -ENOENT;
if (update)
rda = t->dest_addr_hash.addrs[rh_it];
while (rda) {
if (re_address_match(&rda->destination, &i->local))
goto got_rda;
rh_it++;
if (rh_it >= 256)
rh_it = 0;
err = -ENXIO;
if (rh_it == rda_hash)
goto fail4;
rda = t->dest_addr_hash.addrs[rh_it];
}
err = -ENOENT;
if (update)
goto fail4;
write_unlock_irqrestore(&t->target_lock, flags);
rda = kmalloc(sizeof(*rda), GFP_KERNEL);
err = -ENOMEM;
if (!rda)
goto fail2;
memset(rda, 0, sizeof(*rda));
memcpy(&rda->destination, &i->local, sizeof(rda->destination));
write_lock_irqsave(&t->target_lock, flags);
if (t->dest_addr_hash.addrs[rh_it]) {
write_unlock_irqrestore(&t->target_lock, flags);
kfree(rda);
goto retry;
}
t->dest_addr_hash.addrs[rh_it] = rda;
bitfield_set(&t->dest_addr_hash.addrs_bf, rh_it);
b = kmalloc(sizeof(*b), GFP_KERNEL);
err = -ENOMEM;
if (!b)
goto fail2;
memset(b, 0, sizeof(*b));
got_rda:
/* find or allocate re_bucket */
write_lock_irqsave(&t->target_lock, flags);
if ((b = rda->ports_hi[hi]))
goto got_bucket;
if (!t->bucket[hi]) {
t->bucket[hi] = b;
bitfield_set(&t->buckets, hi);
}
else {
ba = b;
b = t->bucket[hi];
}
err = -ENOENT;
if (update)
goto fail4;
write_unlock_irqrestore(&t->target_lock, flags);
b = kmalloc(sizeof(*b), GFP_KERNEL);
err = -ENOMEM;
if (!b)
goto fail2;
memset(b, 0, sizeof(*b));
write_lock_irqsave(&t->target_lock, flags);
if (!rda->ports_hi[hi]) {
rda->ports_hi[hi] = b;
bitfield_set(&rda->ports_hi_bf, hi);
}
else {
ba = b;
b = rda->ports_hi[hi];
}
got_bucket:
if (update) {
err = -ENOENT;
og = b->target[lo];
og = b->ports_lo[lo];
if (!og)
goto fail4;
@ -1492,13 +1683,13 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
}
else {
err = -EEXIST;
if (b->target[lo])
if (b->ports_lo[lo])
goto fail4;
bitfield_set(&b->targets, lo);
t->targets++;
bitfield_set(&b->ports_lo_bf, lo);
t->num_targets++;
}
b->target[lo] = g;
b->ports_lo[lo] = g;
g = NULL;
write_unlock_irqrestore(&t->target_lock, flags);
@ -1523,21 +1714,24 @@ fail1:
static struct rtpengine_target *get_target(struct rtpengine_table *t, u_int16_t port) {
static struct rtpengine_target *get_target(struct rtpengine_table *t, const struct re_address *local) {
unsigned char hi, lo;
struct re_dest_addr *rda;
struct rtpengine_target *r;
unsigned long flags;
if (!t)
return NULL;
if (!port)
if (!local)
return NULL;
hi = (port & 0xff00) >> 8;
lo = port & 0xff;
hi = (local->port & 0xff00) >> 8;
lo = local->port & 0xff;
read_lock_irqsave(&t->target_lock, flags);
r = t->bucket[hi] ? t->bucket[hi]->target[lo] : NULL;
rda = find_dest_addr(&t->dest_addr_hash, local);
r = rda ? (rda->ports_hi[hi] ? rda->ports_hi[hi]->ports_lo[lo] : NULL) : NULL;
if (r)
target_hold(r);
read_unlock_irqrestore(&t->target_lock, flags);
@ -1684,7 +1878,7 @@ static ssize_t proc_control_write(struct file *file, const char __user *buf, siz
break;
case MMG_DEL:
err = table_del_target(t, msg.target.target_port);
err = table_del_target(t, &msg.target.local);
if (err)
goto err;
break;
@ -2201,7 +2395,9 @@ static inline int rtp_payload_type(const struct rtp_header *hdr, const struct rt
return match - tg->payload_types;
}
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src, u_int8_t in_tos) {
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src,
struct re_address *dst, u_int8_t in_tos)
{
struct udphdr *uh;
struct rtpengine_target *g;
struct sk_buff *skb2;
@ -2227,8 +2423,9 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t,
skb_trim(skb, datalen);
src->port = ntohs(uh->source);
dst->port = ntohs(uh->dest);
g = get_target(t, ntohs(uh->dest));
g = get_target(t, dst);
if (!g)
goto skip2;
@ -2396,7 +2593,7 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para
struct sk_buff *skb;
struct iphdr *ih;
struct rtpengine_table *t;
struct re_address src;
struct re_address src, dst;
t = get_table(pinfo->id);
if (!t)
@ -2415,8 +2612,10 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para
memset(&src, 0, sizeof(src));
src.family = AF_INET;
src.u.ipv4 = ih->saddr;
dst.family = AF_INET;
dst.u.ipv4 = ih->daddr;
return rtpengine46(skb, t, &src, (u_int8_t)ih->tos);
return rtpengine46(skb, t, &src, &dst, (u_int8_t)ih->tos);
skip2:
kfree_skb(skb);
@ -2438,7 +2637,7 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para
struct sk_buff *skb;
struct ipv6hdr *ih;
struct rtpengine_table *t;
struct re_address src;
struct re_address src, dst;
t = get_table(pinfo->id);
if (!t)
@ -2458,8 +2657,10 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para
memset(&src, 0, sizeof(src));
src.family = AF_INET6;
memcpy(&src.u.ipv6, &ih->saddr, sizeof(src.u.ipv6));
dst.family = AF_INET6;
memcpy(&dst.u.ipv6, &ih->daddr, sizeof(dst.u.ipv6));
return rtpengine46(skb, t, &src, ipv6_get_dsfield(ih));
return rtpengine46(skb, t, &src, &dst, ipv6_get_dsfield(ih));
skip2:
kfree_skb(skb);


+ 1
- 1
kernel-module/xt_RTPENGINE.h View File

@ -75,7 +75,7 @@ enum rtpengine_src_mismatch {
};
struct rtpengine_target_info {
u_int16_t target_port;
struct re_address local;
struct re_address expected_src; /* for incoming packets */
enum rtpengine_src_mismatch src_mismatch;


Loading…
Cancel
Save