webhooks: Implement a send buffer for socket writes
This prevents ClientConnection.send() from blocking, removing the possibility that callers become reentrant. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
4dcf494b97
commit
bf221d5e26
|
@ -158,8 +158,8 @@ class ClientConnection:
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
self.fd_handle = self.reactor.register_fd(
|
self.fd_handle = self.reactor.register_fd(
|
||||||
self.sock.fileno(), self.process_received)
|
self.sock.fileno(), self.process_received)
|
||||||
self.partial_data = ""
|
self.partial_data = self.send_buffer = ""
|
||||||
self.mutex = self.reactor.mutex()
|
self.is_sending_data = False
|
||||||
logging.info(
|
logging.info(
|
||||||
"webhooks: New connection established")
|
"webhooks: New connection established")
|
||||||
|
|
||||||
|
@ -222,30 +222,34 @@ class ClientConnection:
|
||||||
self.send({'method': "response", 'params': result})
|
self.send({'method': "response", 'params': result})
|
||||||
|
|
||||||
def send(self, data):
|
def send(self, data):
|
||||||
with self.mutex:
|
self.send_buffer += json.dumps(data) + "\x03"
|
||||||
retries = 10
|
if not self.is_sending_data:
|
||||||
data = json.dumps(data) + "\x03"
|
self.is_sending_data = True
|
||||||
while data:
|
self.reactor.register_callback(self._do_send)
|
||||||
try:
|
|
||||||
sent = self.sock.send(data)
|
def _do_send(self, eventtime):
|
||||||
except socket.error as e:
|
retries = 10
|
||||||
if e.errno == errno.EBADF or e.errno == errno.EPIPE \
|
while self.send_buffer:
|
||||||
or not retries:
|
try:
|
||||||
sent = 0
|
sent = self.sock.send(self.send_buffer)
|
||||||
else:
|
except socket.error as e:
|
||||||
retries -= 1
|
if e.errno == errno.EBADF or e.errno == errno.EPIPE \
|
||||||
waketime = self.reactor.monotonic() + .001
|
or not retries:
|
||||||
self.reactor.pause(waketime)
|
sent = 0
|
||||||
continue
|
|
||||||
retries = 10
|
|
||||||
if sent > 0:
|
|
||||||
data = data[sent:]
|
|
||||||
else:
|
else:
|
||||||
logging.info(
|
retries -= 1
|
||||||
"webhooks: Error sending server data,"
|
waketime = self.reactor.monotonic() + .001
|
||||||
" closing socket")
|
self.reactor.pause(waketime)
|
||||||
self.close()
|
continue
|
||||||
break
|
retries = 10
|
||||||
|
if sent > 0:
|
||||||
|
self.send_buffer = self.send_buffer[sent:]
|
||||||
|
else:
|
||||||
|
logging.info(
|
||||||
|
"webhooks: Error sending server data, closing socket")
|
||||||
|
self.close()
|
||||||
|
break
|
||||||
|
self.is_sending_data = False
|
||||||
|
|
||||||
class WebHooks:
|
class WebHooks:
|
||||||
def __init__(self, printer):
|
def __init__(self, printer):
|
||||||
|
|
Loading…
Reference in New Issue