settings.py
REST_FRAMEWORK = { "DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.URLPathVersioning", "DEFAULT_VERSION": "v1", "ALLOWED_VERSIONS": ["v1", "v2"], "VERSION_PARAM": "version", # "DEFAULT_AUTHENTICATION_CLASSES": ["utils.Auth.MyAuth"], "DEFAULT_THROTTLE_RATES": { "WD": "3/m" }}
urls.py
from django.conf.urls import urlfrom django.contrib import admin# from serializers.views import StudentAPIView, StudentEditView, BooksAPIViewfrom serializers.views import StudentView, VersionView, UserView, TestAuthView,BookPageViewurlpatterns = [ url(r'^admin/', admin.site.urls), # url(r'^api/students', StudentAPIView.as_view()), # get post请求 # url(r'^api/student/(?P\d+)', StudentEditView.as_view()), #带有id的get patch delete请求 # 使用我们自己写的ModelViewSet,继承ViewSetMixin进行请求的分发 # url(r'^api/students', BooksAPIView.as_view({"get": "list", "post": "create"})), # url(r'^api/student/(?P \d+)', BooksAPIView.as_view({"get": "retrieve", "patch": "update", "delete": "destroy"})), # 使用rest_framework自带的ModelViewSet进行请求的分发,这里要注意url携带的命名参数,名字应该是pk # url(r'^api/students', BooksAPIView.as_view({"get": "list", "post": "create"})), # url(r'^api/student/(?P \d+)', BooksAPIView.as_view({"get": "retrieve", "patch": "update", "delete": "destroy"})), # 自己写的测试版二,可以通用的版本 # url(r'^api/student$', StudentView.as_view()), # url(r'^api/student/(?P \d+)', StudentView.as_view()), # 带版本控制的 url(r'^(?P [v1|v2]+)/api/student$', StudentView.as_view()), # http://127.0.0.1:8000/v1/api/student url(r'^(?P [v1|v2]+)/api/student/(?P \d+)', StudentView.as_view()), # 版本控制测试 url(r'^(?P [v1|v2]+)/book$', VersionView.as_view()), # 认证测试,权限测试,频率测试 url(r'^user$', UserView.as_view()), url(r'^test', TestAuthView.as_view()), # http://127.0.0.1:8000/test?token=20609ddd01fe4faeb0ffe7d8d8c39881 # 分页测试 url(r'^book_page', BookPageView.as_view()), # http://127.0.0.1:8000/book_page?page=1&size=1 # http://127.0.0.1:8000/book_page?limit=2&offset=0 # http://127.0.0.1:8000/book_page]
utils文件夹下的文件 Auth.py(认证) pagenation.py(分页) permissions.py(权限) throttle(频率)
# Auth.pyfrom rest_framework.authentication import BaseAuthenticationfrom rest_framework.exceptions import AuthenticationFailedfrom serializers.models import UserInfofrom rest_framework.response import Responseclass MyAuth(BaseAuthentication): def authenticate(self, request): # 第一步先拿到前端传过来的token token = request.query_params["token"] # 验证token是否存在 user_obj = UserInfo.objects.filter(token=token).first() if user_obj: return (user_obj, token) else: raise AuthenticationFailed("认证失败") # pagenation.pyfrom rest_framework import pagination # 分页class MyPagenation(pagination.PageNumberPagination): page_size = 2 page_query_param = 'page' page_size_query_param = "size" max_page_size = 3class MyLimitPage(pagination.LimitOffsetPagination): default_limit = 1 limit_query_param = 'limit' # limit限制每页显示的个数 offset_query_param = 'offset' # 相对第一个数据的偏移量 max_limit = 2class MyCursorPage(pagination.CursorPagination): cursor_query_param = 'cursor' page_size = 2 ordering = '-id'# permissions.pyclass MyPermission(object): message = "您没有权限,请充值" def has_permission(self, request, view): # 权限逻辑 有权限返回True 没有返回False # 认证是在权限前面执行 # request.user user_obj user_obj = request.user if user_obj.type == 1: return True else: return False# throttle.pyfrom rest_framework import throttling # 频率import timeVISIT_RECORD = {}class MyThrottle(object): """ 60秒访问3次 """ def __init__(self): self.history = None def allow_request(self, request, view): """ 频率限制的逻辑 通过返回True 不通过返回False :param request: :param view: :return: """ # 获取用户IP ip = request.META.get("REMOTE_ADDR") # 判断ip是否在访问记录里 now = time.time() if ip not in VISIT_RECORD: VISIT_RECORD[ip] = [now,] # 如果ip在访问记录里 history = VISIT_RECORD[ip] # 把当然访问时间添加到列表最前面 history.insert(0, now) # 确保列表内的时间都是范围内时间 while history and now - history[-1] > 60: history.pop() self.history = history # 看列表长度是否符合限制次数 if len(history) <= 3: # 经过这样的限制后,history列表中(60秒内)最多有三次访问记录,列表中最后的数据是据现在60秒内最早的访问记录(访问时间) return True else: return False def wait(self): """ 返回还剩多久可以访问 :return: """ now = time.time() return 60 - (now - self.history[-1])class MyVisitThrottle(throttling.SimpleRateThrottle): scope = "WD" """ 第一 自己的类里要有scope 第二 settings DEFAULT_THROTTLE_RATES 第三 DEFAULT_THROTTLE_RATES = { scope配置的变量值:xxx} 第四 重写 get_cache_key(self, request, view) """ def get_cache_key(self, request, view): return self.get_ident(request)
views.py
from django.shortcuts import renderfrom rest_framework.views import APIViewfrom rest_framework.response import Responsefrom .models import Student,UserInfofrom .serializers import StudentSerializerfrom utils.Auth import MyAuthfrom utils.permissions import MyPermissionfrom utils.throttle import MyThrottle, MyVisitThrottleimport uuidfrom utils.pagenation import MyPagenation,MyLimitPage,MyCursorPagefrom rest_framework.viewsets import ViewSetMixin, ModelViewSet# Create your views here."""class GenericAPIView(APIView): queryset = None serializer_class = None def get_queryset(self): return self.queryset.all() def get_serializer(self, *args, **kwargs): return self.serializer_class(*args, **kwargs)class ListModelMixin(object): def list(self, request): queryset = self.get_queryset() ser_obj = self.get_serializer(queryset, many=True) return Response(ser_obj.data)class CreateModelMixin(object): def create(self, request): ser_obj = self.get_serializer(data=request.data) if ser_obj.is_valid(): print(ser_obj.validated_data) ser_obj.save() return Response(ser_obj.validated_data) else: return Response(ser_obj.errors)class All(GenericAPIView, ListModelMixin, CreateModelMixin): passclass RetrieveModelMixin(object): def retrieve(self, request, id): student_obj = self.get_queryset().filter(id=id).first() ser_obj = self.get_serializer(student_obj) return Response(ser_obj.data)class UpdateModelMixin(object): def update(self, request, id): student_obj = self.get_queryset().filter(id=id).first() ser_obj = self.get_serializer(instance=student_obj, data=request.data, partial=True) if ser_obj.is_valid(): ser_obj.save() return Response(ser_obj.validated_data) else: return Response(ser_obj.errors)class DestroyModelMixin(object): def destroy(self, request, id): student_obj = self.get_queryset().filter(id=id).first() if student_obj: student_obj.delete() return Response("") else: return Response("删除对象不存在")class EditAll(GenericAPIView, RetrieveModelMixin, UpdateModelMixin, DestroyModelMixin): passclass StudentAPIView(All): queryset = Student.objects.all() serializer_class = StudentSerializer def get(self, request): return self.list(request) def post(self, request): return self.create(request)class StudentEditView(EditAll): queryset = Student.objects.all() serializer_class = StudentSerializer def get(self, request, id): # 不知道为什么这里的形参id的名字必须与url(?P\d+)中传递参数的名字一样,可能这里传的是关键字参数吧 return self.retrieve(request, id) def patch(self, request, id): return self.update(request, id) def delete(self, request, id): return self.destroy(request, id)# class ModelViewSet(ViewSetMixin, All, EditAll): # 打开注释用的是自己写的ModelViewSet,不打开用的是rest_framework封装好的ModelViewSet# passclass BooksAPIView(ModelViewSet): queryset = Student.objects.all() serializer_class = StudentSerializer""""""# 自己的测试版一class GenericAPIView(APIView): queryset = None serializer_class = None def get_queryset(self): return self.queryset.all() def get_serializer(self, *args, **kwargs): return self.serializer_class(*args, **kwargs)class RetrieveModelMixin(object): def retrieve(self, request, id): if not id: queryset = self.get_queryset() ser_obj = self.get_serializer(queryset, many=True) else: student_obj = self.get_queryset().filter(id=id).first() ser_obj = self.get_serializer(student_obj) return Response(ser_obj.data)class CreateModelMixin(object): def create(self, request): ser_obj = self.get_serializer(data=request.data) if ser_obj.is_valid(): print(ser_obj.validated_data) ser_obj.save() return Response(ser_obj.validated_data) else: return Response(ser_obj.errors)class UpdateModelMixin(object): def update(self, request, id): student_obj = self.get_queryset().filter(id=id).first() ser_obj = self.get_serializer(instance=student_obj, data=request.data, partial=True) if ser_obj.is_valid(): ser_obj.save() return Response(ser_obj.validated_data) else: return Response(ser_obj.errors)class DestroyModelMixin(object): def destroy(self, request, id): student_obj = self.get_queryset().filter(id=id).first() if student_obj: student_obj.delete() return Response("") else: return Response("删除对象不存在")class All(GenericAPIView,RetrieveModelMixin,CreateModelMixin,UpdateModelMixin,DestroyModelMixin): passclass StudentView(All): queryset = Student.objects.all() serializer_class = StudentSerializer def get(self, request, id=None): return self.retrieve(request, id) def post(self, request,id=None): return self.create(request) def patch(self, request, id=None): return self.update(request, id) def delete(self, request, id=None): return self.destroy(request, id)""""""# 自己的测试版二,这版整理的比较好可以通用class GenericAPIView(APIView): queryset = None serializer_class = None def get_queryset(self): return self.queryset.all() def get_serializer(self, *args, **kwargs): return self.serializer_class(*args, **kwargs) def retrieve(self, request, id): if not id: queryset = self.get_queryset() ser_obj = self.get_serializer(queryset, many=True) else: student_obj = self.get_queryset().filter(id=id).first() ser_obj = self.get_serializer(student_obj) return Response(ser_obj.data) def create(self, request): ser_obj = self.get_serializer(data=request.data) if ser_obj.is_valid(): print(ser_obj.validated_data) ser_obj.save() return Response(ser_obj.validated_data) else: return Response(ser_obj.errors) def update(self, request, id): student_obj = self.get_queryset().filter(id=id).first() ser_obj = self.get_serializer(instance=student_obj, data=request.data, partial=True) if ser_obj.is_valid(): ser_obj.save() return Response(ser_obj.validated_data) else: return Response(ser_obj.errors) def destroy(self, request, id): student_obj = self.get_queryset().filter(id=id).first() if student_obj: student_obj.delete() return Response("") else: return Response("删除对象不存在")class StudentView(GenericAPIView): queryset = Student.objects.all() serializer_class = StudentSerializer def get(self, request, id=None): return self.retrieve(request, id) def post(self, request,id=None): return self.create(request) def patch(self, request, id=None): return self.update(request, id) def delete(self, request,id=None): return self.destroy(request, id)"""# url带版本验证class GenericAPIView(APIView): queryset = None serializer_class = None def get_queryset(self): return self.queryset.all() def get_serializer(self, *args, **kwargs): return self.serializer_class(*args, **kwargs) def retrieve(self, request,version, id): if not id: queryset = self.get_queryset() ser_obj = self.get_serializer(queryset, many=True) else: student_obj = self.get_queryset().filter(id=id).first() ser_obj = self.get_serializer(student_obj) return Response(ser_obj.data) def create(self, request): ser_obj = self.get_serializer(data=request.data) if ser_obj.is_valid(): print(ser_obj.validated_data) ser_obj.save() return Response(ser_obj.validated_data) else: return Response(ser_obj.errors) def update(self, request, id): student_obj = self.get_queryset().filter(id=id).first() ser_obj = self.get_serializer(instance=student_obj, data=request.data, partial=True) if ser_obj.is_valid(): ser_obj.save() return Response(ser_obj.validated_data) else: return Response(ser_obj.errors) def destroy(self, request, id): student_obj = self.get_queryset().filter(id=id).first() if student_obj: student_obj.delete() return Response("") else: return Response("删除对象不存在")class StudentView(GenericAPIView): queryset = Student.objects.all() serializer_class = StudentSerializer def get(self, request,version=None, id=None): # 这里做些版本控制的逻辑,不同版本请求做不同处理,当然这里的逻辑最好还是写到GenericAPIView类里面 if request.version == "v1": return Response("v1版本的返回") elif request.version == "v2": return Response("v2版本的返回") return self.retrieve(request,version, id) def post(self, request,version=None,id=None): return self.create(request) def patch(self, request,version=None, id=None): return self.update(request, id) def delete(self, request,version=None, id=None): return self.destroy(request, id)class VersionView(APIView): def get(self, request, version): print(request.version) print(request.versioning_scheme) if request.version == "v1": return Response("v1版本的返回") elif request.version == "v2": return Response("v2版本的返回") return Response("版本不存在")class UserView(APIView): def post(self, request): # 这相当于注册 username = request.data["username"] UserInfo.objects.create(username=username, token=uuid.uuid4()) return Response("ok")class TestAuthView(APIView): authentication_classes = [MyAuth,] # 这里是局部注册认证,写在settings中是全局注册 permission_classes = [MyPermission, ] throttle_classes = [MyThrottle, ] # 使用自己写的 # throttle_classes = [MyVisitThrottle, ] # 使用提供的频率控制 # 这相当于登录的认证 def get(self, request): print(request.user) print(request.auth) return Response("认证测试")from rest_framework.negotiation import DefaultContentNegotiationclass BookPageView(APIView): def get(self, request): queryset = Student.objects.all() my_pagenation = MyPagenation() # page size分页方法 # my_pagenation = MyLimitPage() # limit offset分页方法 # my_pagenation = MyCursorPage() # cursor 游标分页 page_queryset = my_pagenation.paginate_queryset(queryset, request, view=self) ser_obj = StudentSerializer(page_queryset, many=True) # return Response(ser_obj.data) return my_pagenation.get_paginated_response(ser_obj.data) def post(self, request): ser_obj = StudentSerializer(data=request.data) if ser_obj.is_valid(): ser_obj.save() return Response(ser_obj.validated_data) else: return Response(ser_obj.errors)
serializers.py
from rest_framework import serializersfrom .models import Studentclass ClassesSerializer(serializers.Serializer): id = serializers.IntegerField() name = serializers.CharField(max_length=32)class CourseSerializer(serializers.Serializer): id = serializers.IntegerField() name = serializers.CharField(max_length=32)class StudentSerializer(serializers.ModelSerializer): classes_info = serializers.SerializerMethodField(read_only=True) course_info = serializers.SerializerMethodField(read_only=True) def get_classes_info(self, obj): ret = { "id": obj.classes.id, "name": obj.classes.name } return ret def get_course_info(self, obj): courses = obj.courses.all() ret = [] for course in courses: ret.append({ "id": course.id, 'name': course.name }) return ret class Meta: model = Student fields = "__all__" # fields = ["id", "name", "classes_info","course_info"] extra_kwargs = { "classes": {"write_only": True}, "courses": {'write_only': True}, }
models.py
from django.db import models# Create your models here.__all__ = ['Student', 'Classes', 'Course']class Student(models.Model): name = models.CharField(max_length=32, verbose_name='姓名') age = models.IntegerField() classes = models.ForeignKey(to='Classes') courses = models.ManyToManyField(to='Course') def __str__(self): return self.name class Meta: db_table='01-学生表' verbose_name_plural=db_tableclass Classes(models.Model): name = models.CharField(max_length=32,verbose_name='班级') def __str__(self): return self.name class Meta: db_table='02-班级表' verbose_name_plural=db_tableclass Course(models.Model): name=models.CharField(max_length=32,verbose_name='课程') def __str__(self): return self.name class Meta: db_table='03-课程表' verbose_name_plural=db_tableclass UserInfo(models.Model): username = models.CharField(max_length=32) token = models.UUIDField() CHOICES = ((1, "vip"), (2, "svip"), (3, "普通用户")) type = models.IntegerField(choices=CHOICES, default=3)
app下的admin.py
from django.contrib import adminfrom . import models# Register your models here.for table in models.__all__: admin.site.register(getattr(models,table))