:py:mod:`storage` ================= .. py:module:: storage .. autoapi-nested-parse:: This library implements various methods for working with the Google Storage APIs. Installation ------------ .. code-block:: console $ pip install --upgrade gcloud-aio-storage Usage ----- To upload a file, you might do something like the following: .. code-block:: python import aiofiles import aiohttp from gcloud.aio.storage import Storage async with aiohttp.ClientSession() as session: client = Storage(session=session) async with aiofiles.open('/path/to/my/file', mode="r") as f: output = await f.read() status = await client.upload( 'my-bucket-name', 'path/to/gcs/folder', output, ) print(status) Note that there are multiple ways to accomplish the above, ie,. by making use of the ``Bucket`` and ``Blob`` convenience classes if that better fits your use-case. Of course, the major benefit of using an async library is being able to parallelize operations like this. Since ``gcloud-aio-storage`` is fully asyncio-compatible, you can use any of the builtin asyncio method to perform more complicated operations: .. code-block:: python my_files = { '/local/path/to/file.1': 'path/in/gcs.1', '/local/path/to/file.2': 'path/in/gcs.2', '/local/path/to/file.3': 'different/gcs/path/filename.3', } async with Storage() as client: # Prepare all our upload data uploads = [] for local_name, gcs_name in my_files.items(): async with aiofiles.open(local_name, mode="r") as f: contents = await f.read() uploads.append((gcs_name, contents)) # Simultaneously upload all files await asyncio.gather( *[ client.upload('my-bucket-name', path, file_) for path, file_ in uploads ] ) You can also refer to the `smoke test`_ for more info and examples. Note that you can also let ``gcloud-aio-storage`` do its own session management, so long as you give us a hint when to close that session: .. code-block:: python async with Storage() as client: # closes the client.session on leaving the context manager # OR client = Storage() # do stuff await client.close() # close the session explicitly File Encodings -------------- In some cases, ``aiohttp`` needs to transform the objects returned from GCS into strings, eg. for debug logging and other such issues. The built-in ``await response.text()`` operation relies on `chardet`_ for guessing the character encoding in any cases where it can not be determined based on the file metadata. Unfortunately, this operation can be extremely slow, especially in cases where you might be working with particularly large files. If you notice odd latency issues when reading your results, you may want to set your character encoding more explicitly within GCS, eg. by ensuring you set the ``contentType`` of the relevant objects to something suffixed with ``; charset=utf-8``. For example, in the case of ``contentType='application/x-netcdf'`` files exhibiting latency, you could instead set ``contentType='application/x-netcdf; charset=utf-8``. See `Issue #172`_ for more info! Emulators --------- For testing purposes, you may want to use ``gcloud-aio-storage`` along with a local GCS emulator. Setting the ``$STORAGE_EMULATOR_HOST`` environment variable to the address of your emulator should be enough to do the trick. For example, using `fsouza/fake-gcs-server`_, you can do: .. code-block:: console docker run -d -p 4443:4443 -v $PWD/my-sample-data:/data fsouza/fake-gcs-server export STORAGE_EMULATOR_HOST='http://0.0.0.0:4443' Any ``gcloud-aio-storage`` requests made with that environment variable set will query ``fake-gcs-server`` instead of the official GCS API. Note that some emulation systems require disabling SSL -- if you're using a custom http session, you may need to disable SSL verification. Customization ------------- This library mostly tries to stay agnostic of potential use-cases; as such, we do not implement any sort of retrying or other policies under the assumption that we wouldn't get things right for every user's situation. As such, we recommend configuring your own policies on an as-needed basis. The `backoff`_ library can make this quite straightforward! For example, you may find it useful to configure something like: .. code-block:: python class StorageWithBackoff(gcloud.aio.storage.Storage): @backoff.on_exception(backoff.expo, aiohttp.ClientResponseError, max_tries=5, jitter=backoff.full_jitter) async def copy(self, *args: Any, **kwargs: Any): return await super().copy(*args, **kwargs) @backoff.on_exception(backoff.expo, aiohttp.ClientResponseError, max_tries=10, jitter=backoff.full_jitter) async def download(self, *args: Any, **kwargs: Any): return await super().download(*args, **kwargs) .. _Issue #172: https://github.com/talkiq/gcloud-aio/issues/172 .. _backoff: https://pypi.org/project/backoff/ .. _chardet: https://pypi.org/project/chardet/ .. _fsouza/fake-gcs-server: https://github.com/fsouza/fake-gcs-server .. _smoke test: https://github.com/talkiq/gcloud-aio/blob/master/storage/tests/integration/smoke_test.py Submodules ---------- .. toctree:: :titlesonly: :maxdepth: 1 blob/index.rst bucket/index.rst constants/index.rst storage/index.rst Package Contents ---------------- Classes ~~~~~~~ .. autoapisummary:: storage.Blob storage.Bucket storage.Storage storage.StreamResponse Attributes ~~~~~~~~~~ .. autoapisummary:: storage.SCOPES storage.__version__ .. py:class:: Blob(bucket, name, metadata) .. py:property:: chunk_size :type: int .. py:method:: download(timeout = DEFAULT_TIMEOUT, session = None, auto_decompress = True) :async: .. py:method:: upload(data, content_type = None, session = None) :async: .. py:method:: get_signed_url(expiration, headers = None, query_params = None, http_method = 'GET', iam_client = None, service_account_email = None, token = None, session = None) :async: Create a temporary access URL for Storage Blob accessible by anyone with the link. Adapted from Google Documentation: https://cloud.google.com/storage/docs/access-control/signing-urls-manually#python-sample .. py:method:: get_pem_signature(str_to_sign, private_key) :staticmethod: .. py:method:: get_iam_api_signature(str_to_sign, iam_client, service_account_email, session) :staticmethod: :async: .. py:class:: Bucket(storage, name) .. py:method:: get_blob(blob_name, timeout = DEFAULT_TIMEOUT, session = None) :async: .. py:method:: blob_exists(blob_name, session = None) :async: .. py:method:: list_blobs(prefix = '', match_glob = '', session = None) :async: .. py:method:: new_blob(blob_name) .. py:method:: get_metadata(params = None, session = None) :async: .. py:data:: SCOPES :value: ['https://www.googleapis.com/auth/devstorage.read_write'] .. py:class:: Storage(*, service_file = None, token = None, session = None, api_root = None) .. py:attribute:: _api_root :type: str .. py:attribute:: _api_is_dev :type: bool .. py:attribute:: _api_root_read :type: str .. py:attribute:: _api_root_write :type: str .. py:method:: _headers() :async: .. py:method:: list_buckets(project, *, params = None, headers = None, session = None, timeout = DEFAULT_TIMEOUT) :async: .. py:method:: get_bucket(bucket_name) .. py:method:: copy(bucket, object_name, destination_bucket, *, new_name = None, metadata = None, params = None, headers = None, timeout = DEFAULT_TIMEOUT, session = None) :async: When files are too large, multiple calls to ``rewriteTo`` are made. We refer to the same copy job by using the ``rewriteToken`` from the previous return payload in subsequent ``rewriteTo`` calls. Using the ``rewriteTo`` GCS API is preferred in part because it is able to make multiple calls to fully copy an object whereas the ``copyTo`` GCS API only calls ``rewriteTo`` once under the hood, and thus may fail if files are large. In the rare case you need to resume a copy operation, include the ``rewriteToken`` in the ``params`` dictionary. Once you begin a multi-part copy operation, you then have 1 week to complete the copy job. See https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite .. py:method:: delete(bucket, object_name, *, timeout = DEFAULT_TIMEOUT, params = None, headers = None, session = None) :async: .. py:method:: download(bucket, object_name, *, headers = None, timeout = DEFAULT_TIMEOUT, session = None) :async: .. py:method:: download_to_filename(bucket, object_name, filename, **kwargs) :async: .. py:method:: download_metadata(bucket, object_name, *, headers = None, session = None, timeout = DEFAULT_TIMEOUT) :async: .. py:method:: download_stream(bucket, object_name, *, headers = None, timeout = DEFAULT_TIMEOUT, session = None) :async: Download a GCS object in a buffered stream. :param bucket: The bucket from which to download. :param object_name: The object within the bucket to download. :param headers: Custom header values for the request, such as range. :param timeout: Timeout, in seconds, for the request. Note that with this function, this is the time to the beginning of the response data (TTFB). :param session: A specific session to (re)use. :returns: A object encapsulating the stream, similar to io.BufferedIOBase, but it only supports the read() function. :rtype: StreamResponse .. py:method:: list_objects(bucket, *, params = None, headers = None, session = None, timeout = DEFAULT_TIMEOUT) :async: .. py:method:: upload(bucket, object_name, file_data, *, content_type = None, parameters = None, headers = None, metadata = None, session = None, force_resumable_upload = None, zipped = False, timeout = 30) :async: .. py:method:: upload_from_filename(bucket, object_name, filename, **kwargs) :async: .. py:method:: _get_stream_len(stream) :staticmethod: .. py:method:: _preprocess_data(data) :staticmethod: .. py:method:: _compress_file_in_chunks(input_stream, chunk_size = 8192) :staticmethod: Reads the contents of input_stream and writes it gzip-compressed to output_stream in chunks. The chunk size is 8Kb by default, which is a standard filesystem block size. .. py:method:: _decide_upload_type(force_resumable_upload, content_length) :staticmethod: .. py:method:: _split_content_type(content_type) :staticmethod: .. py:method:: _format_metadata_key(key) :staticmethod: Formats the fixed-key metadata keys as wanted by the multipart API. Ex: Content-Disposition --> contentDisposition .. py:method:: _download(bucket, object_name, *, params = None, headers = None, timeout = DEFAULT_TIMEOUT, session = None) :async: .. py:method:: _download_stream(bucket, object_name, *, params = None, headers = None, timeout = DEFAULT_TIMEOUT, session = None) :async: .. py:method:: _upload_simple(url, object_name, stream, params, headers, *, session = None, timeout = 30) :async: .. py:method:: _upload_multipart(url, object_name, stream, params, headers, metadata, *, session = None, timeout = 30) :async: .. py:method:: _upload_resumable(url, object_name, stream, params, headers, *, metadata = None, session = None, timeout = 30) :async: .. py:method:: _initiate_upload(url, object_name, params, headers, *, metadata = None, timeout = DEFAULT_TIMEOUT, session = None) :async: .. py:method:: _do_upload(session_uri, stream, headers, *, retries = 5, session = None, timeout = 30) :async: .. py:method:: patch_metadata(bucket, object_name, metadata, *, params = None, headers = None, session = None, timeout = DEFAULT_TIMEOUT) :async: .. py:method:: get_bucket_metadata(bucket, *, params = None, headers = None, session = None, timeout = DEFAULT_TIMEOUT) :async: .. py:method:: close() :async: .. py:method:: __aenter__() :async: .. py:method:: __aexit__(*args) :async: .. py:class:: StreamResponse(response) This class provides an abstraction between the slightly different recommended streaming implementations between requests and aiohttp. .. py:property:: content_length :type: int .. py:method:: read(size = -1) :async: .. py:method:: __aenter__() :async: .. py:method:: __aexit__(*exc_info) :async: .. py:data:: __version__