Browse Source

private structs make for safer threading

git.mgm/mediaproxy-ng/2.1
Richard Fuchs 14 years ago
parent
commit
5f5e158d76
5 changed files with 69 additions and 54 deletions
  1. +3
    -5
      daemon/call.c
  2. +49
    -11
      daemon/control.c
  3. +3
    -31
      daemon/control.h
  4. +10
    -5
      daemon/streambuf.c
  5. +4
    -2
      daemon/streambuf.h

+ 3
- 5
daemon/call.c View File

@ -1663,7 +1663,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) {
/* TODO: only called for tcp controller, so no linked list of calls? */ /* TODO: only called for tcp controller, so no linked list of calls? */
streambuf_printf(s->outbuf, "session %s %s %s %s %s %i\n",
control_stream_printf(s, "session %s %s %s %s %s %i\n",
c->callid, c->callid,
(char *) g_hash_table_lookup(c->infohash, "from"), (char *) g_hash_table_lookup(c->infohash, "from"),
(char *) g_hash_table_lookup(c->infohash, "to"), (char *) g_hash_table_lookup(c->infohash, "to"),
@ -1689,7 +1689,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) {
else else
smart_ntop_p(addr3, &m->ipv6, sizeof(addr3)); smart_ntop_p(addr3, &m->ipv6, sizeof(addr3));
streambuf_printf(s->outbuf, "stream %s:%u %s:%u %s:%u %llu/%llu/%llu %s %s %s %i\n",
control_stream_printf(s, "stream %s:%u %s:%u %s:%u %llu/%llu/%llu %s %s %s %i\n",
addr1, r1->peer.port, addr1, r1->peer.port,
addr2, r2->peer.port, addr2, r2->peer.port,
addr3, r1->localport, addr3, r1->localport,
@ -1705,15 +1705,13 @@ static void call_status_iterator(void *key, void *val, void *ptr) {
void calls_status(struct callmaster *m, struct control_stream *s) { void calls_status(struct callmaster *m, struct control_stream *s) {
rwlock_lock_r(&m->lock); rwlock_lock_r(&m->lock);
mutex_lock(&s->lock);
streambuf_printf(s->outbuf, "proxy %u %llu/%llu/%llu\n",
control_stream_printf(s, "proxy %u %llu/%llu/%llu\n",
g_hash_table_size(m->callhash), g_hash_table_size(m->callhash),
(long long unsigned int) m->stats.bytes, (long long unsigned int) m->stats.bytes,
(long long unsigned int) m->stats.bytes - m->stats.errors, (long long unsigned int) m->stats.bytes - m->stats.errors,
(long long unsigned int) m->stats.bytes * 2 - m->stats.errors); (long long unsigned int) m->stats.bytes * 2 - m->stats.errors);
g_hash_table_foreach(m->callhash, call_status_iterator, s); g_hash_table_foreach(m->callhash, call_status_iterator, s);
mutex_unlock(&s->lock);
rwlock_unlock_r(&m->lock); rwlock_unlock_r(&m->lock);
} }


+ 49
- 11
daemon/control.c View File

@ -5,6 +5,7 @@
#include <stdio.h> #include <stdio.h>
#include <pcre.h> #include <pcre.h>
#include <glib.h> #include <glib.h>
#include <stdarg.h>
#include "control.h" #include "control.h"
#include "poller.h" #include "poller.h"
@ -16,8 +17,36 @@
static pcre *parse_re;
static pcre_extra *parse_ree;
struct control_stream {
struct obj obj;
int fd;
mutex_t lock;
struct streambuf *inbuf;
struct streambuf *outbuf;
struct sockaddr_in inaddr;
struct control *control;
struct poller *poller;
};
struct control {
struct obj obj;
int fd;
pcre *parse_re;
pcre_extra *parse_ree;
mutex_t lock;
GList *streams;
struct poller *poller;
struct callmaster *callmaster;
};
static void control_stream_closed(int fd, void *p, uintptr_t u) { static void control_stream_closed(int fd, void *p, uintptr_t u) {
struct control_stream *s = p; struct control_stream *s = p;
@ -63,7 +92,7 @@ static int control_stream_parse(struct control_stream *s, char *line) {
struct control *c = s->control; struct control *c = s->control;
char *output = NULL; char *output = NULL;
ret = pcre_exec(parse_re, parse_ree, line, strlen(line), 0, 0, ovec, G_N_ELEMENTS(ovec));
ret = pcre_exec(c->parse_re, c->parse_ree, line, strlen(line), 0, 0, ovec, G_N_ELEMENTS(ovec));
if (ret <= 0) { if (ret <= 0) {
mylog(LOG_WARNING, "Unable to parse command line from " DF ": %s", DP(s->inaddr), line); mylog(LOG_WARNING, "Unable to parse command line from " DF ": %s", DP(s->inaddr), line);
return -1; return -1;
@ -230,14 +259,6 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru
if (!m) if (!m)
return NULL; return NULL;
if (!parse_re) {
parse_re = pcre_compile(
/* reqtype callid streams ip fromdom fromtype todom totype agent info |reqtype callid info | reqtype */
"^(?:(request|lookup)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+info=(\\S*)|(delete)\\s+(\\S+)\\s+info=(\\S*)|(build|version|controls|quit|exit|status))$",
PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL);
parse_ree = pcre_study(parse_re, 0, &errptr);
}
fd = socket(AF_INET, SOCK_STREAM, 0); fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1) if (fd == -1)
return NULL; return NULL;
@ -258,6 +279,12 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru
c = obj_alloc0("control", sizeof(*c), NULL); c = obj_alloc0("control", sizeof(*c), NULL);
c->parse_re = pcre_compile(
/* reqtype callid streams ip fromdom fromtype todom totype agent info |reqtype callid info | reqtype */
"^(?:(request|lookup)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+info=(\\S*)|(delete)\\s+(\\S+)\\s+info=(\\S*)|(build|version|controls|quit|exit|status))$",
PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL);
c->parse_ree = pcre_study(c->parse_re, 0, &errptr);
c->fd = fd; c->fd = fd;
c->poller = p; c->poller = p;
c->callmaster = m; c->callmaster = m;
@ -280,3 +307,14 @@ fail:
close(fd); close(fd);
return NULL; return NULL;
} }
void control_stream_printf(struct control_stream *s, const char *f, ...) {
va_list va;
va_start(va, f);
mutex_lock(&s->lock);
streambuf_vprintf(s->outbuf, f, va);
mutex_unlock(&s->lock);
va_end(va);
}

+ 3
- 31
daemon/control.h View File

@ -29,42 +29,14 @@
#define RE_TCP_DIV_CMD 14 #define RE_TCP_DIV_CMD 14
struct poller; struct poller;
struct control;
struct streambuf;
struct callmaster; struct callmaster;
struct control_stream {
struct obj obj;
int fd;
mutex_t lock;
struct streambuf *inbuf;
struct streambuf *outbuf;
struct sockaddr_in inaddr;
struct control *control;
struct poller *poller;
};
struct control {
struct obj obj;
int fd;
mutex_t lock;
GList *streams;
struct poller *poller;
struct callmaster *callmaster;
};
struct control;
struct control_stream;
struct control *control_new(struct poller *, u_int32_t, u_int16_t, struct callmaster *); struct control *control_new(struct poller *, u_int32_t, u_int16_t, struct callmaster *);
void control_stream_printf(struct control_stream *, const char *, ...) __attribute__ ((format (printf, 2, 3)));


+ 10
- 5
daemon/streambuf.c View File

@ -128,20 +128,25 @@ unsigned int streambuf_bufsize(struct streambuf *b) {
} }
void streambuf_printf(struct streambuf *b, char *f, ...) {
va_list va;
void streambuf_vprintf(struct streambuf *b, const char *f, va_list va) {
GString *gs; GString *gs;
va_start(va, f);
gs = g_string_new(""); gs = g_string_new("");
g_string_vprintf(gs, f, va); g_string_vprintf(gs, f, va);
va_end(va);
streambuf_write(b, gs->str, gs->len); streambuf_write(b, gs->str, gs->len);
g_string_free(gs, TRUE); g_string_free(gs, TRUE);
} }
void streambuf_write(struct streambuf *b, char *s, unsigned int len) {
void streambuf_printf(struct streambuf *b, const char *f, ...) {
va_list va;
va_start(va, f);
streambuf_vprintf(b, f, va);
va_end(va);
}
void streambuf_write(struct streambuf *b, const char *s, unsigned int len) {
unsigned int out; unsigned int out;
int ret; int ret;


+ 4
- 2
daemon/streambuf.h View File

@ -6,6 +6,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <time.h> #include <time.h>
#include <glib.h> #include <glib.h>
#include <stdarg.h>
@ -28,8 +29,9 @@ int streambuf_writeable(struct streambuf *);
int streambuf_readable(struct streambuf *); int streambuf_readable(struct streambuf *);
char *streambuf_getline(struct streambuf *); char *streambuf_getline(struct streambuf *);
unsigned int streambuf_bufsize(struct streambuf *); unsigned int streambuf_bufsize(struct streambuf *);
void streambuf_printf(struct streambuf *, char *, ...) __attribute__ ((format (printf, 2, 3)));
void streambuf_write(struct streambuf *, char *, unsigned int);
void streambuf_printf(struct streambuf *, const char *, ...) __attribute__ ((format (printf, 2, 3)));
void streambuf_vprintf(struct streambuf *, const char *, va_list);
void streambuf_write(struct streambuf *, const char *, unsigned int);
#endif #endif

Loading…
Cancel
Save