|
|
|
@ -12,6 +12,7 @@ |
|
|
|
#include "log.h" |
|
|
|
#include "helpers.h" |
|
|
|
#include "str.h" |
|
|
|
#include "types.h" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -21,6 +22,8 @@ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TYPED_GQUEUE(gstring, GString) |
|
|
|
|
|
|
|
struct homer_sender { |
|
|
|
mutex_t lock; |
|
|
|
|
|
|
|
@ -30,7 +33,7 @@ struct homer_sender { |
|
|
|
socket_t socket; |
|
|
|
time_t retry; |
|
|
|
|
|
|
|
GQueue send_queue; |
|
|
|
gstring_q send_queue; |
|
|
|
GString *partial; |
|
|
|
|
|
|
|
int (*state)(struct homer_sender *); |
|
|
|
@ -120,7 +123,7 @@ static int __established(struct homer_sender *hs) { |
|
|
|
} |
|
|
|
|
|
|
|
// unqueue as much as we can |
|
|
|
while ((gs = g_queue_pop_head(&hs->send_queue))) { |
|
|
|
while ((gs = t_queue_pop_head(&hs->send_queue))) { |
|
|
|
ilog(LOG_DEBUG, "dequeue send queue to Homer"); |
|
|
|
ret = __attempt_send(hs, gs); |
|
|
|
if (ret == 0) // everything sent OK |
|
|
|
@ -129,7 +132,7 @@ static int __established(struct homer_sender *hs) { |
|
|
|
hs->partial = gs; |
|
|
|
return 0; |
|
|
|
} |
|
|
|
g_queue_push_head(&hs->send_queue, gs); |
|
|
|
t_queue_push_head(&hs->send_queue, gs); |
|
|
|
if (ret == 1) // write error |
|
|
|
return -1; |
|
|
|
// ret == 2 -> blocked |
|
|
|
@ -222,7 +225,7 @@ int homer_send(GString *s, const str *id, const endpoint_t *src, |
|
|
|
|
|
|
|
mutex_lock(&main_homer_sender->lock); |
|
|
|
if (main_homer_sender->send_queue.length < SEND_QUEUE_LIMIT) { |
|
|
|
g_queue_push_tail(&main_homer_sender->send_queue, s); |
|
|
|
t_queue_push_tail(&main_homer_sender->send_queue, s); |
|
|
|
s = NULL; |
|
|
|
} |
|
|
|
else |
|
|
|
|