数据清洗引擎完成
This commit is contained in:
@@ -0,0 +1,3 @@
|
||||
|
||||
|
||||
|
||||
|
||||
+54
-5
@@ -1,14 +1,63 @@
|
||||
import pandas as pd
|
||||
|
||||
from utils.MODEL_DICT import CLEAN_DICT
|
||||
from utils.mongodb_util import MongoUtil
|
||||
|
||||
|
||||
class DataCleaningEngine:
|
||||
def __init__(self,conditions):
|
||||
self.conditions = conditions
|
||||
def __init__(self):
|
||||
self.db = MongoUtil()
|
||||
|
||||
# todo 数据清洗规则校验校验
|
||||
def check_clean_condition(self, conditions):
|
||||
dataset = self.db.find_dataset()
|
||||
def check_clean_condition(self, user_name, dataset_name, conditions):
|
||||
"""
|
||||
传入数据集,清洗条件
|
||||
:param user_name:
|
||||
:param dataset_name:
|
||||
:param conditions:
|
||||
:return: dataframe 清洗完成后的数据
|
||||
"""
|
||||
try:
|
||||
dataset = self.db.find_dataset(user_name, dataset_name)
|
||||
df = pd.DataFrame(dataset) if dataset else {}
|
||||
if isinstance(df, pd.core.frame.DataFrame):
|
||||
print("缺失值统计:", df.isnull().sum())
|
||||
for condition in conditions:
|
||||
# cols用于后续的表达式执行
|
||||
try:
|
||||
cols = condition.get("columns", [])
|
||||
clean_method = condition.get("clean_method", "")
|
||||
clean_expression = CLEAN_DICT.get(clean_method, "")
|
||||
|
||||
# 清洗方法为字符串,说明并没有子方法,可直接执行该方法
|
||||
if clean_expression and isinstance(clean_expression, str):
|
||||
exec(clean_expression)
|
||||
|
||||
# 清洗方法为字典,说明含有子方法,从映射表中取出对应表达式后执行语句
|
||||
elif clean_expression and isinstance(clean_expression, dict):
|
||||
sub_method = condition.get("sub_method", "")
|
||||
clean_expression = CLEAN_DICT.get(clean_method, {}).get(sub_method)
|
||||
exec(clean_expression)
|
||||
except Exception as e:
|
||||
message = "对{}列进行{}失败!失败原因:{}".format(",".join(cols), clean_method, str(e))
|
||||
raise Exception(message)
|
||||
return df
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
def save_clean_data(self, user_name, dataset_name, new_data):
|
||||
"""
|
||||
将清洗完成的数据集入库
|
||||
:param user_name:
|
||||
:param dataset_name: 原始数据集名称
|
||||
:param new_data: DataFrame
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
new_name = "clean_{}".format(dataset_name)
|
||||
self.db.upload_dataset(user_name, new_name, new_data)
|
||||
return True
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
|
||||
class DataMiningEngine:
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
from django.urls import path
|
||||
from .engine_view import *
|
||||
from utils.views2urls import views2urls
|
||||
|
||||
app_name = 'engine'
|
||||
|
||||
|
||||
# urlpatterns = views2urls("Engines.engine_view")
|
||||
urlpatterns = [
|
||||
# 校验清洗规则
|
||||
path('check_clean_condition', check_clean_condition),
|
||||
# 清洗完成的数据保存
|
||||
path('save_clean_data', save_clean_data)
|
||||
]
|
||||
|
||||
|
||||
+50
-7
@@ -1,15 +1,58 @@
|
||||
import json
|
||||
import os
|
||||
import traceback
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from django.http import JsonResponse, HttpResponse
|
||||
|
||||
from django.http import JsonResponse
|
||||
from django.views.decorators.http import require_http_methods
|
||||
|
||||
from .engine_model import DataCleaningEngine
|
||||
|
||||
clean_engine = DataCleaningEngine()
|
||||
|
||||
|
||||
@require_http_methods(['POST'])
|
||||
def check_clean_condition(request):
|
||||
postBody = json.loads(request.body)
|
||||
return JsonResponse(postBody)
|
||||
"""
|
||||
清洗条件校验, 校验失败时返回失败的清洗配置
|
||||
:param request:
|
||||
:return:
|
||||
"""
|
||||
data, code, msg = None, 200, None
|
||||
try:
|
||||
post_body = json.loads(request.body)
|
||||
dataset_name = post_body.pop("dataset", "")
|
||||
user_name = post_body.pop("user_name", "")
|
||||
conditions = post_body.pop("conditions", {})
|
||||
clean_engine.check_clean_condition(user_name, dataset_name, conditions)
|
||||
msg = "校验通过!"
|
||||
except Exception as e:
|
||||
msg = str(e)
|
||||
code = 500
|
||||
traceback.print_exc()
|
||||
|
||||
return JsonResponse({"code": code, "msg": msg, "data": data})
|
||||
|
||||
|
||||
@require_http_methods(['POST'])
|
||||
def save_clean_data(request):
|
||||
"""
|
||||
保存清洗完的数据到数据集管理
|
||||
:param request:
|
||||
:return:
|
||||
"""
|
||||
data, code, msg = None, 200, None
|
||||
try:
|
||||
post_body = json.loads(request.body)
|
||||
dataset_name = post_body.pop("dataset", "")
|
||||
user_name = post_body.pop("user_name", "")
|
||||
conditions = post_body.pop("conditions", {})
|
||||
new_data = clean_engine.check_clean_condition(user_name, dataset_name, conditions)
|
||||
if isinstance(new_data, pd.core.frame.DataFrame):
|
||||
res = clean_engine.save_clean_data(user_name,dataset_name,new_data)
|
||||
if res:
|
||||
msg = "保存成功"
|
||||
except Exception as e:
|
||||
msg = str(e)
|
||||
code = 500
|
||||
traceback.print_exc()
|
||||
|
||||
return JsonResponse({"code": code, "msg": msg, "data": data})
|
||||
|
||||
@@ -17,7 +17,7 @@ import os
|
||||
|
||||
- 主函数预先定义一个代码文件,相关参数通过占位符填充,填充的参数来源于前段输入,包括:主要包括特征列、目标列,文件名
|
||||
"""
|
||||
from code_templates.MODEL_DICT import MODEL_DICT
|
||||
from utils.MODEL_DICT import MODEL_DICT
|
||||
|
||||
|
||||
class SetModel():
|
||||
|
||||
@@ -5,8 +5,6 @@ from .views import *
|
||||
|
||||
app_name = 'model_selection'
|
||||
urlpatterns = [
|
||||
path('', TemplateView.as_view(template_name='index.html')),
|
||||
path('index/', TemplateView.as_view(template_name='index.html')),
|
||||
path('upload_dataset', upload_dataset), # 上传数据集文件
|
||||
path('get_data_list', get_data_list), # 获取数据集列表
|
||||
path('show_dataset', show_dataset), # 预览数据集
|
||||
|
||||
@@ -7,7 +7,7 @@ from django.http import JsonResponse, HttpResponse
|
||||
from django.shortcuts import render
|
||||
from django.views.decorators.http import require_http_methods
|
||||
|
||||
from code_templates.MODEL_DICT import CLEAN_DICT
|
||||
from utils.MODEL_DICT import CLEAN_DICT
|
||||
from ModelSelection.dataset_process_model import DatasetProcess
|
||||
from ModelSelection.model_process_model import SetModel
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ CLEAN_DICT = {
|
||||
"使用下一个数据填充": "df[cols]=df[cols].fillna(method='bfill')",
|
||||
"插值法填充": "df[cols].interpolate()"
|
||||
},
|
||||
"重复项删除":"",
|
||||
"重复项删除":"df[cols].dropna()",
|
||||
"排序": "df.sort_values(by=cols, inplace= True)",
|
||||
"筛选": {
|
||||
"大于": "",
|
||||
+26
-2
@@ -1,3 +1,4 @@
|
||||
import time
|
||||
from pymongo import MongoClient
|
||||
|
||||
|
||||
@@ -6,10 +7,33 @@ class MongoUtil:
|
||||
self.client = MongoClient(host="localhost", port=27017)
|
||||
self.mydb = self.client[database]
|
||||
|
||||
def find_dataset(self, user_name,dataset_name):
|
||||
def find_dataset(self, user_name, dataset_name):
|
||||
collection = self.mydb["dataset_model"]
|
||||
dataset = collection.find_one({"username": user_name, "dataset_name": dataset_name})
|
||||
return dataset.get("data",{})
|
||||
return dataset.get("data", {})
|
||||
|
||||
def upload_dataset(self, user_name, dataset_name, new_data):
|
||||
"""
|
||||
上传数据集
|
||||
:param user_name:
|
||||
:param dataset_name:
|
||||
:param new_data: DataFrame
|
||||
:return:
|
||||
"""
|
||||
upload_time = time.time()
|
||||
columns = new_data.columns
|
||||
data = new_data.to_dict(orient="list")
|
||||
# 插入到数据集集合
|
||||
self.mydb["dataset_model"].insert_one({
|
||||
"username":user_name,
|
||||
"datset_name":dataset_name,
|
||||
"columns":columns,
|
||||
"data":data
|
||||
})
|
||||
# 更新用户集合
|
||||
self.mydb["user_model"].update_many({"username":user_name},
|
||||
{"$push": {"dataset": {"name": dataset_name, 'upload_time': upload_time}}})
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time :2021/4/2 17:35
|
||||
# @Author :lzh
|
||||
# @File : views2urls.py
|
||||
# @Software: PyCharm
|
||||
|
||||
|
||||
import inspect
|
||||
import types
|
||||
from django.urls import path
|
||||
from django.conf.urls import url
|
||||
|
||||
def views2urls(module_name):
|
||||
"""
|
||||
将views中的接口加入到urls
|
||||
:param module_name:
|
||||
:return:
|
||||
"""
|
||||
modules = module_name
|
||||
view = __import__(modules, fromlist=["cc"])
|
||||
url_patterns = []
|
||||
|
||||
for name, value in inspect.getmembers(view):
|
||||
if name not in ["save_clean_data", "check_clean_condition"]:
|
||||
continue
|
||||
print(name,value)
|
||||
if isinstance(value, types.FunctionType):
|
||||
url_patterns.append(path(name, value))
|
||||
return url_patterns
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(views2urls("Engines.engine_view"))
|
||||
Reference in New Issue
Block a user