name='ChannelMessage',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
- ('ts', models.DateTimeField(auto_now=True)),
+ ('ts', models.DateTimeField(auto_now_add=True)),
('text', models.TextField()),
('channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='chat.channel')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
name='PrivateMessage',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
- ('ts', models.DateTimeField(auto_now=True)),
+ ('ts', models.DateTimeField(auto_now_add=True)),
('text', models.TextField()),
('recipient', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='pns_recieved', to=settings.AUTH_USER_MODEL)),
('sender', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='pms_sent', to=settings.AUTH_USER_MODEL)),
class PrivateMessage(Model):
sender = ForeignKey(User, related_name="pms_sent", on_delete=CASCADE)
recipient = ForeignKey(User, related_name="pns_recieved", on_delete=CASCADE)
- ts = DateTimeField(auto_now=True)
+ ts = DateTimeField(auto_now_add=True)
text = TextField()
class Meta:
class ChannelMessage(Model):
user = ForeignKey(User, on_delete=CASCADE)
channel = ForeignKey(Channel, on_delete=CASCADE)
- ts = DateTimeField(auto_now=True)
+ ts = DateTimeField(auto_now_add=True)
text = TextField()
class Meta:
from django.contrib.auth.mixins import LoginRequiredMixin
from django.db.models import Q, QuerySet
from django.http import HttpResponse
-from django.urls import path, reverse
+from django.urls import path
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import View
-from .models import Channel, ChannelMessage, PrivateMessage, User #, ChannelUser
from .serializers import (
ChannelMessageSerializer,
ChannelSerializer,
@staticmethod
def get_json_dump_kwargs():
- if settings.DEBUG:
+ if settings.DEBUG: # pragma: no cover
return {"indent": 4}
return {}
self.paginate(self.get_queryset(), serializer.to_json),
**self.get_json_dump_kwargs(),
),
- headers={"content-type": "application/json"},
+ content_type="application/json"
)
def create(self):
return self.create_or_update(status=201)
- def detail_response(self, instance, **kwargs):
+ def detail(self):
return HttpResponse(
json.dumps(
- self.serializer(self.request).to_json(instance),
+ self.serializer(self.request).to_json(self.get_object()),
**self.get_json_dump_kwargs(),
),
- headers={"content-type": "application/json"},
- **kwargs,
+ content_type="application/json",
)
- def detail(self):
- return self.detail_response(self.get_object())
-
def update(self):
return self.create_or_update(self.get_object())
def create_or_update(self, *args, **kwargs):
- instance, m2m = self.serializer(self.request).from_json(
+ serializer = self.serializer(self.request)
+ instance, m2m = serializer.from_json(
json.load(self.request), *args
)
instance.save()
for field_name, value in m2m.items():
getattr(instance, field_name).set(value)
- return self.detail_response(instance, **kwargs)
+ if settings.REST_CREATE_UPDATE_RETURN_RESULT:
+ return HttpResponse(
+ json.dumps(serializer.to_json(instance)),
+ content_type="application/json",
+ **kwargs,
+ )
+ return HttpResponse(status=204) # pragma: no cover
def delete(self):
self.get_object().delete()
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
- self.handle_no_permission()
+ return self.handle_no_permission()
return getattr(self, self.method_map[request.method.lower()])()
@classmethod
serializer = UserSerializer
def get_queryset(self):
- return User.objects.filter(is_active=True, last_login__isnull=False)
+ return super().get_queryset().filter(is_active=True, last_login__isnull=False)
def get_object(self):
- if self.request.path == reverse("chat-user-current-detail"):
+ if self.request.resolver_match.view_name == "chat-user-current-detail":
self.kwargs["id"] = self.request.user.id
return super().get_object()
def create(self):
if not self.request.user.is_privileged():
- self.handle_no_permission()
+ return self.handle_no_permission()
return super().create()
def delete(self):
if not self.request.user.is_privileged():
- self.handle_no_permission()
+ return self.handle_no_permission()
return super().delete()
def update(self):
- if not (
- self.request.user.is_privileged()
- or self.kwargs["id"] == self.request.user.id
+ if (
+ not self.request.user.is_privileged()
+ and self.kwargs["id"] != self.request.user.id
):
- self.handle_no_permission()
+ return self.handle_no_permission()
return super().update()
@classmethod
serializer = PrivateMessageSerializer
def get_queryset(self):
+ queryset = super().get_queryset()
if "other" in self.request.GET:
- queryset = PrivateMessage.objects.filter(
+ queryset = queryset.filter(
Q(sender=self.request.user, recipient_id=self.request.GET["other"]) | Q(
sender_id=self.request.GET["other"], recipient=self.request.user
)
)
else:
- queryset = PrivateMessage.objects.filter(
+ queryset = queryset.filter(
Q(sender=self.request.user) | Q(recipient=self.request.user)
)
return queryset.order_by("ts")
serializer = ChannelSerializer
def get_queryset(self):
- return Channel.objects.filter(users=self.request.user.pk)
+ return super().get_queryset().filter(users=self.request.user.pk)
#class ChannelUserRestView(ListAllMixin, ModelRestView):
# serializer = ChannelUserSerializer
#
# def get_queryset(self):
-# return ChannelUser.objects.filter(user=self.request.user.pk)
+# return super().get_queryset().filter(user=self.request.user.pk)
class ChannelMessageRestView(ModelRestView):
serializer = ChannelMessageSerializer
def get_queryset(self):
+ queryset = super().get_queryset().filter(
+ channel__users=self.request.user.pk
+ )
if "channel" in self.request.GET:
- queryset = ChannelMessage.objects.filter(
- channel_id=self.request.GET["channel"]
- )
- else:
- queryset = ChannelMessage.objects.filter(
- channel__users=self.request.user.pk
- )
+ queryset = queryset.filter(channel_id=self.request.GET["channel"])
return queryset.order_by("ts")
def to_json(self, instance):
result = super().to_json(instance)
- result["is_current"] = result["id"] == self.request.user.pk
+ result["is_authenticated"] = result["id"] == self.request.user.pk
return result
def from_json(self, data, instance=None):
if instance is None:
- data["sender"] = self.request.user.pk
- data["ts"] = now().isoformat(" ")
+ data.update(
+ {
+ "sender": self.request.user.pk,
+ "ts": now().isoformat(" ")
+ }
+ )
else:
- data.pop("sender")
- data.pop("recipient")
- data.pop("ts")
+ for key in ("sender", "recipient", "ts"):
+ data.pop(key, None)
return super().from_json(data, instance)
def from_json(self, data, instance=None):
instance, m2m = super().from_json(data, instance)
- if instance is None:
+ if instance.pk is None:
if "users" not in m2m:
m2m["users"] = []
if self.request.user.pk not in m2m["users"]:
TRUST_USER_REGISTRATIONS = False
EMAIL_SUBJECT_PREFIX = "[ChatApp] "
PG_NOTIFY_CHANNEL = "pg_notify"
-DEFAULT_PAGE_SIZE = 5
+DEFAULT_PAGE_SIZE = 20
+REST_CREATE_UPDATE_RETURN_RESULT = False
+SECRET_KEY = os.environ.get("SECRET_KEY")
try:
from .settings_local import *
-except ImportError:
+except ImportError: # pragma: no cover
pass
users_per_id = {};
for (i = 0; i < data.result.length; i += 1) {
users_per_id[data.result[i].id] = data.result[i];
- if (data.result[i].is_current) {
+ if (data.result[i].is_authenticated) {
p = document.createElement("P");
p.setAttribute("data-id", data.result[i].id);
p.appendChild(document.createTextNode(data.result[i].username));
from html.parser import HTMLParser
from importlib import import_module, reload
from string import hexdigits
+from secrets import token_urlsafe
from urllib.parse import urlencode
from django.conf import settings
from django.utils.timezone import now
from .forms import RegisterForm
-from .models import Channel, User
+from .models import Channel, ChannelMessage, User
class FormExtractor(HTMLParser):
@override_settings(
TRUST_USER_REGISTRATIONS=False,
- MAIL_ADMINS=[("Admin", "admin@example.com")],
+ ADMINS=[("Admin", "admin@example.com")],
)
def test_registration_without_trust_user_registrations(self):
self.reload_urls()
self.assertEqual(response.headers["Location"], f"{login_url}?{query}")
+@override_settings(REST_CREATE_UPDATE_RETURN_RESULT=True, DEFAULT_PAGE_SIZE=5)
class ChatTest(ChatTestMixin, TestCase):
def setup_users(self):
user1 = User(
user1_url = reverse('chat-user-detail', args=[user1.pk])
user2_url = reverse("chat-user-detail", args=[user2.pk])
+ response = self.client.get(user2_url)
+ self.assertEqual(response.status_code, 302)
+ self.assertEqual(response.headers["Location"], f"/login/?next={user2_url}")
+
+ user2.is_
+
cookies = self.login_user(user2, user2_password)
response = self.client.get(user2_url, headers={"cookie": cookies})
+ data = json.loads(response.content)
self.assertEqual(
- json.loads(response.content),
+ data,
{
"id": user2.id,
"url": f"http://testserver{user2_url}",
"last_name": user2.last_name,
"date_joined": user2.date_joined.isoformat(" "),
"channels": [channel.pk],
+ "is_authenticated": True,
}
)
+ response = self.client.get(
+ reverse("chat-user-current-detail"), headers={"cookie": cookies}
+ )
+ self.assertEqual(data, json.loads(response.content))
response = self.client.get(
reverse("chat-user-list"), headers={"cookie": cookies}
)
"last_name": user1.last_name,
"date_joined": user1.date_joined.isoformat(" "),
"channels": [channel.pk],
+ "is_authenticated": False,
},
{
"id": user2.id,
"last_name": user2.last_name,
"date_joined": user2.date_joined.isoformat(" "),
"channels": [channel.pk],
+ "is_authenticated": True,
},
]
}
"last_name": "",
"date_joined": data["date_joined"],
"channels": [channel.pk],
+ "is_authenticated": False,
},
)
response = self.client.put(
"last_name": "Nichols",
"date_joined": user4_data["date_joined"],
"channels": [channel.pk],
+ "is_authenticated": False,
},
)
user4.refresh_from_db()
self.client.delete(user4_data["url"], headers={"cookie": cookies})
self.assertFalse(User.objects.filter(pk=user4_data["id"]).exists())
- def test_channel_api(self):
+ def test_chat_channel_api(self):
user1, user2, user2_password, channel = self.setup_users()
cookies = self.login_user(user2, user2_password)
channel_url = reverse("chat-channel-detail", args=[channel.pk])
"users": [user1.pk, user2.pk],
}
)
+ response = self.client.post(
+ reverse("chat-channel-list"),
+ {
+ "name": "new channel",
+ },
+ content_type="application/json",
+ headers={"cookie": cookies},
+ )
+ data = json.loads(response.content)
+ new_channel_url = reverse("chat-channel-detail", args=[data["id"]])
+ self.assertEqual(
+ data,
+ {
+ "id": data["id"],
+ "url": f"http://testserver{new_channel_url}",
+ "name": "new channel",
+ "users": [user2.pk],
+ },
+ )
+
+ def test_chat_privatemessage_api(self):
+ user1, user2, user2_password, channel = self.setup_users()
+ cookies = self.login_user(user2, user2_password)
+ response = self.client.post(
+ reverse("chat-privatemessage-list"),
+ json.dumps(
+ {
+ "id": None,
+ "recipient": user1.pk,
+ "text": "hello pms world",
+ }
+ ),
+ content_type="application/json",
+ headers={"cookie": cookies},
+ )
+ data = json.loads(response.content)
+ privatemessage_url = reverse("chat-privatemessage-detail", args=[data["id"]])
+ self.assertEqual(
+ data,
+ {
+ "id": data["id"],
+ "url": f"http://testserver{privatemessage_url}",
+ "ts": data["ts"],
+ "sender": user2.pk,
+ "recipient": user1.pk,
+ "text": "hello pms world",
+ }
+ )
+ user3 = User.objects.create(
+ username="additional user",
+ email="additional@user.com",
+ last_login=now(),
+ is_active=True,
+ )
+ response = self.client.put(
+ data["url"],
+ json.dumps(
+ {
+ "sender": user3.pk,
+ "recipient": user3.pk,
+ "ts": "2022-02-22 22:22:22.363636+00:00",
+ "text": "hello updated pms world",
+ }
+ ),
+ content_type="application/json",
+ headers={"cookie": cookies},
+ )
+ data_new = json.loads(response.content)
+ self.assertEqual(
+ data_new,
+ {
+ "id": data["id"],
+ "url": f"http://testserver{privatemessage_url}",
+ "ts": data["ts"],
+ "sender": user2.pk,
+ "recipient": user1.pk,
+ "text": "hello updated pms world",
+ }
+ )
+ response = self.client.get(
+ f"{reverse('chat-privatemessage-list')}?other={user1.pk}",
+ headers={"cookie": cookies},
+ )
+ data = json.loads(response.content)
+ self.assertTrue(
+ all(
+ (
+ (m["sender"] == user2.pk and m["recipient"] == user1.pk)
+ or (m["sender"] == user1.pk and m["recipient"] == user2.pk)
+ )
+ for m in data["result"]
+ )
+ )
+ self.assertEqual(
+ data,
+ {
+ "previous": None,
+ "result": [
+ {
+ "id": data["result"][0]["id"],
+ "url": f"http://testserver{privatemessage_url}",
+ "ts": data["result"][0]["ts"],
+ "sender": user2.pk,
+ "recipient": user1.pk,
+ "text": "hello updated pms world",
+ },
+ ]
+ }
+ )
+
+
+ def test_chat_channelmessage_api(self):
+ user1, user2, user2_password, channel = self.setup_users()
+ cookies = self.login_user(user2, user2_password)
+ response = self.client.post(
+ reverse("chat-channelmessage-list"),
+ json.dumps(
+ {
+ "channel": channel.pk,
+ "text": "hello world",
+ }
+ ),
+ content_type="application/json",
+ headers={"cookie": cookies},
+ )
+ data = json.loads(response.content)
+ channelmessage_url = reverse("chat-channelmessage-detail", args=[data["id"]])
+ self.assertEqual(
+ data,
+ {
+ "id": data["id"],
+ "url": f"http://testserver{channelmessage_url}",
+ "ts": data["ts"],
+ "channel": channel.pk,
+ "text": "hello world",
+ "user": user2.pk,
+ }
+ )
+ for x in range(10):
+ ChannelMessage.objects.create(
+ channel=channel,
+ user=user2,
+ text=token_urlsafe(),
+ )
+ list_url = f"{reverse('chat-channelmessage-list')}?channel={channel.pk}"
+ response = self.client.get(list_url)
+ data = json.loads(response.content)
+ first_id = data["result"][0]["id"]
+ list_url = f"{list_url}&before={first_id}"
+ self.assertEqual(data["previous"], f"http://testserver{list_url}")
+ response = self.client.get(data["previous"])
+ data = json.loads(response.content)
+ self.assertTrue(all(item["id"] < first_id for item in data["result"]))
*ChannelMessageRestView.get_urls("channelmessage"),
]
-if settings.DEBUG:
+if settings.DEBUG: # pragma: no cover
urlpatterns.append(
re_path(f"{settings.STATIC_URL.lstrip('/')}(?P<path>.*)$", serve_staticfiles)
)
from pathlib import Path
-def setup_virtual_env():
+def setup_virtual_env(): # pragma: no cover
venv_dir = Path(__file__).parent / "venv"
subprocess.run(["bash", str(venv_dir.parent / "setup_venv.sh")])
os.environ.setdefault("VIRTUAL_ENV", str(venv_dir))