Source code for slixmpp.plugins.xep_0077.register


# Slixmpp: The Slick XMPP Library
# Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
import logging
import ssl

from slixmpp.stanza import StreamFeatures, Iq
from slixmpp.xmlstream import register_stanza_plugin, JID
from slixmpp.xmlstream.handler import CoroutineCallback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.xep_0077 import stanza, Register, RegisterFeature


log = logging.getLogger(__name__)


[docs] class XEP_0077(BasePlugin): """ XEP-0077: In-Band Registration Events: :: user_register -- After succesful validation and add to the user store in api["user_validate"] user_unregister -- After succesful user removal in api["user_remove"] Config: :: form_fields are only form_instructions are only used for component registration in case api["make_registration_form"] is not overriden. API: :: user_get(jid, node, ifrom, iq) Returns a dict-like object containing `form_fields` for this user or None user_remove(jid, node, ifrom, iq) Removes a user or raise KeyError in case the user is not found in the user store make_registration_form(self, jid, node, ifrom, iq) Returns an iq reply to a registration form request, pre-filled and with <registered/> in case the requesting entity is already registered to us user_validate((self, jid, node, ifrom, registration) Add the user to the user store or raise ValueError(msg) if any problem is encountered msg is sent back to the XMPP client as an error message. """ name = 'xep_0077' description = 'XEP-0077: In-Band Registration' dependencies = {'xep_0004', 'xep_0066'} stanza = stanza default_config = { 'create_account': True, 'force_registration': False, 'order': 50, "form_fields": {"username", "password"}, "form_instructions": "Enter your credentials", } def plugin_init(self): register_stanza_plugin(StreamFeatures, RegisterFeature) register_stanza_plugin(Iq, Register) if self.xmpp.is_component: self.xmpp["xep_0030"].add_feature("jabber:iq:register") self.xmpp.register_handler( CoroutineCallback( "registration", StanzaPath("/iq/register"), self._handle_registration, ) ) self._user_store = {} self.api.register(self._user_get, "user_get") self.api.register(self._user_remove, "user_remove") self.api.register(self._make_registration_form, "make_registration_form") self.api.register(self._user_validate, "user_validate") else: self.xmpp.register_feature( "register", self._handle_register_feature, restart=False, order=self.order, ) register_stanza_plugin(Register, self.xmpp['xep_0004'].stanza.Form) register_stanza_plugin(Register, self.xmpp['xep_0066'].stanza.OOB) self.xmpp.add_event_handler('connected', self._force_registration) def plugin_end(self): if not self.xmpp.is_component: self.xmpp.unregister_feature('register', self.order) def _user_get(self, jid, node, ifrom, iq): return self._user_store.get(iq["from"].bare) def _user_remove(self, jid, node, ifrom, iq): return self._user_store.pop(iq["from"].bare) async def _make_registration_form(self, jid, node, ifrom, iq: Iq): reg = iq["register"] user = await self.api["user_get"](None, None, iq['from'], iq) if user is None: user = {} else: reg["registered"] = True reg["instructions"] = self.form_instructions for field in self.form_fields: data = user.get(field, "") if data: reg[field] = data else: # Add a blank field reg.add_field(field) reply = iq.reply() reply.set_payload(reg.xml) return reply def _user_validate(self, jid, node, ifrom, registration): self._user_store[ifrom.bare] = {key: registration[key] for key in self.form_fields} async def _handle_registration(self, iq: Iq): if iq["type"] == "get": await self._send_form(iq) elif iq["type"] == "set": if iq["register"]["remove"]: try: await self.api["user_remove"](None, None, iq["from"], iq) except KeyError: _send_error( iq, "404", "cancel", "item-not-found", "User not found", ) else: reply = iq.reply() reply.send() self.xmpp.event("user_unregister", iq) return for field in self.form_fields: if not iq["register"][field]: # Incomplete Registration _send_error( iq, "406", "modify", "not-acceptable", "Please fill in all fields.", ) return try: await self.api["user_validate"](None, None, iq["from"], iq["register"]) except ValueError as e: _send_error( iq, "406", "modify", "not-acceptable", e.args, ) else: reply = iq.reply() reply.send() self.xmpp.event("user_register", iq) async def _send_form(self, iq): reply = await self.api["make_registration_form"](None, None, iq["from"], iq) reply.send() def _force_registration(self, event): if self.force_registration: self.xmpp.add_filter('in', self._force_stream_feature) def _force_stream_feature(self, stanza): if isinstance(stanza, StreamFeatures): if not self.xmpp.disable_starttls: if 'starttls' not in self.xmpp.features: return stanza elif not isinstance(self.xmpp.socket, ssl.SSLSocket): return stanza if 'mechanisms' not in self.xmpp.features: log.debug('Forced adding in-band registration stream feature') stanza.enable('register') self.xmpp.del_filter('in', self._force_stream_feature) return stanza async def _handle_register_feature(self, features): if 'mechanisms' in self.xmpp.features: # We have already logged in with an account return False if self.create_account and self.xmpp.event_handled('register'): form = await self.get_registration() await self.xmpp.event_async('register', form) return True return False def get_registration(self, jid=None, ifrom=None, timeout=None, callback=None): iq = self.xmpp.Iq() iq['type'] = 'get' iq['to'] = jid iq['from'] = ifrom iq.enable('register') return iq.send(timeout=timeout, callback=callback) def cancel_registration(self, jid=None, ifrom=None, timeout=None, callback=None): iq = self.xmpp.Iq() iq['type'] = 'set' iq['to'] = jid iq['from'] = ifrom iq['register']['remove'] = True return iq.send(timeout=timeout, callback=callback) def change_password(self, password, jid=None, ifrom=None, timeout=None, callback=None): iq = self.xmpp.Iq() iq['type'] = 'set' iq['to'] = jid iq['from'] = ifrom if self.xmpp.is_component: ifrom = JID(ifrom) iq['register']['username'] = ifrom.user else: iq['register']['username'] = self.xmpp.boundjid.user iq['register']['password'] = password return iq.send(timeout=timeout, callback=callback)
def _send_error(iq, code, error_type, name, text=""): # It would be nice to raise XMPPError but the iq payload # should include the register info reply = iq.reply() reply.set_payload(iq["register"].xml) reply.error() reply["error"]["code"] = code reply["error"]["type"] = error_type reply["error"]["condition"] = name reply["error"]["text"] = text reply.send()