--- /dev/null
+# See https://pre-commit.com for more information
+# See https://pre-commit.com/hooks.html for more hooks
+repos:
+- repo: https://github.com/pre-commit/pre-commit-hooks
+ rev: v5.0.0
+ hooks:
+ - id: trailing-whitespace
+ - id: end-of-file-fixer
+ - id: check-yaml
+ - id: check-added-large-files
+- repo: https://github.com/astral-sh/ruff-pre-commit
+ rev: v0.7.0
+ hooks:
+ - id: ruff
+ args: [ --fix, --extend-select, I ]
+ - id: ruff-format
-from django.contrib import admin
-
# Register your models here.
class ChatConfig(AppConfig):
- default_auto_field = 'django.db.models.BigAutoField'
- name = 'chat'
+ default_auto_field = "django.db.models.BigAutoField"
@staticmethod
def autodiscover_models():
from django.contrib.admin import site
+
for app_config in get_app_configs():
for model in app_config.get_models():
site.register(model, ModelAdmin)
-"""
-ASGI config for chat project.
+"""ASGI config for chat project.
It exposes the ASGI callable as a module-level variable named ``application``.
import django
from django.core.handlers.asgi import ASGIHandler
-os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'chat.settings')
+os.environ.setdefault("DJANGO_SETTINGS_MODULE", "chat.settings")
class ASGIHandlerWithWebsocket(ASGIHandler):
async def __call__(self, scope, receive, send):
from .urls import websocket_urls
+
if scope["type"] == "websocket":
await websocket_urls.get(scope["path"])(scope, receive, send)
else:
await super().__call__(scope, receive, send)
-
def get_asgi_application():
django.setup(set_prefix=False)
return ASGIHandlerWithWebsocket()
from urllib.parse import urlunsplit
from django.conf import settings
-from django.contrib.auth.forms import (
- AuthenticationForm as DjangoAuthenticationForm, BaseUserCreationForm
-)
+from django.contrib.auth.forms import AuthenticationForm as DjangoAuthenticationForm
+from django.contrib.auth.forms import BaseUserCreationForm
from django.contrib.auth.tokens import default_token_generator
from django.contrib.sites.shortcuts import get_current_site
from django.core.exceptions import ValidationError
class Meta:
model = User
- fields = ['email', 'username', 'first_name', 'last_name']
+ fields = ["email", "username", "first_name", "last_name"]
class PasswordResetForm(Form):
),
None,
None,
- )
+ ),
)
user.email_user(
f"Password reset on {site.name}",
f"Your username, in case you've forgotten: {user}\n\n"
"Thanks for using our site!\n"
f"The {site.name} team"
- )
+ ),
)
class Migration(migrations.Migration):
-
initial = True
dependencies = [
- ('auth', '0012_alter_user_first_name_max_length'),
+ ("auth", "0012_alter_user_first_name_max_length"),
]
operations = [
migrations.CreateModel(
- name='Channel',
+ name="Channel",
fields=[
- ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
- ('name', models.CharField(max_length=256, unique=True)),
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ ("name", models.CharField(max_length=256, unique=True)),
],
),
migrations.CreateModel(
- name='User',
+ name="User",
fields=[
- ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
- ('password', models.CharField(max_length=128, verbose_name='password')),
- ('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
- ('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
- ('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
- ('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
- ('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
- ('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
- ('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
- ('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
- ('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
- ('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
- ('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ ("password", models.CharField(max_length=128, verbose_name="password")),
+ (
+ "last_login",
+ models.DateTimeField(
+ blank=True,
+ null=True,
+ verbose_name="last login",
+ ),
+ ),
+ (
+ "is_superuser",
+ models.BooleanField(
+ default=False,
+ help_text="Designates that this user has all permissions without explicitly assigning them.",
+ verbose_name="superuser status",
+ ),
+ ),
+ (
+ "username",
+ models.CharField(
+ error_messages={
+ "unique": "A user with that username already exists.",
+ },
+ help_text="Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.",
+ max_length=150,
+ unique=True,
+ validators=[
+ django.contrib.auth.validators.UnicodeUsernameValidator(),
+ ],
+ verbose_name="username",
+ ),
+ ),
+ (
+ "first_name",
+ models.CharField(
+ blank=True,
+ max_length=150,
+ verbose_name="first name",
+ ),
+ ),
+ (
+ "last_name",
+ models.CharField(
+ blank=True,
+ max_length=150,
+ verbose_name="last name",
+ ),
+ ),
+ (
+ "email",
+ models.EmailField(
+ blank=True,
+ max_length=254,
+ verbose_name="email address",
+ ),
+ ),
+ (
+ "is_staff",
+ models.BooleanField(
+ default=False,
+ help_text="Designates whether the user can log into this admin site.",
+ verbose_name="staff status",
+ ),
+ ),
+ (
+ "is_active",
+ models.BooleanField(
+ default=True,
+ help_text="Designates whether this user should be treated as active. Unselect this instead of deleting accounts.",
+ verbose_name="active",
+ ),
+ ),
+ (
+ "date_joined",
+ models.DateTimeField(
+ default=django.utils.timezone.now,
+ verbose_name="date joined",
+ ),
+ ),
+ (
+ "groups",
+ models.ManyToManyField(
+ blank=True,
+ help_text="The groups this user belongs to. A user will get all permissions granted to each of their groups.",
+ related_name="user_set",
+ related_query_name="user",
+ to="auth.group",
+ verbose_name="groups",
+ ),
+ ),
+ (
+ "user_permissions",
+ models.ManyToManyField(
+ blank=True,
+ help_text="Specific permissions for this user.",
+ related_name="user_set",
+ related_query_name="user",
+ to="auth.permission",
+ verbose_name="user permissions",
+ ),
+ ),
],
managers=[
- ('objects', django.contrib.auth.models.UserManager()),
+ ("objects", django.contrib.auth.models.UserManager()),
],
),
migrations.CreateModel(
- name='ChannelMessage',
+ name="ChannelMessage",
fields=[
- ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
- ('ts', models.DateTimeField(auto_now_add=True)),
- ('text', models.TextField()),
- ('channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='chat.channel')),
- ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ ("ts", models.DateTimeField(auto_now_add=True)),
+ ("text", models.TextField()),
+ (
+ "channel",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ to="chat.channel",
+ ),
+ ),
+ (
+ "user",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ to=settings.AUTH_USER_MODEL,
+ ),
+ ),
],
),
migrations.CreateModel(
- name='ChannelUser',
+ name="ChannelUser",
fields=[
- ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
- ('channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='chat.channel')),
- ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ (
+ "channel",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ to="chat.channel",
+ ),
+ ),
+ (
+ "user",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ to=settings.AUTH_USER_MODEL,
+ ),
+ ),
],
),
migrations.AddField(
- model_name='channel',
- name='users',
- field=models.ManyToManyField(related_name='channels', through='chat.ChannelUser', to=settings.AUTH_USER_MODEL),
+ model_name="channel",
+ name="users",
+ field=models.ManyToManyField(
+ related_name="channels",
+ through="chat.ChannelUser",
+ to=settings.AUTH_USER_MODEL,
+ ),
),
migrations.CreateModel(
- name='PrivateMessage',
+ name="PrivateMessage",
fields=[
- ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
- ('ts', models.DateTimeField(auto_now_add=True)),
- ('text', models.TextField()),
- ('recipient', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='pns_recieved', to=settings.AUTH_USER_MODEL)),
- ('sender', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='pms_sent', to=settings.AUTH_USER_MODEL)),
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ ("ts", models.DateTimeField(auto_now_add=True)),
+ ("text", models.TextField()),
+ (
+ "recipient",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="pns_recieved",
+ to=settings.AUTH_USER_MODEL,
+ ),
+ ),
+ (
+ "sender",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="pms_sent",
+ to=settings.AUTH_USER_MODEL,
+ ),
+ ),
],
),
pgtrigger.migrations.AddTrigger(
- model_name='user',
- trigger=pgtrigger.compiler.Trigger(name='pg_notify_user', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ', hash='7f6e8eba01193825febb722ab5558a49773b1c1a', operation='DELETE OR INSERT OR UPDATE', pgid='pgtrigger_pg_notify_user_152e9', table='chat_user', when='AFTER')),
+ model_name="user",
+ trigger=pgtrigger.compiler.Trigger(
+ name="pg_notify_user",
+ sql=pgtrigger.compiler.UpsertTriggerSql(
+ func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ',
+ hash="7f6e8eba01193825febb722ab5558a49773b1c1a",
+ operation="DELETE OR INSERT OR UPDATE",
+ pgid="pgtrigger_pg_notify_user_152e9",
+ table="chat_user",
+ when="AFTER",
+ ),
+ ),
),
pgtrigger.migrations.AddTrigger(
- model_name='user',
- trigger=pgtrigger.compiler.Trigger(name='pg_notify_user_truncate', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME || \'"}\'\n );\n RETURN NULL;\n ', hash='58944d2a6ef65e1fe00c504ca2488854a9ed2989', level='STATEMENT', operation='TRUNCATE', pgid='pgtrigger_pg_notify_user_truncate_b8c18', table='chat_user', when='AFTER')),
+ model_name="user",
+ trigger=pgtrigger.compiler.Trigger(
+ name="pg_notify_user_truncate",
+ sql=pgtrigger.compiler.UpsertTriggerSql(
+ func="\n PERFORM pg_notify(\n 'pg_notify',\n '{\"op\":\"' || TG_OP || '\",\"table\":\"' || TG_TABLE_NAME || '\"}'\n );\n RETURN NULL;\n ",
+ hash="58944d2a6ef65e1fe00c504ca2488854a9ed2989",
+ level="STATEMENT",
+ operation="TRUNCATE",
+ pgid="pgtrigger_pg_notify_user_truncate_b8c18",
+ table="chat_user",
+ when="AFTER",
+ ),
+ ),
),
pgtrigger.migrations.AddTrigger(
- model_name='channelmessage',
- trigger=pgtrigger.compiler.Trigger(name='pg_notify_channelmessage', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ', hash='b6e672dd9e454c168ef544260779943b1e40fe6d', operation='DELETE OR INSERT OR UPDATE', pgid='pgtrigger_pg_notify_channelmessage_d2b2e', table='chat_channelmessage', when='AFTER')),
+ model_name="channelmessage",
+ trigger=pgtrigger.compiler.Trigger(
+ name="pg_notify_channelmessage",
+ sql=pgtrigger.compiler.UpsertTriggerSql(
+ func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ',
+ hash="b6e672dd9e454c168ef544260779943b1e40fe6d",
+ operation="DELETE OR INSERT OR UPDATE",
+ pgid="pgtrigger_pg_notify_channelmessage_d2b2e",
+ table="chat_channelmessage",
+ when="AFTER",
+ ),
+ ),
),
pgtrigger.migrations.AddTrigger(
- model_name='channelmessage',
- trigger=pgtrigger.compiler.Trigger(name='pg_notify_channelmessage_truncate', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME || \'"}\'\n );\n RETURN NULL;\n ', hash='8f1c4f12257239f00ee73bfd21b226a25a2c6a16', level='STATEMENT', operation='TRUNCATE', pgid='pgtrigger_pg_notify_channelmessage_truncate_02c45', table='chat_channelmessage', when='AFTER')),
+ model_name="channelmessage",
+ trigger=pgtrigger.compiler.Trigger(
+ name="pg_notify_channelmessage_truncate",
+ sql=pgtrigger.compiler.UpsertTriggerSql(
+ func="\n PERFORM pg_notify(\n 'pg_notify',\n '{\"op\":\"' || TG_OP || '\",\"table\":\"' || TG_TABLE_NAME || '\"}'\n );\n RETURN NULL;\n ",
+ hash="8f1c4f12257239f00ee73bfd21b226a25a2c6a16",
+ level="STATEMENT",
+ operation="TRUNCATE",
+ pgid="pgtrigger_pg_notify_channelmessage_truncate_02c45",
+ table="chat_channelmessage",
+ when="AFTER",
+ ),
+ ),
),
pgtrigger.migrations.AddTrigger(
- model_name='channeluser',
- trigger=pgtrigger.compiler.Trigger(name='pg_notify_channeluser', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ', hash='6d4422d5ba8d654603affe34f0cabebe53fe23b0', operation='DELETE OR INSERT OR UPDATE', pgid='pgtrigger_pg_notify_channeluser_f01cc', table='chat_channeluser', when='AFTER')),
+ model_name="channeluser",
+ trigger=pgtrigger.compiler.Trigger(
+ name="pg_notify_channeluser",
+ sql=pgtrigger.compiler.UpsertTriggerSql(
+ func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ',
+ hash="6d4422d5ba8d654603affe34f0cabebe53fe23b0",
+ operation="DELETE OR INSERT OR UPDATE",
+ pgid="pgtrigger_pg_notify_channeluser_f01cc",
+ table="chat_channeluser",
+ when="AFTER",
+ ),
+ ),
),
pgtrigger.migrations.AddTrigger(
- model_name='channeluser',
- trigger=pgtrigger.compiler.Trigger(name='pg_notify_channeluser_truncate', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME || \'"}\'\n );\n RETURN NULL;\n ', hash='7eee59ce12c198a1144e9bd89914eb0845bee5f5', level='STATEMENT', operation='TRUNCATE', pgid='pgtrigger_pg_notify_channeluser_truncate_5ae88', table='chat_channeluser', when='AFTER')),
+ model_name="channeluser",
+ trigger=pgtrigger.compiler.Trigger(
+ name="pg_notify_channeluser_truncate",
+ sql=pgtrigger.compiler.UpsertTriggerSql(
+ func="\n PERFORM pg_notify(\n 'pg_notify',\n '{\"op\":\"' || TG_OP || '\",\"table\":\"' || TG_TABLE_NAME || '\"}'\n );\n RETURN NULL;\n ",
+ hash="7eee59ce12c198a1144e9bd89914eb0845bee5f5",
+ level="STATEMENT",
+ operation="TRUNCATE",
+ pgid="pgtrigger_pg_notify_channeluser_truncate_5ae88",
+ table="chat_channeluser",
+ when="AFTER",
+ ),
+ ),
),
pgtrigger.migrations.AddTrigger(
- model_name='channel',
- trigger=pgtrigger.compiler.Trigger(name='pg_notify_channel', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ', hash='32baa6dfc09e87012fc270e79ff8a074b5c9ffcd', operation='DELETE OR INSERT OR UPDATE', pgid='pgtrigger_pg_notify_channel_fab0c', table='chat_channel', when='AFTER')),
+ model_name="channel",
+ trigger=pgtrigger.compiler.Trigger(
+ name="pg_notify_channel",
+ sql=pgtrigger.compiler.UpsertTriggerSql(
+ func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ',
+ hash="32baa6dfc09e87012fc270e79ff8a074b5c9ffcd",
+ operation="DELETE OR INSERT OR UPDATE",
+ pgid="pgtrigger_pg_notify_channel_fab0c",
+ table="chat_channel",
+ when="AFTER",
+ ),
+ ),
),
pgtrigger.migrations.AddTrigger(
- model_name='channel',
- trigger=pgtrigger.compiler.Trigger(name='pg_notify_channel_truncate', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME || \'"}\'\n );\n RETURN NULL;\n ', hash='0f2df778b97a70d4e3e883b0c80430e8d4ef194e', level='STATEMENT', operation='TRUNCATE', pgid='pgtrigger_pg_notify_channel_truncate_bcd8a', table='chat_channel', when='AFTER')),
+ model_name="channel",
+ trigger=pgtrigger.compiler.Trigger(
+ name="pg_notify_channel_truncate",
+ sql=pgtrigger.compiler.UpsertTriggerSql(
+ func="\n PERFORM pg_notify(\n 'pg_notify',\n '{\"op\":\"' || TG_OP || '\",\"table\":\"' || TG_TABLE_NAME || '\"}'\n );\n RETURN NULL;\n ",
+ hash="0f2df778b97a70d4e3e883b0c80430e8d4ef194e",
+ level="STATEMENT",
+ operation="TRUNCATE",
+ pgid="pgtrigger_pg_notify_channel_truncate_bcd8a",
+ table="chat_channel",
+ when="AFTER",
+ ),
+ ),
),
pgtrigger.migrations.AddTrigger(
- model_name='privatemessage',
- trigger=pgtrigger.compiler.Trigger(name='pg_notify_privatemessage', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"sender_id":\' || OLD."sender_id" || \',"recipient_id":\' || OLD."recipient_id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"sender_id":\' || NEW."sender_id" || \',"recipient_id":\' || NEW."recipient_id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ', hash='03ce1756ef8739ddbaac8c46c954bb7f29cfb352', operation='DELETE OR INSERT OR UPDATE', pgid='pgtrigger_pg_notify_privatemessage_92534', table='chat_privatemessage', when='AFTER')),
+ model_name="privatemessage",
+ trigger=pgtrigger.compiler.Trigger(
+ name="pg_notify_privatemessage",
+ sql=pgtrigger.compiler.UpsertTriggerSql(
+ func='\n IF TG_OP = \'DELETE\' THEN\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"sender_id":\' || OLD."sender_id" || \',"recipient_id":\' || OLD."recipient_id" || \'}}\'\n );\n ELSE\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"sender_id":\' || NEW."sender_id" || \',"recipient_id":\' || NEW."recipient_id" || \'}}\'\n );\n END IF;\n RETURN NULL;\n ',
+ hash="03ce1756ef8739ddbaac8c46c954bb7f29cfb352",
+ operation="DELETE OR INSERT OR UPDATE",
+ pgid="pgtrigger_pg_notify_privatemessage_92534",
+ table="chat_privatemessage",
+ when="AFTER",
+ ),
+ ),
),
pgtrigger.migrations.AddTrigger(
- model_name='privatemessage',
- trigger=pgtrigger.compiler.Trigger(name='pg_notify_privatemessage_truncate', sql=pgtrigger.compiler.UpsertTriggerSql(func='\n PERFORM pg_notify(\n \'pg_notify\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME || \'"}\'\n );\n RETURN NULL;\n ', hash='edd1292c2830959599e3bec829029206726381d6', level='STATEMENT', operation='TRUNCATE', pgid='pgtrigger_pg_notify_privatemessage_truncate_e9186', table='chat_privatemessage', when='AFTER')),
+ model_name="privatemessage",
+ trigger=pgtrigger.compiler.Trigger(
+ name="pg_notify_privatemessage_truncate",
+ sql=pgtrigger.compiler.UpsertTriggerSql(
+ func="\n PERFORM pg_notify(\n 'pg_notify',\n '{\"op\":\"' || TG_OP || '\",\"table\":\"' || TG_TABLE_NAME || '\"}'\n );\n RETURN NULL;\n ",
+ hash="edd1292c2830959599e3bec829029206726381d6",
+ level="STATEMENT",
+ operation="TRUNCATE",
+ pgid="pgtrigger_pg_notify_privatemessage_truncate_e9186",
+ table="chat_privatemessage",
+ when="AFTER",
+ ),
+ ),
),
]
-from django.db.models import (
- CASCADE, CharField, DateTimeField, ForeignKey, ManyToManyField, Model, TextField
-)
from django.conf import settings
from django.contrib.auth.models import AbstractUser
+from django.db.models import (
+ CASCADE,
+ CharField,
+ DateTimeField,
+ ForeignKey,
+ ManyToManyField,
+ Model,
+ TextField,
+)
+
from .triggers import triggers_for_table
settings.PG_NOTIFY_CHANNEL,
"PrivateMessage",
("id", "sender_id", "recipient_id"),
- )
+ ),
]
users = ManyToManyField(User, related_name="channels", through="ChannelUser")
class Meta:
- triggers = [
- *triggers_for_table(settings.PG_NOTIFY_CHANNEL, "Channel", ("id",))
- ]
+ triggers = [*triggers_for_table(settings.PG_NOTIFY_CHANNEL, "Channel", ("id",))]
class ChannelUser(Model):
settings.PG_NOTIFY_CHANNEL,
"ChannelUser",
("id", "user_id", "channel_id"),
- )
+ ),
]
settings.PG_NOTIFY_CHANNEL,
"ChannelMessage",
("id", "user_id", "channel_id"),
- )
+ ),
]
if "before" in query:
queryset = queryset.filter(pk__lt=int(query["before"]))
count = queryset.count()
- queryset = queryset[max(count - settings.DEFAULT_PAGE_SIZE, 0):]
+ queryset = queryset[max(count - settings.DEFAULT_PAGE_SIZE, 0) :]
if count > settings.DEFAULT_PAGE_SIZE:
query["before"] = queryset[0].pk
prev_url = self.request.build_absolute_uri(
- urlunsplit(("", "", full_path, query.urlencode(), ""))
+ urlunsplit(("", "", full_path, query.urlencode(), "")),
)
else:
prev_url = None
self.paginate(self.get_queryset(), serializer.to_json),
**self.get_json_dump_kwargs(),
),
- content_type="application/json"
+ content_type="application/json",
)
def create(self):
def create_or_update(self, *args, **kwargs):
serializer = self.serializer(self.request)
- instance, m2m = serializer.from_json(
- json.load(self.request), *args
- )
+ instance, m2m = serializer.from_json(json.load(self.request), *args)
instance.save()
for field_name, value in m2m.items():
getattr(instance, field_name).set(value)
if recipient_id is not None:
queryset = queryset.filter(
Q(sender=self.request.user, recipient_id=recipient_id)
- | Q(sender_id=recipient_id, recipient=self.request.user)
+ | Q(sender_id=recipient_id, recipient=self.request.user),
)
else:
queryset = queryset.filter(
- Q(sender=self.request.user) | Q(recipient=self.request.user)
+ Q(sender=self.request.user) | Q(recipient=self.request.user),
)
return queryset.order_by("ts")
return super().get_queryset().filter(users=self.request.user.pk)
-#class ChannelUserRestView(ListAllMixin, ModelRestView):
+# class ChannelUserRestView(ListAllMixin, ModelRestView):
# serializer = ChannelUserSerializer
#
# def get_queryset(self):
serializer = ChannelMessageSerializer
def get_queryset(self):
- queryset = super().get_queryset().filter(
- channel__users=self.request.user.pk
- )
+ queryset = super().get_queryset().filter(channel__users=self.request.user.pk)
if "channel_id" in self.request.GET:
queryset = queryset.filter(channel_id=self.request.GET["channel_id"])
return queryset.order_by("ts")
from typing import Type
from django.db.models import (
- DateTimeField, ForeignKey, ManyToManyField, ManyToManyRel, Model
+ DateTimeField,
+ ForeignKey,
+ ManyToManyField,
+ ManyToManyRel,
+ Model,
)
from django.urls import reverse
from django.utils.timezone import now
-from .models import Channel, PrivateMessage, User, ChannelMessage # , ChannelUser
+from .models import Channel, ChannelMessage, PrivateMessage, User # , ChannelUser
class ModelSerializer:
def field_to_json(self, field_name, instance):
if field_name == "url":
- name = self.model._meta.verbose_name.lower().replace(' ', '')
+ name = self.model._meta.verbose_name.lower().replace(" ", "")
value = reverse(f"chat-{name}-detail", args=[instance.pk])
if self.request:
value = self.request.build_absolute_uri(value)
def from_json(self, data, instance=None):
if instance is None:
- data.update(
- {
- "sender_id": self.request.user.pk,
- "ts": now().isoformat(" ")
- }
- )
+ data.update({"sender_id": self.request.user.pk, "ts": now().isoformat(" ")})
else:
data.pop("ts", None)
return super().from_json(data, instance)
return instance, m2m
-#class ChannelUserSerializer(ModelSerializer):
+# class ChannelUserSerializer(ModelSerializer):
# model = ChannelUser
# fields = ["id", "url", "channel", "user"]
-"""
-Django settings for chat project.
+"""Django settings for chat project.
Generated by 'django-admin startproject' using Django 5.1.1.
For the full list of settings and their values, see
https://docs.djangoproject.com/en/5.1/ref/settings/
"""
+
import os
from pathlib import Path
"PASSWORD": os.environ.get("RDS_PASSWORD"),
"HOST": os.environ.get("RDS_HOSTNAME"),
"PORT": os.environ.get("RDS_PORT"),
- }
+ },
}
AUTH_PASSWORD_VALIDATORS = [
SECRET_KEY = os.environ.get("SECRET_KEY")
try:
- from .settings_local import *
+ from .settings_local import * # noqa: F403
except ImportError: # pragma: no cover
pass
import sys
from html.parser import HTMLParser
from importlib import import_module, reload
-from string import hexdigits
from secrets import token_urlsafe
+from string import hexdigits
from urllib.parse import urlencode
from django.conf import settings
-from django.contrib.auth.forms import SetPasswordForm, AuthenticationForm
+from django.contrib.auth.forms import AuthenticationForm, SetPasswordForm
from django.core import mail
from django.db.models import Max
from django.http import HttpResponse
self.errorlist_stack.append(tag)
if tag in self.IGNORE_TAGS:
return
- elif tag == "form":
+ if tag == "form":
self.forms.append((attrs, []))
elif tag == "input":
input_name = self.get_attr("name", attrs)
pass
else:
raise ValueError(
- f"unhandled input type: {repr(input_type)}"
+ f"unhandled input type: {input_type!r}",
) # pragma: no cover
elif tag == "ul":
if "errorlist" in (self.get_attr("class", attrs) or "").split():
self.errorlist_stack = ["ul"]
else:
- raise ValueError(f"unhandled tag: {repr(tag)}") # pragma: no cover
+ raise ValueError(f"unhandled tag: {tag!r}") # pragma: no cover
def handle_data(self, data):
if self.errorlist_stack is None:
register_url = reverse("chat-register")
response = self.client.get(register_url)
fields, csrf_token = self.get_form_fields_and_csrf_token(
- response.content.decode()
+ response.content.decode(),
)
payload = {
"csrfmiddlewaretoken": csrf_token,
"attachments": [],
"extra_headers": {},
"alternatives": [],
- }
+ },
)
confirm_url = confirm_email.body
pos = confirm_url.find(self.get_empty_confirm_url())
assert pos >= 0
end_pos = content.find(f"</{tag_name}>")
assert end_pos >= 0
- return content[pos + 1:end_pos]
+ return content[pos + 1 : end_pos]
def reload_urls(self):
if settings.ROOT_URLCONF in sys.modules:
reload(sys.modules[settings.ROOT_URLCONF])
import_module(settings.ROOT_URLCONF)
from django.urls import resolvers
+
resolvers._get_cached_resolver.cache_clear()
@override_settings(TRUST_USER_REGISTRATIONS=True)
content = self.client.get(confirm_url).content.decode()
self.assertEqual(
self.extract_tag_data("p", content),
- "Email has already been confirmed!"
+ "Email has already been confirmed!",
)
@override_settings(
content = self.client.get(confirm_url).content.decode()
self.assertEqual(
self.extract_tag_data("p", content),
- f"Email has already been confirmed!<br />{msg}"
+ f"Email has already been confirmed!<br />{msg}",
)
user.is_active = True
user.save(update_fields=["is_active"])
content = self.client.get(confirm_url).content.decode()
self.assertEqual(
self.extract_tag_data("p", content),
- f"Email has already been confirmed!<br />You are ready to go!"
+ "Email has already been confirmed!<br />You are ready to go!",
)
admin_msg = mail.outbox[1]
self.assertEqual(
- admin_msg.subject, f"[ChatApp] User registered: {payload['username']}"
+ admin_msg.subject,
+ f"[ChatApp] User registered: {payload['username']}",
)
self.assertEqual(
admin_msg.body,
f"email: '{payload['email']}'\n\n"
"The user is currently inactive and needs 'is_active' set to true "
"to do anything with the account. Do we trust that?"
- )
+ ),
)
def test_reset_password(self):
reset_password_url = reverse("chat-reset-password")
response = self.client.get(reset_password_url)
fields, csrf_token = self.get_form_fields_and_csrf_token(
- response.content.decode()
+ response.content.decode(),
)
payload = {"csrfmiddlewaretoken": csrf_token, "email": "User@example.com"}
self.assertEqual(set(payload), set(field[0] for field in fields))
response = self.client.post(reset_password_url, payload)
self.assertEqual(response.status_code, 302)
self.assertEqual(
- response.headers["Location"], reverse("chat-reset-password-success")
+ response.headers["Location"],
+ reverse("chat-reset-password-success"),
)
content = self.client.get(response.headers["Location"]).content.decode()
self.assertEqual(self.extract_tag_data("p", content), "Password reset sent")
"You're receiving this email because you requested a password reset "
"for your user account at testserver.\n\nPlease go to the following "
"page and choose a new password:\n\n"
- )
+ ),
)
self.assertEqual(
reset_email.body[after_url_pos:],
"\nYour username, in case you've forgotten: example_user\n\n"
"Thanks for using our site!\n"
"The testserver team"
- )
+ ),
)
url = reset_email.body[url_pos:after_url_pos]
fields, csrf_token = self.get_form_fields_and_csrf_token(
- self.client.get(url).content.decode()
+ self.client.get(url).content.decode(),
)
payload = {
"csrfmiddlewaretoken": csrf_token,
fe = FormExtractor()
fe.feed(response.content.decode())
self.assertEqual(
- fe.errorlist, [SetPasswordForm.error_messages["password_mismatch"]]
+ fe.errorlist,
+ [SetPasswordForm.error_messages["password_mismatch"]],
)
response = self.client.post(url, payload)
self.assertEqual(response.status_code, 302)
self.assertEqual(
- response.headers["Location"], reverse("chat-reset-password-token-success")
+ response.headers["Location"],
+ reverse("chat-reset-password-token-success"),
)
content = self.client.get(response.headers["Location"]).content.decode()
self.assertEqual(
- self.extract_tag_data("p", content), "Password reset successful"
+ self.extract_tag_data("p", content),
+ "Password reset successful",
)
user.refresh_from_db()
user.check_password("SillyNewPw1234!")
other_slash_pos = url.rfind("/", 0, slash_pos)
self.assertGreaterEqual(other_slash_pos, 0)
fake_uid = urlsafe_base64_encode(
- force_bytes(User.objects.aggregate(Max('pk'))["pk__max"] + 1)
+ force_bytes(User.objects.aggregate(Max("pk"))["pk__max"] + 1),
)
url_modifieds = (
f"{url[:dash_pos + 1]}{c}{url[dash_pos + 2:]}",
- f"{url[:other_slash_pos + 1]}{fake_uid}{url[slash_pos:]}"
+ f"{url[:other_slash_pos + 1]}{fake_uid}{url[slash_pos:]}",
)
for url_modified in url_modifieds:
response = self.client.get(url_modified)
(
{"is_active": False},
[
- AuthenticationForm.error_messages["invalid_login"] % {
- "username": "username"
- }
- ]
+ AuthenticationForm.error_messages["invalid_login"]
+ % {"username": "username"},
+ ],
),
({"is_active": True}, ["This account's email address is not confirmed."]),
({"last_login": now()}, None),
user.__dict__.update(update)
user.save(update_fields=update.keys())
fields, csrf_token = self.get_form_fields_and_csrf_token(
- self.client.get(login_url).content.decode()
+ self.client.get(login_url).content.decode(),
)
payload["csrfmiddlewaretoken"] = csrf_token
self.assertEqual(set(payload), set(field[0] for field in fields))
class ChatTest(ChatTestMixin, TestCase):
def setup_users(self):
user1 = User(
- email="u1@example.com", username="u1", is_active=True, last_login=now()
+ email="u1@example.com",
+ username="u1",
+ is_active=True,
+ last_login=now(),
)
user1.set_password("UserOnePassword123")
user1.save()
user2 = User(
- email="u2@example.com", username="u2", is_active=True, last_login=now()
+ email="u2@example.com",
+ username="u2",
+ is_active=True,
+ last_login=now(),
)
user2_password = "UserTwoPassword123"
user2.set_password(user2_password)
channel.users.remove(user2)
user2.channels.add(channel)
- user1_url = reverse('chat-user-detail', args=[user1.pk])
+ user1_url = reverse("chat-user-detail", args=[user1.pk])
user2_url = reverse("chat-user-detail", args=[user2.pk])
response = self.client.get(user2_url)
"date_joined": user2.date_joined.isoformat(" "),
"channels": [channel.pk],
"is_authenticated": True,
- }
+ },
)
response = self.client.get(
- reverse("chat-user-current-detail"), headers={"cookie": cookies}
+ reverse("chat-user-current-detail"),
+ headers={"cookie": cookies},
)
self.assertEqual(data, json.loads(response.content))
response = self.client.get(
- reverse("chat-user-list"), headers={"cookie": cookies}
+ reverse("chat-user-list"),
+ headers={"cookie": cookies},
)
self.assertEqual(
json.loads(response.content),
"channels": [channel.pk],
"is_authenticated": True,
},
- ]
- }
+ ],
+ },
)
self.client.get(reverse("chat-logout"), headers={"cookie": cookies})
"email": "u4@example.com",
"is_active": True,
"last_login": now().isoformat(" "),
- "channels": [channel.pk]
- }
+ "channels": [channel.pk],
+ },
).encode(),
content_type="application/json",
headers={"cookie": cookies},
"username": "user4",
"is_active": False,
"last_name": "Nichols",
- }
+ },
).encode(),
content_type="application/json",
headers={"cookie": cookies},
)
user4_data = json.loads(response.content)
- user4_url = reverse('chat-user-detail', args=[user4_data['id']])
+ user4_url = reverse("chat-user-detail", args=[user4_data["id"]])
self.assertEqual(
user4_data,
{
"url": f"http://testserver{channel_url}",
"name": "test channel",
"users": [user1.pk, user2.pk],
- }
+ },
)
response = self.client.post(
reverse("chat-channel-list"),
"id": None,
"recipient_id": user1.pk,
"text": "hello pms world",
- }
+ },
),
content_type="application/json",
headers={"cookie": cookies},
"sender_id": user2.pk,
"recipient_id": user1.pk,
"text": "hello pms world",
- }
+ },
)
- user3 = User.objects.create(
+ User.objects.create(
username="additional user",
email="additional@user.com",
last_login=now(),
data["url"],
json.dumps(
{
- #"sender_id": user3.pk,
- #"recipient_id": user3.pk,
+ # "sender_id": user3.pk,
+ # "recipient_id": user3.pk,
"ts": "2022-02-22 22:22:22.363636+00:00",
"text": "hello updated pms world",
- }
+ },
),
content_type="application/json",
headers={"cookie": cookies},
"sender_id": user2.pk,
"recipient_id": user1.pk,
"text": "hello updated pms world",
- }
+ },
)
response = self.client.get(
f"{reverse('chat-privatemessage-list')}?other={user1.pk}",
or (m["sender_id"] == user1.pk and m["recipient_id"] == user2.pk)
)
for m in data["result"]
- )
+ ),
)
self.assertEqual(
data,
"recipient_id": user1.pk,
"text": "hello updated pms world",
},
- ]
- }
+ ],
+ },
)
-
def test_chat_channelmessage_api(self):
user1, user2, user2_password, channel = self.setup_users()
cookies = self.login_user(user2, user2_password)
{
"channel_id": channel.pk,
"text": "hello world",
- }
+ },
),
content_type="application/json",
headers={"cookie": cookies},
"channel_id": channel.pk,
"text": "hello world",
"user_id": user2.pk,
- }
+ },
)
for x in range(10):
ChannelMessage.objects.create(
def fields_to_json(t, fields):
- return ",".join(f"\"{field}\":' || {t}.\"{field}\" || '" for field in fields)
+ return ",".join(f'"{field}":\' || {t}."{field}" || \'' for field in fields)
def triggers_for_table(channel_name, model_name, fields):
-"""
-URL configuration for chat project.
+"""URL configuration for chat project.
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/5.1/topics/http/urls/
+
Examples:
Function views
1. Add an import: from my_app import views
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
+
"""
+
from django.conf import settings
-from django.contrib.staticfiles.views import serve as serve_staticfiles
from django.contrib import admin
+from django.contrib.staticfiles.views import serve as serve_staticfiles
from django.urls import path, re_path
from django.utils.safestring import mark_safe
from .rest_views import (
- ChannelMessageRestView, ChannelRestView, PrivateMessageRestView, UserRestView
+ ChannelMessageRestView,
+ ChannelRestView,
+ PrivateMessageRestView,
+ UserRestView,
)
from .views import (
ChatView,
ConfirmEmailView,
LoginView,
LogoutView,
- SuccessView,
PasswordResetTokenView,
PasswordResetView,
RegisterView,
+ SuccessView,
)
from .websocket import handle_websocket
if settings.TRUST_USER_REGISTRATIONS
else mark_safe(
"Registration complete!<br />"
- "Your user account will be activated shortly!"
+ "Your user account will be activated shortly!",
)
- )
- }
+ ),
+ },
),
name="chat-register-success",
),
),
name="chat-reset-password-token-success",
),
-
path("", ChatView.as_view(), name="chat-main"),
path("login/", LoginView.as_view(), name="chat-login"),
path(
name="chat-confirm-email",
),
path("logout/", LogoutView.as_view(), name="chat-logout"),
-
path("admin/", admin.site.urls),
*UserRestView.get_urls("user"),
*PrivateMessageRestView.get_urls("privatemessage"),
if settings.DEBUG: # pragma: no cover
urlpatterns.append(
- re_path(f"{settings.STATIC_URL.lstrip('/')}(?P<path>.*)$", serve_staticfiles)
+ re_path(f"{settings.STATIC_URL.lstrip('/')}(?P<path>.*)$", serve_staticfiles),
)
websocket_urls = {"/": handle_websocket}
def get_app_configs(required_submodule=None):
from django.apps import apps
+
for app_config in apps.get_app_configs():
if "/site-packages/" not in app_config.path and (
required_submodule is None
-from asgiref.sync import sync_to_async
-
from django.conf import settings
from django.contrib.auth.forms import SetPasswordForm
from django.contrib.auth.models import update_last_login
from django.contrib.auth.tokens import default_token_generator
from django.contrib.auth.views import (
LoginView as DjangoLoginView,
+)
+from django.contrib.auth.views import (
LogoutView as DjangoLogoutView,
)
from django.contrib.staticfiles import finders
from django.core.mail import mail_admins
from django.core.signing import Signer
-from django.http import Http404, FileResponse
+from django.http import FileResponse, Http404
from django.urls import reverse, reverse_lazy
from django.utils.http import urlsafe_base64_decode, urlsafe_base64_encode
from django.utils.safestring import mark_safe
from django.views.generic.detail import SingleObjectTemplateResponseMixin
from django.views.generic.edit import BaseCreateView, FormView, UpdateView
-from .forms import AuthenticationForm, RegisterForm, PasswordResetForm
+from .forms import AuthenticationForm, PasswordResetForm, RegisterForm
from .models import User
return user
raise Http404(
"No %(verbose_name)s found matching the query"
- % {"verbose_name": User._meta.verbose_name}
+ % {"verbose_name": User._meta.verbose_name},
)
def get_form_kwargs(self):
# email confirmation can not expire, since it cannot be re-requested
token = urlsafe_base64_encode(Signer().sign(user.pk).encode())
url = self.request.build_absolute_uri(
- reverse("chat-confirm-email", args=[token])
+ reverse("chat-confirm-email", args=[token]),
)
user.email_user(
f"{settings.EMAIL_SUBJECT_PREFIX}Confirm your email address!",
- f"Hello {str(user)}\nClick here to confirm your email address:\n{url}"
+ f"Hello {user!s}\nClick here to confirm your email address:\n{url}",
)
if not settings.TRUST_USER_REGISTRATIONS:
mail_admins(
- f"User registered: {str(user)}",
+ f"User registered: {user!s}",
(
f"Hello admin,\n"
f"User '{user.username}' just registered.\n"
token = f"{self.kwargs['token']}{'=' * (-len(self.kwargs['token']) & 3)}"
context = super().get_context_data(**kwargs)
user = User.objects.get(
- pk=Signer().unsign(urlsafe_base64_decode(token).decode())
+ pk=Signer().unsign(urlsafe_base64_decode(token).decode()),
)
context["msg"] = self.get_msg(user)
update_last_login(None, user)
import json
import sys
from asyncio import (
- CancelledError, ensure_future, get_running_loop, run_coroutine_threadsafe
+ CancelledError,
+ ensure_future,
+ get_running_loop,
+ run_coroutine_threadsafe,
)
from contextlib import contextmanager
from functools import partial
async def process_ws(receive, send):
while True:
event = await receive()
- if event['type'] == 'websocket.connect':
- await send({'type': 'websocket.accept'})
- elif event['type'] == 'websocket.disconnect':
+ if event["type"] == "websocket.connect":
+ await send({"type": "websocket.accept"})
+ elif event["type"] == "websocket.disconnect":
return
- elif event['type'] == 'websocket.receive':
+ elif event["type"] == "websocket.receive":
# ...maybe make it possible to request data through the ws?
- if event['text'] == 'ping':
- await send({
- 'type': 'websocket.send',
- 'text': "pong",
- })
+ if event["text"] == "ping":
+ await send(
+ {
+ "type": "websocket.send",
+ "text": "pong",
+ },
+ )
@contextmanager
return
loop = get_running_loop()
run_coroutine_threadsafe(
- sync_to_async(update_user_channels)(coro, loop, user, user_channels), loop
+ sync_to_async(update_user_channels)(coro, loop, user, user_channels),
+ loop,
)
async def handle_websocket(scope, receive, send):
- request = ASGIRequest({**scope, "method": f"_ws"}, BytesIO())
+ request = ASGIRequest({**scope, "method": "_ws"}, BytesIO())
SessionMiddleware(lambda x: None).process_request(request)
request.user = await aget_user(request)
if not request.user.is_authenticated:
event = await receive()
- if event['type'] == 'websocket.connect':
- await send({'type': 'websocket.accept'})
+ if event["type"] == "websocket.connect":
+ await send({"type": "websocket.accept"})
await send({"type": "websocket.close"})
return
#!/usr/bin/env venv/bin/python
"""Django's command-line utility for administrative tasks."""
+
import os
import subprocess
import sys
def setup_virtual_env(): # pragma: no cover
venv_dir = Path(__file__).parent / "venv"
- subprocess.run(["bash", str(venv_dir.parent / "setup_venv.sh")])
+ subprocess.run(["bash", str(venv_dir.parent / "setup_venv.sh")], check=False)
os.environ.setdefault("VIRTUAL_ENV", str(venv_dir))
venv_bin = str(venv_dir / "bin")
if venv_bin not in os.environ["PATH"].split(":"):
os.environ["PATH"] = f"{venv_bin}:{os.environ['PATH']}"
os.environ.pop("PYTHONHOME", None)
+
def main():
"""Run administrative tasks."""
if "VIRTUAL_ENV" not in os.environ:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
- "forget to activate a virtual environment?"
+ "forget to activate a virtual environment?",
) from exc
execute_from_command_line(sys.argv)
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
"${PYTHON}" -m venv venv
. venv/bin/activate
"${PYTHON}" -m pip install -qU pip
-"${PYTHON}" -m pip install -qU django 'uvicorn[standard]' 'psycopg[c]' django-pgtrigger
+"${PYTHON}" -m pip install -qU \
+ django 'uvicorn[standard]' 'psycopg[c]' django-pgtrigger pre-commit ruff