# Slixmpp: The Slick XMPP Library
# Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
import asyncio
import logging
import collections
from slixmpp.stanza import Message, Presence, Iq, StreamFeatures
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import Callback, Waiter
from slixmpp.xmlstream.matcher import MatchXPath, MatchMany
from slixmpp.plugins.base import BasePlugin
from slixmpp.plugins.xep_0198 import stanza
log = logging.getLogger(__name__)
MAX_SEQ = 2 ** 32
[docs]
class XEP_0198(BasePlugin):
"""
XEP-0198: Stream Management
"""
name = 'xep_0198'
description = 'XEP-0198: Stream Management'
dependencies = set()
stanza = stanza
default_config = {
#: The last ack number received from the server.
'last_ack': 0,
#: The number of stanzas to wait between sending ack requests to
#: the server. Setting this to ``1`` will send an ack request after
#: every sent stanza. Defaults to ``5``.
'window': 5,
#: The stream management ID for the stream. Knowing this value is
#: required in order to do stream resumption.
'sm_id': None,
#: A counter of handled incoming stanzas, mod 2^32.
'handled': 0,
#: A counter of unacked outgoing stanzas, mod 2^32.
'seq': 0,
#: Control whether or not the ability to resume the stream will be
#: requested when enabling stream management. Defaults to ``True``.
'allow_resume': True,
'order': 10100,
'resume_order': 9000
}
def plugin_init(self):
"""Start the XEP-0198 plugin."""
# Only enable stream management for non-components,
# since components do not yet perform feature negotiation.
if self.xmpp.is_component:
return
self.window_counter = self.window
self.enabled_in = False
self.enabled_out = False
self.unacked_queue = collections.deque()
register_stanza_plugin(StreamFeatures, stanza.StreamManagement)
self.xmpp.register_stanza(stanza.Enable)
self.xmpp.register_stanza(stanza.Enabled)
self.xmpp.register_stanza(stanza.Resume)
self.xmpp.register_stanza(stanza.Resumed)
self.xmpp.register_stanza(stanza.Ack)
self.xmpp.register_stanza(stanza.RequestAck)
# Register the feature twice because it may be ordered two
# different ways: enabling after binding and resumption
# before binding.
self.xmpp.register_feature('sm',
self._handle_sm_feature,
restart=True,
order=self.order)
self.xmpp.register_feature('sm',
self._handle_sm_feature,
restart=True,
order=self.resume_order)
self.xmpp.register_handler(
Callback('Stream Management Enabled',
MatchXPath(stanza.Enabled.tag_name()),
self._handle_enabled,
instream=True))
self.xmpp.register_handler(
Callback('Stream Management Resumed',
MatchXPath(stanza.Resumed.tag_name()),
self._handle_resumed,
instream=True))
self.xmpp.register_handler(
Callback('Stream Management Failed',
MatchXPath(stanza.Failed.tag_name()),
self._handle_failed,
instream=True))
self.xmpp.register_handler(
Callback('Stream Management Ack',
MatchXPath(stanza.Ack.tag_name()),
self._handle_ack,
instream=True))
self.xmpp.register_handler(
Callback('Stream Management Request Ack',
MatchXPath(stanza.RequestAck.tag_name()),
self._handle_request_ack,
instream=True))
self.xmpp.add_filter('in', self._handle_incoming)
self.xmpp.add_filter('out_sync', self._handle_outgoing)
self.xmpp.add_event_handler('disconnected', self.disconnected)
self.xmpp.add_event_handler('session_end', self.session_end)
def plugin_end(self):
if self.xmpp.is_component:
return
self.xmpp.unregister_feature('sm', self.order)
self.xmpp.unregister_feature('sm', self.resume_order)
self.xmpp.del_event_handler('disconnected', self.disconnected)
self.xmpp.del_event_handler('session_end', self.session_end)
self.xmpp.del_filter('in', self._handle_incoming)
self.xmpp.del_filter('out_sync', self._handle_outgoing)
self.xmpp.remove_handler('Stream Management Enabled')
self.xmpp.remove_handler('Stream Management Resumed')
self.xmpp.remove_handler('Stream Management Failed')
self.xmpp.remove_handler('Stream Management Ack')
self.xmpp.remove_handler('Stream Management Request Ack')
self.xmpp.remove_stanza(stanza.Enable)
self.xmpp.remove_stanza(stanza.Enabled)
self.xmpp.remove_stanza(stanza.Resume)
self.xmpp.remove_stanza(stanza.Resumed)
self.xmpp.remove_stanza(stanza.Ack)
self.xmpp.remove_stanza(stanza.RequestAck)
[docs]
def disconnected(self, event):
"""Reset enabled state until we can resume/reenable."""
log.debug("disconnected, disabling SM")
self.xmpp.event('sm_disabled', event)
self.enabled_in = False
self.enabled_out = False
[docs]
def session_end(self, event):
"""Reset stream management state."""
log.debug("session_end, disabling SM")
self.xmpp.event('sm_disabled', event)
self.enabled_in = False
self.enabled_out = False
self.unacked_queue.clear()
self.sm_id = None
self.handled = 0
self.seq = 0
self.last_ack = 0
[docs]
def send_ack(self):
"""Send the current ack count to the server."""
if not self.xmpp.transport:
log.debug('Disconnected: not sending ack')
return
ack = stanza.Ack(self.xmpp)
ack['h'] = self.handled
self.xmpp.send_raw(str(ack))
[docs]
def request_ack(self, e=None):
"""Request an ack from the server."""
log.debug("requesting ack")
req = stanza.RequestAck(self.xmpp)
self.xmpp.send_raw(str(req))
async def _handle_sm_feature(self, features):
"""
Enable or resume stream management.
If no SM-ID is stored, and resource binding has taken place,
stream management will be enabled.
If an SM-ID is known, and the server allows resumption, the
previous stream will be resumed.
"""
if 'stream_management' in self.xmpp.features:
# We've already negotiated stream management,
# so no need to do it again.
return False
if self.sm_id and self.allow_resume and 'bind' not in self.xmpp.features:
resume = stanza.Resume(self.xmpp)
resume['h'] = self.handled
resume['previd'] = self.sm_id
resume.send()
log.debug("resuming SM")
# Wait for a response before allowing stream feature processing
# to continue. The actual result processing will be done in the
# _handle_resumed() or _handle_failed() methods.
waiter = Waiter('resumed_or_failed',
MatchMany([
MatchXPath(stanza.Resumed.tag_name()),
MatchXPath(stanza.Failed.tag_name())]))
self.xmpp.register_handler(waiter)
result = await waiter.wait()
if result is not None and result.name == 'resumed':
return True
self.xmpp.event("session_end")
if 'bind' in self.xmpp.features:
enable = stanza.Enable(self.xmpp)
enable['resume'] = self.allow_resume
enable.send()
log.debug("enabling SM")
waiter = Waiter('enabled_or_failed',
MatchMany([
MatchXPath(stanza.Enabled.tag_name()),
MatchXPath(stanza.Failed.tag_name())]))
self.xmpp.register_handler(waiter)
result = await waiter.wait()
return False
def _handle_enabled(self, stanza):
"""Save the SM-ID, if provided.
Raises an :term:`sm_enabled` event.
"""
self.xmpp.features.add('stream_management')
if stanza['id']:
self.sm_id = stanza['id']
self.enabled_in = True
self.handled = 0
self.xmpp.event('sm_enabled', stanza)
self.xmpp.end_session_on_disconnect = False
def _handle_resumed(self, stanza):
"""Finish resuming a stream by resending unacked stanzas.
Raises a :term:`session_resumed` event.
"""
self.xmpp.features.add('stream_management')
self.enabled_in = True
self._handle_ack(stanza)
for id, stanza in self.unacked_queue:
self.xmpp.send(stanza, use_filters=False)
self.xmpp.event('session_resumed', stanza)
self.xmpp.end_session_on_disconnect = False
def _handle_failed(self, stanza):
"""
Disable and reset any features used since stream management was
requested (tracked stanzas may have been sent during the interval
between the enable request and the enabled response).
Raises an :term:`sm_failed` event.
"""
self.enabled_in = False
self.enabled_out = False
self.unacked_queue.clear()
self.xmpp.event('sm_failed', stanza)
def _handle_ack(self, ack):
"""Process a server ack by freeing acked stanzas from the queue.
Raises a :term:`stanza_acked` event for each acked stanza.
"""
if ack['h'] == self.last_ack:
return
num_acked = (ack['h'] - self.last_ack) % MAX_SEQ
num_unacked = len(self.unacked_queue)
log.debug("Ack: %s, Last Ack: %s, " + \
"Unacked: %s, Num Acked: %s, " + \
"Remaining: %s",
ack['h'],
self.last_ack,
num_unacked,
num_acked,
num_unacked - num_acked)
if num_acked > len(self.unacked_queue) or num_acked < 0:
log.error('Inconsistent sequence numbers from the server,'
' ignoring and replacing ours with them.')
num_acked = len(self.unacked_queue)
for x in range(num_acked):
seq, stanza = self.unacked_queue.popleft()
self.xmpp.event('stanza_acked', stanza)
self.last_ack = ack['h']
def _handle_request_ack(self, req):
"""Handle an ack request by sending an ack."""
self.send_ack()
def _handle_incoming(self, stanza):
"""Increment the handled counter for each inbound stanza."""
if not self.enabled_in:
return stanza
if isinstance(stanza, (Message, Presence, Iq)):
# Sequence numbers are mod 2^32
self.handled = (self.handled + 1) % MAX_SEQ
return stanza
def _handle_outgoing(self, stanza):
"""Store outgoing stanzas in a queue to be acked."""
from slixmpp.plugins.xep_0198 import stanza as st
if isinstance(stanza, (st.Enable, st.Resume)):
self.enabled_out = True
self.unacked_queue.clear()
log.debug("enabling outgoing SM: %s" % stanza)
if not self.enabled_out:
return stanza
if isinstance(stanza, (Message, Presence, Iq)):
seq = None
# Sequence numbers are mod 2^32
self.seq = (self.seq + 1) % MAX_SEQ
seq = self.seq
self.unacked_queue.append((seq, stanza))
self.window_counter -= 1
if self.window_counter == 0:
self.window_counter = self.window
self.request_ack()
return stanza