slixmpp.plugins.xep_0198.stream_management

1.6 Documentation

Contents

Source code for slixmpp.plugins.xep_0198.stream_management

"""
    Slixmpp: The Slick XMPP Library
    Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
    This file is part of Slixmpp.

    See the file LICENSE for copying permission.
"""

import asyncio
import logging
import collections

from slixmpp.stanza import Message, Presence, Iq, StreamFeatures
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import Callback, Waiter
from slixmpp.xmlstream.matcher import MatchXPath, MatchMany
from slixmpp.plugins.base import BasePlugin
from slixmpp.plugins.xep_0198 import stanza


log = logging.getLogger(__name__)


MAX_SEQ = 2 ** 32


[docs]class XEP_0198(BasePlugin): """ XEP-0198: Stream Management """ name = 'xep_0198' description = 'XEP-0198: Stream Management' dependencies = set() stanza = stanza default_config = { #: The last ack number received from the server. 'last_ack': 0, #: The number of stanzas to wait between sending ack requests to #: the server. Setting this to ``1`` will send an ack request after #: every sent stanza. Defaults to ``5``. 'window': 5, #: The stream management ID for the stream. Knowing this value is #: required in order to do stream resumption. 'sm_id': None, #: A counter of handled incoming stanzas, mod 2^32. 'handled': 0, #: A counter of unacked outgoing stanzas, mod 2^32. 'seq': 0, #: Control whether or not the ability to resume the stream will be #: requested when enabling stream management. Defaults to ``True``. 'allow_resume': True, 'order': 10100, 'resume_order': 9000 } def plugin_init(self): """Start the XEP-0198 plugin.""" # Only enable stream management for non-components, # since components do not yet perform feature negotiation. if self.xmpp.is_component: return self.window_counter = self.window self.enabled_in = False self.enabled_out = False self.unacked_queue = collections.deque() register_stanza_plugin(StreamFeatures, stanza.StreamManagement) self.xmpp.register_stanza(stanza.Enable) self.xmpp.register_stanza(stanza.Enabled) self.xmpp.register_stanza(stanza.Resume) self.xmpp.register_stanza(stanza.Resumed) self.xmpp.register_stanza(stanza.Ack) self.xmpp.register_stanza(stanza.RequestAck) # Register the feature twice because it may be ordered two # different ways: enabling after binding and resumption # before binding. self.xmpp.register_feature('sm', self._handle_sm_feature, restart=True, order=self.order) self.xmpp.register_feature('sm', self._handle_sm_feature, restart=True, order=self.resume_order) self.xmpp.register_handler( Callback('Stream Management Enabled', MatchXPath(stanza.Enabled.tag_name()), self._handle_enabled, instream=True)) self.xmpp.register_handler( Callback('Stream Management Resumed', MatchXPath(stanza.Resumed.tag_name()), self._handle_resumed, instream=True)) self.xmpp.register_handler( Callback('Stream Management Failed', MatchXPath(stanza.Failed.tag_name()), self._handle_failed, instream=True)) self.xmpp.register_handler( Callback('Stream Management Ack', MatchXPath(stanza.Ack.tag_name()), self._handle_ack, instream=True)) self.xmpp.register_handler( Callback('Stream Management Request Ack', MatchXPath(stanza.RequestAck.tag_name()), self._handle_request_ack, instream=True)) self.xmpp.add_filter('in', self._handle_incoming) self.xmpp.add_filter('out_sync', self._handle_outgoing) self.xmpp.add_event_handler('disconnected', self.disconnected) self.xmpp.add_event_handler('session_end', self.session_end) def plugin_end(self): if self.xmpp.is_component: return self.xmpp.unregister_feature('sm', self.order) self.xmpp.unregister_feature('sm', self.resume_order) self.xmpp.del_event_handler('disconnected', self.disconnected) self.xmpp.del_event_handler('session_end', self.session_end) self.xmpp.del_filter('in', self._handle_incoming) self.xmpp.del_filter('out_sync', self._handle_outgoing) self.xmpp.remove_handler('Stream Management Enabled') self.xmpp.remove_handler('Stream Management Resumed') self.xmpp.remove_handler('Stream Management Failed') self.xmpp.remove_handler('Stream Management Ack') self.xmpp.remove_handler('Stream Management Request Ack') self.xmpp.remove_stanza(stanza.Enable) self.xmpp.remove_stanza(stanza.Enabled) self.xmpp.remove_stanza(stanza.Resume) self.xmpp.remove_stanza(stanza.Resumed) self.xmpp.remove_stanza(stanza.Ack) self.xmpp.remove_stanza(stanza.RequestAck)
[docs] def disconnected(self, event): """Reset enabled state until we can resume/reenable.""" log.debug("disconnected, disabling SM") self.xmpp.event('sm_disabled', event) self.enabled_in = False self.enabled_out = False
[docs] def session_end(self, event): """Reset stream management state.""" log.debug("session_end, disabling SM") self.xmpp.event('sm_disabled', event) self.enabled_in = False self.enabled_out = False self.unacked_queue.clear() self.sm_id = None self.handled = 0 self.seq = 0 self.last_ack = 0
[docs] def send_ack(self): """Send the current ack count to the server.""" ack = stanza.Ack(self.xmpp) ack['h'] = self.handled self.xmpp.send_raw(str(ack))
[docs] def request_ack(self, e=None): """Request an ack from the server.""" log.debug("requesting ack") req = stanza.RequestAck(self.xmpp) self.xmpp.send_raw(str(req))
async def _handle_sm_feature(self, features): """ Enable or resume stream management. If no SM-ID is stored, and resource binding has taken place, stream management will be enabled. If an SM-ID is known, and the server allows resumption, the previous stream will be resumed. """ if 'stream_management' in self.xmpp.features: # We've already negotiated stream management, # so no need to do it again. return False if not self.sm_id: if 'bind' in self.xmpp.features: enable = stanza.Enable(self.xmpp) enable['resume'] = self.allow_resume enable.send() log.debug("enabling SM") waiter = Waiter('enabled_or_failed', MatchMany([ MatchXPath(stanza.Enabled.tag_name()), MatchXPath(stanza.Failed.tag_name())])) self.xmpp.register_handler(waiter) result = await waiter.wait() elif self.sm_id and self.allow_resume and 'bind' not in self.xmpp.features: resume = stanza.Resume(self.xmpp) resume['h'] = self.handled resume['previd'] = self.sm_id resume.send() log.debug("resuming SM") # Wait for a response before allowing stream feature processing # to continue. The actual result processing will be done in the # _handle_resumed() or _handle_failed() methods. waiter = Waiter('resumed_or_failed', MatchMany([ MatchXPath(stanza.Resumed.tag_name()), MatchXPath(stanza.Failed.tag_name())])) self.xmpp.register_handler(waiter) result = await waiter.wait() if result is not None and result.name == 'resumed': return True return False def _handle_enabled(self, stanza): """Save the SM-ID, if provided. Raises an :term:`sm_enabled` event. """ self.xmpp.features.add('stream_management') if stanza['id']: self.sm_id = stanza['id'] self.enabled_in = True self.handled = 0 self.xmpp.event('sm_enabled', stanza) self.xmpp.end_session_on_disconnect = False def _handle_resumed(self, stanza): """Finish resuming a stream by resending unacked stanzas. Raises a :term:`session_resumed` event. """ self.xmpp.features.add('stream_management') self.enabled_in = True self._handle_ack(stanza) for id, stanza in self.unacked_queue: self.xmpp.send(stanza, use_filters=False) self.xmpp.event('session_resumed', stanza) self.xmpp.end_session_on_disconnect = False def _handle_failed(self, stanza): """ Disable and reset any features used since stream management was requested (tracked stanzas may have been sent during the interval between the enable request and the enabled response). Raises an :term:`sm_failed` event. """ self.enabled_in = False self.enabled_out = False self.unacked_queue.clear() self.xmpp.event('sm_failed', stanza) def _handle_ack(self, ack): """Process a server ack by freeing acked stanzas from the queue. Raises a :term:`stanza_acked` event for each acked stanza. """ if ack['h'] == self.last_ack: return num_acked = (ack['h'] - self.last_ack) % MAX_SEQ num_unacked = len(self.unacked_queue) log.debug("Ack: %s, Last Ack: %s, " + \ "Unacked: %s, Num Acked: %s, " + \ "Remaining: %s", ack['h'], self.last_ack, num_unacked, num_acked, num_unacked - num_acked) if num_acked > len(self.unacked_queue) or num_acked < 0: log.error('Inconsistent sequence numbers from the server,' ' ignoring and replacing ours with them.') num_acked = len(self.unacked_queue) for x in range(num_acked): seq, stanza = self.unacked_queue.popleft() self.xmpp.event('stanza_acked', stanza) self.last_ack = ack['h'] def _handle_request_ack(self, req): """Handle an ack request by sending an ack.""" self.send_ack() def _handle_incoming(self, stanza): """Increment the handled counter for each inbound stanza.""" if not self.enabled_in: return stanza if isinstance(stanza, (Message, Presence, Iq)): # Sequence numbers are mod 2^32 self.handled = (self.handled + 1) % MAX_SEQ return stanza def _handle_outgoing(self, stanza): """Store outgoing stanzas in a queue to be acked.""" from slixmpp.plugins.xep_0198 import stanza as st if isinstance(stanza, (st.Enable, st.Resume)): self.enabled_out = True self.unacked_queue.clear() log.debug("enabling outgoing SM: %s" % stanza) if not self.enabled_out: return stanza if isinstance(stanza, (Message, Presence, Iq)): seq = None # Sequence numbers are mod 2^32 self.seq = (self.seq + 1) % MAX_SEQ seq = self.seq self.unacked_queue.append((seq, stanza)) self.window_counter -= 1 if self.window_counter == 0: self.window_counter = self.window self.request_ack() return stanza

Contents