1727 views|0 replies

6818

Posts

11

Resources
The OP
 

[Domestic RISC-V Linux Board Fang·Starlight VisionFive Trial Report] Environment Preparation + Server Construction [Copy link]

 This post was last edited by lugl4313820 on 2022-6-13 20:27

The database has been built, and now we are ready to build the tornado server:

1. Create a new app folder:

2. Open the folder with pycharm, create four folders for different functions: admin, api, models, and tools, and create the following files:

The code of each file is as follows:

views_common.py

# -*- coding: utf-8 -*-
import json
import tornado.web
import datetime
from tornado.escape import utf8
from tornado.util import unicode_type

#from app.common.ip2Addr import ip2addr
#from werkzeug.datastructures import MultiDict


class CommonHandler(tornado.web.RequestHandler):
    # 定义线程池
    # 前缀地址
    @property
    def site_url(self):
        return 'http://127.0.0.1:9000'

    # mongodb连接会话
    @property
    def md(self):
        return self.application.md

    # 客户端像服务器端发送的数据进行处理
    @property
    def params(self):
        data = self.request.body
        # 包含字节类型,转化为python数据类型
        # 由于小程序端提交数据类型是json字符串
        # json字符串在后端进行获取的时候是字节类型,解码为字符串类型
        # 将json字符串转化为字典类型
        data = {
            k: v
            for k, v in json.loads(data.decode('utf-8')).items()
        }
        return data

    # 时间属性
    @property
    def dt(self):
        return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    # 公共参数
    @property
    def common_params(self):
        data = dict(
            #createdAt=datetime.datetime.now(),
            #ip=self.request.remote_ip,  # 获取IP地址
            #addr=ip2addr(self.request.remote_ip)['region'].decode('utf-8'),  # 解析地址
            headers=dict(self.request.headers)  # 转化为字典类型
        )
        print(data)
        return data

    # ajax异步提交数据方法
    @property
    def ajax_params(self):
        data = self.request.arguments
        data = {
            k: list(
                map(
                    lambda val: str(
                        val, encoding='utf-8'
                    ),
                    v
                )
            )
            for k, v in data.items()
        }
        return data

    # 表单数据
    @property
    def form_params(self):
        return MultiDict(self.ajax_params)

    # 渲染模板
    def html(self, template_name, **kwargs):
        if self._finished:
            raise RuntimeError("Cannot render() after finish()")
        html = self.render_string(template_name, **kwargs)

        # Insert the additional JS and CSS added by the modules on the page
        js_embed = []
        js_files = []
        css_embed = []
        css_files = []
        html_heads = []
        html_bodies = []
        for module in getattr(self, "_active_modules", {}).values():
            embed_part = module.embedded_javascript()
            if embed_part:
                js_embed.append(utf8(embed_part))
            file_part = module.javascript_files()
            if file_part:
                if isinstance(file_part, (unicode_type, bytes)):
                    js_files.append(file_part)
                else:
                    js_files.extend(file_part)
            embed_part = module.embedded_css()
            if embed_part:
                css_embed.append(utf8(embed_part))
            file_part = module.css_files()
            if file_part:
                if isinstance(file_part, (unicode_type, bytes)):
                    css_files.append(file_part)
                else:
                    css_files.extend(file_part)
            head_part = module.html_head()
            if head_part:
                html_heads.append(utf8(head_part))
            body_part = module.html_body()
            if body_part:
                html_bodies.append(utf8(body_part))

        if js_files:
            # Maintain order of JavaScript files given by modules
            js = self.render_linked_js(js_files)
            sloc = html.rindex(b'</body>')
            html = html[:sloc] + utf8(js) + b'\n' + html[sloc:]
        if js_embed:
            js = self.render_embed_js(js_embed)
            sloc = html.rindex(b'</body>')
            html = html[:sloc] + js + b'\n' + html[sloc:]
        if css_files:
            css = self.render_linked_css(css_files)
            hloc = html.index(b'</head>')
            html = html[:hloc] + utf8(css) + b'\n' + html[hloc:]
        if css_embed:
            css = self.render_embed_css(css_embed)
            hloc = html.index(b'</head>')
            html = html[:hloc] + css + b'\n' + html[hloc:]
        if html_heads:
            hloc = html.index(b'</head>')
            html = html[:hloc] + b''.join(html_heads) + b'\n' + html[hloc:]
        if html_bodies:
            hloc = html.index(b'</body>')
            html = html[:hloc] + b''.join(html_bodies) + b'\n' + html[hloc:]
        return self.write(html)

views_index.py

# -*- coding: utf-8 -*-
import asyncio
from app.api.views_common import CommonHandler

class IndexHandler(CommonHandler):
    async def get(self):
        self.write("hello world!")
        #self.write(self.common_params)

app\_ini_.py

# -*- coding: utf-8 -*-
import tornado.web #web框架
import tornado.ioloop #事件循环
import tornado.options #命令解析工具
import tornado.httpserver #http服务

from tornado.options import options, define
from app.urls import urls #导入路由配置
from app.configs import configs #导入服务配置


#配置一个服务启动的端口
define('port', type=int, default=9000, help='运行端口')

#自定义应用
class CustomApplication(tornado.web.Application):
    def __init__(self, urls, configs):
        settings = configs
        handlers = urls
        super(CustomApplication, self).__init__(handlers=handlers, **settings)

#创建服务
# 创建服务
def create_server():
    tornado.options.parse_command_line()
    # 创建http服务
    # xheaders=True,获取用户访问的真实IP地址的时候
    http_server = tornado.httpserver.HTTPServer(
        CustomApplication(urls, configs),
        xheaders=True
    )
    # 监听端口
    http_server.listen(options.port)
    # 启动输入输出事件循环
    tornado.ioloop.IOLoop.instance().start()

urls.py

# -*- coding: utf-8 -*-



#from app.admin.views_index import IndexHandler as admin_index
from app.api.views_index import IndexHandler as api_index

# API接口
api_urls = [
   (r'/', api_index),

]

# 后台接口

admin_urls =[
 #   (r'/admin/', admin_index),
   
]

# urls汇总
urls = api_urls + admin_urls

main.py

# -*- coding: utf-8 -*-
from app import create_server

if __name__ == "__main__":
    create_server()

Then run main.py

Type in your browser: 127.0.0.1:9000

Return: helloworld

The framework of this server has been basically built:

This post is from Domestic Chip Exchange
 
 

Just looking around
Find a datasheet?

EEWorld Datasheet Technical Support

EEWorld
subscription
account

EEWorld
service
account

Automotive
development
circle

Copyright © 2005-2024 EEWORLD.com.cn, Inc. All rights reserved 京B2-20211791 京ICP备10001474号-1 电信业务审批[2006]字第258号函 京公网安备 11010802033920号
快速回复 返回顶部 Return list