# asyncurl.py - Wrapper class for using pycurl objects in asyncore # # Copyright (c) 2009, 2010 Antti Ajanki # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """This is a wrapper for using pycurl objects in asyncore. Start a transfer by creating an async_curl_dispatch, and call asyncurl.loop() instead of asyncore.loop(). """ import asyncore import pycurl import traceback import os import select import time import cStringIO from errno import EINTR SOCKET_TIMEOUT = pycurl.SOCKET_TIMEOUT CSELECT_IN = pycurl.CSELECT_IN CSELECT_OUT = pycurl.CSELECT_OUT CSELECT_ERR = pycurl.CSELECT_ERR def poll(timeout=0.0, map=None, mdisp=None): if map is None: map = asyncore.socket_map if mdisp is None: mdisp = multi_dispatcher if map: timeout = min(timeout, mdisp.timeout/1000.0) r = []; w = []; e = [] for fd, obj in map.items(): is_r = obj.readable() is_w = obj.writable() if is_r: r.append(fd) if is_w: w.append(fd) if is_r or is_w: e.append(fd) if [] == r == w == e: time.sleep(timeout) else: try: r, w, e = select.select(r, w, e, timeout) except select.error, err: if err[0] != EINTR: raise else: return if [] == r == w == e: mdisp.socket_action(SOCKET_TIMEOUT, 0) return for fd in r: obj = map.get(fd) if obj is None: continue asyncore.read(obj) for fd in w: obj = map.get(fd) if obj is None: continue asyncore.write(obj) for fd in e: obj = map.get(fd) if obj is None: continue asyncore._exception(obj) def loop(timeout=30.0, use_poll=False, map=None, count=None, mdisp=None): if map is None: map = asyncore.socket_map if mdisp is None: mdisp = multi_dispatcher if use_poll and hasattr(select, 'poll'): print 'poll2 not implemented' poll_fun = poll if count is None: while map: poll_fun(timeout, map, mdisp) else: while map and count > 0: poll_fun(timeout, map, mdisp) count = count - 1 def noop_callback(s): pass class curl_multi_dispatcher: """A dispatcher for pycurl.CurlMulti() objects. An instance of this class is created automatically. There is usually no need to construct one manually.""" def __init__(self, socket_map=None): if socket_map is None: self._map = asyncore.socket_map else: self._map = socket_map self.dispatchers = {} self.timeout = 1000 self._sockets_removed = False self._curlm = pycurl.CurlMulti() self._curlm.setopt(pycurl.M_SOCKETFUNCTION, self.socket_callback) self._curlm.setopt(pycurl.M_TIMERFUNCTION, self.timeout_callback) def socket_callback(self, action, socket, user_data, socket_data): # print 'socket callback: %d, %s' % \ # (socket, {pycurl.POLL_NONE: "NONE", # pycurl.POLL_IN: "IN", # pycurl.POLL_OUT: "OUT", # pycurl.POLL_INOUT: "INOUT", # pycurl.POLL_REMOVE: "REMOVE"}[action]) if action == pycurl.POLL_NONE: return elif action == pycurl.POLL_REMOVE: if socket in self._map: del self._map[socket] self._sockets_removed = True return obj = self._map.get(socket) if obj is None: obj = dispatcher_wrapper(socket, self) self._map[socket] = obj if action == pycurl.POLL_IN: obj.set_readable(True) obj.set_writable(False) elif action == pycurl.POLL_OUT: obj.set_readable(False) obj.set_writable(True) elif action == pycurl.POLL_INOUT: obj.set_readable(True) obj.set_writable(True) def timeout_callback(self, msec): self.timeout = msec def attach(self, curldisp): """Starts a transfer on curl handle by attaching it to this multihandle.""" self.dispatchers[curldisp.curl] = curldisp try: self._curlm.add_handle(curldisp.curl) except pycurl.error: # the curl object is already on this multi-stack pass while self._curlm.socket_all()[0] == pycurl.E_CALL_MULTI_PERFORM: pass self.check_completed(True) def detach(self, curldisp): """Removes curl handle from this multihandle, and fire its completion callback function.""" self.del_curl(curldisp.curl) # libcurl does not send POLL_REMOVE when a handle is aborted for socket, curlobj in self._map.items(): if curlobj == curldisp: print 'handle stopped but socket in map' del self._map[socket] break def del_curl(self, curl): try: self._curlm.remove_handle(curl) except pycurl.error: # the curl object is not on this multi-stack pass if curl in self.dispatchers: del self.dispatchers[curl] curl.close() def socket_action(self, fd, evbitmask): res = -1 OK = False while not OK: try: res = self._curlm.socket_action(fd, evbitmask) OK = True except pycurl.error: # Older libcurls may return CURLM_CALL_MULTI_PERFORM, # which pycurl (at least 7.19.0) converts to an # exception. If that happens, call socket_action # again. pass return res def check_completed(self, force): if not force and not self._sockets_removed: return self._sockets_removed = False nmsg, success, failed = self._curlm.info_read() for handle in success: disp = self.dispatchers.get(handle) if disp is not None: try: disp.handle_completed(0, None) except: self.handle_error() self.del_curl(handle) for handle, err, errmsg in failed: disp = self.dispatchers.get(handle) if disp is not None: try: disp.handle_completed(err, errmsg) except: self.handle_error() self.del_curl(handle) def handle_error(self): print 'Exception occurred in multicurl processing' print traceback.format_exc() class dispatcher_wrapper: """An internal helper class that connects a file descriptor in the asyncore.socket_map to a curl_multi_dispatcher.""" def __init__(self, fd, multicurl): self.fd = fd self.multicurl = multicurl self.read_flag = False self.write_flag = False def readable(self): return self.read_flag def writable(self): return self.write_flag def set_readable(self, x): self.read_flag = x def set_writable(self, x): self.write_flag = x def handle_read_event(self): self.multicurl.socket_action(self.fd, CSELECT_IN) self.multicurl.check_completed(False) def handle_write_event(self): self.multicurl.socket_action(self.fd, CSELECT_OUT) self.multicurl.check_completed(False) def handle_expt_event(self): self.multicurl.socket_action(self.fd, CSELECT_ERR) self.multicurl.check_completed(False) def handle_error(self): print 'Exception occurred during processing of a curl request' print traceback.format_exc() class async_curl_dispatcher: """A dispatcher class for pycurl transfers.""" def __init__(self, url, auto_start=True): """Initializes a pycurl object self.curl. The default is to download url to an internal buffer whose content can be read with self.recv(). If auto_start is False, the transfer is not started before a call to add_channel(). """ self.url = url self.socket = None self.buffer = cStringIO.StringIO() self.curl = pycurl.Curl() self.curl.setopt(pycurl.URL, self.url) self.curl.setopt(pycurl.FOLLOWLOCATION, 1) self.curl.setopt(pycurl.AUTOREFERER, 1) self.curl.setopt(pycurl.MAXREDIRS, 10) self.curl.setopt(pycurl.FAILONERROR, 1) self.curl.setopt(pycurl.WRITEFUNCTION, self.write_to_buf) if auto_start: self.add_channel() def write_to_buf(self, msg): self.buffer.write(msg) self.handle_read() def send(self, data): raise NotImplementedError def recv(self, buffer_size): # buffer_size is ignored ret = self.buffer.getvalue() self.buffer.reset() self.buffer.truncate() return ret def add_channel(self, multidisp=None): if multidisp is None: multidisp = multi_dispatcher multidisp.attach(self) def del_channel(self, multidisp=None): if multidisp is None: multidisp = multi_dispatcher multidisp.detach(self) def close(self): self.del_channel() def log_info(self, message, type='info'): if type != 'info': print '%s: %s' % (type, message) def handle_error(self): print 'Exception occurred during processing of a curl request' print traceback.format_exc() self.close() def handle_read(self): self.log_info('unhandled read event', 'warning') def handle_write(self): self.log_info('unhandled write event', 'warning') def handle_completed(self, err, errmsg): """Called when the download has finished. err is a numeric error code (or 0 if the download was successfull) and errmsg is a curl error message as a string.""" # It seems that a reference to self.write_to_buf forbids # garbage collection from deleting this object. unsetopt() or # setting the callback to None are not allowed. Is there a # better way? self.curl.setopt(pycurl.WRITEFUNCTION, noop_callback) self.close() def test(): class curl_request(async_curl_dispatcher): def __init__(self, url, outfile, i): async_curl_dispatcher.__init__(self, url, False) self.id = i self.outfile = outfile self.add_channel() def handle_read(self): buf = self.recv(4096) print '%s: writing %d bytes' % (self.id, len(buf)) self.outfile.write(buf) def handle_completed(self, err, errmsg): if err != 0: print '%s: error: %d %s' % (self.id, err, errmsg) else: print '%s: completed' % self.id curl_request('http://www.python.org', open('python.out', 'w'), 1) curl_request('http://en.wikipedia.org/wiki/Main_Page', open('wikipedia.out', 'w'), 2) loop(timeout=5.0) pycurl.global_init(pycurl.GLOBAL_DEFAULT) try: multi_dispatcher except NameError: multi_dispatcher = curl_multi_dispatcher() if __name__ == '__main__': test()