Browse Source

make the callmaster struct private - also requires update to redis module

git.mgm/mediaproxy-ng/2.1
Richard Fuchs 14 years ago
parent
commit
900f76a17e
5 changed files with 98 additions and 84 deletions
  1. +66
    -39
      daemon/call.c
  2. +4
    -20
      daemon/call.h
  3. +19
    -17
      daemon/main.c
  4. +4
    -4
      daemon/redis.c
  5. +5
    -4
      daemon/redis.h

+ 66
- 39
daemon/call.c View File

@ -38,6 +38,28 @@
struct callmaster {
struct obj obj;
rwlock_t hashlock;
GHashTable *callhash;
mutex_t portlock;
u_int16_t lastport;
BIT_ARRAY_DECLARE(ports_used, 0x10000);
struct stats statsps;
struct stats stats;
struct poller *poller;
pcre *info_re;
pcre_extra *info_ree;
pcre *streams_re;
pcre_extra *streams_ree;
struct callmaster_config conf;
};
static char *rtp_codecs[] = { static char *rtp_codecs[] = {
[0] = "G711u", [0] = "G711u",
[1] = "1016", [1] = "1016",
@ -102,7 +124,7 @@ void kernelize(struct callstream *c) {
struct kernel_stream ks; struct kernel_stream ks;
struct callmaster *cm = c->call->callmaster; struct callmaster *cm = c->call->callmaster;
if (cm->kernelfd < 0 || cm->kernelid == -1)
if (cm->conf.kernelfd < 0 || cm->conf.kernelid == -1)
return; return;
mylog(LOG_DEBUG, LOG_PREFIX_C "Kernelizing RTP streams", LOG_PARAMS_C(c->call)); mylog(LOG_DEBUG, LOG_PREFIX_C "Kernelizing RTP streams", LOG_PARAMS_C(c->call));
@ -124,26 +146,26 @@ void kernelize(struct callstream *c) {
continue; continue;
ks.local_port = r->localport; ks.local_port = r->localport;
ks.tos = cm->tos;
ks.tos = cm->conf.tos;
ks.src.port = rp->localport; ks.src.port = rp->localport;
ks.dest.port = r->peer.port; ks.dest.port = r->peer.port;
if (IN6_IS_ADDR_V4MAPPED(&r->peer.ip46)) { if (IN6_IS_ADDR_V4MAPPED(&r->peer.ip46)) {
ks.src.family = AF_INET; ks.src.family = AF_INET;
ks.src.ipv4 = cm->ipv4;
ks.src.ipv4 = cm->conf.ipv4;
ks.dest.family = AF_INET; ks.dest.family = AF_INET;
ks.dest.ipv4 = r->peer.ip46.s6_addr32[3]; ks.dest.ipv4 = r->peer.ip46.s6_addr32[3];
} }
else { else {
ks.src.family = AF_INET6; ks.src.family = AF_INET6;
ks.src.ipv6 = cm->ipv6;
ks.src.ipv6 = cm->conf.ipv6;
ks.dest.family = AF_INET6; ks.dest.family = AF_INET6;
ks.dest.ipv6 = r->peer.ip46; ks.dest.ipv6 = r->peer.ip46;
} }
ZERO(r->kstats); ZERO(r->kstats);
kernel_add_stream(cm->kernelfd, &ks, 0);
kernel_add_stream(cm->conf.kernelfd, &ks, 0);
} }
p->kernelized = 1; p->kernelized = 1;
@ -218,7 +240,7 @@ peerinfo:
kernelize(cs); kernelize(cs);
if (redis_update) if (redis_update)
redis_update(c);
redis_update(c, m->conf.redis);
forward: forward:
if (IN6_IS_ADDR_UNSPECIFIED(&r->peer.ip46) || !r->peer.port || !r->fd_family) if (IN6_IS_ADDR_UNSPECIFIED(&r->peer.ip46) || !r->peer.port || !r->fd_family)
@ -246,7 +268,7 @@ forward:
pi = (void *) CMSG_DATA(ch); pi = (void *) CMSG_DATA(ch);
ZERO(*pi); ZERO(*pi);
pi->ipi_spec_dst.s_addr = m->ipv4;
pi->ipi_spec_dst.s_addr = m->conf.ipv4;
mh.msg_controllen = CMSG_SPACE(sizeof(*pi)); mh.msg_controllen = CMSG_SPACE(sizeof(*pi));
@ -266,7 +288,7 @@ forward:
pi6 = (void *) CMSG_DATA(ch); pi6 = (void *) CMSG_DATA(ch);
ZERO(*pi6); ZERO(*pi6);
pi6->ipi6_addr = m->ipv6;
pi6->ipi6_addr = m->conf.ipv6;
mh.msg_controllen = CMSG_SPACE(sizeof(*pi6)); mh.msg_controllen = CMSG_SPACE(sizeof(*pi6));
@ -459,11 +481,11 @@ static void call_timer_iterator(void *key, void *val, void *ptr) {
sr = &p->rtps[j]; sr = &p->rtps[j];
hlp->ports[sr->localport] = sr; hlp->ports[sr->localport] = sr;
check = cm->timeout;
check = cm->conf.timeout;
if (!sr->peer.port) if (!sr->peer.port)
check = cm->silent_timeout;
check = cm->conf.silent_timeout;
else if (IN6_IS_ADDR_UNSPECIFIED(&sr->peer.ip46)) else if (IN6_IS_ADDR_UNSPECIFIED(&sr->peer.ip46))
check = cm->silent_timeout;
check = cm->conf.silent_timeout;
if (poller_now - sr->last < check) if (poller_now - sr->last < check)
goto good; goto good;
@ -553,7 +575,7 @@ static void callmaster_timer(void *ptr) {
memcpy(&m->stats, &m->statsps, sizeof(m->stats)); memcpy(&m->stats, &m->statsps, sizeof(m->stats));
ZERO(m->statsps); ZERO(m->statsps);
i = (m->kernelid != -1) ? kernel_list(m->kernelid) : NULL;
i = (m->conf.kernelid != -1) ? kernel_list(m->conf.kernelid) : NULL;
while (i) { while (i) {
ke = i->data; ke = i->data;
@ -577,8 +599,8 @@ next:
i = g_list_delete_link(i, i); i = g_list_delete_link(i, i);
} }
if (m->b2b_url)
xmlrpc_kill_calls(hlp.del, m->b2b_url);
if (m->conf.b2b_url)
xmlrpc_kill_calls(hlp.del, m->conf.b2b_url);
for (i = hlp.del; i; i = n) { for (i = hlp.del; i; i = n) {
n = i->next; n = i->next;
@ -636,8 +658,8 @@ static int get_port4(struct streamrelay *r, u_int16_t p) {
nonblock(fd); nonblock(fd);
reuseaddr(fd); reuseaddr(fd);
if (m->tos)
setsockopt(fd, IPPROTO_IP, IP_TOS, &m->tos, sizeof(m->tos));
if (m->conf.tos)
setsockopt(fd, IPPROTO_IP, IP_TOS, &m->conf.tos, sizeof(m->conf.tos));
ZERO(sin); ZERO(sin);
sin.sin_family = AF_INET; sin.sin_family = AF_INET;
@ -667,7 +689,7 @@ static int get_port6(struct streamrelay *r, u_int16_t p) {
nonblock(fd); nonblock(fd);
reuseaddr(fd); reuseaddr(fd);
tos = m->tos;
tos = m->conf.tos;
#ifdef IPV6_TCLASS #ifdef IPV6_TCLASS
if (tos) if (tos)
setsockopt(fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof(tos)); setsockopt(fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof(tos));
@ -704,7 +726,7 @@ static int get_port(struct streamrelay *r, u_int16_t p) {
bit_array_set(m->ports_used, p); bit_array_set(m->ports_used, p);
mutex_unlock(&m->portlock); mutex_unlock(&m->portlock);
if (IN6_IS_ADDR_UNSPECIFIED(&m->ipv6))
if (IN6_IS_ADDR_UNSPECIFIED(&m->conf.ipv6))
ret = get_port4(r, p); ret = get_port4(r, p);
else else
ret = get_port6(r, p); ret = get_port6(r, p);
@ -757,8 +779,8 @@ static void get_port_pair(struct peer *p, int wanted_port) {
goto done; goto done;
} }
min = (m->port_min > 0 && m->port_min < 0xfff0) ? m->port_min : 1024;
max = (m->port_max > 0 && m->port_max > min && m->port_max < 0xfff0) ? m->port_max : 0;
min = (m->conf.port_min > 0 && m->conf.port_min < 0xfff0) ? m->conf.port_min : 1024;
max = (m->conf.port_max > 0 && m->conf.port_max > min && m->conf.port_max < 0xfff0) ? m->conf.port_max : 0;
mutex_lock(&m->portlock); mutex_lock(&m->portlock);
if (!m->lastport) if (!m->lastport)
@ -1178,7 +1200,7 @@ static void unkernelize(struct peer *p) {
for (i = 0; i < 2; i++) { for (i = 0; i < 2; i++) {
r = &p->rtps[i]; r = &p->rtps[i];
kernel_del_stream(p->up->call->callmaster->kernelfd, r->localport);
kernel_del_stream(p->up->call->callmaster->conf.kernelfd, r->localport);
} }
@ -1215,7 +1237,7 @@ static void call_destroy(struct call *c) {
rwlock_unlock_w(&m->hashlock); rwlock_unlock_w(&m->hashlock);
if (redis_delete) if (redis_delete)
redis_delete(c);
redis_delete(c, m->conf.redis);
mylog(LOG_INFO, LOG_PREFIX_C "Final packet stats:", c->callid); mylog(LOG_INFO, LOG_PREFIX_C "Final packet stats:", c->callid);
while (c->callstreams->head) { while (c->callstreams->head) {
@ -1270,24 +1292,24 @@ static char *streams_print(GQueue *s, unsigned int num, unsigned int off, const
if (t->peers[other_off].desired_family == AF_INET if (t->peers[other_off].desired_family == AF_INET
|| (t->peers[other_off].desired_family == 0 || (t->peers[other_off].desired_family == 0
&& IN6_IS_ADDR_V4MAPPED(&t->peers[other_off].rtps[0].peer.ip46)) && IN6_IS_ADDR_V4MAPPED(&t->peers[other_off].rtps[0].peer.ip46))
|| IN6_IS_ADDR_UNSPECIFIED(&t->call->callmaster->ipv6)) {
|| IN6_IS_ADDR_UNSPECIFIED(&t->call->callmaster->conf.ipv6)) {
ip4 = t->peers[off].rtps[0].peer.ip46.s6_addr32[3]; ip4 = t->peers[off].rtps[0].peer.ip46.s6_addr32[3];
if (!ip4) if (!ip4)
strcpy(ips, "0.0.0.0"); strcpy(ips, "0.0.0.0");
else if (t->call->callmaster->adv_ipv4)
sprintf(ips, IPF, IPP(t->call->callmaster->adv_ipv4));
else if (t->call->callmaster->conf.adv_ipv4)
sprintf(ips, IPF, IPP(t->call->callmaster->conf.adv_ipv4));
else else
sprintf(ips, IPF, IPP(t->call->callmaster->ipv4));
sprintf(ips, IPF, IPP(t->call->callmaster->conf.ipv4));
af = '4'; af = '4';
} }
else { else {
if (IN6_IS_ADDR_UNSPECIFIED(&t->peers[off].rtps[0].peer.ip46)) if (IN6_IS_ADDR_UNSPECIFIED(&t->peers[off].rtps[0].peer.ip46))
strcpy(ips, "::"); strcpy(ips, "::");
else if (!IN6_IS_ADDR_UNSPECIFIED(&t->call->callmaster->adv_ipv6))
inet_ntop(AF_INET6, &t->call->callmaster->adv_ipv6, ips, sizeof(ips));
else if (!IN6_IS_ADDR_UNSPECIFIED(&t->call->callmaster->conf.adv_ipv6))
inet_ntop(AF_INET6, &t->call->callmaster->conf.adv_ipv6, ips, sizeof(ips));
else else
inet_ntop(AF_INET6, &t->call->callmaster->ipv6, ips, sizeof(ips));
inet_ntop(AF_INET6, &t->call->callmaster->conf.ipv6, ips, sizeof(ips));
af = '6'; af = '6';
} }
@ -1452,7 +1474,7 @@ char *call_update_udp(const char **out, struct callmaster *m) {
g_queue_clear(&q); g_queue_clear(&q);
if (redis_update) if (redis_update)
redis_update(c);
redis_update(c, m->conf.redis);
ret = streams_print(c->callstreams, 1, (num >= 0) ? 0 : 1, out[RE_UDP_COOKIE], 1); ret = streams_print(c->callstreams, 1, (num >= 0) ? 0 : 1, out[RE_UDP_COOKIE], 1);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
@ -1481,7 +1503,7 @@ char *call_lookup_udp(const char **out, struct callmaster *m) {
rwlock_unlock_r(&m->hashlock); rwlock_unlock_r(&m->hashlock);
mylog(LOG_WARNING, LOG_PREFIX_CI "Got UDP LOOKUP for unknown call-id or unknown via-branch", mylog(LOG_WARNING, LOG_PREFIX_CI "Got UDP LOOKUP for unknown call-id or unknown via-branch",
out[RE_UDP_UL_CALLID], out[RE_UDP_UL_VIABRANCH]); out[RE_UDP_UL_CALLID], out[RE_UDP_UL_VIABRANCH]);
asprintf(&ret, "%s 0 " IPF "\n", out[RE_UDP_COOKIE], IPP(m->ipv4));
asprintf(&ret, "%s 0 " IPF "\n", out[RE_UDP_COOKIE], IPP(m->conf.ipv4));
return ret; return ret;
} }
obj_hold(c); obj_hold(c);
@ -1499,7 +1521,7 @@ char *call_lookup_udp(const char **out, struct callmaster *m) {
g_queue_clear(&q); g_queue_clear(&q);
if (redis_update) if (redis_update)
redis_update(c);
redis_update(c, m->conf.redis);
ret = streams_print(c->callstreams, 1, (num >= 0) ? 1 : 0, out[RE_UDP_COOKIE], 1); ret = streams_print(c->callstreams, 1, (num >= 0) ? 1 : 0, out[RE_UDP_COOKIE], 1);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
@ -1530,7 +1552,7 @@ char *call_request(const char **out, struct callmaster *m) {
streams_free(s); streams_free(s);
if (redis_update) if (redis_update)
redis_update(c);
redis_update(c, m->conf.redis);
ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 0 : 1, NULL, 0); ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 0 : 1, NULL, 0);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
@ -1561,7 +1583,7 @@ char *call_lookup(const char **out, struct callmaster *m) {
streams_free(s); streams_free(s);
if (redis_update) if (redis_update)
redis_update(c);
redis_update(c, m->conf.redis);
ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 1 : 0, NULL, 0); ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 1 : 0, NULL, 0);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
@ -1709,9 +1731,9 @@ static void call_status_iterator(void *key, void *val, void *ptr) {
smart_ntop_p(addr1, &r1->peer.ip46, sizeof(addr1)); smart_ntop_p(addr1, &r1->peer.ip46, sizeof(addr1));
smart_ntop_p(addr2, &r2->peer.ip46, sizeof(addr2)); smart_ntop_p(addr2, &r2->peer.ip46, sizeof(addr2));
if (IN6_IS_ADDR_V4MAPPED(&r1->peer.ip46)) if (IN6_IS_ADDR_V4MAPPED(&r1->peer.ip46))
inet_ntop(AF_INET, &m->ipv4, addr3, sizeof(addr3));
inet_ntop(AF_INET, &m->conf.ipv4, addr3, sizeof(addr3));
else else
smart_ntop_p(addr3, &m->ipv6, sizeof(addr3));
smart_ntop_p(addr3, &m->conf.ipv6, sizeof(addr3));
control_stream_printf(s, "stream %s:%u %s:%u %s:%u %llu/%llu/%llu %s %s %s %i\n", control_stream_printf(s, "stream %s:%u %s:%u %s:%u %llu/%llu/%llu %s %s %s %i\n",
addr1, r1->peer.port, addr1, r1->peer.port,
@ -1744,17 +1766,22 @@ void calls_status(struct callmaster *m, struct control_stream *s) {
static void calls_dump_iterator(void *key, void *val, void *ptr) { static void calls_dump_iterator(void *key, void *val, void *ptr) {
struct call *c = val; struct call *c = val;
struct callmaster *m = c->callmaster;
if (redis_update) if (redis_update)
redis_update(c);
redis_update(c, m->conf.redis);
} }
void calls_dump_redis(struct callmaster *m) { void calls_dump_redis(struct callmaster *m) {
if (!m->redis)
if (!m->conf.redis)
return; return;
mylog(LOG_DEBUG, "Start dumping all call data to Redis...\n"); mylog(LOG_DEBUG, "Start dumping all call data to Redis...\n");
redis_wipe(m);
redis_wipe(m->conf.redis);
g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL); g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL);
mylog(LOG_DEBUG, "Finished dumping all call data to Redis\n"); mylog(LOG_DEBUG, "Finished dumping all call data to Redis\n");
} }
void callmaster_config(struct callmaster *m, struct callmaster_config *c) {
m->conf = *c;
}

+ 4
- 20
daemon/call.h View File

@ -95,21 +95,7 @@ struct call {
const char *log_info; /* branch */ const char *log_info; /* branch */
}; };
struct callmaster {
struct obj obj;
rwlock_t hashlock;
GHashTable *callhash;
mutex_t portlock;
u_int16_t lastport;
BIT_ARRAY_DECLARE(ports_used, 0x10000);
struct stats statsps;
struct stats stats;
struct poller *poller;
struct redis *redis;
struct callmaster_config {
int kernelfd; int kernelfd;
unsigned int kernelid; unsigned int kernelid;
u_int32_t ipv4; u_int32_t ipv4;
@ -118,21 +104,19 @@ struct callmaster {
struct in6_addr adv_ipv6; struct in6_addr adv_ipv6;
int port_min; int port_min;
int port_max; int port_max;
pcre *info_re;
pcre_extra *info_ree;
pcre *streams_re;
pcre_extra *streams_ree;
unsigned int timeout; unsigned int timeout;
unsigned int silent_timeout; unsigned int silent_timeout;
struct redis *redis;
char *b2b_url; char *b2b_url;
unsigned char tos; unsigned char tos;
}; };
struct callmaster;
struct callmaster *callmaster_new(struct poller *); struct callmaster *callmaster_new(struct poller *);
void callmaster_config(struct callmaster *m, struct callmaster_config *c);
char *call_request(const char **, struct callmaster *); char *call_request(const char **, struct callmaster *);


+ 19
- 17
daemon/main.c View File

@ -266,6 +266,7 @@ static void wpidfile(void) {
int main(int argc, char **argv) { int main(int argc, char **argv) {
struct poller *p; struct poller *p;
struct callmaster *m; struct callmaster *m;
struct callmaster_config mc;
struct control *c; struct control *c;
struct control_udp *cu; struct control_udp *cu;
int kfd = -1; int kfd = -1;
@ -304,18 +305,18 @@ int main(int argc, char **argv) {
m = callmaster_new(p); m = callmaster_new(p);
if (!m) if (!m)
return -1; return -1;
m->kernelfd = kfd;
m->kernelid = table;
m->ipv4 = ipv4;
m->adv_ipv4 = adv_ipv4;
m->ipv6 = ipv6;
m->adv_ipv6 = adv_ipv6;
m->port_min = port_min;
m->port_max = port_max;
m->timeout = timeout;
m->silent_timeout = silent_timeout;
m->tos = tos;
m->b2b_url = b2b_url;
mc.kernelfd = kfd;
mc.kernelid = table;
mc.ipv4 = ipv4;
mc.adv_ipv4 = adv_ipv4;
mc.ipv6 = ipv6;
mc.adv_ipv6 = adv_ipv6;
mc.port_min = port_min;
mc.port_max = port_max;
mc.timeout = timeout;
mc.silent_timeout = silent_timeout;
mc.tos = tos;
mc.b2b_url = b2b_url;
c = NULL; c = NULL;
if (listenport) { if (listenport) {
@ -336,7 +337,7 @@ int main(int argc, char **argv) {
if (!dlh) if (!dlh)
die("Failed to open redis plugin, aborting (%s)\n", dlerror()); die("Failed to open redis plugin, aborting (%s)\n", dlerror());
strp = dlsym(dlh, "__module_version"); strp = dlsym(dlh, "__module_version");
if (!strp || !*strp || strcmp(*strp, "redis/1.0.0"))
if (!strp || !*strp || strcmp(*strp, "redis/1.0.1"))
die("Incorrect redis module version: %s\n", *strp); die("Incorrect redis module version: %s\n", *strp);
dlresolve(dlh, redis_new); dlresolve(dlh, redis_new);
@ -345,19 +346,20 @@ int main(int argc, char **argv) {
dlresolve(dlh, redis_delete); dlresolve(dlh, redis_delete);
dlresolve(dlh, redis_wipe); dlresolve(dlh, redis_wipe);
m->redis = redis_new(redis_ip, redis_port, redis_db);
if (!m->redis)
mc.redis = redis_new(redis_ip, redis_port, redis_db);
if (!mc.redis)
die("Cannot start up without Redis database\n"); die("Cannot start up without Redis database\n");
} }
callmaster_config(m, &mc);
mylog(LOG_INFO, "Startup complete, version %s", MEDIAPROXY_VERSION); mylog(LOG_INFO, "Startup complete, version %s", MEDIAPROXY_VERSION);
if (!foreground) if (!foreground)
daemonize(); daemonize();
wpidfile(); wpidfile();
if (m->redis) {
if (redis_restore(m))
if (mc.redis) {
if (redis_restore(m, mc.redis))
die("Refusing to continue without working Redis database\n"); die("Refusing to continue without working Redis database\n");
} }


+ 4
- 4
daemon/redis.c View File

@ -1,7 +1,7 @@
#include "redis.h" #include "redis.h"
struct redis *(*redis_new)(u_int32_t, u_int16_t, int); struct redis *(*redis_new)(u_int32_t, u_int16_t, int);
int (*redis_restore)(struct callmaster *);
void (*redis_update)(struct call *);
void (*redis_delete)(struct call *);
void (*redis_wipe)(struct callmaster *);
int (*redis_restore)(struct callmaster *, struct redis *);
void (*redis_update)(struct call *, struct redis *);
void (*redis_delete)(struct call *, struct redis *);
void (*redis_wipe)(struct redis *);

+ 5
- 4
daemon/redis.h View File

@ -11,14 +11,15 @@
struct callmaster; struct callmaster;
struct call; struct call;
struct redis;
extern struct redis *(*redis_new)(u_int32_t, u_int16_t, int); extern struct redis *(*redis_new)(u_int32_t, u_int16_t, int);
extern int (*redis_restore)(struct callmaster *);
extern void (*redis_update)(struct call *);
extern void (*redis_delete)(struct call *);
extern void (*redis_wipe)(struct callmaster *);
extern int (*redis_restore)(struct callmaster *, struct redis *);
extern void (*redis_update)(struct call *, struct redis *);
extern void (*redis_delete)(struct call *, struct redis *);
extern void (*redis_wipe)(struct redis *);


Loading…
Cancel
Save