Source code for slixmpp.plugins.xep_0060.stanza.pubsub_event


# Slixmpp: The Slick XMPP Library
# Copyright (C) 2011  Nathanael C. Fritz
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
import datetime as dt

from slixmpp import Message
from slixmpp.xmlstream import register_stanza_plugin, ElementBase, ET, JID
from slixmpp.plugins.xep_0004 import Form
from slixmpp.plugins import xep_0082


[docs] class Event(ElementBase): namespace = 'http://jabber.org/protocol/pubsub#event' name = 'event' plugin_attrib = 'pubsub_event' interfaces = set()
[docs] class EventItem(ElementBase): namespace = 'http://jabber.org/protocol/pubsub#event' name = 'item' plugin_attrib = name interfaces = {'id', 'payload', 'node', 'publisher'}
[docs] def set_payload(self, value): self.xml.append(value)
[docs] def get_payload(self): children = list(self.xml) if len(children) > 0: return children[0]
[docs] def del_payload(self): for child in self.xml: self.xml.remove(child)
[docs] class EventRetract(ElementBase): namespace = 'http://jabber.org/protocol/pubsub#event' name = 'retract' plugin_attrib = name interfaces = {'id'}
[docs] class EventItems(ElementBase): namespace = 'http://jabber.org/protocol/pubsub#event' name = 'items' plugin_attrib = name interfaces = {'node'}
[docs] class EventCollection(ElementBase): namespace = 'http://jabber.org/protocol/pubsub#event' name = 'collection' plugin_attrib = name interfaces = {'node'}
[docs] class EventAssociate(ElementBase): namespace = 'http://jabber.org/protocol/pubsub#event' name = 'associate' plugin_attrib = name interfaces = {'node'}
[docs] class EventDisassociate(ElementBase): namespace = 'http://jabber.org/protocol/pubsub#event' name = 'disassociate' plugin_attrib = name interfaces = {'node'}
[docs] class EventConfiguration(ElementBase): namespace = 'http://jabber.org/protocol/pubsub#event' name = 'configuration' plugin_attrib = name interfaces = {'node'}
[docs] class EventPurge(ElementBase): namespace = 'http://jabber.org/protocol/pubsub#event' name = 'purge' plugin_attrib = name interfaces = {'node'}
[docs] class EventDelete(ElementBase): namespace = 'http://jabber.org/protocol/pubsub#event' name = 'delete' plugin_attrib = name interfaces = {'node', 'redirect'}
[docs] def set_redirect(self, uri): del self['redirect'] redirect = ET.Element('{%s}redirect' % self.namespace) redirect.attrib['uri'] = uri self.xml.append(redirect)
[docs] def get_redirect(self): redirect = self.xml.find('{%s}redirect' % self.namespace) if redirect is not None: return redirect.attrib.get('uri', '') return ''
[docs] def del_redirect(self): redirect = self.xml.find('{%s}redirect' % self.namespace) if redirect is not None: self.xml.remove(redirect)
[docs] class EventSubscription(ElementBase): namespace = 'http://jabber.org/protocol/pubsub#event' name = 'subscription' plugin_attrib = name interfaces = {'node', 'expiry', 'jid', 'subid', 'subscription'}
[docs] def get_expiry(self): expiry = self._get_attr('expiry') if expiry.lower() == 'presence': return expiry return xep_0082.parse(expiry)
[docs] def set_expiry(self, value): if isinstance(value, dt.datetime): value = xep_0082.format_datetime(value) self._set_attr('expiry', value)
[docs] def set_jid(self, value): self._set_attr('jid', str(value))
[docs] def get_jid(self): return JID(self._get_attr('jid'))
register_stanza_plugin(Message, Event) register_stanza_plugin(Event, EventCollection) register_stanza_plugin(Event, EventConfiguration) register_stanza_plugin(Event, EventPurge) register_stanza_plugin(Event, EventDelete) register_stanza_plugin(Event, EventItems) register_stanza_plugin(Event, EventSubscription) register_stanza_plugin(EventCollection, EventAssociate) register_stanza_plugin(EventCollection, EventDisassociate) register_stanza_plugin(EventConfiguration, Form) register_stanza_plugin(EventItems, EventItem, iterable=True) register_stanza_plugin(EventItems, EventRetract, iterable=True)