diff options
Diffstat (limited to 'src/libwebvi/webvi/asyncurl.py')
-rw-r--r-- | src/libwebvi/webvi/asyncurl.py | 389 |
1 files changed, 389 insertions, 0 deletions
diff --git a/src/libwebvi/webvi/asyncurl.py b/src/libwebvi/webvi/asyncurl.py new file mode 100644 index 0000000..afc575a --- /dev/null +++ b/src/libwebvi/webvi/asyncurl.py @@ -0,0 +1,389 @@ +# asyncurl.py - Wrapper class for using pycurl objects in asyncore +# +# Copyright (c) 2009, 2010 Antti Ajanki <antti.ajanki@iki.fi> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""This is a wrapper for using pycurl objects in asyncore. + +Start a transfer by creating an async_curl_dispatch, and call +asyncurl.loop() instead of asyncore.loop(). +""" + +import asyncore +import pycurl +import traceback +import os +import select +import time +import cStringIO +from errno import EINTR + +SOCKET_TIMEOUT = pycurl.SOCKET_TIMEOUT +CSELECT_IN = pycurl.CSELECT_IN +CSELECT_OUT = pycurl.CSELECT_OUT +CSELECT_ERR = pycurl.CSELECT_ERR + +def poll(timeout=0.0, map=None, mdisp=None): + if map is None: + map = asyncore.socket_map + if mdisp is None: + mdisp = multi_dispatcher + if map: + timeout = min(timeout, mdisp.timeout/1000.0) + + r = []; w = []; e = [] + for fd, obj in map.items(): + is_r = obj.readable() + is_w = obj.writable() + if is_r: + r.append(fd) + if is_w: + w.append(fd) + if is_r or is_w: + e.append(fd) + if [] == r == w == e: + time.sleep(timeout) + else: + try: + r, w, e = select.select(r, w, e, timeout) + except select.error, err: + if err[0] != EINTR: + raise + else: + return + + if [] == r == w == e: + mdisp.socket_action(SOCKET_TIMEOUT, 0) + return + + for fd in r: + obj = map.get(fd) + if obj is None: + continue + asyncore.read(obj) + + for fd in w: + obj = map.get(fd) + if obj is None: + continue + asyncore.write(obj) + + for fd in e: + obj = map.get(fd) + if obj is None: + continue + asyncore._exception(obj) + +def loop(timeout=30.0, use_poll=False, map=None, count=None, mdisp=None): + if map is None: + map = asyncore.socket_map + if mdisp is None: + mdisp = multi_dispatcher + + if use_poll and hasattr(select, 'poll'): + print 'poll2 not implemented' + poll_fun = poll + + if count is None: + while map: + poll_fun(timeout, map, mdisp) + + else: + while map and count > 0: + poll_fun(timeout, map, mdisp) + count = count - 1 + +def noop_callback(s): + pass + + +class curl_multi_dispatcher: + """A dispatcher for pycurl.CurlMulti() objects. An instance of + this class is created automatically. There is usually no need to + construct one manually.""" + def __init__(self, socket_map=None): + if socket_map is None: + self._map = asyncore.socket_map + else: + self._map = socket_map + self.dispatchers = {} + self.timeout = 1000 + self._sockets_removed = False + self._curlm = pycurl.CurlMulti() + self._curlm.setopt(pycurl.M_SOCKETFUNCTION, self.socket_callback) + self._curlm.setopt(pycurl.M_TIMERFUNCTION, self.timeout_callback) + + def socket_callback(self, action, socket, user_data, socket_data): +# print 'socket callback: %d, %s' % \ +# (socket, {pycurl.POLL_NONE: "NONE", +# pycurl.POLL_IN: "IN", +# pycurl.POLL_OUT: "OUT", +# pycurl.POLL_INOUT: "INOUT", +# pycurl.POLL_REMOVE: "REMOVE"}[action]) + + if action == pycurl.POLL_NONE: + return + elif action == pycurl.POLL_REMOVE: + if socket in self._map: + del self._map[socket] + self._sockets_removed = True + return + + obj = self._map.get(socket) + if obj is None: + obj = dispatcher_wrapper(socket, self) + self._map[socket] = obj + + if action == pycurl.POLL_IN: + obj.set_readable(True) + obj.set_writable(False) + elif action == pycurl.POLL_OUT: + obj.set_readable(False) + obj.set_writable(True) + elif action == pycurl.POLL_INOUT: + obj.set_readable(True) + obj.set_writable(True) + + def timeout_callback(self, msec): + self.timeout = msec + + def attach(self, curldisp): + """Starts a transfer on curl handle by attaching it to this + multihandle.""" + self.dispatchers[curldisp.curl] = curldisp + try: + self._curlm.add_handle(curldisp.curl) + except pycurl.error: + # the curl object is already on this multi-stack + pass + + while self._curlm.socket_all()[0] == pycurl.E_CALL_MULTI_PERFORM: + pass + + self.check_completed(True) + + def detach(self, curldisp): + """Removes curl handle from this multihandle, and fire its + completion callback function.""" + self.del_curl(curldisp.curl) + + # libcurl does not send POLL_REMOVE when a handle is aborted + for socket, curlobj in self._map.items(): + if curlobj == curldisp: + + print 'handle stopped but socket in map' + + del self._map[socket] + break + + def del_curl(self, curl): + try: + self._curlm.remove_handle(curl) + except pycurl.error: + # the curl object is not on this multi-stack + pass + if curl in self.dispatchers: + del self.dispatchers[curl] + curl.close() + + def socket_action(self, fd, evbitmask): + res = -1 + OK = False + while not OK: + try: + res = self._curlm.socket_action(fd, evbitmask) + OK = True + except pycurl.error: + # Older libcurls may return CURLM_CALL_MULTI_PERFORM, + # which pycurl (at least 7.19.0) converts to an + # exception. If that happens, call socket_action + # again. + pass + return res + + def check_completed(self, force): + if not force and not self._sockets_removed: + return + self._sockets_removed = False + + nmsg, success, failed = self._curlm.info_read() + for handle in success: + disp = self.dispatchers.get(handle) + if disp is not None: + try: + disp.handle_completed(0, None) + except: + self.handle_error() + self.del_curl(handle) + for handle, err, errmsg in failed: + disp = self.dispatchers.get(handle) + if disp is not None: + try: + disp.handle_completed(err, errmsg) + except: + self.handle_error() + self.del_curl(handle) + + def handle_error(self): + print 'Exception occurred in multicurl processing' + print traceback.format_exc() + + +class dispatcher_wrapper: + """An internal helper class that connects a file descriptor in the + asyncore.socket_map to a curl_multi_dispatcher.""" + def __init__(self, fd, multicurl): + self.fd = fd + self.multicurl = multicurl + self.read_flag = False + self.write_flag = False + + def readable(self): + return self.read_flag + + def writable(self): + return self.write_flag + + def set_readable(self, x): + self.read_flag = x + + def set_writable(self, x): + self.write_flag = x + + def handle_read_event(self): + self.multicurl.socket_action(self.fd, CSELECT_IN) + self.multicurl.check_completed(False) + + def handle_write_event(self): + self.multicurl.socket_action(self.fd, CSELECT_OUT) + self.multicurl.check_completed(False) + + def handle_expt_event(self): + self.multicurl.socket_action(self.fd, CSELECT_ERR) + self.multicurl.check_completed(False) + + def handle_error(self): + print 'Exception occurred during processing of a curl request' + print traceback.format_exc() + + +class async_curl_dispatcher: + """A dispatcher class for pycurl transfers.""" + def __init__(self, url, auto_start=True): + """Initializes a pycurl object self.curl. The default is to + download url to an internal buffer whose content can be read + with self.recv(). If auto_start is False, the transfer is not + started before a call to add_channel(). + """ + self.url = url + self.socket = None + self.buffer = cStringIO.StringIO() + self.curl = pycurl.Curl() + self.curl.setopt(pycurl.URL, self.url) + self.curl.setopt(pycurl.FOLLOWLOCATION, 1) + self.curl.setopt(pycurl.AUTOREFERER, 1) + self.curl.setopt(pycurl.MAXREDIRS, 10) + self.curl.setopt(pycurl.FAILONERROR, 1) + self.curl.setopt(pycurl.WRITEFUNCTION, self.write_to_buf) + if auto_start: + self.add_channel() + + def write_to_buf(self, msg): + self.buffer.write(msg) + self.handle_read() + + def send(self, data): + raise NotImplementedError + + def recv(self, buffer_size): + # buffer_size is ignored + ret = self.buffer.getvalue() + self.buffer.reset() + self.buffer.truncate() + return ret + + def add_channel(self, multidisp=None): + if multidisp is None: + multidisp = multi_dispatcher + multidisp.attach(self) + + def del_channel(self, multidisp=None): + if multidisp is None: + multidisp = multi_dispatcher + multidisp.detach(self) + + def close(self): + self.del_channel() + + def log_info(self, message, type='info'): + if type != 'info': + print '%s: %s' % (type, message) + + def handle_error(self): + print 'Exception occurred during processing of a curl request' + print traceback.format_exc() + self.close() + + def handle_read(self): + self.log_info('unhandled read event', 'warning') + + def handle_write(self): + self.log_info('unhandled write event', 'warning') + + def handle_completed(self, err, errmsg): + """Called when the download has finished. err is a numeric + error code (or 0 if the download was successfull) and errmsg + is a curl error message as a string.""" + # It seems that a reference to self.write_to_buf forbids + # garbage collection from deleting this object. unsetopt() or + # setting the callback to None are not allowed. Is there a + # better way? + self.curl.setopt(pycurl.WRITEFUNCTION, noop_callback) + self.close() + + +def test(): + + class curl_request(async_curl_dispatcher): + def __init__(self, url, outfile, i): + async_curl_dispatcher.__init__(self, url, False) + self.id = i + self.outfile = outfile + self.add_channel() + + def handle_read(self): + buf = self.recv(4096) + print '%s: writing %d bytes' % (self.id, len(buf)) + self.outfile.write(buf) + + def handle_completed(self, err, errmsg): + if err != 0: + print '%s: error: %d %s' % (self.id, err, errmsg) + else: + print '%s: completed' % self.id + + curl_request('http://www.python.org', open('python.out', 'w'), 1) + curl_request('http://en.wikipedia.org/wiki/Main_Page', open('wikipedia.out', 'w'), 2) + loop(timeout=5.0) + + +pycurl.global_init(pycurl.GLOBAL_DEFAULT) +try: + multi_dispatcher +except NameError: + multi_dispatcher = curl_multi_dispatcher() + +if __name__ == '__main__': + test() |