Source code for curl_cffi.requests.session

from __future__ import annotations

import asyncio
import queue
import sys
import threading
import warnings
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager, contextmanager, suppress
from io import BytesIO
from typing import (
    TYPE_CHECKING,
    Callable,
    Generic,
    Literal,
    Optional,
    TypedDict,
    TypeVar,
    Union,
    cast,
)
from urllib.parse import urlparse

from ..aio import AsyncCurl
from ..const import CurlHttpVersion, CurlInfo, CurlOpt
from ..curl import Curl, CurlError, CurlMime
from ..utils import CurlCffiWarning
from .cookies import Cookies, CookieTypes, CurlMorsel
from .exceptions import RequestException, SessionClosed, code2error
from .headers import Headers, HeaderTypes
from .impersonate import BrowserTypeLiteral, ExtraFingerprints, ExtraFpDict
from .models import STREAM_END, Response
from .utils import not_set, set_curl_options
from .websockets import AsyncWebSocket, WebSocket

with suppress(ImportError):
    import gevent

with suppress(ImportError):
    import eventlet.tpool

# Added in 3.13: https://docs.python.org/3/library/typing.html#typing.TypeVar.__default__
if sys.version_info >= (3, 13):
    R = TypeVar("R", bound=Response, default=Response)
else:
    R = TypeVar("R", bound=Response)

if TYPE_CHECKING:
    from typing_extensions import Unpack

    class ProxySpec(TypedDict, total=False):
        all: str
        http: str
        https: str
        ws: str
        wss: str

    class BaseSessionParams(Generic[R], TypedDict, total=False):
        headers: Optional[HeaderTypes]
        cookies: Optional[CookieTypes]
        auth: Optional[tuple[str, str]]
        proxies: Optional[ProxySpec]
        proxy: Optional[str]
        proxy_auth: Optional[tuple[str, str]]
        base_url: Optional[str]
        params: Optional[dict]
        verify: bool
        timeout: Union[float, tuple[float, float]]
        trust_env: bool
        allow_redirects: bool
        max_redirects: int
        impersonate: Optional[BrowserTypeLiteral]
        ja3: Optional[str]
        akamai: Optional[str]
        extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]]
        default_headers: bool
        default_encoding: Union[str, Callable[[bytes], str]]
        curl_options: Optional[dict]
        curl_infos: Optional[list]
        http_version: Optional[CurlHttpVersion]
        debug: bool
        interface: Optional[str]
        cert: Optional[Union[str, tuple[str, str]]]
        response_class: Optional[type[R]]

    class StreamRequestParams(TypedDict, total=False):
        params: Optional[Union[dict, list, tuple]]
        data: Optional[Union[dict[str, str], list[tuple], str, BytesIO, bytes]]
        json: Optional[dict | list]
        headers: Optional[HeaderTypes]
        cookies: Optional[CookieTypes]
        files: Optional[dict]
        auth: Optional[tuple[str, str]]
        timeout: Optional[Union[float, tuple[float, float], object]]
        allow_redirects: Optional[bool]
        max_redirects: Optional[int]
        proxies: Optional[ProxySpec]
        proxy: Optional[str]
        proxy_auth: Optional[tuple[str, str]]
        verify: Optional[bool]
        referer: Optional[str]
        accept_encoding: Optional[str]
        content_callback: Optional[Callable]
        impersonate: Optional[BrowserTypeLiteral]
        ja3: Optional[str]
        akamai: Optional[str]
        extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]]
        default_headers: Optional[bool]
        default_encoding: Union[str, Callable[[bytes], str]]
        quote: Union[str, Literal[False]]
        http_version: Optional[CurlHttpVersion]
        interface: Optional[str]
        cert: Optional[Union[str, tuple[str, str]]]
        max_recv_speed: int
        multipart: Optional[CurlMime]

    class RequestParams(StreamRequestParams, total=False):
        stream: Optional[bool]

else:

    class _Unpack:
        @staticmethod
        def __getitem__(*args, **kwargs):
            pass

    Unpack = _Unpack()

    ProxySpec = dict[str, str]
    BaseSessionParams = TypedDict
    StreamRequestParams, RequestParams = TypedDict, TypedDict

ThreadType = Literal["eventlet", "gevent"]
HttpMethod = Literal[
    "GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "TRACE", "PATCH", "QUERY"
]


def _is_absolute_url(url: str) -> bool:
    """Check if the provided url is an absolute url"""
    parsed_url = urlparse(url)
    return bool(parsed_url.scheme and parsed_url.hostname)


def _peek_queue(q: queue.Queue, default=None):
    try:
        return q.queue[0]
    except IndexError:
        return default


def _peek_aio_queue(q: asyncio.Queue, default=None):
    try:
        return q._queue[0]  # type: ignore
    except IndexError:
        return default


class BaseSession(Generic[R]):
    """Provide common methods for setting curl options and reading info in sessions."""

    def __init__(
        self,
        *,
        headers: Optional[HeaderTypes] = None,
        cookies: Optional[CookieTypes] = None,
        auth: Optional[tuple[str, str]] = None,
        proxies: Optional[ProxySpec] = None,
        proxy: Optional[str] = None,
        proxy_auth: Optional[tuple[str, str]] = None,
        base_url: Optional[str] = None,
        params: Optional[dict] = None,
        verify: bool = True,
        timeout: Union[float, tuple[float, float]] = 30,
        trust_env: bool = True,
        allow_redirects: bool = True,
        max_redirects: int = 30,
        impersonate: Optional[BrowserTypeLiteral] = None,
        ja3: Optional[str] = None,
        akamai: Optional[str] = None,
        extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]] = None,
        default_headers: bool = True,
        default_encoding: Union[str, Callable[[bytes], str]] = "utf-8",
        curl_options: Optional[dict] = None,
        curl_infos: Optional[list] = None,
        http_version: Optional[CurlHttpVersion] = None,
        debug: bool = False,
        interface: Optional[str] = None,
        cert: Optional[Union[str, tuple[str, str]]] = None,
        response_class: Optional[type[R]] = None,
    ):
        self.headers = Headers(headers)
        self._cookies = Cookies(cookies)  # guarded by @property
        self.auth = auth
        self.base_url = base_url
        self.params = params
        self.verify = verify
        self.timeout = timeout
        self.trust_env = trust_env
        self.allow_redirects = allow_redirects
        self.max_redirects = max_redirects
        self.impersonate = impersonate
        self.ja3 = ja3
        self.akamai = akamai
        self.extra_fp = extra_fp
        self.default_headers = default_headers
        self.default_encoding = default_encoding
        self.curl_options = curl_options or {}
        self.curl_infos = curl_infos or []
        self.http_version = http_version
        self.debug = debug
        self.interface = interface
        self.cert = cert

        if response_class is not None and issubclass(response_class, Response) is False:
            raise TypeError(
                "`response_class` must be a subclass of "
                "`curl_cffi.requests.models.Response`, "
                f"not of type `{response_class}`"
            )
        self.response_class = response_class or Response

        if proxy and proxies:
            raise TypeError("Cannot specify both 'proxy' and 'proxies'")
        if proxy:
            proxies = {"all": proxy}
        self.proxies: ProxySpec = proxies or {}
        self.proxy_auth = proxy_auth

        if self.base_url and not _is_absolute_url(self.base_url):
            raise ValueError("You need to provide an absolute url for 'base_url'")

        self._closed = False

    def _parse_response(self, curl, buffer, header_buffer, default_encoding) -> R:
        c = curl
        rsp = cast(R, self.response_class(c))
        rsp.url = cast(bytes, c.getinfo(CurlInfo.EFFECTIVE_URL)).decode()
        if buffer:
            rsp.content = buffer.getvalue()
        rsp.http_version = cast(int, c.getinfo(CurlInfo.HTTP_VERSION))
        rsp.status_code = cast(int, c.getinfo(CurlInfo.RESPONSE_CODE))
        rsp.ok = 200 <= rsp.status_code < 400
        header_lines = header_buffer.getvalue().splitlines()

        # TODO: history urls
        header_list: list[bytes] = []
        for header_line in header_lines:
            if not header_line.strip():
                continue
            if header_line.startswith(b"HTTP/"):
                # read header from last response
                rsp.reason = c.get_reason_phrase(header_line).decode()
                # empty header list for new redirected response
                header_list = []
                continue
            if header_line.startswith(b" ") or header_line.startswith(b"\t"):
                header_list[-1] += header_line
                continue
            header_list.append(header_line)
        rsp.headers = Headers(header_list)

        # cookies
        morsels = [
            CurlMorsel.from_curl_format(c) for c in c.getinfo(CurlInfo.COOKIELIST)
        ]
        # for l in c.getinfo(CurlInfo.COOKIELIST):
        #     print("Curl Cookies", l.decode())
        self._cookies.update_cookies_from_curl(morsels)
        rsp.cookies = self._cookies
        # print("Cookies after extraction", self._cookies)

        rsp.primary_ip = cast(bytes, c.getinfo(CurlInfo.PRIMARY_IP)).decode()
        rsp.primary_port = cast(int, c.getinfo(CurlInfo.PRIMARY_PORT))
        rsp.local_ip = cast(bytes, c.getinfo(CurlInfo.LOCAL_IP)).decode()
        rsp.local_port = cast(int, c.getinfo(CurlInfo.LOCAL_PORT))
        rsp.default_encoding = default_encoding
        rsp.elapsed = cast(float, c.getinfo(CurlInfo.TOTAL_TIME))
        rsp.redirect_count = cast(int, c.getinfo(CurlInfo.REDIRECT_COUNT))
        rsp.redirect_url = cast(bytes, c.getinfo(CurlInfo.REDIRECT_URL)).decode()

        # custom info options
        for info in self.curl_infos:
            rsp.infos[info] = c.getinfo(info)

        return rsp

    def _check_session_closed(self):
        if self._closed:
            raise SessionClosed("Session is closed, cannot send request.")

    @property
    def cookies(self) -> Cookies:
        return self._cookies

    @cookies.setter
    def cookies(self, cookies: CookieTypes) -> None:
        # This ensures that the cookies property is always converted to Cookies.
        self._cookies = Cookies(cookies)


[docs] class Session(BaseSession[R]): """A request session, cookies and connections will be reused. This object is thread-safe, but it's recommended to use a separate session for each thread."""
[docs] def __init__( self, curl: Optional[Curl] = None, thread: Optional[ThreadType] = None, use_thread_local_curl: bool = True, **kwargs: Unpack[BaseSessionParams[R]], ): """ Parameters set in the ``__init__`` method will be overriden by the same parameter in request method. Args: curl: curl object to use in the session. If not provided, a new one will be created. Also, a fresh curl object will always be created when accessed from another thread. thread: thread engine to use for working with other thread implementations. choices: eventlet, gevent. headers: headers to use in the session. cookies: cookies to add in the session. auth: HTTP basic auth, a tuple of (username, password), only basic auth is supported. proxies: dict of proxies to use, prefer to use proxy if they are the same. format: ``{"http": proxy_url, "https": proxy_url}``. proxy: proxy to use, format: "http://proxy_url". Cannot be used with the above parameter. proxy_auth: HTTP basic auth for proxy, a tuple of (username, password). base_url: absolute url to use as base for relative urls. params: query string for the session. verify: whether to verify https certs. timeout: how many seconds to wait before giving up. trust_env: use http_proxy/https_proxy and other environments, default True. allow_redirects: whether to allow redirection. max_redirects: max redirect counts, default 30, use -1 for unlimited. impersonate: which browser version to impersonate in the session. ja3: ja3 string to impersonate in the session. akamai: akamai string to impersonate in the session. extra_fp: extra fingerprints options, in complement to ja3 and akamai str. interface: which interface use. default_encoding: encoding for decoding response content if charset is not found in headers. Defaults to "utf-8". Can be set to a callable for automatic detection. cert: a tuple of (cert, key) filenames for client cert. response_class: A customized subtype of ``Response`` to use. Notes: This class can be used as a context manager. .. code-block:: python from curl_cffi.requests import Session with Session() as s: r = s.get("https://example.com") """ super().__init__(**kwargs) self._thread = thread self._use_thread_local_curl = use_thread_local_curl self._queue = None self._executor = None if use_thread_local_curl: self._local = threading.local() if curl: self._is_customized_curl = True self._local.curl = curl else: self._is_customized_curl = False self._local.curl = Curl(debug=self.debug) else: self._curl = curl if curl else Curl(debug=self.debug)
@property def curl(self): if self._use_thread_local_curl: if self._is_customized_curl: warnings.warn( "Creating fresh curl handle in different thread.", CurlCffiWarning, stacklevel=2, ) if not getattr(self._local, "curl", None): self._local.curl = Curl(debug=self.debug) return self._local.curl else: return self._curl @property def executor(self): if self._executor is None: self._executor = ThreadPoolExecutor() return self._executor def __enter__(self): return self def __exit__(self, *args): self.close() def close(self) -> None: """Close the session.""" self._closed = True self.curl.close()
[docs] @contextmanager def stream( self, method: HttpMethod, url: str, **kwargs: Unpack[StreamRequestParams], ): """Equivalent to ``with request(..., stream=True) as r:``""" rsp = self.request(method=method, url=url, **kwargs, stream=True) try: yield rsp finally: rsp.close()
[docs] def ws_connect( self, url, on_message=None, on_error=None, on_open=None, on_close=None, **kwargs ) -> WebSocket: """Connects to a websocket url. Note: This method is deprecated, use WebSocket instead. Args: url: the ws url to connect. on_message: message callback, ``def on_message(ws, str)`` on_error: error callback, ``def on_error(ws, error)`` on_open: open callback, ``def on_open(ws)`` on_close: close callback, ``def on_close(ws)`` Other parameters are the same as ``.request`` Returns: a WebSocket instance to communicate with the server. """ self._check_session_closed() curl = self.curl.duphandle() self.curl.reset() ws = WebSocket( curl=curl, on_message=on_message, on_error=on_error, on_open=on_open, on_close=on_close, ) ws.connect(url, **kwargs) return ws
def upkeep(self) -> int: return self.curl.upkeep()
[docs] def request( self, method: HttpMethod, url: str, params: Optional[Union[dict, list, tuple]] = None, data: Optional[Union[dict[str, str], list[tuple], str, BytesIO, bytes]] = None, json: Optional[dict | list] = None, headers: Optional[HeaderTypes] = None, cookies: Optional[CookieTypes] = None, files: Optional[dict] = None, auth: Optional[tuple[str, str]] = None, timeout: Optional[Union[float, tuple[float, float], object]] = not_set, allow_redirects: Optional[bool] = None, max_redirects: Optional[int] = None, proxies: Optional[ProxySpec] = None, proxy: Optional[str] = None, proxy_auth: Optional[tuple[str, str]] = None, verify: Optional[bool] = None, referer: Optional[str] = None, accept_encoding: Optional[str] = "gzip, deflate, br", content_callback: Optional[Callable] = None, impersonate: Optional[BrowserTypeLiteral] = None, ja3: Optional[str] = None, akamai: Optional[str] = None, extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]] = None, default_headers: Optional[bool] = None, default_encoding: Union[str, Callable[[bytes], str]] = "utf-8", quote: Union[str, Literal[False]] = "", http_version: Optional[CurlHttpVersion] = None, interface: Optional[str] = None, cert: Optional[Union[str, tuple[str, str]]] = None, stream: Optional[bool] = None, max_recv_speed: int = 0, multipart: Optional[CurlMime] = None, ): """Send the request, see ``requests.request`` for details on parameters.""" self._check_session_closed() # clone a new curl instance for streaming response if stream: c = self.curl.duphandle() self.curl.reset() else: c = self.curl req, buffer, header_buffer, q, header_recved, quit_now = set_curl_options( c, method=method, url=url, params_list=[self.params, params], base_url=self.base_url, data=data, json=json, headers_list=[self.headers, headers], cookies_list=[self._cookies, cookies], files=files, auth=auth or self.auth, timeout=self.timeout if timeout is not_set else timeout, allow_redirects=self.allow_redirects if allow_redirects is None else allow_redirects, max_redirects=self.max_redirects if max_redirects is None else max_redirects, proxies_list=[self.proxies, proxies], proxy=proxy, proxy_auth=proxy_auth or self.proxy_auth, verify_list=[self.verify, verify], referer=referer, accept_encoding=accept_encoding, content_callback=content_callback, impersonate=impersonate or self.impersonate, ja3=ja3 or self.ja3, akamai=akamai or self.akamai, extra_fp=extra_fp or self.extra_fp, default_headers=self.default_headers if default_headers is None else default_headers, quote=quote, http_version=http_version or self.http_version, interface=interface or self.interface, stream=stream, max_recv_speed=max_recv_speed, multipart=multipart, cert=cert or self.cert, curl_options=self.curl_options, queue_class=queue.Queue, event_class=threading.Event, ) if stream: header_parsed = threading.Event() def perform(): try: c.perform() except CurlError as e: rsp = self._parse_response( c, buffer, header_buffer, default_encoding ) rsp.request = req q.put_nowait(RequestException(str(e), e.code, rsp)) # type: ignore finally: if not cast(threading.Event, header_recved).is_set(): cast(threading.Event, header_recved).set() q.put(STREAM_END) # type: ignore def cleanup(fut): header_parsed.wait() c.reset() stream_task = self.executor.submit(perform) stream_task.add_done_callback(cleanup) # Wait for the first chunk header_recved.wait() # type: ignore rsp = self._parse_response(c, buffer, header_buffer, default_encoding) header_parsed.set() # Raise the exception if something wrong happens when receiving the header. first_element = _peek_queue(q) # type: ignore if isinstance(first_element, RequestException): c.reset() raise first_element rsp.request = req rsp.stream_task = stream_task rsp.quit_now = quit_now rsp.queue = q return rsp else: try: if self._thread == "eventlet": # see: https://eventlet.net/doc/threading.html eventlet.tpool.execute(c.perform) # type: ignore elif self._thread == "gevent": # see: https://www.gevent.org/api/gevent.threadpool.html gevent.get_hub().threadpool.spawn(c.perform).get() # type: ignore else: c.perform() except CurlError as e: rsp = self._parse_response(c, buffer, header_buffer, default_encoding) rsp.request = req error = code2error(e.code, str(e)) raise error(str(e), e.code, rsp) from e else: rsp = self._parse_response(c, buffer, header_buffer, default_encoding) rsp.request = req return rsp finally: c.reset()
def head(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="HEAD", url=url, **kwargs) def get(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="GET", url=url, **kwargs) def post(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="POST", url=url, **kwargs) def put(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="PUT", url=url, **kwargs) def patch(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="PATCH", url=url, **kwargs) def delete(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="DELETE", url=url, **kwargs) def options(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="OPTIONS", url=url, **kwargs) def trace(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="TRACE", url=url, **kwargs) def query(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="QUERY", url=url, **kwargs)
[docs] class AsyncSession(BaseSession[R]): """An async request session, cookies and connections will be reused."""
[docs] def __init__( self, *, loop=None, async_curl: Optional[AsyncCurl] = None, max_clients: int = 10, **kwargs: Unpack[BaseSessionParams[R]], ): """ Parameters set in the ``__init__`` method will be override by the same parameter in request method. Parameters: loop: loop to use, if not provided, the running loop will be used. async_curl: [AsyncCurl](/api/curl_cffi#curl_cffi.AsyncCurl) object to use. max_clients: maxmium curl handle to use in the session, this will affect the concurrency ratio. headers: headers to use in the session. cookies: cookies to add in the session. auth: HTTP basic auth, a tuple of (username, password), only basic auth is supported. proxies: dict of proxies to use, prefer to use ``proxy`` if they are the same. format: ``{"http": proxy_url, "https": proxy_url}``. proxy: proxy to use, format: "http://proxy_url". Cannot be used with the above parameter. proxy_auth: HTTP basic auth for proxy, a tuple of (username, password). base_url: absolute url to use for relative urls. params: query string for the session. verify: whether to verify https certs. timeout: how many seconds to wait before giving up. trust_env: use http_proxy/https_proxy and other environments, default True. allow_redirects: whether to allow redirection. max_redirects: max redirect counts, default 30, use -1 for unlimited. impersonate: which browser version to impersonate in the session. ja3: ja3 string to impersonate in the session. akamai: akamai string to impersonate in the session. extra_fp: extra fingerprints options, in complement to ja3 and akamai str. default_encoding: encoding for decoding response content if charset is not found in headers. Defaults to "utf-8". Can be set to a callable for automatic detection. cert: a tuple of (cert, key) filenames for client cert. response_class: A customized subtype of ``Response`` to use. Notes: This class can be used as a context manager, and it's recommended to use via ``async with``. However, unlike aiohttp, it is not required to use ``with``. .. code-block:: python from curl_cffi.requests import AsyncSession # recommended. async with AsyncSession() as s: r = await s.get("https://example.com") s = AsyncSession() # it also works. """ super().__init__(**kwargs) self._loop = loop self._acurl = async_curl self.max_clients = max_clients self.init_pool()
@property def loop(self): if self._loop is None: self._loop = asyncio.get_running_loop() return self._loop @property def acurl(self): if self._acurl is None: self._acurl = AsyncCurl(loop=self.loop) return self._acurl def init_pool(self): self.pool = asyncio.LifoQueue(self.max_clients) while True: try: self.pool.put_nowait(None) except asyncio.QueueFull: break async def pop_curl(self): curl = await self.pool.get() if curl is None: curl = Curl(debug=self.debug) # XXX: This may be related to proxy rotation # curl.setopt(CurlOpt.FRESH_CONNECT, 1) # curl.setopt(CurlOpt.FORBID_REUSE, 1) return curl def push_curl(self, curl): with suppress(asyncio.QueueFull): self.pool.put_nowait(curl) async def __aenter__(self): return self async def __aexit__(self, *args): await self.close() return None
[docs] async def close(self) -> None: """Close the session.""" await self.acurl.close() self._closed = True while True: try: curl = self.pool.get_nowait() if curl: curl.close() except asyncio.QueueEmpty: break
def release_curl(self, curl): curl.clean_after_perform() if not self._closed: self.acurl.remove_handle(curl) curl.reset() # curl.setopt(CurlOpt.PIPEWAIT, 1) self.push_curl(curl) else: curl.close()
[docs] @asynccontextmanager async def stream( self, method: HttpMethod, url: str, **kwargs: Unpack[StreamRequestParams], ): """Equivalent to ``async with request(..., stream=True) as r:``""" rsp = await self.request(method=method, url=url, **kwargs, stream=True) try: yield rsp finally: await rsp.aclose()
[docs] async def ws_connect( self, url: str, autoclose: bool = True, params: Optional[Union[dict, list, tuple]] = None, headers: Optional[HeaderTypes] = None, cookies: Optional[CookieTypes] = None, auth: Optional[tuple[str, str]] = None, timeout: Optional[Union[float, tuple[float, float], object]] = not_set, allow_redirects: Optional[bool] = None, max_redirects: Optional[int] = None, proxies: Optional[ProxySpec] = None, proxy: Optional[str] = None, proxy_auth: Optional[tuple[str, str]] = None, verify: Optional[bool] = None, referer: Optional[str] = None, accept_encoding: Optional[str] = "gzip, deflate, br", impersonate: Optional[BrowserTypeLiteral] = None, ja3: Optional[str] = None, akamai: Optional[str] = None, extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]] = None, default_headers: Optional[bool] = None, quote: Union[str, Literal[False]] = "", http_version: Optional[CurlHttpVersion] = None, interface: Optional[str] = None, cert: Optional[Union[str, tuple[str, str]]] = None, max_recv_speed: int = 0, ) -> AsyncWebSocket: """Connects to a WebSocket. Args: url: url for the requests. autoclose: whether to close the WebSocket after receiving a close frame. params: query string for the requests. headers: headers to send. cookies: cookies to use. auth: HTTP basic auth, a tuple of (username, password), only basic auth is supported. timeout: how many seconds to wait before giving up. allow_redirects: whether to allow redirection. max_redirects: max redirect counts, default 30, use -1 for unlimited. proxies: dict of proxies to use, prefer to use ``proxy`` if they are the same. format: ``{"http": proxy_url, "https": proxy_url}``. proxy: proxy to use, format: "http://user@pass:proxy_url". Can't be used with `proxies` parameter. proxy_auth: HTTP basic auth for proxy, a tuple of (username, password). verify: whether to verify https certs. referer: shortcut for setting referer header. accept_encoding: shortcut for setting accept-encoding header. impersonate: which browser version to impersonate. ja3: ja3 string to impersonate. akamai: akamai string to impersonate. extra_fp: extra fingerprints options, in complement to ja3 and akamai str. default_headers: whether to set default browser headers. quote: Set characters to be quoted, i.e. percent-encoded. Default safe string is ``!#$%&'()*+,/:;=?@[]~``. If set to a sting, the character will be removed from the safe string, thus quoted. If set to False, the url will be kept as is, without any automatic percent-encoding, you must encode the URL yourself. curl_options: extra curl options to use. http_version: limiting http version, defaults to http2. interface: which interface to use. cert: a tuple of (cert, key) filenames for client cert. max_recv_speed: maximum receive speed, bytes per second. """ self._check_session_closed() curl = await self.pop_curl() set_curl_options( curl=curl, method="GET", url=url, base_url=self.base_url, params_list=[self.params, params], headers_list=[self.headers, headers], cookies_list=[self.cookies, cookies], auth=auth or self.auth, timeout=self.timeout if timeout is not_set else timeout, allow_redirects=self.allow_redirects if allow_redirects is None else allow_redirects, max_redirects=self.max_redirects if max_redirects is None else max_redirects, proxies_list=[self.proxies, proxies], proxy=proxy, proxy_auth=proxy_auth or self.proxy_auth, verify_list=[self.verify, verify], referer=referer, accept_encoding=accept_encoding, impersonate=impersonate or self.impersonate, ja3=ja3 or self.ja3, akamai=akamai or self.akamai, extra_fp=extra_fp or self.extra_fp, default_headers=self.default_headers if default_headers is None else default_headers, quote=quote, http_version=http_version or self.http_version, interface=interface or self.interface, max_recv_speed=max_recv_speed, cert=cert or self.cert, queue_class=asyncio.Queue, event_class=asyncio.Event, ) curl.setopt(CurlOpt.CONNECT_ONLY, 2) # https://curl.se/docs/websocket.html await self.loop.run_in_executor(None, curl.perform) return AsyncWebSocket( cast(AsyncSession[Response], self), curl, autoclose=autoclose, )
[docs] async def request( self, method: HttpMethod, url: str, params: Optional[Union[dict, list, tuple]] = None, data: Optional[Union[dict[str, str], list[tuple], str, BytesIO, bytes]] = None, json: Optional[dict | list] = None, headers: Optional[HeaderTypes] = None, cookies: Optional[CookieTypes] = None, files: Optional[dict] = None, auth: Optional[tuple[str, str]] = None, timeout: Optional[Union[float, tuple[float, float], object]] = not_set, allow_redirects: Optional[bool] = None, max_redirects: Optional[int] = None, proxies: Optional[ProxySpec] = None, proxy: Optional[str] = None, proxy_auth: Optional[tuple[str, str]] = None, verify: Optional[bool] = None, referer: Optional[str] = None, accept_encoding: Optional[str] = "gzip, deflate, br", content_callback: Optional[Callable] = None, impersonate: Optional[BrowserTypeLiteral] = None, ja3: Optional[str] = None, akamai: Optional[str] = None, extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]] = None, default_headers: Optional[bool] = None, default_encoding: Union[str, Callable[[bytes], str]] = "utf-8", quote: Union[str, Literal[False]] = "", http_version: Optional[CurlHttpVersion] = None, interface: Optional[str] = None, cert: Optional[Union[str, tuple[str, str]]] = None, stream: Optional[bool] = None, max_recv_speed: int = 0, multipart: Optional[CurlMime] = None, ): """Send the request, see ``curl_cffi.requests.request`` for details on args.""" self._check_session_closed() curl = await self.pop_curl() req, buffer, header_buffer, q, header_recved, quit_now = set_curl_options( curl=curl, method=method, url=url, params_list=[self.params, params], base_url=self.base_url, data=data, json=json, headers_list=[self.headers, headers], cookies_list=[self.cookies, cookies], files=files, auth=auth or self.auth, timeout=self.timeout if timeout is not_set else timeout, allow_redirects=self.allow_redirects if allow_redirects is None else allow_redirects, max_redirects=self.max_redirects if max_redirects is None else max_redirects, proxies_list=[self.proxies, proxies], proxy=proxy, proxy_auth=proxy_auth or self.proxy_auth, verify_list=[self.verify, verify], referer=referer, accept_encoding=accept_encoding, content_callback=content_callback, impersonate=impersonate or self.impersonate, ja3=ja3 or self.ja3, akamai=akamai or self.akamai, extra_fp=extra_fp or self.extra_fp, default_headers=self.default_headers if default_headers is None else default_headers, quote=quote, http_version=http_version or self.http_version, interface=interface or self.interface, stream=stream, max_recv_speed=max_recv_speed, multipart=multipart, cert=cert or self.cert, curl_options=self.curl_options, queue_class=asyncio.Queue, event_class=asyncio.Event, ) if stream: task = self.acurl.add_handle(curl) async def perform(): try: await task except CurlError as e: rsp = self._parse_response( curl, buffer, header_buffer, default_encoding ) rsp.request = req q.put_nowait(RequestException(str(e), e.code, rsp)) # type: ignore finally: if not cast(asyncio.Event, header_recved).is_set(): cast(asyncio.Event, header_recved).set() await q.put(STREAM_END) # type: ignore def cleanup(fut): self.release_curl(curl) stream_task = asyncio.create_task(perform()) stream_task.add_done_callback(cleanup) await cast(asyncio.Event, header_recved).wait() # Unlike threads, coroutines does not use preemptive scheduling. # For asyncio, there is no need for a header_parsed event, the # _parse_response will execute in the foreground, no background tasks # running. rsp = self._parse_response(curl, buffer, header_buffer, default_encoding) first_element = _peek_aio_queue(q) # type: ignore if isinstance(first_element, RequestException): self.release_curl(curl) raise first_element rsp.request = req rsp.astream_task = stream_task rsp.quit_now = quit_now rsp.queue = q return rsp else: try: task = self.acurl.add_handle(curl) await task except CurlError as e: rsp = self._parse_response( curl, buffer, header_buffer, default_encoding ) rsp.request = req error = code2error(e.code, str(e)) raise error(str(e), e.code, rsp) from e else: rsp = self._parse_response( curl, buffer, header_buffer, default_encoding ) rsp.request = req return rsp finally: self.release_curl(curl)
def head(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="HEAD", url=url, **kwargs) def get(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="GET", url=url, **kwargs) def post(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="POST", url=url, **kwargs) def put(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="PUT", url=url, **kwargs) def patch(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="PATCH", url=url, **kwargs) def delete(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="DELETE", url=url, **kwargs) def options(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="OPTIONS", url=url, **kwargs) def trace(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="TRACE", url=url, **kwargs) def query(self, url: str, **kwargs: Unpack[RequestParams]): return self.request(method="QUERY", url=url, **kwargs)