Source code for slixmpp.plugins.xep_0060.stanza.pubsub_event
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2011 Nathanael C. Fritz
This file is part of Slixmpp.
See the file LICENSE for copying permission.
"""
import datetime as dt
from slixmpp import Message
from slixmpp.xmlstream import register_stanza_plugin, ElementBase, ET, JID
from slixmpp.plugins.xep_0004 import Form
from slixmpp.plugins import xep_0082
[docs]class Event(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'event'
plugin_attrib = 'pubsub_event'
interfaces = set()
[docs]class EventItem(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'item'
plugin_attrib = name
interfaces = {'id', 'payload', 'node', 'publisher'}
[docs] def set_payload(self, value):
self.xml.append(value)
[docs] def get_payload(self):
children = list(self.xml)
if len(children) > 0:
return children[0]
[docs] def del_payload(self):
for child in self.xml:
self.xml.remove(child)
[docs]class EventRetract(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'retract'
plugin_attrib = name
interfaces = {'id'}
[docs]class EventItems(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'items'
plugin_attrib = name
interfaces = {'node'}
[docs]class EventCollection(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'collection'
plugin_attrib = name
interfaces = {'node'}
[docs]class EventAssociate(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'associate'
plugin_attrib = name
interfaces = {'node'}
[docs]class EventDisassociate(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'disassociate'
plugin_attrib = name
interfaces = {'node'}
[docs]class EventConfiguration(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'configuration'
plugin_attrib = name
interfaces = {'node'}
[docs]class EventPurge(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'purge'
plugin_attrib = name
interfaces = {'node'}
[docs]class EventDelete(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'delete'
plugin_attrib = name
interfaces = {'node', 'redirect'}
[docs] def set_redirect(self, uri):
del self['redirect']
redirect = ET.Element('{%s}redirect' % self.namespace)
redirect.attrib['uri'] = uri
self.xml.append(redirect)
[docs] def get_redirect(self):
redirect = self.xml.find('{%s}redirect' % self.namespace)
if redirect is not None:
return redirect.attrib.get('uri', '')
return ''
[docs] def del_redirect(self):
redirect = self.xml.find('{%s}redirect' % self.namespace)
if redirect is not None:
self.xml.remove(redirect)
[docs]class EventSubscription(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'subscription'
plugin_attrib = name
interfaces = {'node', 'expiry', 'jid', 'subid', 'subscription'}
[docs] def get_expiry(self):
expiry = self._get_attr('expiry')
if expiry.lower() == 'presence':
return expiry
return xep_0082.parse(expiry)
[docs] def set_expiry(self, value):
if isinstance(value, dt.datetime):
value = xep_0082.format_datetime(value)
self._set_attr('expiry', value)
[docs] def set_jid(self, value):
self._set_attr('jid', str(value))
[docs] def get_jid(self):
return JID(self._get_attr('jid'))
register_stanza_plugin(Message, Event)
register_stanza_plugin(Event, EventCollection)
register_stanza_plugin(Event, EventConfiguration)
register_stanza_plugin(Event, EventPurge)
register_stanza_plugin(Event, EventDelete)
register_stanza_plugin(Event, EventItems)
register_stanza_plugin(Event, EventSubscription)
register_stanza_plugin(EventCollection, EventAssociate)
register_stanza_plugin(EventCollection, EventDisassociate)
register_stanza_plugin(EventConfiguration, Form)
register_stanza_plugin(EventItems, EventItem, iterable=True)
register_stanza_plugin(EventItems, EventRetract, iterable=True)