from io import BytesIO
import sys
import textwrap
import email
import time
import random
import threading
import json
import asyncio
import ssl
from http.client import HTTPResponse

# Used for global variables
import stripe  # noqa: IMP101
from stripe import _util
from stripe._request_metrics import RequestMetrics
from stripe._error import APIConnectionError

from typing import (
    Any,
    Dict,
    Iterable,
    List,
    Mapping,
    MutableMapping,
    Optional,
    Tuple,
    ClassVar,
    Union,
    cast,
    overload,
    AsyncIterable,
)
from typing_extensions import (
    Literal,
    NoReturn,
    TypedDict,
    Awaitable,
    Never,
)


# The precedence of HTTP libraries is
# - Urlfetch (this is provided by Google App Engine, so if it's present you probably want it)
# - Requests (popular library, the top priority for all environments outside Google App Engine, but not always present)
# - Pycurl (another library, not always present, not as preferred as Requests but at least it verifies SSL certs)
# - urllib2 with a warning (basically always present, fallback if needed)
try:
    import urllib.request as urllibrequest
    import urllib.error as urlliberror
except ImportError:
    # Try to load in urllib2, but don't sweat it if it's not available.
    pass

try:
    import pycurl  # pyright: ignore
except ImportError:
    pycurl = None

try:
    import httpx
    import anyio
    from httpx import Timeout as HTTPXTimeout
    from httpx import Client as HTTPXClientType
except ImportError:
    httpx = None
    anyio = None

try:
    import aiohttp
    from aiohttp import ClientTimeout as AIOHTTPTimeout
    from aiohttp import StreamReader as AIOHTTPStreamReader
except ImportError:
    aiohttp = None

try:
    import requests
    from requests import Session as RequestsSession
except ImportError:
    requests = None
else:
    try:
        # Require version 0.8.8, but don't want to depend on distutils
        version: str
        version = requests.__version__
        major: int
        minor: int
        patch: int
        major, minor, patch = [int(i) for i in version.split(".")]
    except Exception:
        # Probably some new-fangled version, so it should support verify
        pass
    else:
        if (major, minor, patch) < (0, 8, 8):
            sys.stderr.write(
                "Warning: the Stripe library requires that your Python "
                '"requests" library be newer than version 0.8.8, but your '
                '"requests" library is version %s. Stripe will fall back to '
                "an alternate HTTP library so everything should work. We "
                'recommend upgrading your "requests" library. If you have any '
                "questions, please contact support@stripe.com. (HINT: running "
                '"pip install -U requests" should upgrade your requests '
                "library to the latest version.)" % (version,)
            )
            requests = None

try:
    from google.appengine.api import urlfetch  # pyright: ignore
except ImportError:
    urlfetch = None

# proxy support for the pycurl client
from urllib.parse import urlparse, ParseResult


def _now_ms():
    return int(round(time.time() * 1000))


def new_default_http_client(*args: Any, **kwargs: Any) -> "HTTPClient":
    if urlfetch:
        impl = UrlFetchClient
    elif requests:
        impl = RequestsClient
    elif pycurl:
        impl = PycurlClient
    else:
        impl = Urllib2Client

    return impl(*args, **kwargs)


def new_http_client_async_fallback(*args: Any, **kwargs: Any) -> "HTTPClient":
    if httpx:
        impl = HTTPXClient
    elif aiohttp:
        impl = AIOHTTPClient
    else:
        impl = NoImportFoundAsyncClient

    return impl(*args, **kwargs)


class HTTPClient(object):
    name: ClassVar[str]

    class _Proxy(TypedDict):
        http: Optional[str]
        https: Optional[str]

    MAX_DELAY = 5
    INITIAL_DELAY = 0.5
    MAX_RETRY_AFTER = 60
    _proxy: Optional[_Proxy]
    _verify_ssl_certs: bool

    def __init__(
        self,
        verify_ssl_certs: bool = True,
        proxy: Optional[Union[str, _Proxy]] = None,
        async_fallback_client: Optional["HTTPClient"] = None,
    ):
        self._verify_ssl_certs = verify_ssl_certs
        if proxy:
            if isinstance(proxy, str):
                proxy = {"http": proxy, "https": proxy}
            if not isinstance(proxy, dict):  # pyright: ignore[reportUnnecessaryIsInstance]
                raise ValueError(
                    "Proxy(ies) must be specified as either a string "
                    "URL or a dict() with string URL under the"
                    " "
                    "https"
                    " and/or "
                    "http"
                    " keys."
                )
        self._proxy = proxy.copy() if proxy else None
        self._async_fallback_client = async_fallback_client

        self._thread_local = threading.local()

    def _should_retry(
        self,
        response: Optional[Tuple[Any, int, Optional[Mapping[str, str]]]],
        api_connection_error: Optional[APIConnectionError],
        num_retries: int,
        max_network_retries: Optional[int],
    ):
        max_network_retries = (
            max_network_retries if max_network_retries is not None else 0
        )
        if num_retries >= max_network_retries:
            return False

        if response is None:
            # We generally want to retry on timeout and connection
            # exceptions, but defer this decision to underlying subclass
            # implementations. They should evaluate the driver-specific
            # errors worthy of retries, and set flag on the error returned.
            assert api_connection_error is not None
            return api_connection_error.should_retry

        _, status_code, rheaders = response

        # The API may ask us not to retry (eg; if doing so would be a no-op)
        # or advise us to retry (eg; in cases of lock timeouts); we defer to that.
        #
        # Note that we expect the headers object to be a CaseInsensitiveDict, as is the case with the requests library.
        if rheaders is not None and "stripe-should-retry" in rheaders:
            if rheaders["stripe-should-retry"] == "false":
                return False
            if rheaders["stripe-should-retry"] == "true":
                return True

        # Retry on conflict errors.
        if status_code == 409:
            return True

        # Retry on 500, 503, and other internal errors.
        #
        # Note that we expect the stripe-should-retry header to be false
        # in most cases when a 500 is returned, since our idempotency framework
        # would typically replay it anyway.
        if status_code >= 500:
            return True

        return False

    def _retry_after_header(
        self, response: Optional[Tuple[Any, Any, Mapping[str, str]]] = None
    ):
        if response is None:
            return None
        _, _, rheaders = response

        try:
            return int(rheaders["retry-after"])
        except (KeyError, ValueError):
            return None

    def _sleep_time_seconds(
        self,
        num_retries: int,
        response: Optional[Tuple[Any, Any, Mapping[str, str]]] = None,
    ) -> float:
        """
        Apply exponential backoff with initial_network_retry_delay on the number of num_retries so far as inputs.
        Do not allow the number to exceed `max_network_retry_delay`.
        """
        sleep_seconds = min(
            HTTPClient.INITIAL_DELAY * (2 ** (num_retries - 1)),
            HTTPClient.MAX_DELAY,
        )

        sleep_seconds = self._add_jitter_time(sleep_seconds)

        # But never sleep less than the base sleep seconds.
        sleep_seconds = max(HTTPClient.INITIAL_DELAY, sleep_seconds)

        # And never sleep less than the time the API asks us to wait, assuming it's a reasonable ask.
        retry_after = self._retry_after_header(response) or 0
        if retry_after <= HTTPClient.MAX_RETRY_AFTER:
            sleep_seconds = max(retry_after, sleep_seconds)

        return sleep_seconds

    def _add_jitter_time(self, sleep_seconds: float) -> float:
        """
        Randomize the value in `[(sleep_seconds/ 2) to (sleep_seconds)]`.
        Also separated method here to isolate randomness for tests
        """
        sleep_seconds *= 0.5 * (1 + random.uniform(0, 1))
        return sleep_seconds

    def _add_telemetry_header(
        self, headers: Mapping[str, str]
    ) -> Mapping[str, str]:
        last_request_metrics = getattr(
            self._thread_local, "last_request_metrics", None
        )
        if stripe.enable_telemetry and last_request_metrics:
            telemetry = {
                "last_request_metrics": last_request_metrics.payload()
            }
            ret = dict(headers)
            ret["X-Stripe-Client-Telemetry"] = json.dumps(telemetry)
            return ret
        return headers

    def _record_request_metrics(self, response, request_start, usage):
        _, _, rheaders = response
        if "Request-Id" in rheaders and stripe.enable_telemetry:
            request_id = rheaders["Request-Id"]
            request_duration_ms = _now_ms() - request_start
            self._thread_local.last_request_metrics = RequestMetrics(
                request_id, request_duration_ms, usage=usage
            )

    def request_with_retries(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data: Any = None,
        max_network_retries: Optional[int] = None,
        *,
        _usage: Optional[List[str]] = None,
    ) -> Tuple[str, int, Mapping[str, str]]:
        return self._request_with_retries_internal(
            method,
            url,
            headers,
            post_data,
            is_streaming=False,
            max_network_retries=max_network_retries,
            _usage=_usage,
        )

    def request_stream_with_retries(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data=None,
        max_network_retries=None,
        *,
        _usage: Optional[List[str]] = None,
    ) -> Tuple[Any, int, Mapping[str, str]]:
        return self._request_with_retries_internal(
            method,
            url,
            headers,
            post_data,
            is_streaming=True,
            max_network_retries=max_network_retries,
            _usage=_usage,
        )

    def _request_with_retries_internal(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data: Any,
        is_streaming: bool,
        max_network_retries: Optional[int],
        *,
        _usage: Optional[List[str]] = None,
    ) -> Tuple[Any, int, Mapping[str, str]]:
        headers = self._add_telemetry_header(headers)

        num_retries = 0

        while True:
            request_start = _now_ms()

            try:
                if is_streaming:
                    response = self.request_stream(
                        method, url, headers, post_data
                    )
                else:
                    response = self.request(method, url, headers, post_data)
                connection_error = None
            except APIConnectionError as e:
                connection_error = e
                response = None

            if self._should_retry(
                response, connection_error, num_retries, max_network_retries
            ):
                if connection_error:
                    _util.log_info(
                        "Encountered a retryable error %s"
                        % connection_error.user_message
                    )
                num_retries += 1
                sleep_time = self._sleep_time_seconds(num_retries, response)
                _util.log_info(
                    (
                        "Initiating retry %i for request %s %s after "
                        "sleeping %.2f seconds."
                        % (num_retries, method, url, sleep_time)
                    )
                )
                time.sleep(sleep_time)
            else:
                if response is not None:
                    self._record_request_metrics(
                        response, request_start, usage=_usage
                    )

                    return response
                else:
                    assert connection_error is not None
                    raise connection_error

    def request(
        self,
        method: str,
        url: str,
        headers: Optional[Mapping[str, str]],
        post_data: Any = None,
        *,
        _usage: Optional[List[str]] = None,
    ) -> Tuple[str, int, Mapping[str, str]]:
        raise NotImplementedError(
            "HTTPClient subclasses must implement `request`"
        )

    def request_stream(
        self,
        method: str,
        url: str,
        headers: Optional[Mapping[str, str]],
        post_data: Any = None,
        *,
        _usage: Optional[List[str]] = None,
    ) -> Tuple[Any, int, Mapping[str, str]]:
        raise NotImplementedError(
            "HTTPClient subclasses must implement `request_stream`"
        )

    def close(self):
        raise NotImplementedError(
            "HTTPClient subclasses must implement `close`"
        )

    async def request_with_retries_async(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data=None,
        max_network_retries: Optional[int] = None,
        *,
        _usage: Optional[List[str]] = None,
    ) -> Tuple[Any, int, Any]:
        return await self._request_with_retries_internal_async(
            method,
            url,
            headers,
            post_data,
            is_streaming=False,
            max_network_retries=max_network_retries,
            _usage=_usage,
        )

    async def request_stream_with_retries_async(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data=None,
        max_network_retries=None,
        *,
        _usage: Optional[List[str]] = None,
    ) -> Tuple[AsyncIterable[bytes], int, Any]:
        return await self._request_with_retries_internal_async(
            method,
            url,
            headers,
            post_data,
            is_streaming=True,
            max_network_retries=max_network_retries,
            _usage=_usage,
        )

    @overload
    async def _request_with_retries_internal_async(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming: Literal[False],
        max_network_retries: Optional[int],
        *,
        _usage: Optional[List[str]] = None,
    ) -> Tuple[Any, int, Mapping[str, str]]: ...

    @overload
    async def _request_with_retries_internal_async(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming: Literal[True],
        max_network_retries: Optional[int],
        *,
        _usage: Optional[List[str]] = None,
    ) -> Tuple[AsyncIterable[bytes], int, Mapping[str, str]]: ...

    async def _request_with_retries_internal_async(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming: bool,
        max_network_retries: Optional[int],
        *,
        _usage: Optional[List[str]] = None,
    ) -> Tuple[Any, int, Mapping[str, str]]:
        headers = self._add_telemetry_header(headers)

        num_retries = 0

        while True:
            request_start = _now_ms()

            try:
                if is_streaming:
                    response = await self.request_stream_async(
                        method, url, headers, post_data
                    )
                else:
                    response = await self.request_async(
                        method, url, headers, post_data
                    )
                connection_error = None
            except APIConnectionError as e:
                connection_error = e
                response = None

            if self._should_retry(
                response, connection_error, num_retries, max_network_retries
            ):
                if connection_error:
                    _util.log_info(
                        "Encountered a retryable error %s"
                        % connection_error.user_message
                    )
                num_retries += 1
                sleep_time = self._sleep_time_seconds(num_retries, response)
                _util.log_info(
                    (
                        "Initiating retry %i for request %s %s after "
                        "sleeping %.2f seconds."
                        % (num_retries, method, url, sleep_time)
                    )
                )
                await self.sleep_async(sleep_time)
            else:
                if response is not None:
                    self._record_request_metrics(
                        response, request_start, usage=_usage
                    )

                    return response
                else:
                    assert connection_error is not None
                    raise connection_error

    async def request_async(
        self, method: str, url: str, headers: Mapping[str, str], post_data=None
    ) -> Tuple[bytes, int, Mapping[str, str]]:
        if self._async_fallback_client is not None:
            return await self._async_fallback_client.request_async(
                method, url, headers, post_data
            )
        raise NotImplementedError(
            "HTTPClient subclasses must implement `request_async`"
        )

    async def request_stream_async(
        self, method: str, url: str, headers: Mapping[str, str], post_data=None
    ) -> Tuple[AsyncIterable[bytes], int, Mapping[str, str]]:
        if self._async_fallback_client is not None:
            return await self._async_fallback_client.request_stream_async(
                method, url, headers, post_data
            )
        raise NotImplementedError(
            "HTTPClient subclasses must implement `request_stream_async`"
        )

    async def close_async(self):
        if self._async_fallback_client is not None:
            return await self._async_fallback_client.close_async()
        raise NotImplementedError(
            "HTTPClient subclasses must implement `close_async`"
        )

    def sleep_async(self, secs: float) -> Awaitable[None]:
        if self._async_fallback_client is not None:
            return self._async_fallback_client.sleep_async(secs)
        raise NotImplementedError(
            "HTTPClient subclasses must implement `sleep`"
        )


class RequestsClient(HTTPClient):
    name = "requests"

    def __init__(
        self,
        timeout: int = 80,
        session: Optional["RequestsSession"] = None,
        verify_ssl_certs: bool = True,
        proxy: Optional[Union[str, HTTPClient._Proxy]] = None,
        async_fallback_client: Optional[HTTPClient] = None,
        **kwargs,
    ):
        super(RequestsClient, self).__init__(
            verify_ssl_certs=verify_ssl_certs,
            proxy=proxy,
            async_fallback_client=async_fallback_client,
        )
        self._session = session
        self._timeout = timeout

        assert requests is not None
        self.requests = requests

    def request(
        self,
        method: str,
        url: str,
        headers: Optional[Mapping[str, str]],
        post_data=None,
    ) -> Tuple[bytes, int, Mapping[str, str]]:
        return self._request_internal(
            method, url, headers, post_data, is_streaming=False
        )

    def request_stream(
        self,
        method: str,
        url: str,
        headers: Optional[Mapping[str, str]],
        post_data=None,
    ) -> Tuple[Any, int, Mapping[str, str]]:
        return self._request_internal(
            method, url, headers, post_data, is_streaming=True
        )

    @overload
    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Optional[Mapping[str, str]],
        post_data,
        is_streaming: Literal[True],
    ) -> Tuple[Any, int, Mapping[str, str]]: ...

    @overload
    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Optional[Mapping[str, str]],
        post_data,
        is_streaming: Literal[False],
    ) -> Tuple[bytes, int, Mapping[str, str]]: ...

    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Optional[Mapping[str, str]],
        post_data,
        is_streaming: bool,
    ) -> Tuple[Union[bytes, Any], int, Mapping[str, str]]:
        kwargs = {}
        if self._verify_ssl_certs:
            kwargs["verify"] = stripe.ca_bundle_path
        else:
            kwargs["verify"] = False

        if self._proxy:
            kwargs["proxies"] = self._proxy

        if is_streaming:
            kwargs["stream"] = True

        if getattr(self._thread_local, "session", None) is None:
            self._thread_local.session = (
                self._session or self.requests.Session()
            )

        try:
            try:
                result = cast(
                    "RequestsSession", self._thread_local.session
                ).request(
                    method,
                    url,
                    headers=headers,
                    data=post_data,
                    timeout=self._timeout,
                    **kwargs,
                )
            except TypeError as e:
                raise TypeError(
                    "Warning: It looks like your installed version of the "
                    '"requests" library is not compatible with Stripe\'s '
                    "usage thereof. (HINT: The most likely cause is that "
                    'your "requests" library is out of date. You can fix '
                    'that by running "pip install -U requests".) The '
                    "underlying error was: %s" % (e,)
                )

            if is_streaming:
                content = result.raw
            else:
                # This causes the content to actually be read, which could cause
                # e.g. a socket timeout. TODO: The other fetch methods probably
                # are susceptible to the same and should be updated.
                content = result.content

            status_code = result.status_code
        except Exception as e:
            # Would catch just requests.exceptions.RequestException, but can
            # also raise ValueError, RuntimeError, etc.
            self._handle_request_error(e)

        return content, status_code, result.headers

    def _handle_request_error(self, e: Exception) -> NoReturn:
        # Catch SSL error first as it belongs to ConnectionError,
        # but we don't want to retry
        if isinstance(e, self.requests.exceptions.SSLError):
            msg = (
                "Could not verify Stripe's SSL certificate.  Please make "
                "sure that your network is not intercepting certificates.  "
                "If this problem persists, let us know at "
                "support@stripe.com."
            )
            err = "%s: %s" % (type(e).__name__, str(e))
            should_retry = False
        # Retry only timeout and connect errors; similar to urllib3 Retry
        elif isinstance(
            e,
            (
                self.requests.exceptions.Timeout,
                self.requests.exceptions.ConnectionError,
            ),
        ):
            msg = (
                "Unexpected error communicating with Stripe.  "
                "If this problem persists, let us know at "
                "support@stripe.com."
            )
            err = "%s: %s" % (type(e).__name__, str(e))
            should_retry = True
        # Catch remaining request exceptions
        elif isinstance(e, self.requests.exceptions.RequestException):
            msg = (
                "Unexpected error communicating with Stripe.  "
                "If this problem persists, let us know at "
                "support@stripe.com."
            )
            err = "%s: %s" % (type(e).__name__, str(e))
            should_retry = False
        else:
            msg = (
                "Unexpected error communicating with Stripe. "
                "It looks like there's probably a configuration "
                "issue locally.  If this problem persists, let us "
                "know at support@stripe.com."
            )
            err = "A %s was raised" % (type(e).__name__,)
            if str(e):
                err += " with error message %s" % (str(e),)
            else:
                err += " with no error message"
            should_retry = False

        msg = textwrap.fill(msg) + "\n\n(Network error: %s)" % (err,)
        raise APIConnectionError(msg, should_retry=should_retry) from e

    def close(self):
        if getattr(self._thread_local, "session", None) is not None:
            self._thread_local.session.close()


class UrlFetchClient(HTTPClient):
    name = "urlfetch"

    def __init__(
        self,
        verify_ssl_certs: bool = True,
        proxy: Optional[HTTPClient._Proxy] = None,
        deadline: int = 55,
        async_fallback_client: Optional[HTTPClient] = None,
    ):
        super(UrlFetchClient, self).__init__(
            verify_ssl_certs=verify_ssl_certs,
            proxy=proxy,
            async_fallback_client=async_fallback_client,
        )

        # no proxy support in urlfetch. for a patch, see:
        # https://code.google.com/p/googleappengine/issues/detail?id=544
        if proxy:
            raise ValueError(
                "No proxy support in urlfetch library. "
                "Set stripe.default_http_client to either RequestsClient, "
                "PycurlClient, or Urllib2Client instance to use a proxy."
            )

        self._verify_ssl_certs = verify_ssl_certs
        # GAE requests time out after 60 seconds, so make sure to default
        # to 55 seconds to allow for a slow Stripe
        self._deadline = deadline

        assert urlfetch is not None
        self.urlfetch = urlfetch

    def request(
        self, method: str, url: str, headers: Mapping[str, str], post_data=None
    ) -> Tuple[str, int, Mapping[str, str]]:
        return self._request_internal(
            method, url, headers, post_data, is_streaming=False
        )

    def request_stream(
        self, method: str, url: str, headers: Mapping[str, str], post_data=None
    ) -> Tuple[BytesIO, int, Mapping[str, str]]:
        return self._request_internal(
            method, url, headers, post_data, is_streaming=True
        )

    @overload
    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming: Literal[True],
    ) -> Tuple[BytesIO, int, Any]: ...

    @overload
    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming: Literal[False],
    ) -> Tuple[str, int, Any]: ...

    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming,
    ):
        try:
            result = self.urlfetch.fetch(
                url=url,
                method=method,
                headers=headers,
                # Google App Engine doesn't let us specify our own cert bundle.
                # However, that's ok because the CA bundle they use recognizes
                # api.stripe.com.
                validate_certificate=self._verify_ssl_certs,
                deadline=self._deadline,
                payload=post_data,
            )
        except self.urlfetch.Error as e:
            self._handle_request_error(e, url)

        if is_streaming:
            # This doesn't really stream.
            content = _util.io.BytesIO(str.encode(result.content))
        else:
            content = result.content

        return content, result.status_code, result.headers

    def _handle_request_error(self, e: Exception, url: str) -> NoReturn:
        if isinstance(e, self.urlfetch.InvalidURLError):
            msg = (
                "The Stripe library attempted to fetch an "
                "invalid URL (%r). This is likely due to a bug "
                "in the Stripe Python bindings. Please let us know "
                "at support@stripe.com." % (url,)
            )
        elif isinstance(e, self.urlfetch.DownloadError):
            msg = "There was a problem retrieving data from Stripe."
        elif isinstance(e, self.urlfetch.ResponseTooLargeError):
            msg = (
                "There was a problem receiving all of your data from "
                "Stripe.  This is likely due to a bug in Stripe. "
                "Please let us know at support@stripe.com."
            )
        else:
            msg = (
                "Unexpected error communicating with Stripe. If this "
                "problem persists, let us know at support@stripe.com."
            )

        msg = textwrap.fill(msg) + "\n\n(Network error: " + str(e) + ")"
        raise APIConnectionError(msg) from e

    def close(self):
        pass


class _Proxy(TypedDict):
    http: Optional[ParseResult]
    https: Optional[ParseResult]


class PycurlClient(HTTPClient):
    class _ParsedProxy(TypedDict, total=False):
        http: Optional[ParseResult]
        https: Optional[ParseResult]

    name = "pycurl"
    _parsed_proxy: Optional[_ParsedProxy]

    def __init__(
        self,
        verify_ssl_certs: bool = True,
        proxy: Optional[HTTPClient._Proxy] = None,
        async_fallback_client: Optional[HTTPClient] = None,
    ):
        super(PycurlClient, self).__init__(
            verify_ssl_certs=verify_ssl_certs,
            proxy=proxy,
            async_fallback_client=async_fallback_client,
        )

        assert pycurl is not None
        self.pycurl = pycurl
        # Initialize this within the object so that we can reuse connections.
        self._curl = pycurl.Curl()

        self._parsed_proxy = {}
        # need to urlparse the proxy, since PyCurl
        # consumes the proxy url in small pieces
        if self._proxy:
            proxy_ = self._proxy
            for scheme, value in proxy_.items():
                # In general, TypedDict.items() gives you (key: str, value: object)
                # but we know value to be a string because all the value types on Proxy_ are strings.
                self._parsed_proxy[scheme] = urlparse(cast(str, value))

    def parse_headers(self, data):
        if "\r\n" not in data:
            return {}
        raw_headers = data.split("\r\n", 1)[1]
        headers = email.message_from_string(raw_headers)
        return dict((k.lower(), v) for k, v in dict(headers).items())

    def request(
        self, method, url, headers: Mapping[str, str], post_data=None
    ) -> Tuple[str, int, Mapping[str, str]]:
        return self._request_internal(
            method, url, headers, post_data, is_streaming=False
        )

    def request_stream(
        self, method, url, headers: Mapping[str, str], post_data=None
    ) -> Tuple[BytesIO, int, Mapping[str, str]]:
        return self._request_internal(
            method, url, headers, post_data, is_streaming=True
        )

    @overload
    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming: Literal[True],
    ) -> Tuple[BytesIO, int, Any]: ...

    @overload
    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming: Literal[False],
    ) -> Tuple[str, int, Mapping[str, str]]: ...

    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming,
    ) -> Tuple[Union[str, BytesIO], int, Mapping[str, str]]:
        b = _util.io.BytesIO()
        rheaders = _util.io.BytesIO()

        # Pycurl's design is a little weird: although we set per-request
        # options on this object, it's also capable of maintaining established
        # connections. Here we call reset() between uses to make sure it's in a
        # pristine state, but notably reset() doesn't reset connections, so we
        # still get to take advantage of those by virtue of re-using the same
        # object.
        self._curl.reset()

        proxy = self._get_proxy(url)
        if proxy:
            if proxy.hostname:
                self._curl.setopt(self.pycurl.PROXY, proxy.hostname)
            if proxy.port:
                self._curl.setopt(self.pycurl.PROXYPORT, proxy.port)
            if proxy.username or proxy.password:
                self._curl.setopt(
                    self.pycurl.PROXYUSERPWD,
                    "%s:%s" % (proxy.username, proxy.password),
                )

        if method == "get":
            self._curl.setopt(self.pycurl.HTTPGET, 1)
        elif method == "post":
            self._curl.setopt(self.pycurl.POST, 1)
            self._curl.setopt(self.pycurl.POSTFIELDS, post_data)
        else:
            self._curl.setopt(self.pycurl.CUSTOMREQUEST, method.upper())

        # pycurl doesn't like unicode URLs
        self._curl.setopt(self.pycurl.URL, url)

        self._curl.setopt(self.pycurl.WRITEFUNCTION, b.write)
        self._curl.setopt(self.pycurl.HEADERFUNCTION, rheaders.write)
        self._curl.setopt(self.pycurl.NOSIGNAL, 1)
        self._curl.setopt(self.pycurl.CONNECTTIMEOUT, 30)
        self._curl.setopt(self.pycurl.TIMEOUT, 80)
        self._curl.setopt(
            self.pycurl.HTTPHEADER,
            ["%s: %s" % (k, v) for k, v in dict(headers).items()],
        )
        if self._verify_ssl_certs:
            self._curl.setopt(self.pycurl.CAINFO, stripe.ca_bundle_path)
        else:
            self._curl.setopt(self.pycurl.SSL_VERIFYHOST, False)

        try:
            self._curl.perform()
        except self.pycurl.error as e:
            self._handle_request_error(e)

        if is_streaming:
            b.seek(0)
            rcontent = b
        else:
            rcontent = b.getvalue().decode("utf-8")

        rcode = self._curl.getinfo(self.pycurl.RESPONSE_CODE)
        headers = self.parse_headers(rheaders.getvalue().decode("utf-8"))

        return rcontent, rcode, headers

    def _handle_request_error(self, e: Exception) -> NoReturn:
        if e.args[0] in [
            self.pycurl.E_COULDNT_CONNECT,
            self.pycurl.E_COULDNT_RESOLVE_HOST,
            self.pycurl.E_OPERATION_TIMEOUTED,
        ]:
            msg = (
                "Could not connect to Stripe.  Please check your "
                "internet connection and try again.  If this problem "
                "persists, you should check Stripe's service status at "
                "https://twitter.com/stripestatus, or let us know at "
                "support@stripe.com."
            )
            should_retry = True
        elif e.args[0] in [
            self.pycurl.E_SSL_CACERT,
            self.pycurl.E_SSL_PEER_CERTIFICATE,
        ]:
            msg = (
                "Could not verify Stripe's SSL certificate.  Please make "
                "sure that your network is not intercepting certificates.  "
                "If this problem persists, let us know at "
                "support@stripe.com."
            )
            should_retry = False
        else:
            msg = (
                "Unexpected error communicating with Stripe. If this "
                "problem persists, let us know at support@stripe.com."
            )
            should_retry = False

        msg = textwrap.fill(msg) + "\n\n(Network error: " + e.args[1] + ")"
        raise APIConnectionError(msg, should_retry=should_retry) from e

    def _get_proxy(self, url) -> Optional[ParseResult]:
        if self._parsed_proxy:
            proxy = self._parsed_proxy
            scheme = url.split(":")[0] if url else None
            if scheme:
                return proxy.get(scheme, proxy.get(scheme[0:-1]))
        return None

    def close(self):
        pass


class Urllib2Client(HTTPClient):
    name = "urllib.request"

    def __init__(
        self,
        verify_ssl_certs: bool = True,
        proxy: Optional[HTTPClient._Proxy] = None,
        async_fallback_client: Optional[HTTPClient] = None,
    ):
        super(Urllib2Client, self).__init__(
            verify_ssl_certs=verify_ssl_certs,
            proxy=proxy,
            async_fallback_client=async_fallback_client,
        )
        # prepare and cache proxy tied opener here
        self._opener = None
        if self._proxy:
            # We have to cast _Proxy to Dict[str, str] because pyright is not smart enough to
            # realize that all the value types are str.
            proxy_handler = urllibrequest.ProxyHandler(
                cast(Dict[str, str], self._proxy)
            )
            self._opener = urllibrequest.build_opener(proxy_handler)

    def request(
        self, method: str, url: str, headers: Mapping[str, str], post_data=None
    ) -> Tuple[str, int, Mapping[str, str]]:
        return self._request_internal(
            method, url, headers, post_data, is_streaming=False
        )

    def request_stream(
        self, method: str, url: str, headers: Mapping[str, str], post_data=None
    ) -> Tuple[HTTPResponse, int, Mapping[str, str]]:
        return self._request_internal(
            method, url, headers, post_data, is_streaming=True
        )

    @overload
    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming: Literal[False],
    ) -> Tuple[str, int, Any]: ...

    @overload
    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming: Literal[True],
    ) -> Tuple[HTTPResponse, int, Any]: ...

    def _request_internal(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data,
        is_streaming,
    ):
        if isinstance(post_data, str):
            post_data = post_data.encode("utf-8")

        req = urllibrequest.Request(
            url, post_data, cast(MutableMapping[str, str], headers)
        )

        if method not in ("get", "post"):
            req.get_method = lambda: method.upper()

        try:
            # use the custom proxy tied opener, if any.
            # otherwise, fall to the default urllib opener.
            response = (
                self._opener.open(req)
                if self._opener
                else urllibrequest.urlopen(req)
            )

            if is_streaming:
                rcontent = response
            else:
                rcontent = response.read()

            rcode = response.code
            headers = dict(response.info())
        except urlliberror.HTTPError as e:
            rcode = e.code
            rcontent = e.read()
            headers = dict(e.info())
        except (urlliberror.URLError, ValueError) as e:
            self._handle_request_error(e)
        lh = dict((k.lower(), v) for k, v in iter(dict(headers).items()))
        return rcontent, rcode, lh

    def _handle_request_error(self, e: Exception) -> NoReturn:
        msg = (
            "Unexpected error communicating with Stripe. "
            "If this problem persists, let us know at support@stripe.com."
        )
        msg = textwrap.fill(msg) + "\n\n(Network error: " + str(e) + ")"
        raise APIConnectionError(msg) from e

    def close(self):
        pass


class HTTPXClient(HTTPClient):
    name = "httpx"

    _client: Optional["HTTPXClientType"]

    def __init__(
        self,
        timeout: Optional[Union[float, "HTTPXTimeout"]] = 80,
        allow_sync_methods=False,
        **kwargs,
    ):
        super(HTTPXClient, self).__init__(**kwargs)

        if httpx is None:
            raise ImportError(
                "Unexpected: tried to initialize HTTPXClient but the httpx module is not present."
            )

        if anyio is None:
            raise ImportError(
                "Unexpected: tried to initialize HTTPXClient but the anyio module is not present."
            )

        self.httpx = httpx
        self.anyio = anyio

        kwargs = {}
        if self._verify_ssl_certs:
            kwargs["verify"] = ssl.create_default_context(
                cafile=stripe.ca_bundle_path
            )
        else:
            kwargs["verify"] = False

        self._client_async = httpx.AsyncClient(**kwargs)
        self._client = None
        if allow_sync_methods:
            self._client = httpx.Client(**kwargs)
        self._timeout = timeout

    def sleep_async(self, secs):
        return self.anyio.sleep(secs)

    def _get_request_args_kwargs(
        self, method: str, url: str, headers: Mapping[str, str], post_data
    ):
        kwargs = {}

        if self._proxy:
            kwargs["proxies"] = self._proxy

        if self._timeout:
            kwargs["timeout"] = self._timeout
        return [
            (method, url),
            {"headers": headers, "data": post_data or {}, **kwargs},
        ]

    def request(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data=None,
    ) -> Tuple[bytes, int, Mapping[str, str]]:
        if self._client is None:
            raise RuntimeError(
                "Stripe: HTTPXClient was initialized with allow_sync_methods=False, "
                "so it cannot be used for synchronous requests."
            )
        args, kwargs = self._get_request_args_kwargs(
            method, url, headers, post_data
        )
        try:
            response = self._client.request(*args, **kwargs)
        except Exception as e:
            self._handle_request_error(e)

        content = response.content
        status_code = response.status_code
        response_headers = response.headers
        return content, status_code, response_headers

    async def request_async(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data=None,
    ) -> Tuple[bytes, int, Mapping[str, str]]:
        args, kwargs = self._get_request_args_kwargs(
            method, url, headers, post_data
        )
        try:
            response = await self._client_async.request(*args, **kwargs)
        except Exception as e:
            self._handle_request_error(e)

        content = response.content
        status_code = response.status_code
        response_headers = response.headers
        return content, status_code, response_headers

    def _handle_request_error(self, e: Exception) -> NoReturn:
        msg = (
            "Unexpected error communicating with Stripe. If this "
            "problem persists, let us know at support@stripe.com."
        )
        err = "A %s was raised" % (type(e).__name__,)
        should_retry = True

        msg = textwrap.fill(msg) + "\n\n(Network error: %s)" % (err,)
        raise APIConnectionError(msg, should_retry=should_retry) from e

    def request_stream(
        self, method: str, url: str, headers: Mapping[str, str], post_data=None
    ) -> Tuple[Iterable[bytes], int, Mapping[str, str]]:
        if self._client is None:
            raise RuntimeError(
                "Stripe: HTTPXClient was not initialized with allow_sync_methods=True, "
                "so it cannot be used for synchronous requests."
            )
        args, kwargs = self._get_request_args_kwargs(
            method, url, headers, post_data
        )
        try:
            response = self._client.send(
                request=self._client_async.build_request(*args, **kwargs),
                stream=True,
            )
        except Exception as e:
            self._handle_request_error(e)
        content = response.iter_bytes()
        status_code = response.status_code
        headers = response.headers

        return content, status_code, headers

    async def request_stream_async(
        self, method: str, url: str, headers: Mapping[str, str], post_data=None
    ) -> Tuple[AsyncIterable[bytes], int, Mapping[str, str]]:
        args, kwargs = self._get_request_args_kwargs(
            method, url, headers, post_data
        )
        try:
            response = await self._client_async.send(
                request=self._client_async.build_request(*args, **kwargs),
                stream=True,
            )
        except Exception as e:
            self._handle_request_error(e)
        content = response.aiter_bytes()
        status_code = response.status_code
        headers = response.headers

        return content, status_code, headers

    def close(self):
        if self._client is not None:
            self._client.close()

    async def close_async(self):
        await self._client_async.aclose()


class AIOHTTPClient(HTTPClient):
    name = "aiohttp"

    def __init__(
        self, timeout: Optional[Union[float, "AIOHTTPTimeout"]] = 80, **kwargs
    ):
        super(AIOHTTPClient, self).__init__(**kwargs)

        if aiohttp is None:
            raise ImportError(
                "Unexpected: tried to initialize AIOHTTPClient but the aiohttp module is not present."
            )

        self._timeout = timeout
        self._cached_session = None

    @property
    def _session(self):
        assert aiohttp is not None

        if self._cached_session is None:
            kwargs = {}
            if self._verify_ssl_certs:
                ssl_context = ssl.create_default_context(
                    cafile=stripe.ca_bundle_path
                )
                kwargs["connector"] = aiohttp.TCPConnector(ssl=ssl_context)
            else:
                kwargs["connector"] = aiohttp.TCPConnector(verify_ssl=False)

            self._cached_session = aiohttp.ClientSession(**kwargs)

        return self._cached_session

    def sleep_async(self, secs):
        return asyncio.sleep(secs)

    def request(self) -> Tuple[bytes, int, Mapping[str, str]]:
        raise NotImplementedError(
            "AIOHTTPClient does not support synchronous requests."
        )

    def _get_request_args_kwargs(
        self, method: str, url: str, headers: Mapping[str, str], post_data
    ):
        args = (method, url)
        kwargs = {}
        if self._proxy:
            if self._proxy["http"] != self._proxy["https"]:
                raise ValueError(
                    "AIOHTTPClient does not support different proxies for HTTP and HTTPS."
                )
            kwargs["proxy"] = self._proxy["https"]
        if self._timeout:
            kwargs["timeout"] = self._timeout

        kwargs["headers"] = headers
        kwargs["data"] = post_data
        return args, kwargs

    async def request_async(
        self,
        method: str,
        url: str,
        headers: Mapping[str, str],
        post_data=None,
    ) -> Tuple[bytes, int, Mapping[str, str]]:
        (
            content,
            status_code,
            response_headers,
        ) = await self.request_stream_async(
            method, url, headers, post_data=post_data
        )

        return (await content.read()), status_code, response_headers

    def _handle_request_error(self, e: Exception) -> NoReturn:
        msg = (
            "Unexpected error communicating with Stripe. If this "
            "problem persists, let us know at support@stripe.com."
        )
        err = "A %s was raised" % (type(e).__name__,)
        should_retry = True

        msg = textwrap.fill(msg) + "\n\n(Network error: %s)" % (err,)
        raise APIConnectionError(msg, should_retry=should_retry) from e

    def request_stream(self) -> Tuple[Iterable[bytes], int, Mapping[str, str]]:
        raise NotImplementedError(
            "AIOHTTPClient does not support synchronous requests."
        )

    async def request_stream_async(
        self, method: str, url: str, headers: Mapping[str, str], post_data=None
    ) -> Tuple["AIOHTTPStreamReader", int, Mapping[str, str]]:
        args, kwargs = self._get_request_args_kwargs(
            method, url, headers, post_data
        )
        try:
            response = await self._session.request(*args, **kwargs)
        except Exception as e:
            self._handle_request_error(e)

        content = response.content
        status_code = response.status
        response_headers = response.headers
        return content, status_code, response_headers

    def close(self):
        pass

    async def close_async(self):
        await self._session.close()


class NoImportFoundAsyncClient(HTTPClient):
    def __init__(self, **kwargs):
        super(NoImportFoundAsyncClient, self).__init__(**kwargs)

    @staticmethod
    def raise_async_client_import_error() -> Never:
        raise ImportError(
            (
                "Import httpx not found. To make async http requests,"
                "You must either install httpx or define your own"
                "async http client by subclassing stripe.HTTPClient"
                "and setting stripe.default_http_client to an instance of it."
            )
        )

    async def request_async(
        self, method: str, url: str, headers: Mapping[str, str], post_data=None
    ) -> Tuple[bytes, int, Mapping[str, str]]:
        self.raise_async_client_import_error()

    async def request_stream_async(
        self, method: str, url: str, headers: Mapping[str, str], post_data=None
    ):
        self.raise_async_client_import_error()

    async def close_async(self):
        self.raise_async_client_import_error()
