Source code for curl_cffi.curl

from __future__ import annotations

import re
import struct
import warnings
from http.cookies import SimpleCookie
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Optional, Union, cast

import certifi

from ._wrapper import ffi, lib
from .const import CurlECode, CurlHttpVersion, CurlInfo, CurlOpt, CurlWsFlag
from .utils import CurlCffiWarning

DEFAULT_CACERT = certifi.where()
REASON_PHRASE_RE = re.compile(rb"HTTP/\d\.\d [0-9]{3} (.*)")
STATUS_LINE_RE = re.compile(rb"HTTP/(\d\.\d) ([0-9]{3}) (.*)")

if TYPE_CHECKING:

    class CurlWsFrame:
        age: int
        flags: int
        offset: int
        bytesleft: int
        len: int


class CurlError(Exception):
    """Base exception for curl_cffi package"""

    def __init__(self, msg, code: Union[CurlECode, Literal[0]] = 0, *args, **kwargs):
        super().__init__(msg, *args, **kwargs)
        self.code: Union[CurlECode, Literal[0]] = code


CURLINFO_TEXT = 0
CURLINFO_HEADER_IN = 1
CURLINFO_HEADER_OUT = 2
CURLINFO_DATA_IN = 3
CURLINFO_DATA_OUT = 4
CURLINFO_SSL_DATA_IN = 5
CURLINFO_SSL_DATA_OUT = 6

CURL_WRITEFUNC_PAUSE = 0x10000001
CURL_WRITEFUNC_ERROR = 0xFFFFFFFF


@ffi.def_extern()
def debug_function(curl, type_: int, data, size: int, clientp) -> int:
    """ffi callback for curl debug info"""
    callback = ffi.from_handle(clientp)
    text = ffi.buffer(data, size)[:]
    callback(type_, text)
    return 0

def debug_function_default(type_: int, text: bytes) -> None:
    if type_ in (CURLINFO_SSL_DATA_IN, CURLINFO_SSL_DATA_OUT):
        print("SSL OUT", text)
    elif type_ in (CURLINFO_DATA_IN, CURLINFO_DATA_OUT):
        print(text.decode("utf-8", "replace"))
    else:
        print(text.decode(), end="")


@ffi.def_extern()
def buffer_callback(ptr, size, nmemb, userdata):
    """ffi callback for curl write function, directly writes to a buffer"""
    # assert size == 1
    buffer = ffi.from_handle(userdata)
    buffer.write(ffi.buffer(ptr, nmemb)[:])
    return nmemb * size


def ensure_int(s):
    if not s:
        return 0
    return int(s)


@ffi.def_extern()
def write_callback(ptr, size, nmemb, userdata):
    """ffi callback for curl write function, calls the callback python function"""
    # although similar enough to the function above, kept here for performance reasons
    callback = ffi.from_handle(userdata)
    wrote = callback(ffi.buffer(ptr, nmemb)[:])
    wrote = ensure_int(wrote)
    if wrote == CURL_WRITEFUNC_PAUSE or wrote == CURL_WRITEFUNC_ERROR:  # noqa: SIM109
        return wrote
    # should make this an exception in future versions
    if wrote != nmemb * size:
        warnings.warn("Wrote bytes != received bytes.", CurlCffiWarning, stacklevel=2)
    return nmemb * size


# Credits: @alexio777 on https://github.com/lexiforest/curl_cffi/issues/4
def slist_to_list(head) -> list[bytes]:
    """Converts curl slist to a python list."""
    result = []
    ptr = head
    while ptr:
        result.append(ffi.string(ptr.data))
        ptr = ptr.next
    lib.curl_slist_free_all(head)
    return result


[docs] class Curl: """ Wrapper for ``curl_easy_*`` functions of libcurl. """
[docs] def __init__(self, cacert: str = "", debug: bool = False, handle=None) -> None: """ Parameters: cacert: CA cert path to use, by default, certs from ``certifi`` are used. debug: whether to show curl debug messages. handle: a curl handle instance from ``curl_easy_init``. """ self._curl = handle if handle else lib.curl_easy_init() self._headers = ffi.NULL self._proxy_headers = ffi.NULL self._resolve = ffi.NULL self._cacert = cacert or DEFAULT_CACERT self._is_cert_set = False self._write_handle = None self._header_handle = None self._debug_handle = None self._body_handle = None # TODO: use CURL_ERROR_SIZE self._error_buffer = ffi.new("char[]", 256) self._debug = debug self._set_error_buffer()
def _set_error_buffer(self) -> None: ret = lib._curl_easy_setopt(self._curl, CurlOpt.ERRORBUFFER, self._error_buffer) if ret != 0: warnings.warn("Failed to set error buffer", CurlCffiWarning, stacklevel=2) if self._debug: self.debug()
[docs] def debug(self) -> None: """Set debug to True""" self.setopt(CurlOpt.VERBOSE, 1) self.setopt(CurlOpt.DEBUGFUNCTION, True)
def __del__(self) -> None: self.close() def _check_error(self, errcode: int, *args: Any) -> None: error = self._get_error(errcode, *args) if error is not None: raise error def _get_error(self, errcode: int, *args: Any): if errcode != 0: errmsg = ffi.string(self._error_buffer).decode(errors="backslashreplace") action = " ".join([str(a) for a in args]) return CurlError( f"Failed to {action}, curl: ({errcode}) {errmsg}. " "See https://curl.se/libcurl/c/libcurl-errors.html first for more " "details.", code=cast(CurlECode, errcode), )
[docs] def setopt(self, option: CurlOpt, value: Any) -> int: """Wrapper for ``curl_easy_setopt``. Parameters: option: option to set, using constants from CurlOpt enum value: value to set, strings will be handled automatically Returns: 0 if no error, see ``CurlECode``. """ input_option = { # this should be int in curl, but cffi requires pointer for void* # it will be convert back in the glue c code. 0: "long*", 10000: "char*", 20000: "void*", 30000: "int64_t*", # offset type 40000: "void*", # blob type } # print("option", option, "value", value) # Convert value value_type = input_option.get((option // 10000) * 10000) if value_type == "long*" or value_type == "int64_t*": c_value = ffi.new(value_type, value) elif option == CurlOpt.WRITEDATA: c_value = ffi.new_handle(value) self._write_handle = c_value lib._curl_easy_setopt( self._curl, CurlOpt.WRITEFUNCTION, lib.buffer_callback ) elif option == CurlOpt.HEADERDATA: c_value = ffi.new_handle(value) self._header_handle = c_value lib._curl_easy_setopt( self._curl, CurlOpt.HEADERFUNCTION, lib.buffer_callback ) elif option == CurlOpt.WRITEFUNCTION: c_value = ffi.new_handle(value) self._write_handle = c_value lib._curl_easy_setopt(self._curl, CurlOpt.WRITEFUNCTION, lib.write_callback) option = CurlOpt.WRITEDATA elif option == CurlOpt.HEADERFUNCTION: c_value = ffi.new_handle(value) self._header_handle = c_value lib._curl_easy_setopt(self._curl, CurlOpt.HEADERFUNCTION, lib.write_callback) option = CurlOpt.HEADERDATA elif option == CurlOpt.DEBUGFUNCTION: if value is True: value = debug_function_default c_value = ffi.new_handle(value) self._debug_handle = c_value lib._curl_easy_setopt(self._curl, CurlOpt.DEBUGFUNCTION, lib.debug_function) option = CurlOpt.DEBUGDATA elif value_type == "char*": c_value = value.encode() if isinstance(value, str) else value # Must keep a reference, otherwise may be GCed. if option == CurlOpt.POSTFIELDS: self._body_handle = c_value else: raise NotImplementedError(f"Option unsupported: {option}") if option == CurlOpt.HTTPHEADER: for header in value: self._headers = lib.curl_slist_append(self._headers, header) ret = lib._curl_easy_setopt(self._curl, option, self._headers) elif option == CurlOpt.PROXYHEADER: for proxy_header in value: self._proxy_headers = lib.curl_slist_append( self._proxy_headers, proxy_header ) ret = lib._curl_easy_setopt(self._curl, option, self._proxy_headers) elif option == CurlOpt.RESOLVE: for resolve in value: if isinstance(resolve, str): resolve = resolve.encode() self._resolve = lib.curl_slist_append(self._resolve, resolve) ret = lib._curl_easy_setopt(self._curl, option, self._resolve) else: ret = lib._curl_easy_setopt(self._curl, option, c_value) self._check_error(ret, "setopt", option, value) if option == CurlOpt.CAINFO: self._is_cert_set = True return ret
[docs] def getinfo(self, option: CurlInfo) -> Union[bytes, int, float, list]: """Wrapper for ``curl_easy_getinfo``. Gets information in response after curl.perform. Parameters: option: option to get info of, using constants from ``CurlInfo`` enum Returns: value retrieved from last perform. """ ret_option = { 0x100000: "char**", 0x200000: "long*", 0x300000: "double*", 0x400000: "struct curl_slist **", 0x500000: "long*", 0x600000: "int64_t*", } ret_cast_option = { 0x100000: ffi.string, 0x200000: int, 0x300000: float, 0x500000: int, 0x600000: int, } c_value = ffi.new(ret_option[option & 0xF00000]) ret = lib.curl_easy_getinfo(self._curl, option, c_value) self._check_error(ret, "getinfo", option) # cookielist and ssl_engines starts with 0x400000, see also: const.py if option & 0xF00000 == 0x400000: return slist_to_list(c_value[0]) if c_value[0] == ffi.NULL: return b"" return ret_cast_option[option & 0xF00000](c_value[0])
[docs] def version(self) -> bytes: """Get the underlying libcurl version.""" return ffi.string(lib.curl_version())
[docs] def impersonate(self, target: str, default_headers: bool = True) -> int: """Set the browser type to impersonate. Parameters: target: browser to impersonate. default_headers: whether to add default headers, like User-Agent. Returns: 0 if no error. """ return lib.curl_easy_impersonate( self._curl, target.encode(), int(default_headers) )
def _ensure_cacert(self) -> None: if not self._is_cert_set: ret = self.setopt(CurlOpt.CAINFO, self._cacert) self._check_error(ret, "set cacert") ret = self.setopt(CurlOpt.PROXY_CAINFO, self._cacert) self._check_error(ret, "set proxy cacert")
[docs] def perform(self, clear_headers: bool = True) -> None: """Wrapper for ``curl_easy_perform``, performs a curl request. Parameters: clear_headers: clear header slist used in this perform Raises: CurlError: if the perform was not successful. """ # make sure we set a cacert store self._ensure_cacert() # here we go ret = lib.curl_easy_perform(self._curl) try: self._check_error(ret, "perform") finally: # cleaning self.clean_after_perform(clear_headers)
def upkeep(self) -> int: return lib.curl_easy_upkeep(self._curl) def clean_after_perform(self, clear_headers: bool = True) -> None: """Clean up handles and buffers after ``perform``, called at the end of ``perform``.""" self._write_handle = None self._header_handle = None self._debug_handle = None self._body_handle = None if clear_headers: if self._headers != ffi.NULL: lib.curl_slist_free_all(self._headers) self._headers = ffi.NULL if self._proxy_headers != ffi.NULL: lib.curl_slist_free_all(self._proxy_headers) self._proxy_headers = ffi.NULL
[docs] def duphandle(self) -> Curl: """Wrapper for ``curl_easy_duphandle``. This is not a full copy of entire curl object in python. For example, headers handle is not copied, you have to set them again.""" new_handle = lib.curl_easy_duphandle(self._curl) c = Curl(cacert=self._cacert, debug=self._debug, handle=new_handle) return c
[docs] def reset(self) -> None: """Reset all curl options, wrapper for ``curl_easy_reset``.""" self._is_cert_set = False if self._curl is not None: lib.curl_easy_reset(self._curl) self._set_error_buffer() self._resolve = ffi.NULL
[docs] @staticmethod def get_reason_phrase(status_line: bytes) -> bytes: """Extract reason phrase, like ``OK``, ``Not Found`` from response status line.""" m = REASON_PHRASE_RE.match(status_line) return m.group(1) if m else b""
[docs] @staticmethod def parse_status_line(status_line: bytes) -> tuple[CurlHttpVersion, int, bytes]: """Parse status line. Returns: http_version, status_code, and reason phrase """ m = STATUS_LINE_RE.match(status_line) if not m: return CurlHttpVersion.V1_0, 0, b"" if m.group(1) == "2.0": http_version = CurlHttpVersion.V2_0 elif m.group(1) == "1.1": http_version = CurlHttpVersion.V1_1 elif m.group(1) == "1.0": http_version = CurlHttpVersion.V1_0 else: http_version = CurlHttpVersion.NONE status_code = int(m.group(2)) reason = m.group(3) return http_version, status_code, reason
[docs] def close(self) -> None: """Close and cleanup curl handle, wrapper for ``curl_easy_cleanup``.""" if self._curl: lib.curl_easy_cleanup(self._curl) self._curl = None ffi.release(self._error_buffer) self._resolve = ffi.NULL
[docs] def ws_recv(self, n: int = 1024) -> tuple[bytes, CurlWsFrame]: """Receive a frame from a websocket connection. Args: n: maximum data to receive. Returns: a tuple of frame content and curl frame meta struct. Raises: CurlError: if failed. """ buffer = ffi.new("char[]", n) n_recv = ffi.new("int *") p_frame = ffi.new("struct curl_ws_frame **") ret = lib.curl_ws_recv(self._curl, buffer, n, n_recv, p_frame) self._check_error(ret, "WS_RECV") # Frame meta explained: https://curl.se/libcurl/c/curl_ws_meta.html frame = p_frame[0] return ffi.buffer(buffer)[: n_recv[0]], frame
[docs] def ws_send(self, payload: bytes, flags: CurlWsFlag = CurlWsFlag.BINARY) -> int: """Send data to a websocket connection. Args: payload: content to send. flags: websocket flag to set for the frame, default: binary. Returns: 0 if no error. Raises: CurlError: if failed. """ n_sent = ffi.new("int *") buffer = ffi.from_buffer(payload) ret = lib.curl_ws_send(self._curl, buffer, len(buffer), n_sent, 0, flags) self._check_error(ret, "WS_SEND") return n_sent[0]
[docs] def ws_close(self, code: int = 1000, message: bytes = b"") -> int: """Close a websocket connection. Shorthand for :meth:`ws_send` with close code and message. Note that to completely close the connection, you must close the curl handle after this call with :meth:`close`. Args: code: close code. message: close message. Returns: 0 if no error. Raises: CurlError: if failed. """ return self.ws_send(struct.pack("!H", code) + message)
[docs] class CurlMime: """Wrapper for the ``curl_mime_`` API."""
[docs] def __init__(self, curl: Optional[Curl] = None): """ Args: curl: Curl instance to use. """ self._curl = curl if curl else Curl() self._form = lib.curl_mime_init(self._curl._curl)
[docs] def addpart( self, name: str, *, content_type: Optional[str] = None, filename: Optional[str] = None, local_path: Optional[Union[str, bytes, Path]] = None, data: Optional[bytes] = None, ) -> None: """Add a mime part for a mutlipart html form. Note: You can only use either local_path or data, not both. Args: name: name of the field. content_type: content_type for the field. for example: ``image/png``. filename: filename for the server. local_path: file to upload on local disk. data: file content to upload. """ part = lib.curl_mime_addpart(self._form) ret = lib.curl_mime_name(part, name.encode()) if ret != 0: raise CurlError("Add field failed.") # mime type if content_type is not None: ret = lib.curl_mime_type(part, content_type.encode()) if ret != 0: raise CurlError("Add field failed.") # remote file name if filename is not None: ret = lib.curl_mime_filename(part, filename.encode()) if ret != 0: raise CurlError("Add field failed.") if local_path and data: raise CurlError("Can not use local_path and data at the same time.") # this is a filename if local_path is not None: if isinstance(local_path, Path): local_path_str = str(local_path) elif isinstance(local_path, bytes): local_path_str = local_path.decode() else: local_path_str = local_path if not Path(local_path_str).exists(): raise FileNotFoundError(f"File not found at {local_path_str}") ret = lib.curl_mime_filedata(part, local_path_str.encode()) if ret != 0: raise CurlError("Add field failed.") if data is not None: if not isinstance(data, bytes): data = str(data).encode() ret = lib.curl_mime_data(part, data, len(data))
[docs] @classmethod def from_list(cls, files: list[dict]): """Create a multipart instance from a list of dict, for keys, see ``addpart``""" form = cls() for file in files: form.addpart(**file) return form
[docs] def attach(self, curl: Optional[Curl] = None) -> None: """Attach the mime instance to a curl instance.""" c = curl if curl else self._curl c.setopt(CurlOpt.MIMEPOST, self._form)
[docs] def close(self) -> None: """Close the mime instance and underlying files. This method must be called after ``perform`` or ``request``.""" lib.curl_mime_free(self._form) self._form = ffi.NULL
def __del__(self) -> None: self.close()