+from urllib.parse import urlunsplit
+
from django.conf import settings
-from django.contrib.auth.models import User
-from django.contrib.auth.password_validation import validate_password
+from django.contrib.auth.forms import (
+ AuthenticationForm as DjangoAuthenticationForm, BaseUserCreationForm
+)
+from django.contrib.auth.tokens import default_token_generator
+from django.contrib.sites.shortcuts import get_current_site
from django.core.exceptions import ValidationError
-from django.forms import CharField, EmailField, ModelForm
-from django.forms.widgets import PasswordInput
+from django.db.models import Q
+from django.forms import EmailField, Form
+from django.forms.widgets import EmailInput
+from django.urls import reverse
+from django.utils.encoding import force_bytes
+from django.utils.http import urlsafe_base64_encode
+from django.utils.translation import gettext_lazy as _
-from chat.utils import build_url
+from .models import User
-class RegisterForm(ModelForm):
+class RegisterForm(BaseUserCreationForm):
email = EmailField(required=True)
- password = CharField(widget=PasswordInput, validators=[validate_password])
- password_again = CharField(widget=PasswordInput)
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- def clean(self):
- cleaned_data = super().clean()
- if cleaned_data["password"] != cleaned_data["password_again"]:
- raise ValidationError("Passwords did not match!")
- return cleaned_data
def save(self, commit=True):
if not settings.TRUST_USER_REGISTRATIONS:
self.instance.is_active = False
self.instance.last_login = None
- self.instance.set_password(self.instance.password)
return super().save(commit)
class Meta:
model = User
- fields = ['email', 'username', 'first_name', 'last_name', 'password']
+ fields = ['email', 'username', 'first_name', 'last_name']
+
+
+class PasswordResetForm(Form):
+ email = EmailField(
+ label="Email",
+ max_length=254,
+ widget=EmailInput(attrs={"autocomplete": "email"}),
+ )
+
+ def save(self, request):
+ site = get_current_site(request)
+ users = User.objects.filter(
+ ~Q(password=""),
+ last_login__isnull=False,
+ email__iexact=self.cleaned_data["email"],
+ is_active=True,
+ )
+ for user in users:
+ url = urlunsplit(
+ (
+ "http" if not request.is_secure() else "https",
+ site.domain,
+ reverse(
+ "chat-reset-password-token",
+ kwargs={
+ "uidb64": urlsafe_base64_encode(force_bytes(user.pk)),
+ "token": default_token_generator.make_token(user),
+ },
+ ),
+ None,
+ None,
+ )
+ )
+ user.email_user(
+ f"Password reset on {site.name}",
+ (
+ "You're receiving this email because you requested a password "
+ f"reset for your user account at {site.name}.\n\n"
+ "Please go to the following page and choose a new password:\n\n"
+ f"{url}\n"
+ f"Your username, in case you've forgotten: {user}\n\n"
+ "Thanks for using our site!\n"
+ f"The {site.name} team"
+ )
+ )
+
+
+class AuthenticationForm(DjangoAuthenticationForm):
+ error_messages = {
+ **DjangoAuthenticationForm.error_messages,
+ "unconfirmed": _("This account's email address is not confirmed."),
+ }
+
+ def confirm_login_allowed(self, user):
+ super().confirm_login_allowed(user)
+ if user.last_login is None:
+ raise ValidationError(
+ self.error_messages["unconfirmed"],
+ code="unconfirmed",
+ )
--- /dev/null
+from asyncio import new_event_loop
+from logging import getLogger
+
+from django.core.management import BaseCommand
+from django.db import connection
+
+from chat.websocket import listen_notify_handler
+
+logger = getLogger(__name__)
+
+
+class Command(BaseCommand):
+ def handle(self, **options):
+ loop = new_event_loop()
+ connection.connect()
+ conn = connection.connection
+ with listen_notify_handler(conn, print, loop):
+ loop.run_forever()
import os
-from django.core.management import BaseCommand, call_command
+from django.core.management import BaseCommand
from uvicorn.main import main
class Command(BaseCommand):
def run_from_argv(self, argv):
- call_command("collectstatic", "--noinput", "-v0")
assert argv[1] == "runserver"
os.environ.setdefault("UVICORN_APP", "chat.asgi:get_asgi_application")
argv[:2] = (f"{argv[0]} {argv[1]}",)
-# Generated by Django 5.1.1 on 2024-09-11 10:09
+# Generated by Django 5.1.1 on 2024-09-30 09:06
+import django.contrib.auth.models
+import django.contrib.auth.validators
import django.db.models.deletion
+import django.utils.timezone
+import pgtrigger.compiler
+import pgtrigger.migrations
from django.conf import settings
from django.db import migrations, models
initial = True
dependencies = [
- migrations.swappable_dependency(settings.AUTH_USER_MODEL),
+ ('auth', '0012_alter_user_first_name_max_length'),
]
operations = [
name='Channel',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
- ('users', models.ManyToManyField(related_name='channels', to=settings.AUTH_USER_MODEL)),
+ ('name', models.CharField(max_length=256, unique=True)),
+ ],
+ ),
+ migrations.CreateModel(
+ name='User',
+ fields=[
+ ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('password', models.CharField(max_length=128, verbose_name='password')),
+ ('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
+ ('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
+ ('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
+ ('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
+ ('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
+ ('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
+ ('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
+ ('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
+ ('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
+ ('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
+ ('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
+ ],
+ managers=[
+ ('objects', django.contrib.auth.models.UserManager()),
],
),
migrations.CreateModel(
('ts', models.DateTimeField(auto_now=True)),
('text', models.TextField()),
('channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='chat.channel')),
+ ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
],
),
+ migrations.CreateModel(
+ name='ChannelUser',
+ fields=[
+ ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='chat.channel')),
+ ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
+ ],
+ ),
+ migrations.AddField(
+ model_name='channel',
+ name='users',
+ field=models.ManyToManyField(related_name='channels', through='chat.ChannelUser', to=settings.AUTH_USER_MODEL),
+ ),
migrations.CreateModel(
name='PrivateMessage',
fields=[
('sender', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='pms_sent', to=settings.AUTH_USER_MODEL)),
],
),
+ pgtrigger.migrations.AddTrigger(
+ model_name='user',
+ trigger=pgtrigger.compiler.Trigger(name='pg_notify_user', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","old":{"id":\' || OLD."id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","new":{"id":\' || NEW."id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ', hash='2f48ed2d25af6836e62e7506ebb833638d21f225', operation='DELETE OR INSERT OR UPDATE', pgid='pgtrigger_pg_notify_user_152e9', table='chat_user', when='AFTER')),
+ ),
+ pgtrigger.migrations.AddTrigger(
+ model_name='user',
+ trigger=pgtrigger.compiler.Trigger(name='pg_notify_user_truncate', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME || \'"}\'\n );\n RETURN NULL;\n ', hash='58944d2a6ef65e1fe00c504ca2488854a9ed2989', level='STATEMENT', operation='TRUNCATE', pgid='pgtrigger_pg_notify_user_truncate_b8c18', table='chat_user', when='AFTER')),
+ ),
+ pgtrigger.migrations.AddTrigger(
+ model_name='channelmessage',
+ trigger=pgtrigger.compiler.Trigger(name='pg_notify_channelmessage', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","old":{"id":\' || OLD."id" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","new":{"id":\' || NEW."id" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ', hash='8839d39c99595353dad9ba1de4805be117880a3b', operation='DELETE OR INSERT OR UPDATE', pgid='pgtrigger_pg_notify_channelmessage_d2b2e', table='chat_channelmessage', when='AFTER')),
+ ),
+ pgtrigger.migrations.AddTrigger(
+ model_name='channelmessage',
+ trigger=pgtrigger.compiler.Trigger(name='pg_notify_channelmessage_truncate', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME || \'"}\'\n );\n RETURN NULL;\n ', hash='8f1c4f12257239f00ee73bfd21b226a25a2c6a16', level='STATEMENT', operation='TRUNCATE', pgid='pgtrigger_pg_notify_channelmessage_truncate_02c45', table='chat_channelmessage', when='AFTER')),
+ ),
+ pgtrigger.migrations.AddTrigger(
+ model_name='channeluser',
+ trigger=pgtrigger.compiler.Trigger(name='pg_notify_channeluser', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","old":{"id":\' || OLD."id" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","new":{"id":\' || NEW."id" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ', hash='e919ee13216581d3895b820bfef92e17397b7c1a', operation='DELETE OR INSERT OR UPDATE', pgid='pgtrigger_pg_notify_channeluser_f01cc', table='chat_channeluser', when='AFTER')),
+ ),
+ pgtrigger.migrations.AddTrigger(
+ model_name='channeluser',
+ trigger=pgtrigger.compiler.Trigger(name='pg_notify_channeluser_truncate', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME || \'"}\'\n );\n RETURN NULL;\n ', hash='7eee59ce12c198a1144e9bd89914eb0845bee5f5', level='STATEMENT', operation='TRUNCATE', pgid='pgtrigger_pg_notify_channeluser_truncate_5ae88', table='chat_channeluser', when='AFTER')),
+ ),
+ pgtrigger.migrations.AddTrigger(
+ model_name='channel',
+ trigger=pgtrigger.compiler.Trigger(name='pg_notify_channel', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","old":{"id":\' || OLD."id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","new":{"id":\' || NEW."id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ', hash='07e8b07cb0612a98869139e71cda37635e0d4fda', operation='DELETE OR INSERT OR UPDATE', pgid='pgtrigger_pg_notify_channel_fab0c', table='chat_channel', when='AFTER')),
+ ),
+ pgtrigger.migrations.AddTrigger(
+ model_name='channel',
+ trigger=pgtrigger.compiler.Trigger(name='pg_notify_channel_truncate', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME || \'"}\'\n );\n RETURN NULL;\n ', hash='0f2df778b97a70d4e3e883b0c80430e8d4ef194e', level='STATEMENT', operation='TRUNCATE', pgid='pgtrigger_pg_notify_channel_truncate_bcd8a', table='chat_channel', when='AFTER')),
+ ),
+ pgtrigger.migrations.AddTrigger(
+ model_name='privatemessage',
+ trigger=pgtrigger.compiler.Trigger(name='pg_notify_privatemessage', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","old":{"id":\' || OLD."id" || \',"sender_id":\' || OLD."sender_id" || \',"recipient_id":\' || OLD."recipient_id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","new":{"id":\' || NEW."id" || \',"sender_id":\' || NEW."sender_id" || \',"recipient_id":\' || NEW."recipient_id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ', hash='5b1db355e28bdcbe3440d36e8da1709c7cba8156', operation='DELETE OR INSERT OR UPDATE', pgid='pgtrigger_pg_notify_privatemessage_92534', table='chat_privatemessage', when='AFTER')),
+ ),
+ pgtrigger.migrations.AddTrigger(
+ model_name='privatemessage',
+ trigger=pgtrigger.compiler.Trigger(name='pg_notify_privatemessage_truncate', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME || \'"}\'\n );\n RETURN NULL;\n ', hash='edd1292c2830959599e3bec829029206726381d6', level='STATEMENT', operation='TRUNCATE', pgid='pgtrigger_pg_notify_privatemessage_truncate_e9186', table='chat_privatemessage', when='AFTER')),
+ ),
]
from django.db.models import (
- CASCADE, DateTimeField, ForeignKey, ManyToManyField, Model, TextField
+ CASCADE, CharField, DateTimeField, ForeignKey, ManyToManyField, Model, TextField
)
-from django.contrib.auth.models import User
+from django.conf import settings
+from django.contrib.auth.models import AbstractUser
+from .triggers import triggers_for_table
+
+
+class User(AbstractUser):
+ class Meta:
+ triggers = [*triggers_for_table(settings.PG_NOTIFY_CHANNEL, "User", ("id",))]
+
+ def is_privileged(self):
+ return self.is_staff or self.is_superuser
class PrivateMessage(Model):
ts = DateTimeField(auto_now=True)
text = TextField()
+ class Meta:
+ triggers = [
+ *triggers_for_table(
+ settings.PG_NOTIFY_CHANNEL,
+ "PrivateMessage",
+ ("id", "sender_id", "recipient_id"),
+ )
+ ]
+
class Channel(Model):
- users = ManyToManyField(User, related_name="channels")
+ name = CharField(max_length=256, unique=True)
+ users = ManyToManyField(User, related_name="channels", through="ChannelUser")
+
+ class Meta:
+ triggers = [
+ *triggers_for_table(settings.PG_NOTIFY_CHANNEL, "Channel", ("id",))
+ ]
+
+
+class ChannelUser(Model):
+ user = ForeignKey(User, on_delete=CASCADE)
+ channel = ForeignKey(Channel, on_delete=CASCADE)
+
+ class Meta:
+ triggers = [
+ *triggers_for_table(
+ settings.PG_NOTIFY_CHANNEL,
+ "ChannelUser",
+ ("id", "user_id", "channel_id"),
+ )
+ ]
class ChannelMessage(Model):
+ user = ForeignKey(User, on_delete=CASCADE)
channel = ForeignKey(Channel, on_delete=CASCADE)
ts = DateTimeField(auto_now=True)
text = TextField()
+
+ class Meta:
+ triggers = [
+ *triggers_for_table(
+ settings.PG_NOTIFY_CHANNEL,
+ "ChannelMessage",
+ ("id", "user_id", "channel_id"),
+ )
+ ]
--- /dev/null
+import json
+from typing import Type
+from urllib.parse import urlunsplit
+
+from django.conf import settings
+from django.contrib.auth.mixins import LoginRequiredMixin
+from django.db.models import Q, QuerySet
+from django.http import HttpResponse
+from django.urls import path, reverse
+from django.utils.decorators import method_decorator
+from django.views.decorators.csrf import csrf_exempt
+from django.views.generic import View
+
+from .models import Channel, ChannelMessage, PrivateMessage, User #, ChannelUser
+from .serializers import (
+ ChannelMessageSerializer,
+ ChannelSerializer,
+ # ChannelUserSerializer,
+ ModelSerializer,
+ PrivateMessageSerializer,
+ UserSerializer,
+)
+
+
+class ModelRestView(LoginRequiredMixin, View):
+ http_method_names = []
+
+ serializer: Type[ModelSerializer]
+ method_map = {
+ "list": {"get": "list", "post": "create"},
+ "detail": {"get": "detail", "put": "update", "delete": "delete"},
+ }
+
+ def get_queryset(self):
+ return QuerySet(self.serializer.model).all()
+
+ def paginate(self, queryset, to_json):
+ query = self.request.GET.copy()
+ full_path = self.request.get_full_path()
+ pos = full_path.find("?")
+ if pos >= 0:
+ full_path = full_path[:pos]
+ if "before" in query:
+ queryset = queryset.filter(pk__lt=int(query["before"]))
+ count = queryset.count()
+ queryset = queryset[max(count - settings.DEFAULT_PAGE_SIZE, 0):]
+ if count > settings.DEFAULT_PAGE_SIZE:
+ query["before"] = queryset[0].pk
+ prev_url = self.request.build_absolute_uri(
+ urlunsplit(("", "", full_path, query.urlencode(), ""))
+ )
+ else:
+ prev_url = None
+
+ return {
+ "previous": prev_url,
+ "result": [to_json(instance) for instance in queryset],
+ }
+
+ def get_object(self):
+ return self.get_queryset().get(pk=self.kwargs["id"])
+
+ @staticmethod
+ def get_json_dump_kwargs():
+ if settings.DEBUG:
+ return {"indent": 4}
+ return {}
+
+ def list(self):
+ serializer = self.serializer(self.request)
+ return HttpResponse(
+ json.dumps(
+ self.paginate(self.get_queryset(), serializer.to_json),
+ **self.get_json_dump_kwargs(),
+ ),
+ headers={"content-type": "application/json"},
+ )
+
+ def create(self):
+ return self.create_or_update(status=201)
+
+ def detail_response(self, instance, **kwargs):
+ return HttpResponse(
+ json.dumps(
+ self.serializer(self.request).to_json(instance),
+ **self.get_json_dump_kwargs(),
+ ),
+ headers={"content-type": "application/json"},
+ **kwargs,
+ )
+
+ def detail(self):
+ return self.detail_response(self.get_object())
+
+ def update(self):
+ return self.create_or_update(self.get_object())
+
+ def create_or_update(self, *args, **kwargs):
+ instance, m2m = self.serializer(self.request).from_json(
+ json.load(self.request), *args
+ )
+ instance.save()
+ for field_name, value in m2m.items():
+ getattr(instance, field_name).set(value)
+ return self.detail_response(instance, **kwargs)
+
+ def delete(self):
+ self.get_object().delete()
+ return HttpResponse(status=204)
+
+ @method_decorator(csrf_exempt)
+ def dispatch(self, request, *args, **kwargs):
+ if not request.user.is_authenticated:
+ self.handle_no_permission()
+ return getattr(self, self.method_map[request.method.lower()])()
+
+ @classmethod
+ def get_urls(cls, name):
+ return (
+ path(
+ f"api/{name}/",
+ cls.as_view(method_map=cls.method_map["list"]),
+ name=f"chat-{name}-list",
+ ),
+ path(
+ f"api/{name}/<int:id>/",
+ cls.as_view(method_map=cls.method_map["detail"]),
+ name=f"chat-{name}-detail",
+ ),
+ )
+
+
+class ListAllMixin:
+ def paginate(self, queryset, to_json):
+ return {"result": [to_json(instance) for instance in queryset]}
+
+
+class UserRestView(ListAllMixin, ModelRestView):
+ serializer = UserSerializer
+
+ def get_queryset(self):
+ return User.objects.filter(is_active=True, last_login__isnull=False)
+
+ def get_object(self):
+ if self.request.path == reverse("chat-user-current-detail"):
+ self.kwargs["id"] = self.request.user.id
+ return super().get_object()
+
+ def create(self):
+ if not self.request.user.is_privileged():
+ self.handle_no_permission()
+ return super().create()
+
+ def delete(self):
+ if not self.request.user.is_privileged():
+ self.handle_no_permission()
+ return super().delete()
+
+ def update(self):
+ if not (
+ self.request.user.is_privileged()
+ or self.kwargs["id"] == self.request.user.id
+ ):
+ self.handle_no_permission()
+ return super().update()
+
+ @classmethod
+ def get_urls(cls, name):
+ return (
+ *super().get_urls(name),
+ path(
+ f"api/{name}/current/",
+ cls.as_view(method_map=cls.method_map["detail"]),
+ name=f"chat-{name}-current-detail",
+ ),
+ )
+
+
+class PrivateMessageRestView(ModelRestView):
+ serializer = PrivateMessageSerializer
+
+ def get_queryset(self):
+ if "other" in self.request.GET:
+ queryset = PrivateMessage.objects.filter(
+ Q(sender=self.request.user, recipient_id=self.request.GET["other"]) | Q(
+ sender_id=self.request.GET["other"], recipient=self.request.user
+ )
+ )
+ else:
+ queryset = PrivateMessage.objects.filter(
+ Q(sender=self.request.user) | Q(recipient=self.request.user)
+ )
+ return queryset.order_by("ts")
+
+
+class ChannelRestView(ListAllMixin, ModelRestView):
+ serializer = ChannelSerializer
+
+ def get_queryset(self):
+ return Channel.objects.filter(users=self.request.user.pk)
+
+
+#class ChannelUserRestView(ListAllMixin, ModelRestView):
+# serializer = ChannelUserSerializer
+#
+# def get_queryset(self):
+# return ChannelUser.objects.filter(user=self.request.user.pk)
+
+
+class ChannelMessageRestView(ModelRestView):
+ serializer = ChannelMessageSerializer
+
+ def get_queryset(self):
+ if "channel" in self.request.GET:
+ queryset = ChannelMessage.objects.filter(
+ channel_id=self.request.GET["channel"]
+ )
+ else:
+ queryset = ChannelMessage.objects.filter(
+ channel__users=self.request.user.pk
+ )
+ return queryset.order_by("ts")
--- /dev/null
+from datetime import datetime
+from typing import Type
+
+from django.db.models import (
+ DateTimeField, ForeignKey, ManyToManyField, ManyToManyRel, Model
+)
+from django.urls import reverse
+from django.utils.timezone import now
+
+from .models import Channel, PrivateMessage, User, ChannelMessage # , ChannelUser
+
+
+class ModelSerializer:
+ model: Type[Model]
+ fields = ["id", "url"]
+
+ def __init__(self, request=None):
+ self.request = request
+
+ def to_json(self, instance):
+ return {key: self.field_to_json(key, instance) for key in self.fields}
+
+ def field_to_json(self, field_name, instance):
+ if field_name == "url":
+ name = self.model._meta.verbose_name.lower().replace(' ', '')
+ value = reverse(f"chat-{name}-detail", args=[instance.pk])
+ if self.request:
+ value = self.request.build_absolute_uri(value)
+ return value
+ field = self.model._meta.get_field(field_name)
+ value = getattr(instance, field_name)
+ if isinstance(field, DateTimeField):
+ value = value.isoformat(" ")
+ elif isinstance(field, ForeignKey):
+ value = value.pk
+ elif isinstance(field, (ManyToManyField, ManyToManyRel)):
+ value = [v.pk for v in value.all()]
+ return value
+
+ def from_json(self, data, instance=None):
+ if instance is None:
+ instance = self.model()
+ m2m = {}
+ for field_name, value in data.items():
+ if field_name not in self.fields or field_name in ("id", "url"):
+ continue
+ field = self.model._meta.get_field(field_name)
+ if isinstance(field, DateTimeField):
+ value = datetime.fromisoformat(value)
+ if isinstance(field, (ManyToManyField, ManyToManyRel)):
+ m2m[field_name] = value
+ elif isinstance(field, ForeignKey):
+ # disallow reassigning FKs on update
+ if instance.pk is None:
+ setattr(instance, f"{field_name}_id", value)
+ else:
+ setattr(instance, field_name, value)
+ return instance, m2m
+
+
+class UserSerializer(ModelSerializer):
+ model = User
+ fields = [
+ "id",
+ "url",
+ "username",
+ "email",
+ "first_name",
+ "last_name",
+ "date_joined",
+ "channels",
+ ]
+
+ def from_json(self, data, instance=None):
+ orig_fields = self.fields.copy()
+ if User.is_privileged(self.request.user):
+ for field in User._meta.fields:
+ if field.name not in self.fields:
+ self.fields.append(field.name)
+ result = super().from_json(data, instance)
+ self.fields.clear()
+ self.fields.extend(orig_fields)
+ return result
+
+ def to_json(self, instance):
+ result = super().to_json(instance)
+ result["is_current"] = result["id"] == self.request.user.pk
+ return result
+
+
+class PrivateMessageSerializer(ModelSerializer):
+ model = PrivateMessage
+ fields = ["id", "url", "sender", "recipient", "ts", "text"]
+
+ def from_json(self, data, instance=None):
+ if instance is None:
+ data["sender"] = self.request.user.pk
+ data["ts"] = now().isoformat(" ")
+ else:
+ data.pop("sender")
+ data.pop("recipient")
+ data.pop("ts")
+ return super().from_json(data, instance)
+
+
+class ChannelSerializer(ModelSerializer):
+ model = Channel
+ fields = ["id", "url", "name", "users"]
+
+ def from_json(self, data, instance=None):
+ instance, m2m = super().from_json(data, instance)
+ if instance is None:
+ if "users" not in m2m:
+ m2m["users"] = []
+ if self.request.user.pk not in m2m["users"]:
+ m2m["users"].append(self.request.user.pk)
+ return instance, m2m
+
+
+#class ChannelUserSerializer(ModelSerializer):
+# model = ChannelUser
+# fields = ["id", "url", "channel", "user"]
+
+
+class ChannelMessageSerializer(ModelSerializer):
+ model = ChannelMessage
+ fields = ["id", "url", "user", "channel", "ts", "text"]
+
+ def from_json(self, data, instance=None):
+ instance, m2m = super().from_json(data, instance)
+ if instance.pk is None:
+ instance.user = self.request.user
+ return instance, m2m
For the full list of settings and their values, see
https://docs.djangoproject.com/en/5.1/ref/settings/
"""
-
+import os
from pathlib import Path
-from django.conf.global_settings import LOGIN_REDIRECT_URL
-
-# Build paths inside the project like this: BASE_DIR / 'subdir'.
+# Build paths inside the project like this: BASE_DIR / "subdir".
BASE_DIR = Path(__file__).resolve().parent.parent
-DEBUG = True
+DEBUG = False
ALLOWED_HOSTS = []
INSTALLED_APPS = [
"chat",
- 'django.contrib.admin',
- 'django.contrib.auth',
- 'django.contrib.contenttypes',
- 'django.contrib.sessions',
- 'django.contrib.messages',
- 'django.contrib.staticfiles',
+ "django.contrib.admin",
+ "django.contrib.auth",
+ "django.contrib.contenttypes",
+ "django.contrib.sessions",
+ "django.contrib.messages",
+ "django.contrib.staticfiles",
+ "pgtrigger",
]
MIDDLEWARE = [
- 'django.middleware.security.SecurityMiddleware',
- 'django.contrib.sessions.middleware.SessionMiddleware',
- 'django.middleware.common.CommonMiddleware',
- 'django.middleware.csrf.CsrfViewMiddleware',
- 'django.contrib.auth.middleware.AuthenticationMiddleware',
- 'django.contrib.messages.middleware.MessageMiddleware',
- 'django.middleware.clickjacking.XFrameOptionsMiddleware',
+ "django.middleware.security.SecurityMiddleware",
+ "django.contrib.sessions.middleware.SessionMiddleware",
+ "django.middleware.common.CommonMiddleware",
+ "django.middleware.csrf.CsrfViewMiddleware",
+ "django.contrib.auth.middleware.AuthenticationMiddleware",
+ "django.contrib.messages.middleware.MessageMiddleware",
+ "django.middleware.clickjacking.XFrameOptionsMiddleware",
]
-ROOT_URLCONF = 'chat.urls'
+ROOT_URLCONF = "chat.urls"
TEMPLATES = [
{
- 'BACKEND': 'django.template.backends.django.DjangoTemplates',
- 'DIRS': [],
- 'APP_DIRS': True,
- 'OPTIONS': {
- 'context_processors': [
- 'django.template.context_processors.debug',
- 'django.template.context_processors.request',
- 'django.template.context_processors.static',
- 'django.contrib.auth.context_processors.auth',
- 'django.contrib.messages.context_processors.messages',
+ "BACKEND": "django.template.backends.django.DjangoTemplates",
+ "DIRS": [],
+ "APP_DIRS": True,
+ "OPTIONS": {
+ "context_processors": [
+ "django.template.context_processors.debug",
+ "django.template.context_processors.request",
+ "django.template.context_processors.static",
+ "django.contrib.auth.context_processors.auth",
+ "django.contrib.messages.context_processors.messages",
],
},
},
]
DATABASES = {
- 'default': {
- 'ENGINE': 'django.db.backends.sqlite3',
- 'NAME': BASE_DIR / 'db.sqlite3',
+ "default": {
+ "ENGINE": "django.db.backends.postgresql",
+ "NAME": os.environ.get("RDS_DB_NAME"),
+ "USER": os.environ.get("RDS_USERNAME"),
+ "PASSWORD": os.environ.get("RDS_PASSWORD"),
+ "HOST": os.environ.get("RDS_HOSTNAME"),
+ "PORT": os.environ.get("RDS_PORT"),
}
}
AUTH_PASSWORD_VALIDATORS = [
{
- 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
- },
- {
- 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
- },
- {
- 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
- },
- {
- 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
+ "NAME": (
+ "django.contrib.auth.password_validation.UserAttributeSimilarityValidator"
+ ),
},
+ {"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator"},
+ {"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator"},
+ {"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator"},
]
-LANGUAGE_CODE = 'en-us'
-TIME_ZONE = 'UTC'
+AUTH_USER_MODEL = "chat.User"
+LANGUAGE_CODE = "en-us"
+TIME_ZONE = "UTC"
USE_I18N = True
USE_TZ = True
-STATIC_URL = 'static/'
+STATIC_URL = "static/"
STATIC_ROOT = BASE_DIR / "staticfiles"
-DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
-LOGIN_URL = "/login"
+DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
+LOGIN_URL = "/login/"
LOGIN_REDIRECT_URL = "/"
TRUST_USER_REGISTRATIONS = False
-MAX_EMAIL_CONFIRMATION_AGE_SECONDS = 86400 * 14
+EMAIL_SUBJECT_PREFIX = "[ChatApp] "
+PG_NOTIFY_CHANNEL = "pg_notify"
+DEFAULT_PAGE_SIZE = 5
try:
from .settings_local import *
--- /dev/null
+main {
+ height: 100%;
+}
--- /dev/null
+(function () {
+ var current_channel = {};
+ var users_per_id = {}, current_user_id = 0;
+
+ function add_msg(sender, msg) {
+ var p = document.createElement("P");
+ p.appendChild(document.createTextNode(sender));
+ p.appendChild(document.createTextNode(": "));
+ p.appendChild(document.createTextNode(msg));
+ document.getElementsByClassName("messages")[0].appendChild(p);
+ }
+
+ function ws_receive(msg) {
+ add_msg("ws", msg.data);
+ }
+
+ function xhr_error(msg) {
+ console.log(msg);
+ add_msg("xhr_error", msg);
+ }
+
+ function fetch_data(url, callback) {
+ var xhr = new XMLHttpRequest();
+ xhr.addEventListener("load", callback);
+ xhr.addEventListener("error", xhr_error);
+ xhr.open("get", url);
+ xhr.send();
+ }
+
+ function post_data(url, data, callback) {
+ var xhr = new XMLHttpRequest();
+ xhr.addEventListener("load", callback);
+ xhr.addEventListener("error", xhr_error);
+ xhr.open("post", url);
+ xhr.send(data);
+ }
+
+ function add_privatemessages() {
+ var msgs = JSON.parse(this.responseText), i;
+ for (i = 0; i < msgs.result.length; i += 1) {
+ add_msg(users_per_id[msgs.result[i].sender].username, msgs.result[i].text);
+ }
+ }
+
+ function set_privatemessage_channel(user_id) {
+ var messages = document.getElementsByClassName("messages")[0];
+ current_channel = {
+ "url": "/api/privatemessage/",
+ "data": {"recipient": user_id},
+ }
+ while(messages.firstChild) {
+ messages.removeChild(messages.firstChild);
+ }
+ fetch_data(
+ "/api/privatemessage/?other=" + user_id.toString(), add_privatemessages
+ );
+ }
+
+ function setup_user_callback() {
+ var data = JSON.parse(this.responseText), ul, li, a;
+ ul = document.getElementsByClassName("users")[0].children[0];
+ for (i = 0; i < data.result.length; i += 1) {
+ users_per_id[data.result[i].id] = data.result[i];
+ if (data.result[i].is_current) {
+ current_user_id = data.result[i].id;
+ }
+ li = document.createElement("li");
+ a = document.createElement("a");
+ a.setAttribute("href", "#");
+ a.appendChild(
+ document.createTextNode(
+ data.result[i].username + "\xa0(" + data.result[i].email + ")"
+ )
+ );
+ a.addEventListener(
+ "click",
+ (function (user_id) {
+ return function () { set_privatemessage_channel(user_id); };
+ }(data.result[i].id))
+ );
+ li.appendChild(a);
+ ul.appendChild(li);
+ }
+ }
+
+ function add_channelmessages() {
+ var msgs = JSON.parse(this.responseText), i;
+ for (i = 0; i < msgs.result.length; i += 1) {
+ add_msg(users_per_id[msgs.result[i].user].username, msgs.result[i].text);
+ }
+ }
+
+ function set_channel(channel_id) {
+ var messages = document.getElementsByClassName("messages")[0];
+ current_channel = {
+ "url": "/api/channelmessage/",
+ "data": {"channel": channel_id},
+ }
+ while(messages.firstChild) {
+ messages.removeChild(messages.firstChild);
+ }
+ fetch_data(
+ "/api/channelmessage/?channel=" + channel_id.toString(), add_channelmessages
+ );
+ }
+
+ function setup_channel_callback() {
+ var data = JSON.parse(this.responseText), ul, li, a;
+ ul = document.getElementsByClassName("channels")[0].children[0];
+ for (i = 0; i < data.result.length; i += 1) {
+ li = document.createElement("li");
+ a = document.createElement("a");
+ a.setAttribute("href", "#");
+ a.appendChild(document.createTextNode(data.result[i].name));
+ a.addEventListener(
+ "click",
+ (function (channel_id) {
+ return function () { set_channel(channel_id); };
+ }(data.result[i].id))
+ );
+ li.appendChild(a);
+ ul.appendChild(li);
+ }
+ }
+
+ function send() {
+ var ta = document.getElementsByClassName("input")[0].children[0], data, name;
+ data = {"text": ta.value};
+ for (name in current_channel.data) {
+ if (current_channel.data.hasOwnProperty(name)) {
+ data[name] = current_channel.data[name];
+ }
+ }
+ post_data(
+ current_channel.url,
+ JSON.stringify(data), function () { add_msg("send", "success"); }
+ );
+ ta.value = "";
+ }
+
+ window.addEventListener("load", function(e) {
+ var ws, schema;
+ e = e || window.event;
+ if (e.target.readyState !== "complete") {
+ return;
+ }
+ // setup ws
+ schema = {"http:": "ws:", "https:": "wss:"}[window.location.protocol];
+ ws = new WebSocket(schema + "//" + window.location.host + "/");
+ ws.addEventListener("message", ws_receive);
+ fetch_data("/api/user/", setup_user_callback);
+ fetch_data("/api/channel/", setup_channel_callback);
+ document.getElementsByClassName("input")[0].children[1].addEventListener(
+ "click", send
+ );
+ });
+}());
+++ /dev/null
-(function () {
-}());
\ No newline at end of file
color: white;
}
+html, body, nav, .messages, .channels, .users {
+ height: 100%;
+}
+
+body, nav, main {
+ display: flex;
+}
+
nav {
width: 10em;
- height: 100%;
+ overflow: auto;
+}
+
+main, .messages, .input > textarea, .user, .users, .channels {
+ width: 100%;
+}
+
+nav, main {
+ flex-direction: column;
+}
+
+nav, .input {
+ display:flex;
+}
+
+.user, .users, .channels {
+ overflow-x: hidden;
+}
+
+.user {
+ flex-shrink: 0;
+}
+
+nav > div {
+ border-bottom: 1px solid;
}
{% extends "chat/base.html" %}
{% load static %}
+{% block head %}
+<link rel="stylesheet" href="{% static 'chat/chat.css' %}">
+{% endblock head %}
+
{% block header %}
<nav>
<div class="user">
- {% block user_properties %}{{ request.user }}{% endblock user_properties %}
</div>
<div class="channels">
+ Channels
<ul>
- {% for channel in request.user.channels.all %}
- <li>
- {% block channel_link %}
- {{ channel }}
- {% endblock channel_link %}
- </li>
- {% endfor %}
</ul>
</div>
<div class="users">
+ Users
<ul>
- {% for user in User.objects.all %}
- <li>
- {% block user_pms_links %}
- {{ user }}
- {% endblock user_pms_links %}
- </li>
- {% endfor %}
</ul>
</div>
</nav>
{% block main %}
<div class="messages">
- {% block messages %}
- {% endblock messages %}
+ No channel selected!
</div>
<div class="input">
- {% block input %}<textarea></textarea>{% endblock input %}
+ <textarea></textarea>
+ <button>Send</button>
</div>
{% endblock main %}
{% block footer %}
- <script src="{% static 'chat/index.js' %}"></script>
+ <script src="{% static 'chat/chat.js' %}"></script>
{% endblock footer %}
{% extends "chat/base.html" %}
-{% load static i18n %}
+{% load static %}
{% block head %}
<link rel="stylesheet" href="{% static 'chat/login.css' %}">
{% endblock head %}
{% block main %}
- <h1>{% translate view.title %}</h1>
+ <h1>{{ view.title }}</h1>
<form method="post">
{% csrf_token %}
{{ form.as_p }}
{% extends "chat/base.html" %}
-{% load static i18n %}
+{% load static %}
{% block head %}
<link rel="stylesheet" href="{% static 'chat/login.css' %}">
{% endblock head %}
{% block main %}
- <h1>{% translate view.title %}</h1>
+ <h1>{{ view.title }}</h1>
<form>
<p>{{ msg }}</p>
<div class="controls">
+import json
+import sys
from html.parser import HTMLParser
-from unittest.mock import patch
+from importlib import import_module, reload
+from string import hexdigits
+from urllib.parse import urlencode
from django.conf import settings
-from django.contrib.auth.models import User
+from django.contrib.auth.forms import SetPasswordForm, AuthenticationForm
from django.core import mail
-from django.core.handlers.wsgi import WSGIRequest
+from django.db.models import Max
+from django.http import HttpResponse
from django.test import TestCase, override_settings
+from django.urls import reverse
+from django.utils.encoding import force_bytes
+from django.utils.http import urlsafe_base64_encode
+from django.utils.timezone import now
-from chat.utils import build_url
+from .forms import RegisterForm
+from .models import Channel, User
class FormExtractor(HTMLParser):
IGNORE_TAGS = (
"a",
"body",
+ "br",
"div",
"h1",
"head",
"html",
"label",
+ "li",
"link",
"main",
"meta",
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.forms = []
+ self.errorlist_stack = None
+ self.errorlist = []
@staticmethod
def get_attr(attr_key, attrs):
def handle_starttag(self, tag, attrs):
tag = tag.lower()
+ if self.errorlist_stack is not None:
+ self.errorlist_stack.append(tag)
if tag in self.IGNORE_TAGS:
return
elif tag == "form":
elif input_type in "submit":
pass
else:
- raise ValueError(f"unhandled input type: {repr(input_type)}")
+ raise ValueError(
+ f"unhandled input type: {repr(input_type)}"
+ ) # pragma: no cover
+ elif tag == "ul":
+ if "errorlist" in (self.get_attr("class", attrs) or "").split():
+ self.errorlist_stack = ["ul"]
else:
- raise ValueError(f"unhandled tag: {repr(tag)}")
+ raise ValueError(f"unhandled tag: {repr(tag)}") # pragma: no cover
+ def handle_data(self, data):
+ if self.errorlist_stack is None:
+ return
+ data = data.strip()
+ if data:
+ self.errorlist.append(data)
-class ChatTest(TestCase):
- def do_registration(self):
- response = self.client.get("/register/")
+ def handle_endtag(self, tag):
+ if self.errorlist_stack is None:
+ return
+ tag = tag.lower()
+ if len(self.errorlist_stack) == 0:
+ raise ValueError(f"Unexpected closing tag {tag}") # pragma: no cover
+ while len(self.errorlist_stack) > 0:
+ if self.errorlist_stack.pop() == tag:
+ break
+ else:
+ raise ValueError(f"Unexpected closing tag {tag}") # pragma: no cover
+ if len(self.errorlist_stack) == 0:
+ self.errorlist_stack = None
+
+
+class ChatTestMixin:
+ def login_user(self, user, password):
+ response = self.client.post(
+ reverse("chat-login"),
+ {"username": user.username, "password": password},
+ )
+ return ";".join(c.output(header="") for c in response.cookies.values()).strip()
+
+
+class ChatSignUpLoginTest(ChatTestMixin, TestCase):
+ def get_empty_confirm_url(self):
+ token = "EXAMPLE_TOKEN"
+ empty_confirm_url = reverse("chat-confirm-email", kwargs={"token": token})
+ pos = empty_confirm_url.find(token)
+ assert pos >= 0
+ return empty_confirm_url[:pos]
+
+ def get_form_fields_and_csrf_token(self, content):
fe = FormExtractor()
- fe.feed(response.content.decode())
+ fe.feed(content)
self.assertEqual(len(fe.forms), 1)
form_attrs, fields = fe.forms[0]
assert FormExtractor.get_attr("method", form_attrs).lower() == "post"
if field_name == "csrfmiddlewaretoken":
assert csrf_token is None and field_type == "hidden"
csrf_token = field_value
+ return fields, csrf_token
+
+ def do_registration(self):
+ register_url = reverse("chat-register")
+ response = self.client.get(register_url)
+ fields, csrf_token = self.get_form_fields_and_csrf_token(
+ response.content.decode()
+ )
payload = {
"csrfmiddlewaretoken": csrf_token,
"email": "new_user@example.com",
"username": "new_user_username",
"first_name": "John",
"last_name": "Doe",
- "password": "EhMacarena",
- "password_again": "EhMacarena",
+ "password1": "EhMacarena",
+ "password2": "EhMacarena",
}
+ self.assertEqual(set(payload), set(field[0] for field in fields))
+ self.assertRaises(User.DoesNotExist, User.objects.get, email=payload["email"])
+ response = self.client.post(register_url, {**payload, "password2": "Derp"})
+ fe = FormExtractor()
+ fe.feed(response.content.decode())
+ self.assertEqual(
+ [RegisterForm.error_messages["password_mismatch"]],
+ fe.errorlist,
+ )
self.assertRaises(User.DoesNotExist, User.objects.get, email=payload["email"])
- response = self.client.post("/register/", payload)
+ response = self.client.post(register_url, payload)
self.assertEqual(response.status_code, 302)
- request = WSGIRequest(self.client._base_environ())
self.assertEqual(
response.headers["Location"],
- build_url(request, "chat-register", params={"success": "1"}),
+ reverse("chat-register-success"),
)
+ response = self.client.get(response.headers["Location"])
+ confirm_msg = self.extract_tag_data("p", response.content.decode())
user = User.objects.get(email=payload["email"])
- user.check_password(payload["password"])
- self.assertEqual(len(mail.outbox), 1)
- confirmation_email = mail.outbox[0]
+ user.check_password(payload["password1"])
+ confirm_email = mail.outbox[0]
self.assertDictEqual(
{
k: v
- for k, v in confirmation_email.__dict__.items()
+ for k, v in confirm_email.__dict__.items()
if k not in ("body", "connection")
},
{
"bcc": [],
"reply_to": [],
"from_email": settings.DEFAULT_FROM_EMAIL,
- "subject": "[Chat-App] Confirm your email address!",
+ "subject": "[ChatApp] Confirm your email address!",
"attachments": [],
"extra_headers": {},
"alternatives": [],
}
)
- confirm_url = confirmation_email.body
- pos = confirm_url.find(build_url(request, "chat-confirm-email", kwargs={"token": "_"})[:-1])
+ confirm_url = confirm_email.body
+ pos = confirm_url.find(self.get_empty_confirm_url())
self.assertGreaterEqual(pos, 0)
- confirm_url = confirm_url[pos:]
- return user, payload, confirm_url
+ return user, payload, confirm_url[pos:], confirm_msg
+
+ @staticmethod
+ def extract_tag_data(tag_name, content):
+ pos = content.find(f"<{tag_name} ")
+ if pos == -1:
+ pos = content.find(f"<{tag_name}>")
+ assert pos >= 0
+ pos = content.find(">", pos)
+ assert pos >= 0
+ end_pos = content.find(f"</{tag_name}>")
+ assert end_pos >= 0
+ return content[pos + 1:end_pos]
- def confirm_email(self, email, confirm_url):
- self.assertIsNone(User.objects.get(email=email).last_login)
- self.client.get(confirm_url)
- self.assertIsNotNone(User.objects.get(email=email).last_login)
+ def reload_urls(self):
+ if settings.ROOT_URLCONF in sys.modules:
+ reload(sys.modules[settings.ROOT_URLCONF])
+ import_module(settings.ROOT_URLCONF)
+ from django.urls import resolvers
+ resolvers._get_cached_resolver.cache_clear()
@override_settings(TRUST_USER_REGISTRATIONS=True)
def test_registration_with_trust_user_registrations(self):
- user, payload, confirm_url = self.do_registration()
+ self.reload_urls()
+ user, payload, confirm_url, confirm_msg = self.do_registration()
+ self.assertEqual(len(mail.outbox), 1)
+ self.assertEqual(confirm_msg, "Registration complete!")
self.assertTrue(user.is_active)
- self.confirm_email(payload["email"], confirm_url)
+ self.assertIsNone(User.objects.get(email=payload["email"]).last_login)
+ content = self.client.get(confirm_url).content.decode()
+ self.assertEqual(self.extract_tag_data("p", content), "You are ready to go!")
+ self.assertIsNotNone(User.objects.get(email=payload["email"]).last_login)
+ content = self.client.get(confirm_url).content.decode()
+ self.assertEqual(
+ self.extract_tag_data("p", content),
+ "Email has already been confirmed!"
+ )
- @override_settings(TRUST_USER_REGISTRATIONS=False)
+ @override_settings(
+ TRUST_USER_REGISTRATIONS=False,
+ MAIL_ADMINS=[("Admin", "admin@example.com")],
+ )
def test_registration_without_trust_user_registrations(self):
- user, payload, confirm_url = self.do_registration()
+ self.reload_urls()
+ user, payload, confirm_url, confirm_msg = self.do_registration()
+ self.assertEqual(len(mail.outbox), 2)
+ msg = "Your user account will be activated shortly!"
+ self.assertEqual(confirm_msg, f"Registration complete!<br />{msg}")
self.assertFalse(user.is_active)
- self.confirm_email(payload["email"], confirm_url)
+ self.assertIsNone(User.objects.get(email=payload["email"]).last_login)
+ content = self.client.get(confirm_url).content.decode()
+ self.assertEqual(self.extract_tag_data("p", content), msg)
+ self.assertIsNotNone(User.objects.get(email=payload["email"]).last_login)
+ content = self.client.get(confirm_url).content.decode()
+ self.assertEqual(
+ self.extract_tag_data("p", content),
+ f"Email has already been confirmed!<br />{msg}"
+ )
+ user.is_active = True
+ user.save(update_fields=["is_active"])
+ content = self.client.get(confirm_url).content.decode()
+ self.assertEqual(
+ self.extract_tag_data("p", content),
+ f"Email has already been confirmed!<br />You are ready to go!"
+ )
+ admin_msg = mail.outbox[1]
+ self.assertEqual(
+ admin_msg.subject, f"[ChatApp] User registered: {payload['username']}"
+ )
+ self.assertEqual(
+ admin_msg.body,
+ (
+ "Hello admin,\n"
+ f"User '{payload['username']}' just registered.\n"
+ f"full name: '{user.get_full_name()}'\n"
+ f"email: '{payload['email']}'\n\n"
+ "The user is currently inactive and needs 'is_active' set to true "
+ "to do anything with the account. Do we trust that?"
+ )
+ )
+
+ def test_reset_password(self):
+ user = User(
+ email="user@example.com",
+ username="example_user",
+ is_active=True,
+ last_login=now(),
+ )
+ user.set_password("Uabc123...")
+ user.save()
+ reset_password_url = reverse("chat-reset-password")
+ response = self.client.get(reset_password_url)
+ fields, csrf_token = self.get_form_fields_and_csrf_token(
+ response.content.decode()
+ )
+ payload = {"csrfmiddlewaretoken": csrf_token, "email": "User@example.com"}
+ self.assertEqual(set(payload), set(field[0] for field in fields))
+ response = self.client.post(reset_password_url, payload)
+ self.assertEqual(response.status_code, 302)
+ self.assertEqual(
+ response.headers["Location"], reverse("chat-reset-password-success")
+ )
+ content = self.client.get(response.headers["Location"]).content.decode()
+ self.assertEqual(self.extract_tag_data("p", content), "Password reset sent")
+ self.assertEqual(len(mail.outbox), 1)
+ reset_email = mail.outbox[0]
+ self.assertEqual(reset_email.subject, "Password reset on testserver")
+ self.assertEqual(reset_email.to, [payload["email"].lower()])
+ url_pos = reset_email.body.find("http://")
+ after_url_pos = reset_email.body.find("\n", url_pos)
+ self.assertEqual(
+ reset_email.body[:url_pos],
+ (
+ "You're receiving this email because you requested a password reset "
+ "for your user account at testserver.\n\nPlease go to the following "
+ "page and choose a new password:\n\n"
+ )
+ )
+ self.assertEqual(
+ reset_email.body[after_url_pos:],
+ (
+ "\nYour username, in case you've forgotten: example_user\n\n"
+ "Thanks for using our site!\n"
+ "The testserver team"
+ )
+ )
+ url = reset_email.body[url_pos:after_url_pos]
+ fields, csrf_token = self.get_form_fields_and_csrf_token(
+ self.client.get(url).content.decode()
+ )
+ payload = {
+ "csrfmiddlewaretoken": csrf_token,
+ "new_password1": "SillyNewPw1234!",
+ "new_password2": "SillyNewPw1234!",
+ }
+ self.assertEqual(set(payload), set(field[0] for field in fields))
+ response = self.client.post(url, {**payload, "new_password2": "Derp"})
+ fe = FormExtractor()
+ fe.feed(response.content.decode())
+ self.assertEqual(
+ fe.errorlist, [SetPasswordForm.error_messages["password_mismatch"]]
+ )
+ response = self.client.post(url, payload)
+ self.assertEqual(response.status_code, 302)
+ self.assertEqual(
+ response.headers["Location"], reverse("chat-reset-password-token-success")
+ )
+ content = self.client.get(response.headers["Location"]).content.decode()
+ self.assertEqual(
+ self.extract_tag_data("p", content), "Password reset successful"
+ )
+ user.refresh_from_db()
+ user.check_password("SillyNewPw1234!")
+ slash_pos = url.rfind("/", 0, -1)
+ self.assertGreaterEqual(slash_pos, 0)
+ dash_pos = url.find("-", slash_pos)
+ self.assertGreaterEqual(dash_pos, 0)
+ c = hexdigits[(hexdigits.lower().index(url[dash_pos + 1].lower()) + 1) % 16]
+ other_slash_pos = url.rfind("/", 0, slash_pos)
+ self.assertGreaterEqual(other_slash_pos, 0)
+ fake_uid = urlsafe_base64_encode(
+ force_bytes(User.objects.aggregate(Max('pk'))["pk__max"] + 1)
+ )
+ url_modifieds = (
+ f"{url[:dash_pos + 1]}{c}{url[dash_pos + 2:]}",
+ f"{url[:other_slash_pos + 1]}{fake_uid}{url[slash_pos:]}"
+ )
+ for url_modified in url_modifieds:
+ response = self.client.get(url_modified)
+ self.assertEqual(response.status_code, 404)
+
+ def test_login_logout(self):
+ user = User(
+ email="user@example.com",
+ username="example_user",
+ )
+ password = "Uabc123..."
+ user.set_password(password)
+ user.save()
+
+ login_url = reverse("chat-login")
+ updates = (
+ (
+ {"is_active": False},
+ [
+ AuthenticationForm.error_messages["invalid_login"] % {
+ "username": "username"
+ }
+ ]
+ ),
+ ({"is_active": True}, ["This account's email address is not confirmed."]),
+ ({"last_login": now()}, None),
+ )
+ payload = {
+ "csrfmiddlewaretoken": "",
+ "username": user.username,
+ "password": password,
+ }
+ response: HttpResponse = HttpResponse()
+ for update, errorlist in updates:
+ user.__dict__.update(update)
+ user.save(update_fields=update.keys())
+ fields, csrf_token = self.get_form_fields_and_csrf_token(
+ self.client.get(login_url).content.decode()
+ )
+ payload["csrfmiddlewaretoken"] = csrf_token
+ self.assertEqual(set(payload), set(field[0] for field in fields))
+ response = self.client.post(login_url, payload)
+ if errorlist:
+ fe = FormExtractor()
+ fe.feed(response.content.decode())
+ self.assertEqual(fe.errorlist, errorlist)
+
+ self.assertEqual(response.status_code, 302)
+ cookies = ";".join(
+ c.output(header="") for c in response.cookies.values()
+ ).strip()
+ main_url = reverse("chat-main")
+ response = self.client.get(main_url, headers={"cookie": cookies})
+ self.assertEqual(response.status_code, 200)
+ response = self.client.get(reverse("chat-logout"), headers={"cookie": cookies})
+ self.assertEqual(response.status_code, 200)
+ logout_msg = self.extract_tag_data("p", response.content.decode())
+ self.assertEqual(logout_msg, "Logged out")
+ response = self.client.get(main_url, headers={"cookie": cookies})
+ self.assertEqual(response.status_code, 302)
+ query = urlencode({"next": main_url}, safe="/")
+ self.assertEqual(response.headers["Location"], f"{login_url}?{query}")
+
+
+class ChatTest(ChatTestMixin, TestCase):
+ def setup_users(self):
+ user1 = User(
+ email="u1@example.com", username="u1", is_active=True, last_login=now()
+ )
+ user1.set_password("UserOnePassword123")
+ user1.save()
+ user2 = User(
+ email="u2@example.com", username="u2", is_active=True, last_login=now()
+ )
+ user2_password = "UserTwoPassword123"
+ user2.set_password(user2_password)
+ user2.save()
+ channel = Channel.objects.create(name="test channel")
+ channel.users.set((user1, user2))
+ return user1, user2, user2_password, channel
+
+ def test_chat_user_api(self):
+ user1, user2, user2_password, channel = self.setup_users()
+ channel.users.remove(user2)
+ user2.channels.add(channel)
+
+ user1_url = reverse('chat-user-detail', args=[user1.pk])
+ user2_url = reverse("chat-user-detail", args=[user2.pk])
+
+ cookies = self.login_user(user2, user2_password)
+ response = self.client.get(user2_url, headers={"cookie": cookies})
+ self.assertEqual(
+ json.loads(response.content),
+ {
+ "id": user2.id,
+ "url": f"http://testserver{user2_url}",
+ "username": user2.username,
+ "email": user2.email,
+ "first_name": user2.first_name,
+ "last_name": user2.last_name,
+ "date_joined": user2.date_joined.isoformat(" "),
+ "channels": [channel.pk],
+ }
+ )
+ response = self.client.get(
+ reverse("chat-user-list"), headers={"cookie": cookies}
+ )
+ self.assertEqual(
+ json.loads(response.content),
+ {
+ "result": [
+ {
+ "id": user1.id,
+ "url": f"http://testserver{user1_url}",
+ "username": user1.username,
+ "email": user1.email,
+ "first_name": user1.first_name,
+ "last_name": user1.last_name,
+ "date_joined": user1.date_joined.isoformat(" "),
+ "channels": [channel.pk],
+ },
+ {
+ "id": user2.id,
+ "url": f"http://testserver{user2_url}",
+ "username": user2.username,
+ "email": user2.email,
+ "first_name": user2.first_name,
+ "last_name": user2.last_name,
+ "date_joined": user2.date_joined.isoformat(" "),
+ "channels": [channel.pk],
+ },
+ ]
+ }
+ )
+ self.client.get(reverse("chat-logout"), headers={"cookie": cookies})
+
+ user3 = User(
+ email="u3@example.com",
+ username="u3",
+ is_active=True,
+ last_login=now(),
+ is_staff=True,
+ )
+ user3_password = "UserThreePassword123"
+ user3.set_password(user3_password)
+ user3.save()
+ cookies = self.login_user(user3, user3_password)
+ response = self.client.post(
+ reverse("chat-user-list"),
+ json.dumps(
+ {
+ "username": "u4",
+ "email": "u4@example.com",
+ "is_active": True,
+ "last_login": now().isoformat(" "),
+ "channels": [channel.pk]
+ }
+ ).encode(),
+ content_type="application/json",
+ headers={"cookie": cookies},
+ )
+ data = json.loads(response.content)
+ user4 = User.objects.get(pk=data["id"])
+ self.assertTrue(user4.is_active)
+ self.assertEqual(
+ data,
+ {
+ "id": user4.id,
+ "url": (
+ f"http://testserver{reverse('chat-user-detail', args=[data['id']])}"
+ ),
+ "email": "u4@example.com",
+ "username": "u4",
+ "first_name": "",
+ "last_name": "",
+ "date_joined": data["date_joined"],
+ "channels": [channel.pk],
+ },
+ )
+ response = self.client.put(
+ data["url"],
+ json.dumps(
+ {
+ "username": "user4",
+ "is_active": False,
+ "last_name": "Nichols",
+ }
+ ).encode(),
+ content_type="application/json",
+ headers={"cookie": cookies},
+ )
+ user4_data = json.loads(response.content)
+ user4_url = reverse('chat-user-detail', args=[user4_data['id']])
+ self.assertEqual(
+ user4_data,
+ {
+ "id": user4.id,
+ "url": f"http://testserver{user4_url}",
+ "email": "u4@example.com",
+ "username": "user4",
+ "first_name": "",
+ "last_name": "Nichols",
+ "date_joined": user4_data["date_joined"],
+ "channels": [channel.pk],
+ },
+ )
+ user4.refresh_from_db()
+ self.assertEqual(user4.username, "user4")
+ self.assertFalse(user4.is_active)
+ self.assertEqual(user4.last_name, "Nichols")
+ user3.is_staff = False
+ user3.save(update_fields=["is_staff"])
+ self.assertFalse(user3.is_privileged())
+ response = self.client.put(
+ user4_data["url"],
+ json.dumps({"username": "user4_modified"}).encode(),
+ content_type="application/json",
+ headers={"cookie": cookies},
+ )
+ self.assertEqual(response.status_code, 403)
+ response = self.client.put(
+ reverse("chat-user-detail", args=[user3.pk]),
+ json.dumps({"username": "user3"}).encode(),
+ content_type="application/json",
+ headers={"cookie": cookies},
+ )
+ self.assertEqual(json.loads(response.content)["username"], "user3")
+ user3.refresh_from_db()
+ self.assertEqual(user3.username, "user3")
+ user3.is_staff = True
+ user3.save(update_fields=["is_staff"])
+ user4.is_active = True
+ user4.save(update_fields=["is_active"])
+ self.client.delete(user4_data["url"], headers={"cookie": cookies})
+ self.assertFalse(User.objects.filter(pk=user4_data["id"]).exists())
- def test_expired_email_confirmation(self):
- ...
+ def test_channel_api(self):
+ user1, user2, user2_password, channel = self.setup_users()
+ cookies = self.login_user(user2, user2_password)
+ channel_url = reverse("chat-channel-detail", args=[channel.pk])
+ response = self.client.get(channel_url, headers={"cookie": cookies})
+ self.assertEqual(
+ json.loads(response.content),
+ {
+ "id": channel.pk,
+ "url": f"http://testserver{channel_url}",
+ "name": "test channel",
+ "users": [user1.pk, user2.pk],
+ }
+ )
--- /dev/null
+from pgtrigger import After, Delete, Insert, Row, Statement, Trigger, Truncate, Update
+
+
+def fields_to_json(t, fields):
+ return ",".join(f"\"{field}\":' || {t}.\"{field}\" || '" for field in fields)
+
+
+def triggers_for_table(channel_name, model_name, fields):
+ return [
+ Trigger(
+ name=f"{channel_name}_{model_name.lower()}",
+ level=Row,
+ operation=Delete | Insert | Update,
+ when=After,
+ func=f"""
+ IF TG_OP = 'DELETE' THEN
+ PERFORM pg_notify(
+ '{channel_name}',
+ '{{"op":"' || TG_OP || '","table":"' || TG_TABLE_NAME ||
+ '","old":{{{fields_to_json('OLD', fields)}}}}}'
+ );
+ ELSE
+ PERFORM pg_notify(
+ '{channel_name}',
+ '{{"op":"' || TG_OP || '","table":"' || TG_TABLE_NAME ||
+ '","new":{{{fields_to_json('NEW', fields)}}}}}'
+ );
+ END IF;
+ RETURN NULL;
+ """,
+ ),
+ Trigger(
+ name=f"{channel_name}_{model_name.lower()}_truncate",
+ level=Statement,
+ operation=Truncate,
+ when=After,
+ func=f"""
+ PERFORM pg_notify(
+ '{channel_name}',
+ '{{"op":"' || TG_OP || '","table":"' || TG_TABLE_NAME || '"}}'
+ );
+ RETURN NULL;
+ """,
+ ),
+ ]
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.conf import settings
-from django.conf.urls.static import static
+from django.contrib.staticfiles.views import serve as serve_staticfiles
from django.contrib import admin
-from django.urls import path
+from django.urls import path, re_path
+from django.utils.safestring import mark_safe
+
+from .rest_views import (
+ ChannelMessageRestView, ChannelRestView, PrivateMessageRestView, UserRestView
+)
from .views import (
- ChatView, ConfirmEmailView, LoginView, LogoutView, PasswordResetView, RegisterView,
+ ChatView,
+ ConfirmEmailView,
+ LoginView,
+ LogoutView,
+ SuccessView,
+ PasswordResetTokenView,
+ PasswordResetView,
+ RegisterView,
)
from .websocket import handle_websocket
urlpatterns = [
- path("admin/", admin.site.urls),
- *static(settings.STATIC_URL, document_root=settings.STATIC_ROOT),
+ path("register/", RegisterView.as_view(), name="chat-register"),
+ path(
+ "register-success/",
+ SuccessView.as_view(
+ title="Register",
+ extra_context={
+ "msg": (
+ "Registration complete!"
+ if settings.TRUST_USER_REGISTRATIONS
+ else mark_safe(
+ "Registration complete!<br />"
+ "Your user account will be activated shortly!"
+ )
+ )
+ }
+ ),
+ name="chat-register-success",
+ ),
+ path("reset-password/", PasswordResetView.as_view(), name="chat-reset-password"),
+ path(
+ "reset-password-success/",
+ SuccessView.as_view(
+ title="Reset Password",
+ extra_context={"msg": "Password reset sent"},
+ ),
+ name="chat-reset-password-success",
+ ),
+ path(
+ "reset-password/<uidb64>/<token>/",
+ PasswordResetTokenView.as_view(),
+ name="chat-reset-password-token",
+ ),
+ path(
+ "reset-password-token-success/",
+ SuccessView.as_view(
+ title="Reset Password",
+ extra_context={"msg": "Password reset successful"},
+ ),
+ name="chat-reset-password-token-success",
+ ),
+
path("", ChatView.as_view(), name="chat-main"),
path("login/", LoginView.as_view(), name="chat-login"),
- path("reset-password/", PasswordResetView.as_view(), name="chat-reset-password"),
- path("register/", RegisterView.as_view(), name="chat-register"),
path(
"confirm-email/<token>",
ConfirmEmailView.as_view(),
name="chat-confirm-email",
),
path("logout/", LogoutView.as_view(), name="chat-logout"),
+
+ path("admin/", admin.site.urls),
+ *UserRestView.get_urls("user"),
+ *PrivateMessageRestView.get_urls("privatemessage"),
+ *ChannelRestView.get_urls("channel"),
+ *ChannelMessageRestView.get_urls("channelmessage"),
]
+if settings.DEBUG:
+ urlpatterns.append(
+ re_path(f"{settings.STATIC_URL.lstrip('/')}(?P<path>.*)$", serve_staticfiles)
+ )
+
websocket_urls = {"/": handle_websocket}
-from django.urls import reverse
-from django.utils.http import urlencode
from django.utils.module_loading import module_has_submodule
or module_has_submodule(app_config.module, required_submodule)
):
yield app_config
-
-
-def build_url(request, *args, **kwargs):
- params = kwargs.pop("params", None)
- url = reverse(*args, **kwargs)
- if request is not None:
- url = request.build_absolute_uri(url)
- if params:
- return f"{url}?{urlencode(params)}"
- return url
-from base64 import b64decode, b64encode
from django.conf import settings
-from django.contrib.auth.forms import AuthenticationForm, PasswordResetForm
+from django.contrib.auth.forms import SetPasswordForm
from django.contrib.auth.mixins import LoginRequiredMixin
-from django.contrib.auth.models import User, update_last_login
-from django.contrib.auth.views import LoginView as DjangoLoginView, LogoutView as DjangoLogoutView, PasswordResetView as DjangoPasswordResetView
-from django.core.signing import TimestampSigner
+from django.contrib.auth.models import update_last_login
+from django.contrib.auth.tokens import default_token_generator
+from django.contrib.auth.views import (
+ LoginView as DjangoLoginView,
+ LogoutView as DjangoLogoutView,
+)
+from django.core.mail import mail_admins
+from django.core.signing import Signer
+from django.http import Http404
+from django.urls import reverse, reverse_lazy
+from django.utils.http import urlsafe_base64_decode, urlsafe_base64_encode
from django.utils.safestring import mark_safe
from django.views.generic import TemplateView
from django.views.generic.detail import SingleObjectTemplateResponseMixin
-from django.views.generic.edit import BaseCreateView
+from django.views.generic.edit import BaseCreateView, FormView, UpdateView
-from .forms import RegisterForm
-from .utils import build_url
+from .forms import AuthenticationForm, RegisterForm, PasswordResetForm
+from .models import User
class ChatView(LoginRequiredMixin, TemplateView):
template_name = "chat/login.html"
title = "Login"
- def get_success_url(self):
- return build_url(self.request, "chat-main")
-
class LogoutView(DjangoLogoutView):
http_method_names = ["post", "options", "get"]
return context
-class PasswordResetView(DjangoPasswordResetView):
+class PasswordResetView(FormView):
form_class = PasswordResetForm
template_name = "chat/login.html"
title = "Reset Password"
+ success_url = reverse_lazy("chat-reset-password-success")
+ def form_valid(self, form):
+ """If the form is valid, save the associated model."""
+ form.save(self.request)
+ return super().form_valid(form)
-class RegisterView(SingleObjectTemplateResponseMixin, BaseCreateView):
- form_class = RegisterForm
- title = "Register"
- def get_template_names(self):
- if self.request.GET.get("success") == "1":
- return ["chat/success.html"]
- return ["chat/login.html"]
+class SuccessView(TemplateView):
+ template_name = "chat/success.html"
+ title = "Success"
- def get_context_data(self, **kwargs):
- context = super().get_context_data(**kwargs)
- if self.request.GET.get("success") == "1":
- if settings.TRUST_USER_REGISTRATIONS:
- context["msg"] = mark_safe("Registration complete!")
- else:
- context["msg"] = mark_safe(
- "Registration complete.<br />"
- "Your user account will be activated shortly."
- )
- return context
- def get_success_url(self):
- return build_url(self.request, "chat-register", params={"success": "1"})
+class PasswordResetTokenView(UpdateView):
+ form_class = SetPasswordForm
+ template_name = "chat/login.html"
+ title = "Reset Password"
+ success_url = reverse_lazy("chat-reset-password-token-success")
+ model = User
+
+ def get_object(self, queryset=None):
+ if queryset is None:
+ queryset = User.objects.filter(is_active=True, last_login__isnull=False)
+ try:
+ user = queryset.get(pk=urlsafe_base64_decode(self.kwargs["uidb64"]))
+ except User.DoesNotExist:
+ pass
+ else:
+ if default_token_generator.check_token(user, self.kwargs["token"]):
+ return user
+ raise Http404(
+ "No %(verbose_name)s found matching the query"
+ % {"verbose_name": User._meta.verbose_name}
+ )
+
+ def get_form_kwargs(self):
+ kwargs = super().get_form_kwargs()
+ kwargs["user"] = kwargs.pop("instance")
+ return kwargs
+
+
+class RegisterView(SingleObjectTemplateResponseMixin, BaseCreateView):
+ form_class = RegisterForm
+ template_name = "chat/login.html"
+ title = "Register"
+ success_url = reverse_lazy("chat-register-success")
def form_valid(self, form):
response = super().form_valid(form)
- # instance = self.object
- # here, email the user to confirm their email address
- url = build_url(
- self.request,
- "chat-confirm-email",
- args=[
- b64encode(
- TimestampSigner().sign(self.object.email).encode()
- ).decode().rstrip("=")
- ]
+ user = self.object
+ # email confirmation can not expire, since it cannot be re-requested
+ token = urlsafe_base64_encode(Signer().sign(user.pk).encode())
+ url = self.request.build_absolute_uri(
+ reverse("chat-confirm-email", args=[token])
)
- self.object.email_user(
- "[Chat-App] Confirm your email address!",
- f"Hello {str(self.object)}\nClick here to confirm your email address:\n{url}"
+ user.email_user(
+ f"{settings.EMAIL_SUBJECT_PREFIX}Confirm your email address!",
+ f"Hello {str(user)}\nClick here to confirm your email address:\n{url}"
)
if not settings.TRUST_USER_REGISTRATIONS:
- # here, email admin to activate the new user
- pass
+ mail_admins(
+ f"User registered: {str(user)}",
+ (
+ f"Hello admin,\n"
+ f"User '{user.username}' just registered.\n"
+ f"full name: '{user.get_full_name()}'\n"
+ f"email: '{user.email}'\n\n"
+ "The user is currently inactive and needs 'is_active' set to true "
+ "to do anything with the account. Do we trust that?"
+ ),
+ )
return response
class ConfirmEmailView(TemplateView):
template_name = "chat/success.html"
title = "Email confirmed"
-
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.msg = None
-
- def get(self, request, *args, **kwargs):
- token = f"{kwargs['token']}{'=' * (-len(kwargs['token']) % 3)}"
- email = TimestampSigner().unsign(
- b64decode(token.encode()).decode(),
- max_age=settings.MAX_EMAIL_CONFIRMATION_AGE_SECONDS,
- )
- user = User.objects.get(email=email)
- if settings.TRUST_USER_REGISTRATIONS or user.is_active:
- self.msg = "You are ready to go!"
- else:
- self.msg = "Your user account will be activated shortly."
- if user.last_login:
- if self.msg:
- self.msg = f"Email has already been confirmed! {self.msg}"
- else:
- self.msg = "Email has already been confirmed!"
- update_last_login(None, user)
- return super().get(request, *args, **kwargs)
+ MSG_PARTS = (
+ "Your user account will be activated shortly!",
+ "You are ready to go!",
+ "Email has already been confirmed!",
+ )
+
+ @classmethod
+ def get_msg(cls, user):
+ if settings.TRUST_USER_REGISTRATIONS:
+ return cls.MSG_PARTS[1 + (user.last_login is not None)]
+ msg = cls.MSG_PARTS[user.is_active]
+ if user.last_login is None:
+ return msg
+ return mark_safe("<br />".join((cls.MSG_PARTS[2], msg)))
def get_context_data(self, **kwargs):
+ token = f"{self.kwargs['token']}{'=' * (-len(self.kwargs['token']) & 3)}"
context = super().get_context_data(**kwargs)
- if self.msg:
- context["msg"] = self.msg
+ user = User.objects.get(
+ pk=Signer().unsign(urlsafe_base64_decode(token).decode())
+ )
+ context["msg"] = self.get_msg(user)
+ update_last_login(None, user)
return context
-async def handle_websocket(scope, receive, send):
+import sys
+from asyncio import CancelledError, ensure_future
+from contextlib import contextmanager
+from functools import partial
+from io import BytesIO
+from traceback import print_exception
+
+from asgiref.sync import sync_to_async
+from django.conf import settings
+from django.contrib.auth import aget_user
+from django.contrib.sessions.middleware import SessionMiddleware
+from django.core.handlers.asgi import ASGIRequest
+from django.db import connection
+
+
+async def process_ws(receive, send):
while True:
event = await receive()
-
if event['type'] == 'websocket.connect':
await send({'type': 'websocket.accept'})
-
- if event['type'] == 'websocket.disconnect':
- break
-
- if event['type'] == 'websocket.receive':
+ elif event['type'] == 'websocket.disconnect':
+ return
+ elif event['type'] == 'websocket.receive':
if event['text'] == 'ping':
await send({
'type': 'websocket.send',
- 'text': 'pong!'
+ 'text': "pong",
})
+
+
+@contextmanager
+def listen_notify_handler(conn, callback, loop):
+ loop.add_reader(conn.fileno(), partial(conn.execute, "SELECT 1"))
+ conn.add_notify_handler(callback)
+ conn.execute(f"LISTEN {settings.PG_NOTIFY_CHANNEL}")
+ try:
+ yield
+ finally:
+ conn.execute(f"UNLISTEN {settings.PG_NOTIFY_CHANNEL}")
+ conn.remove_notify_handler(callback)
+ loop.remove_reader(conn.fileno())
+
+
+def process_triggers(send, notification):
+ ensure_future(send({"type": "websocket.send", "text": notification.payload}))
+
+
+async def handle_websocket(scope, receive, send):
+ request = ASGIRequest({**scope, "method": f"_ws"}, BytesIO())
+ SessionMiddleware(lambda x: None).process_request(request)
+ request.user = await aget_user(request)
+
+ if not request.user.is_authenticated:
+ event = await receive()
+ if event['type'] == 'websocket.connect':
+ await send({'type': 'websocket.accept'})
+ await send({"type": "websocket.close"})
+ return
+
+ await sync_to_async(connection.connect)()
+ with listen_notify_handler(
+ connection.connection, partial(process_triggers, send), receive.__self__.loop
+ ):
+ try:
+ await process_ws(receive, send)
+ except CancelledError:
+ pass
+ except Exception:
+ print_exception(*sys.exc_info())
"${PYTHON}" -m venv venv
. venv/bin/activate
"${PYTHON}" -m pip install -qU pip
-"${PYTHON}" -m pip install -qU django 'uvicorn[standard]'
+"${PYTHON}" -m pip install -qU django 'uvicorn[standard]' 'psycopg[c]' django-pgtrigger
-[ ] login tests
- last_login is None is used as a sentinel state
- for unconfirmed email user accounts.
- - login is impossible if email address is unconfirmed
- (which is the same as last_login is None)
-[ ] passsword reset
- - email
- - tests
+[ ] finish tests for api endpoints
+
+[ ] tests for pg-trigger→notify websocket
+ - somehow test each trigger statement individually, that is 4 models * 5 endpoints
+[ ] simple postgres→json/json→postgres rest backend
+[ ] JS
+ - ws client infrastructure: auto-reconnect, notify-throttle
+ - xhr to fetch new data
[ ] css for chat UI
-[ ] JS: ws and polling boilerplate
[ ] ws and chat tests?
-[ ] hook up ws to a pg queue (or comparable)
-[ ] user settings, change password
[ ] channel management: channel admins?
[ ] file uploads
[ ] media uploads: images, gifs, even video files?
-[ ] moderation functions: report messages and hardcoded complaints channel
+[ ] moderation functions: report messages to MANAGERS, including private ones