storage

This library implements various methods for working with the Google Storage APIs.

Installation

$ pip install --upgrade gcloud-aio-storage

Usage

To upload a file, you might do something like the following:

import aiofiles
import aiohttp
from gcloud.aio.storage import Storage


async with aiohttp.ClientSession() as session:
    client = Storage(session=session)

    async with aiofiles.open('/path/to/my/file', mode="r") as f:
        output = await f.read()
        status = await client.upload(
            'my-bucket-name',
            'path/to/gcs/folder',
            output,
        )
        print(status)

Note that there are multiple ways to accomplish the above, ie,. by making use of the Bucket and Blob convenience classes if that better fits your use-case.

Of course, the major benefit of using an async library is being able to parallelize operations like this. Since gcloud-aio-storage is fully asyncio-compatible, you can use any of the builtin asyncio method to perform more complicated operations:

my_files = {
    '/local/path/to/file.1': 'path/in/gcs.1',
    '/local/path/to/file.2': 'path/in/gcs.2',
    '/local/path/to/file.3': 'different/gcs/path/filename.3',
}

async with Storage() as client:
    # Prepare all our upload data
    uploads = []
    for local_name, gcs_name in my_files.items():
        async with aiofiles.open(local_name, mode="r") as f:
            contents = await f.read()
            uploads.append((gcs_name, contents))

    # Simultaneously upload all files
    await asyncio.gather(
        *[
            client.upload('my-bucket-name', path, file_)
            for path, file_ in uploads
        ]
    )

You can also refer to the smoke test for more info and examples.

Note that you can also let gcloud-aio-storage do its own session management, so long as you give us a hint when to close that session:

async with Storage() as client:
    # closes the client.session on leaving the context manager

# OR

client = Storage()
# do stuff
await client.close()  # close the session explicitly

File Encodings

In some cases, aiohttp needs to transform the objects returned from GCS into strings, eg. for debug logging and other such issues. The built-in await response.text() operation relies on chardet for guessing the character encoding in any cases where it can not be determined based on the file metadata.

Unfortunately, this operation can be extremely slow, especially in cases where you might be working with particularly large files. If you notice odd latency issues when reading your results, you may want to set your character encoding more explicitly within GCS, eg. by ensuring you set the contentType of the relevant objects to something suffixed with ; charset=utf-8. For example, in the case of contentType='application/x-netcdf' files exhibiting latency, you could instead set contentType='application/x-netcdf; charset=utf-8. See Issue #172 for more info!

Emulators

For testing purposes, you may want to use gcloud-aio-storage along with a local GCS emulator. Setting the $STORAGE_EMULATOR_HOST environment variable to the address of your emulator should be enough to do the trick.

For example, using fsouza/fake-gcs-server, you can do:

docker run -d -p 4443:4443 -v $PWD/my-sample-data:/data fsouza/fake-gcs-server
export STORAGE_EMULATOR_HOST='http://0.0.0.0:4443'

Any gcloud-aio-storage requests made with that environment variable set will query fake-gcs-server instead of the official GCS API.

Note that some emulation systems require disabling SSL – if you’re using a custom http session, you may need to disable SSL verification.

Customization

This library mostly tries to stay agnostic of potential use-cases; as such, we do not implement any sort of retrying or other policies under the assumption that we wouldn’t get things right for every user’s situation.

As such, we recommend configuring your own policies on an as-needed basis. The backoff library can make this quite straightforward! For example, you may find it useful to configure something like:

class StorageWithBackoff(gcloud.aio.storage.Storage):
    @backoff.on_exception(backoff.expo, aiohttp.ClientResponseError,
                          max_tries=5, jitter=backoff.full_jitter)
    async def copy(self, *args: Any, **kwargs: Any):
        return await super().copy(*args, **kwargs)

    @backoff.on_exception(backoff.expo, aiohttp.ClientResponseError,
                          max_tries=10, jitter=backoff.full_jitter)
    async def download(self, *args: Any, **kwargs: Any):
        return await super().download(*args, **kwargs)

Submodules

Attributes

SCOPES

__version__

Classes

Blob

Bucket

Storage

StreamResponse

This class provides an abstraction between the slightly different

Package Contents

class storage.Blob(bucket, name, metadata)
Parameters:
bucket
name
size: int
property chunk_size: int
Return type:

int

async download(timeout=DEFAULT_TIMEOUT, session=None, auto_decompress=True)
Parameters:
  • timeout (int)

  • session (Optional[requests.Session])

  • auto_decompress (bool)

Return type:

Any

async upload(data, content_type=None, session=None)
Parameters:
  • data (Any)

  • content_type (Optional[str])

  • session (Optional[requests.Session])

Return type:

Dict[str, Any]

async get_signed_url(expiration, headers=None, query_params=None, http_method='GET', iam_client=None, service_account_email=None, token=None, session=None)

Create a temporary access URL for Storage Blob accessible by anyone with the link.

Adapted from Google Documentation: https://cloud.google.com/storage/docs/access-control/signing-urls-manually#python-sample

Parameters:
  • expiration (int)

  • headers (Optional[Dict[str, str]])

  • query_params (Optional[Dict[str, Any]])

  • http_method (str)

  • iam_client (Optional[gcloud.aio.auth.IamClient])

  • service_account_email (Optional[str])

  • token (Optional[gcloud.aio.auth.Token])

  • session (Optional[requests.Session])

Return type:

str

static get_pem_signature(str_to_sign, private_key)
Parameters:
  • str_to_sign (str)

  • private_key (str)

Return type:

bytes

static get_iam_api_signature(str_to_sign, iam_client, service_account_email, session)
Async:

Parameters:
  • str_to_sign (str)

  • iam_client (gcloud.aio.auth.IamClient)

  • service_account_email (Optional[str])

  • session (Optional[requests.Session])

Return type:

bytes

class storage.Bucket(storage, name)
Parameters:
storage
name
async get_blob(blob_name, timeout=DEFAULT_TIMEOUT, session=None)
Parameters:
  • blob_name (str)

  • timeout (int)

  • session (Optional[requests.Session])

Return type:

storage.blob.Blob

async blob_exists(blob_name, session=None)
Parameters:
  • blob_name (str)

  • session (Optional[requests.Session])

Return type:

bool

async list_blobs(prefix='', match_glob='', session=None)
Parameters:
  • prefix (str)

  • match_glob (str)

  • session (Optional[requests.Session])

Return type:

List[str]

new_blob(blob_name)
Parameters:

blob_name (str)

Return type:

storage.blob.Blob

async get_metadata(params=None, session=None)
Parameters:
  • params (Optional[Dict[str, Any]])

  • session (Optional[requests.Session])

Return type:

Dict[str, Any]

storage.SCOPES = ['https://www.googleapis.com/auth/devstorage.read_write']
class storage.Storage(*, service_file=None, token=None, session=None, api_root=None)
Parameters:
  • service_file (Optional[Union[str, IO[AnyStr]]])

  • token (Optional[gcloud.aio.auth.Token])

  • session (Optional[requests.Session])

  • api_root (Optional[str])

_api_root: str
_api_is_dev: bool
_api_root_read: str
_api_root_write: str
session
token
async _headers()
Return type:

Dict[str, str]

async list_buckets(project, *, params=None, headers=None, session=None, timeout=DEFAULT_TIMEOUT)
Parameters:
  • project (str)

  • params (Optional[Dict[str, str]])

  • headers (Optional[Dict[str, Any]])

  • session (Optional[requests.Session])

  • timeout (int)

Return type:

List[storage.bucket.Bucket]

get_bucket(bucket_name)
Parameters:

bucket_name (str)

Return type:

storage.bucket.Bucket

async copy(bucket, object_name, destination_bucket, *, new_name=None, metadata=None, params=None, headers=None, timeout=DEFAULT_TIMEOUT, session=None)

When files are too large, multiple calls to rewriteTo are made. We refer to the same copy job by using the rewriteToken from the previous return payload in subsequent rewriteTo calls.

Using the rewriteTo GCS API is preferred in part because it is able to make multiple calls to fully copy an object whereas the copyTo GCS API only calls rewriteTo once under the hood, and thus may fail if files are large.

In the rare case you need to resume a copy operation, include the rewriteToken in the params dictionary. Once you begin a multi-part copy operation, you then have 1 week to complete the copy job.

See https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite

Parameters:
  • bucket (str)

  • object_name (str)

  • destination_bucket (str)

  • new_name (Optional[str])

  • metadata (Optional[Dict[str, Any]])

  • params (Optional[Dict[str, str]])

  • headers (Optional[Dict[str, str]])

  • timeout (int)

  • session (Optional[requests.Session])

Return type:

Dict[str, Any]

async delete(bucket, object_name, *, timeout=DEFAULT_TIMEOUT, params=None, headers=None, session=None)
Parameters:
  • bucket (str)

  • object_name (str)

  • timeout (int)

  • params (Optional[Dict[str, str]])

  • headers (Optional[Dict[str, str]])

  • session (Optional[requests.Session])

Return type:

str

async download(bucket, object_name, *, headers=None, timeout=DEFAULT_TIMEOUT, session=None)
Parameters:
  • bucket (str)

  • object_name (str)

  • headers (Optional[Dict[str, Any]])

  • timeout (int)

  • session (Optional[requests.Session])

Return type:

bytes

async download_to_filename(bucket, object_name, filename, **kwargs)
Parameters:
  • bucket (str)

  • object_name (str)

  • filename (str)

  • kwargs (Any)

Return type:

None

async download_metadata(bucket, object_name, *, headers=None, session=None, timeout=DEFAULT_TIMEOUT)
Parameters:
  • bucket (str)

  • object_name (str)

  • headers (Optional[Dict[str, Any]])

  • session (Optional[requests.Session])

  • timeout (int)

Return type:

Dict[str, Any]

async download_stream(bucket, object_name, *, headers=None, timeout=DEFAULT_TIMEOUT, session=None)

Download a GCS object in a buffered stream.

Parameters:
  • bucket (str) – The bucket from which to download.

  • object_name (str) – The object within the bucket to download.

  • headers (Optional[Dict[str, Any]]) – Custom header values for the request, such as range.

  • timeout (int) – Timeout, in seconds, for the request. Note that with this function, this is the time to the beginning of the response data (TTFB).

  • session (Optional[requests.Session]) – A specific session to (re)use.

Returns:

A object encapsulating the stream, similar to io.BufferedIOBase, but it only supports the read() function.

Return type:

StreamResponse

async list_objects(bucket, *, params=None, headers=None, session=None, timeout=DEFAULT_TIMEOUT)
Parameters:
  • bucket (str)

  • params (Optional[Dict[str, str]])

  • headers (Optional[Dict[str, Any]])

  • session (Optional[requests.Session])

  • timeout (int)

Return type:

Dict[str, Any]

async upload(bucket, object_name, file_data, *, content_type=None, parameters=None, headers=None, metadata=None, session=None, force_resumable_upload=None, zipped=False, timeout=30)
Parameters:
  • bucket (str)

  • object_name (str)

  • file_data (Any)

  • content_type (Optional[str])

  • parameters (Optional[Dict[str, str]])

  • headers (Optional[Dict[str, str]])

  • metadata (Optional[Dict[str, Any]])

  • session (Optional[requests.Session])

  • force_resumable_upload (Optional[bool])

  • zipped (bool)

  • timeout (int)

Return type:

Dict[str, Any]

async upload_from_filename(bucket, object_name, filename, **kwargs)
Parameters:
  • bucket (str)

  • object_name (str)

  • filename (str)

  • kwargs (Any)

Return type:

Dict[str, Any]

static _get_stream_len(stream)
Parameters:

stream (IO[AnyStr])

Return type:

int

static _preprocess_data(data)
Parameters:

data (Any)

Return type:

IO[Any]

static _compress_file_in_chunks(input_stream, chunk_size=8192)

Reads the contents of input_stream and writes it gzip-compressed to output_stream in chunks. The chunk size is 8Kb by default, which is a standard filesystem block size.

Parameters:
  • input_stream (IO[AnyStr])

  • chunk_size (int)

Return type:

IO[bytes]

static _decide_upload_type(force_resumable_upload, content_length)
Parameters:
  • force_resumable_upload (Optional[bool])

  • content_length (int)

Return type:

UploadType

static _split_content_type(content_type)
Parameters:

content_type (str)

Return type:

Tuple[str, Optional[str]]

static _format_metadata_key(key)

Formats the fixed-key metadata keys as wanted by the multipart API.

Ex: Content-Disposition –> contentDisposition

Parameters:

key (str)

Return type:

str

async _download(bucket, object_name, *, params=None, headers=None, timeout=DEFAULT_TIMEOUT, session=None)
Parameters:
  • bucket (str)

  • object_name (str)

  • params (Optional[Dict[str, str]])

  • headers (Optional[Dict[str, str]])

  • timeout (int)

  • session (Optional[requests.Session])

Return type:

bytes

async _download_stream(bucket, object_name, *, params=None, headers=None, timeout=DEFAULT_TIMEOUT, session=None)
Parameters:
  • bucket (str)

  • object_name (str)

  • params (Optional[Dict[str, str]])

  • headers (Optional[Dict[str, str]])

  • timeout (int)

  • session (Optional[requests.Session])

Return type:

StreamResponse

async _upload_simple(url, object_name, stream, params, headers, *, session=None, timeout=30)
Parameters:
  • url (str)

  • object_name (str)

  • stream (IO[AnyStr])

  • params (Dict[str, str])

  • headers (Dict[str, str])

  • session (Optional[requests.Session])

  • timeout (int)

Return type:

Dict[str, Any]

async _upload_multipart(url, object_name, stream, params, headers, metadata, *, session=None, timeout=30)
Parameters:
  • url (str)

  • object_name (str)

  • stream (IO[AnyStr])

  • params (Dict[str, str])

  • headers (Dict[str, str])

  • metadata (Dict[str, Any])

  • session (Optional[requests.Session])

  • timeout (int)

Return type:

Dict[str, Any]

async _upload_resumable(url, object_name, stream, params, headers, *, metadata=None, session=None, timeout=30)
Parameters:
  • url (str)

  • object_name (str)

  • stream (IO[AnyStr])

  • params (Dict[str, str])

  • headers (Dict[str, str])

  • metadata (Optional[Dict[str, Any]])

  • session (Optional[requests.Session])

  • timeout (int)

Return type:

Dict[str, Any]

async _initiate_upload(url, object_name, params, headers, *, metadata=None, timeout=DEFAULT_TIMEOUT, session=None)
Parameters:
  • url (str)

  • object_name (str)

  • params (Dict[str, str])

  • headers (Dict[str, str])

  • metadata (Optional[Dict[str, Any]])

  • timeout (int)

  • session (Optional[requests.Session])

Return type:

str

async _do_upload(session_uri, stream, headers, *, retries=5, session=None, timeout=30)
Parameters:
  • session_uri (str)

  • stream (IO[AnyStr])

  • headers (Dict[str, str])

  • retries (int)

  • session (Optional[requests.Session])

  • timeout (int)

Return type:

Dict[str, Any]

async patch_metadata(bucket, object_name, metadata, *, params=None, headers=None, session=None, timeout=DEFAULT_TIMEOUT)
Parameters:
  • bucket (str)

  • object_name (str)

  • metadata (Dict[str, Any])

  • params (Optional[Dict[str, str]])

  • headers (Optional[Dict[str, str]])

  • session (Optional[requests.Session])

  • timeout (int)

Return type:

Dict[str, Any]

async get_bucket_metadata(bucket, *, params=None, headers=None, session=None, timeout=DEFAULT_TIMEOUT)
Parameters:
  • bucket (str)

  • params (Optional[Dict[str, str]])

  • headers (Optional[Dict[str, str]])

  • session (Optional[requests.Session])

  • timeout (int)

Return type:

Dict[str, Any]

async close()
Return type:

None

async __aenter__()
Return type:

Storage

async __aexit__(*args)
Parameters:

args (Any)

Return type:

None

class storage.StreamResponse(response)

This class provides an abstraction between the slightly different recommended streaming implementations between requests and aiohttp.

Parameters:

response (Any)

_response
_iter: Iterator[bytes] | None = None
property content_length: int
Return type:

int

async read(size=-1)
Parameters:

size (int)

Return type:

bytes

async __aenter__()
Return type:

Any

async __aexit__(*exc_info)
Parameters:

exc_info (Any)

Return type:

None

storage.__version__