ROOTPLOIT
Server: Apache
System: Linux node6122.myfcloud.com 6.14.3-x86_64-linode168 #1 SMP PREEMPT_DYNAMIC Mon Apr 21 19:47:55 EDT 2025 x86_64
User: bashacomputer (1004)
PHP: 7.4.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //proc/thread-self/root/lib/python3.6/site-packages/httpie/uploads.py
import zlib
from typing import Callable, IO, Iterable, Tuple, Union
from urllib.parse import urlencode

import requests
from requests.utils import super_len
from requests_toolbelt import MultipartEncoder

from .cli.dicts import MultipartRequestDataDict, RequestDataDict


class ChunkedUploadStream:
    def __init__(self, stream: Iterable, callback: Callable):
        self.callback = callback
        self.stream = stream

    def __iter__(self) -> Iterable[Union[str, bytes]]:
        for chunk in self.stream:
            self.callback(chunk)
            yield chunk


class ChunkedMultipartUploadStream:
    chunk_size = 100 * 1024

    def __init__(self, encoder: MultipartEncoder):
        self.encoder = encoder

    def __iter__(self) -> Iterable[Union[str, bytes]]:
        while True:
            chunk = self.encoder.read(self.chunk_size)
            if not chunk:
                break
            yield chunk


def prepare_request_body(
    body: Union[str, bytes, IO, MultipartEncoder, RequestDataDict],
    body_read_callback: Callable[[bytes], bytes],
    content_length_header_value: int = None,
    chunked=False,
    offline=False,
) -> Union[str, bytes, IO, MultipartEncoder, ChunkedUploadStream]:

    is_file_like = hasattr(body, 'read')

    if isinstance(body, RequestDataDict):
        body = urlencode(body, doseq=True)

    if offline:
        if is_file_like:
            return body.read()
        return body

    if not is_file_like:
        if chunked:
            body = ChunkedUploadStream(
                # Pass the entire body as one chunk.
                stream=(chunk.encode() for chunk in [body]),
                callback=body_read_callback,
            )
    else:
        # File-like object.

        if not super_len(body):
            # Zero-length -> assume stdin.
            if content_length_header_value is None and not chunked:
                #
                # Read the whole stdin to determine `Content-Length`.
                #
                # TODO: Instead of opt-in --chunked, consider making
                #   `Transfer-Encoding: chunked` for STDIN opt-out via
                #   something like --no-chunked.
                #   This would be backwards-incompatible so wait until v3.0.0.
                #
                body = body.read()
        else:
            orig_read = body.read

            def new_read(*args):
                chunk = orig_read(*args)
                body_read_callback(chunk)
                return chunk

            body.read = new_read

        if chunked:
            if isinstance(body, MultipartEncoder):
                body = ChunkedMultipartUploadStream(
                    encoder=body,
                )
            else:
                body = ChunkedUploadStream(
                    stream=body,
                    callback=body_read_callback,
                )

    return body


def get_multipart_data_and_content_type(
    data: MultipartRequestDataDict,
    boundary: str = None,
    content_type: str = None,
) -> Tuple[MultipartEncoder, str]:
    encoder = MultipartEncoder(
        fields=data.items(),
        boundary=boundary,
    )
    if content_type:
        content_type = content_type.strip()
        if 'boundary=' not in content_type:
            content_type = f'{content_type}; boundary={encoder.boundary_value}'
    else:
        content_type = encoder.content_type

    data = encoder
    return data, content_type


def compress_request(
    request: requests.PreparedRequest,
    always: bool,
):
    deflater = zlib.compressobj()
    if isinstance(request.body, str):
        body_bytes = request.body.encode()
    elif hasattr(request.body, 'read'):
        body_bytes = request.body.read()
    else:
        body_bytes = request.body
    deflated_data = deflater.compress(body_bytes)
    deflated_data += deflater.flush()
    is_economical = len(deflated_data) < len(body_bytes)
    if is_economical or always:
        request.body = deflated_data
        request.headers['Content-Encoding'] = 'deflate'
        request.headers['Content-Length'] = str(len(deflated_data))