storage¶
This library implements various methods for working with the Google Storage APIs.
Installation¶
$ pip install --upgrade gcloud-aio-storage
Usage¶
To upload a file, you might do something like the following:
import aiofiles
import aiohttp
from gcloud.aio.storage import Storage
async with aiohttp.ClientSession() as session:
client = Storage(session=session)
async with aiofiles.open('/path/to/my/file', mode="r") as f:
output = await f.read()
status = await client.upload(
'my-bucket-name',
'path/to/gcs/folder',
output,
)
print(status)
Note that there are multiple ways to accomplish the above, ie,. by making use
of the Bucket
and Blob
convenience classes if that better fits your
use-case.
Of course, the major benefit of using an async library is being able to
parallelize operations like this. Since gcloud-aio-storage
is fully
asyncio-compatible, you can use any of the builtin asyncio method to perform
more complicated operations:
my_files = {
'/local/path/to/file.1': 'path/in/gcs.1',
'/local/path/to/file.2': 'path/in/gcs.2',
'/local/path/to/file.3': 'different/gcs/path/filename.3',
}
async with Storage() as client:
# Prepare all our upload data
uploads = []
for local_name, gcs_name in my_files.items():
async with aiofiles.open(local_name, mode="r") as f:
contents = await f.read()
uploads.append((gcs_name, contents))
# Simultaneously upload all files
await asyncio.gather(
*[
client.upload('my-bucket-name', path, file_)
for path, file_ in uploads
]
)
You can also refer to the smoke test for more info and examples.
Note that you can also let gcloud-aio-storage
do its own session
management, so long as you give us a hint when to close that session:
async with Storage() as client:
# closes the client.session on leaving the context manager
# OR
client = Storage()
# do stuff
await client.close() # close the session explicitly
File Encodings¶
In some cases, aiohttp
needs to transform the objects returned from GCS
into strings, eg. for debug logging and other such issues. The built-in await
response.text()
operation relies on chardet for guessing the character
encoding in any cases where it can not be determined based on the file
metadata.
Unfortunately, this operation can be extremely slow, especially in cases where
you might be working with particularly large files. If you notice odd latency
issues when reading your results, you may want to set your character encoding
more explicitly within GCS, eg. by ensuring you set the contentType
of the
relevant objects to something suffixed with ; charset=utf-8
. For example,
in the case of contentType='application/x-netcdf'
files exhibiting latency,
you could instead set contentType='application/x-netcdf; charset=utf-8
. See
Issue #172 for more info!
Emulators¶
For testing purposes, you may want to use gcloud-aio-storage
along with a
local GCS emulator. Setting the $STORAGE_EMULATOR_HOST
environment variable
to the address of your emulator should be enough to do the trick.
For example, using fsouza/fake-gcs-server, you can do:
docker run -d -p 4443:4443 -v $PWD/my-sample-data:/data fsouza/fake-gcs-server
export STORAGE_EMULATOR_HOST='http://0.0.0.0:4443'
Any gcloud-aio-storage
requests made with that environment variable set
will query fake-gcs-server
instead of the official GCS API.
Note that some emulation systems require disabling SSL – if you’re using a custom http session, you may need to disable SSL verification.
Customization¶
This library mostly tries to stay agnostic of potential use-cases; as such, we do not implement any sort of retrying or other policies under the assumption that we wouldn’t get things right for every user’s situation.
As such, we recommend configuring your own policies on an as-needed basis. The backoff library can make this quite straightforward! For example, you may find it useful to configure something like:
class StorageWithBackoff(gcloud.aio.storage.Storage):
@backoff.on_exception(backoff.expo, aiohttp.ClientResponseError,
max_tries=5, jitter=backoff.full_jitter)
async def copy(self, *args: Any, **kwargs: Any):
return await super().copy(*args, **kwargs)
@backoff.on_exception(backoff.expo, aiohttp.ClientResponseError,
max_tries=10, jitter=backoff.full_jitter)
async def download(self, *args: Any, **kwargs: Any):
return await super().download(*args, **kwargs)
Submodules¶
Attributes¶
Classes¶
This class provides an abstraction between the slightly different |
Package Contents¶
- class storage.Blob(bucket, name, metadata)¶
- Parameters:
bucket (storage.bucket.Bucket)
name (str)
metadata (Dict[str, Any])
- bucket¶
- name¶
- size: int¶
- property chunk_size: int¶
- Return type:
int
- async download(timeout=DEFAULT_TIMEOUT, session=None, auto_decompress=True)¶
- Parameters:
timeout (int)
session (Optional[requests.Session])
auto_decompress (bool)
- Return type:
Any
- async upload(data, content_type=None, session=None)¶
- Parameters:
data (Any)
content_type (Optional[str])
session (Optional[requests.Session])
- Return type:
Dict[str, Any]
- async get_signed_url(expiration, headers=None, query_params=None, http_method='GET', iam_client=None, service_account_email=None, token=None, session=None)¶
Create a temporary access URL for Storage Blob accessible by anyone with the link.
Adapted from Google Documentation: https://cloud.google.com/storage/docs/access-control/signing-urls-manually#python-sample
- Parameters:
expiration (int)
headers (Optional[Dict[str, str]])
query_params (Optional[Dict[str, Any]])
http_method (str)
iam_client (Optional[gcloud.aio.auth.IamClient])
service_account_email (Optional[str])
token (Optional[gcloud.aio.auth.Token])
session (Optional[requests.Session])
- Return type:
str
- static get_pem_signature(str_to_sign, private_key)¶
- Parameters:
str_to_sign (str)
private_key (str)
- Return type:
bytes
- static get_iam_api_signature(str_to_sign, iam_client, service_account_email, session)¶
- Async:
- Parameters:
str_to_sign (str)
iam_client (gcloud.aio.auth.IamClient)
service_account_email (Optional[str])
session (Optional[requests.Session])
- Return type:
bytes
- class storage.Bucket(storage, name)¶
- Parameters:
storage (storage.storage.Storage)
name (str)
- storage¶
- name¶
- async get_blob(blob_name, timeout=DEFAULT_TIMEOUT, session=None)¶
- Parameters:
blob_name (str)
timeout (int)
session (Optional[requests.Session])
- Return type:
- async blob_exists(blob_name, session=None)¶
- Parameters:
blob_name (str)
session (Optional[requests.Session])
- Return type:
bool
- async list_blobs(prefix='', match_glob='', session=None)¶
- Parameters:
prefix (str)
match_glob (str)
session (Optional[requests.Session])
- Return type:
List[str]
- new_blob(blob_name)¶
- Parameters:
blob_name (str)
- Return type:
- async get_metadata(params=None, session=None)¶
- Parameters:
params (Optional[Dict[str, Any]])
session (Optional[requests.Session])
- Return type:
Dict[str, Any]
- storage.SCOPES = ['https://www.googleapis.com/auth/devstorage.read_write']¶
- class storage.Storage(*, service_file=None, token=None, session=None, api_root=None)¶
- Parameters:
service_file (Optional[Union[str, IO[AnyStr]]])
token (Optional[gcloud.aio.auth.Token])
session (Optional[requests.Session])
api_root (Optional[str])
- _api_root: str¶
- _api_is_dev: bool¶
- _api_root_read: str¶
- _api_root_write: str¶
- session¶
- token¶
- async _headers()¶
- Return type:
Dict[str, str]
- async list_buckets(project, *, params=None, headers=None, session=None, timeout=DEFAULT_TIMEOUT)¶
- Parameters:
project (str)
params (Optional[Dict[str, str]])
headers (Optional[Dict[str, Any]])
session (Optional[requests.Session])
timeout (int)
- Return type:
List[storage.bucket.Bucket]
- get_bucket(bucket_name)¶
- Parameters:
bucket_name (str)
- Return type:
- async copy(bucket, object_name, destination_bucket, *, new_name=None, metadata=None, params=None, headers=None, timeout=DEFAULT_TIMEOUT, session=None)¶
When files are too large, multiple calls to
rewriteTo
are made. We refer to the same copy job by using therewriteToken
from the previous return payload in subsequentrewriteTo
calls.Using the
rewriteTo
GCS API is preferred in part because it is able to make multiple calls to fully copy an object whereas thecopyTo
GCS API only callsrewriteTo
once under the hood, and thus may fail if files are large.In the rare case you need to resume a copy operation, include the
rewriteToken
in theparams
dictionary. Once you begin a multi-part copy operation, you then have 1 week to complete the copy job.See https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite
- Parameters:
bucket (str)
object_name (str)
destination_bucket (str)
new_name (Optional[str])
metadata (Optional[Dict[str, Any]])
params (Optional[Dict[str, str]])
headers (Optional[Dict[str, str]])
timeout (int)
session (Optional[requests.Session])
- Return type:
Dict[str, Any]
- async delete(bucket, object_name, *, timeout=DEFAULT_TIMEOUT, params=None, headers=None, session=None)¶
- Parameters:
bucket (str)
object_name (str)
timeout (int)
params (Optional[Dict[str, str]])
headers (Optional[Dict[str, str]])
session (Optional[requests.Session])
- Return type:
str
- async download(bucket, object_name, *, headers=None, timeout=DEFAULT_TIMEOUT, session=None)¶
- Parameters:
bucket (str)
object_name (str)
headers (Optional[Dict[str, Any]])
timeout (int)
session (Optional[requests.Session])
- Return type:
bytes
- async download_to_filename(bucket, object_name, filename, **kwargs)¶
- Parameters:
bucket (str)
object_name (str)
filename (str)
kwargs (Any)
- Return type:
None
- async download_metadata(bucket, object_name, *, headers=None, session=None, timeout=DEFAULT_TIMEOUT)¶
- Parameters:
bucket (str)
object_name (str)
headers (Optional[Dict[str, Any]])
session (Optional[requests.Session])
timeout (int)
- Return type:
Dict[str, Any]
- async download_stream(bucket, object_name, *, headers=None, timeout=DEFAULT_TIMEOUT, session=None)¶
Download a GCS object in a buffered stream.
- Parameters:
bucket (str) – The bucket from which to download.
object_name (str) – The object within the bucket to download.
headers (Optional[Dict[str, Any]]) – Custom header values for the request, such as range.
timeout (int) – Timeout, in seconds, for the request. Note that with this function, this is the time to the beginning of the response data (TTFB).
session (Optional[requests.Session]) – A specific session to (re)use.
- Returns:
A object encapsulating the stream, similar to io.BufferedIOBase, but it only supports the read() function.
- Return type:
- async list_objects(bucket, *, params=None, headers=None, session=None, timeout=DEFAULT_TIMEOUT)¶
- Parameters:
bucket (str)
params (Optional[Dict[str, str]])
headers (Optional[Dict[str, Any]])
session (Optional[requests.Session])
timeout (int)
- Return type:
Dict[str, Any]
- async upload(bucket, object_name, file_data, *, content_type=None, parameters=None, headers=None, metadata=None, session=None, force_resumable_upload=None, zipped=False, timeout=30)¶
- Parameters:
bucket (str)
object_name (str)
file_data (Any)
content_type (Optional[str])
parameters (Optional[Dict[str, str]])
headers (Optional[Dict[str, str]])
metadata (Optional[Dict[str, Any]])
session (Optional[requests.Session])
force_resumable_upload (Optional[bool])
zipped (bool)
timeout (int)
- Return type:
Dict[str, Any]
- async upload_from_filename(bucket, object_name, filename, **kwargs)¶
- Parameters:
bucket (str)
object_name (str)
filename (str)
kwargs (Any)
- Return type:
Dict[str, Any]
- static _get_stream_len(stream)¶
- Parameters:
stream (IO[AnyStr])
- Return type:
int
- static _preprocess_data(data)¶
- Parameters:
data (Any)
- Return type:
IO[Any]
- static _compress_file_in_chunks(input_stream, chunk_size=8192)¶
Reads the contents of input_stream and writes it gzip-compressed to output_stream in chunks. The chunk size is 8Kb by default, which is a standard filesystem block size.
- Parameters:
input_stream (IO[AnyStr])
chunk_size (int)
- Return type:
IO[bytes]
- static _decide_upload_type(force_resumable_upload, content_length)¶
- Parameters:
force_resumable_upload (Optional[bool])
content_length (int)
- Return type:
- static _split_content_type(content_type)¶
- Parameters:
content_type (str)
- Return type:
Tuple[str, Optional[str]]
- static _format_metadata_key(key)¶
Formats the fixed-key metadata keys as wanted by the multipart API.
Ex: Content-Disposition –> contentDisposition
- Parameters:
key (str)
- Return type:
str
- async _download(bucket, object_name, *, params=None, headers=None, timeout=DEFAULT_TIMEOUT, session=None)¶
- Parameters:
bucket (str)
object_name (str)
params (Optional[Dict[str, str]])
headers (Optional[Dict[str, str]])
timeout (int)
session (Optional[requests.Session])
- Return type:
bytes
- async _download_stream(bucket, object_name, *, params=None, headers=None, timeout=DEFAULT_TIMEOUT, session=None)¶
- Parameters:
bucket (str)
object_name (str)
params (Optional[Dict[str, str]])
headers (Optional[Dict[str, str]])
timeout (int)
session (Optional[requests.Session])
- Return type:
- async _upload_simple(url, object_name, stream, params, headers, *, session=None, timeout=30)¶
- Parameters:
url (str)
object_name (str)
stream (IO[AnyStr])
params (Dict[str, str])
headers (Dict[str, str])
session (Optional[requests.Session])
timeout (int)
- Return type:
Dict[str, Any]
- async _upload_multipart(url, object_name, stream, params, headers, metadata, *, session=None, timeout=30)¶
- Parameters:
url (str)
object_name (str)
stream (IO[AnyStr])
params (Dict[str, str])
headers (Dict[str, str])
metadata (Dict[str, Any])
session (Optional[requests.Session])
timeout (int)
- Return type:
Dict[str, Any]
- async _upload_resumable(url, object_name, stream, params, headers, *, metadata=None, session=None, timeout=30)¶
- Parameters:
url (str)
object_name (str)
stream (IO[AnyStr])
params (Dict[str, str])
headers (Dict[str, str])
metadata (Optional[Dict[str, Any]])
session (Optional[requests.Session])
timeout (int)
- Return type:
Dict[str, Any]
- async _initiate_upload(url, object_name, params, headers, *, metadata=None, timeout=DEFAULT_TIMEOUT, session=None)¶
- Parameters:
url (str)
object_name (str)
params (Dict[str, str])
headers (Dict[str, str])
metadata (Optional[Dict[str, Any]])
timeout (int)
session (Optional[requests.Session])
- Return type:
str
- async _do_upload(session_uri, stream, headers, *, retries=5, session=None, timeout=30)¶
- Parameters:
session_uri (str)
stream (IO[AnyStr])
headers (Dict[str, str])
retries (int)
session (Optional[requests.Session])
timeout (int)
- Return type:
Dict[str, Any]
- async patch_metadata(bucket, object_name, metadata, *, params=None, headers=None, session=None, timeout=DEFAULT_TIMEOUT)¶
- Parameters:
bucket (str)
object_name (str)
metadata (Dict[str, Any])
params (Optional[Dict[str, str]])
headers (Optional[Dict[str, str]])
session (Optional[requests.Session])
timeout (int)
- Return type:
Dict[str, Any]
- async get_bucket_metadata(bucket, *, params=None, headers=None, session=None, timeout=DEFAULT_TIMEOUT)¶
- Parameters:
bucket (str)
params (Optional[Dict[str, str]])
headers (Optional[Dict[str, str]])
session (Optional[requests.Session])
timeout (int)
- Return type:
Dict[str, Any]
- async close()¶
- Return type:
None
- async __aexit__(*args)¶
- Parameters:
args (Any)
- Return type:
None
- class storage.StreamResponse(response)¶
This class provides an abstraction between the slightly different recommended streaming implementations between requests and aiohttp.
- Parameters:
response (Any)
- _response¶
- _iter: Iterator[bytes] | None = None¶
- property content_length: int¶
- Return type:
int
- async read(size=-1)¶
- Parameters:
size (int)
- Return type:
bytes
- async __aenter__()¶
- Return type:
Any
- async __aexit__(*exc_info)¶
- Parameters:
exc_info (Any)
- Return type:
None
- storage.__version__¶