:py:mod:`pubsub` ================ .. py:module:: pubsub .. autoapi-nested-parse:: This library implements various methods for working with the Google Pubsub APIs. Installation ------------ .. code-block:: console $ pip install --upgrade gcloud-aio-pubsub Usage ----- Subscriber ~~~~~~~~~~ ``gcloud-aio-pubsub`` provides ``SubscriberClient`` as an interface to call pubsub's HTTP API: .. code-block:: python from gcloud.aio.pubsub import SubscriberClient from gcloud.aio.pubsub import SubscriberMessage client = SubscriberClient() # create subscription await client.create_subscription( 'projects//subscriptions/', 'projects//topics/') # pull messages messages: List[SubscriberMessage] = await client.pull( 'projects//subscriptions/', max_messages=10) There's also ``gcloud.aio.pubsub.subscribe`` helper function you can use to setup a pubsub processing pipeline. It is built with ``asyncio`` and thus only available in the ``gcloud-aio-pubsub`` package. The usage is fairly simple: .. code-block:: python from gcloud.aio.pubsub import SubscriberClient from gcloud.aio.pubsub import subscribe subscriber_client = SubscriberClient() async def handler(message): return await subscribe( 'projects//subscriptions/', handler, subscriber_client, num_producers=1, max_messages_per_producer=100, ack_window=0.3, num_tasks_per_consumer=1, enable_nack=True, nack_window=0.3, ) While defaults are somewhat sensible, it is highly recommended to performance test your application and tweak function parameter to your specific needs. Here's a few hints: - ``handler``: An async function that will be called for each message. It should accept an instance of ``SubscriberMessage`` as its only argument and return ``None`` if the message should be acked. An exception raised within the handler will result in the message being left to expire, and thus it will be redelivered according to your subscription's ack deadline. - ``num_producers``: Number of workers that will be making ``pull`` requests to pubsub. Please note that a worker will only fetch new batch once the ``handler`` was called for each message from the previous batch. This means that running only a single worker will most likely make your application IO bound. If you notice this being an issue don't hesitate to bump this parameter. - ``max_messages_per_producer``: Number of pubsub messages a worker will try to fetch in a single batch. This value is passed to ``pull`` `endpoint`_ as ``maxMessages`` parameter. A rule of thumb here is the faster your handler is the bigger this value should be. - ``ack_window``: Ack requests are handled separately and are done in batches. This parameters specifies how often ack requests will be made. Setting it to ``0.0`` will effectively disable batching. - ``num_tasks_per_consumer``: How many ``handle`` calls a worker can make until it blocks to wait for them to return. If you process messages independently from each other you should be good with the default value of ``1``. If you do something fancy (e.g. aggregate messages before processing them), you'll want a higher pool here. You can think of ``num_producers x num_tasks_per_consumer`` as an upper limit of how many messages can possibly be within your application state at any given moment. - ``enable_nack``: If enabled messages for which ``callback`` raised an exception will be explicitly nacked using ``modifyAckDeadline`` endpoint so they can be retried immediately. - ``nack_window``: Same as ``ack_window`` but for nack requests. Note that this method was built under the assumption that it is the main thread of your application. It may work just fine otherwise, but be aware that the usecase of running it in a background thread has not been extensively tested. As it is generally assumed to run in the foreground, it relies on task cancellation to shut itself down (ie. caused by process termination). To cancel it from a thread, you can send an ``asyncio.CancelledError`` event via ``Task.cancel()``: .. code-block:: python subscribe_task = asyncio.create_task(gcloud.aio.pubsub.subscribe(...)) # snip subscribe_task.cancel() Prometheus Metrics ~~~~~~~~~~~~~~~~~~ If you like pull-based metrics like Prometheus you will be pleased to know that the subscriber records Prometheus metrics in the form ``gcloud_aio_pubsub_``, which will have no effect if you don't use Prometheus to scrape app metrics: - ``subscriber_batch_size`` - [histogram] how many messages were pulled from the subscription in a single batch - ``subscriber_consume`` (labels: ``outcome = {'succeeded', 'cancelled', 'failed', 'failfast'}``) - [counter] a consume operation has completed with a given outcome - ``subscriber_consume_latency_seconds`` (labels: ``phase = {'receive', 'queueing', 'runtime'}``) - [histogram] how many seconds taken to receive a message, while waiting for processing, or to complete the callback - ``subscriber_batch_status`` (labels: ``component = {'acker', 'nacker'}, outcome = {'succeeded', 'failed'}``) - [counter] a batch has succeeded or failed to be acked or nacked - ``subscriber_messages_processed`` (labels: ``component = {'acker', 'nacker'}``) - [counter] the number of messages that were processed, either by being acked or nacked - ``subscriber_messages_received`` - [counter] the number of messages pulled from pubsub Metrics Agent (Deprecated) ~~~~~~~~~~~~~~~~~~~~~~~~~~ ``subscribe`` has also an optional ``metrics_client`` argument which will be removed in a future release. You can provide any metrics agent that implements the same interface as ``MetricsAgent`` (Datadog client will do ;) ) and get the following metrics: - ``pubsub.producer.batch`` - [histogram] actual size of a batch retrieved from pubsub. - ``pubsub.consumer.failfast`` - [increment] a message was dropped due to its lease being expired. - ``pubsub.consumer.latency.receive`` - [histogram] how many seconds it took for a message to reach handler after it was published. - ``pubsub.consumer.succeeded`` - [increment] ``handler`` call was successfull. - ``pubsub.consumer.failed`` - [increment] ``handler`` call raised an exception. - ``pubsub.consumer.latency.runtime`` - [histogram] ``handler`` execution time in seconds. - ``pubsub.acker.batch.failed`` - [increment] ack request failed. - ``pubsub.acker.batch`` - [histogram] actual number of messages that was acked in a single request. Publisher --------- The ``PublisherClient`` is a dead-simple alternative to the official Google Cloud Pub/Sub publisher client. The main design goal was to eliminate all the additional gRPC overhead implemented by the upstream client. If migrating between this library and the official one, the main difference is this: the ``gcloud-{aio,rest}-pubsub`` publisher's ``.publish()`` method *immediately* publishes the messages you've provided, rather than maintaining our own publishing queue, implementing batching and flow control, etc. If you're looking for a full-featured publishing library with all the bells and whistles built in, you may be interested in the upstream provider. If you're looking to manage your own batching / timeouts / retry / threads / etc, this library should be a bit easier to work with. .. code-block:: python from gcloud.aio.pubsub import PubsubMessage from gcloud.aio.pubsub import PublisherClient async with aiohttp.ClientSession() as session: client = PublisherClient(session=session) topic = client.topic_path('my-gcp-project', 'my-topic-name') messages = [ PubsubMessage(b'payload', attribute='value'), PubsubMessage(b'other payload', other_attribute='whatever', more_attributes='something else'), ] response = await client.publish(topic, messages) # response == {'messageIds': ['1', '2']} Emulators --------- For testing purposes, you may want to use ``gcloud-aio-pubsub`` along with a local Pubsub emulator. Setting the ``$PUBSUB_EMULATOR_HOST`` environment variable to the local address of your emulator should be enough to do the trick. For example, using the official Google Pubsub emulator: .. code-block:: console gcloud beta emulators pubsub start --host-port=0.0.0.0:8681 export PUBSUB_EMULATOR_HOST='0.0.0.0:8681' Any ``gcloud-aio-pubsub`` Publisher requests made with that environment variable set will query the emulator instead of the official GCS APIs. For easier ergonomics, you may be interested in `thekevjames/gcloud-pubsub-emulator`_. Customization ------------- This library mostly tries to stay agnostic of potential use-cases; as such, we do not implement any sort of retrying or other policies under the assumption that we wouldn't get things right for every user's situation. As such, we recommend configuring your own policies on an as-needed basis. The `backoff`_ library can make this quite straightforward! For example, you may find it useful to configure something like: .. code-block:: python class SubscriberClientWithBackoff(SubscriberClient): @backoff.on_exception(backoff.expo, aiohttp.ClientResponseError, max_tries=5, jitter=backoff.full_jitter) async def pull(self, *args: Any, **kwargs: Any): return await super().pull(*args, **kwargs) .. _backoff: https://pypi.org/project/backoff/ .. _thekevjames/gcloud-pubsub-emulator: https://github.com/TheKevJames/tools/tree/master/docker-gcloud-pubsub-emulator .. _endpoint: https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull#request-body Submodules ---------- .. toctree:: :titlesonly: :maxdepth: 1 metrics/index.rst metrics_agent/index.rst publisher_client/index.rst subscriber/index.rst subscriber_client/index.rst subscriber_message/index.rst utils/index.rst Package Contents ---------------- Classes ~~~~~~~ .. autoapisummary:: pubsub.PublisherClient pubsub.SubscriberClient pubsub.SubscriberMessage pubsub.PubsubMessage Attributes ~~~~~~~~~~ .. autoapisummary:: pubsub.__version__ .. py:class:: PublisherClient(*, service_file = None, session = None, token = None, api_root = None) .. py:attribute:: _api_root :type: str .. py:attribute:: _api_is_dev :type: bool .. py:method:: project_path(project) :staticmethod: .. py:method:: subscription_path(project, subscription) :classmethod: .. py:method:: topic_path(project, topic) :classmethod: .. py:method:: _headers() :async: .. py:method:: list_topics(project, query_params = None, *, session = None, timeout = 10) :async: List topics .. py:method:: create_topic(topic, body = None, *, session = None, timeout = 10) :async: Create topic. .. py:method:: delete_topic(topic, *, session = None, timeout = 10) :async: Delete topic. .. py:method:: publish(topic, messages, session = None, timeout = 10) :async: .. py:method:: close() :async: .. py:method:: __aenter__() :async: .. py:method:: __aexit__(*args) :async: .. py:class:: SubscriberClient(*, service_file = None, token = None, session = None, api_root = None) .. py:attribute:: _api_root :type: str .. py:attribute:: _api_is_dev :type: bool .. py:method:: project_path(project) :staticmethod: .. py:method:: subscription_path(project, subscription) :classmethod: .. py:method:: topic_path(project, topic) :classmethod: .. py:method:: _headers() :async: .. py:method:: create_subscription(subscription, topic, body = None, *, session = None, timeout = 10) :async: Create subscription. .. py:method:: patch_subscription(subscription, body, *, session = None, timeout = 10) :async: .. py:method:: delete_subscription(subscription, *, session = None, timeout = 10) :async: Delete subscription. .. py:method:: pull(subscription, max_messages, *, session = None, timeout = 30) :async: Pull messages from subscription .. py:method:: acknowledge(subscription, ack_ids, *, session = None, timeout = 10) :async: Acknowledge messages by ackIds .. py:method:: modify_ack_deadline(subscription, ack_ids, ack_deadline_seconds, *, session = None, timeout = 10) :async: Modify messages' ack deadline. Set ack deadline to 0 to nack messages. .. py:method:: get_subscription(subscription, *, session = None, timeout = 10) :async: Get Subscription .. py:method:: list_subscriptions(project, query_params = None, *, session = None, timeout = 10) :async: List subscriptions .. py:method:: close() :async: .. py:method:: __aenter__() :async: .. py:method:: __aexit__(*args) :async: .. py:class:: SubscriberMessage(ack_id, message_id, publish_time, data, attributes, delivery_attempt = None) .. py:method:: from_repr(received_message) :staticmethod: .. py:method:: to_repr() .. py:class:: PubsubMessage(data, ordering_key = '', **kwargs) .. py:method:: __repr__() Return repr(self). .. py:method:: to_repr() .. py:data:: __version__