Python MongoDB 合表

一、原始表结构

1、imsi表

MongoDB Enterprise > db.trs_action_dzwl_zm.findOne()
{
        "_id" : {
                "imsi" : "460029380018855",
                "start_time" : "2019-03-13 15:37:07"
        },
        "site_address" : "织里-大港路与G318交叉口",
        "xnetbar_wacode" : "EG-MIX-WL-4C-006",
        "imei" : "000000052052052",
        "device_longitude" : "120.275424",
        "device_latitude" : "30.838656",
        "tmsi" : "1552462627",
        "rssi" : "140",
        "band" : "40",
        "plmn" : "46000",
        "tel_number" : "1595028",
        "device_name" : "织里-大港路与G318交叉口-4G",
        "vendor_name" : "南京森根",
        "province" : "江苏省",
        "city" : "盐城市"
}

2、car表

MongoDB Enterprise > db.trs_action_car_info.findOne()
{
        "_id" : {
                "license_number" : "苏A39NX7",
                "start_time" : "2019-05-16 23:03:13"
        },
        "site_address" : "湖织大道-香圩桥东侧",
        "site_location_id" : "",
        "unlawful_act" : "",
        "driving_direct" : "其它",
        "lane_id" : "001",
        "netbar_wacode" : "904",
        "license_color" : "002",
        "photo_cnt" : "",
        "monitor_type" : "卡口式监控",
        "photo_path" : "/pic?did=12ffaa00-78a3-1037-921c-54c4150760be&bid=486472&pid=4294966623&ptime=1558018994",
        "speed" : "0",
        "stat" : "0",
        "vehicle_brand1" : "0",
        "vehicle_brand2" : "0",
        "car_length" : "",
        "car_color" : "其它颜色",
        "shade" : "000",
        "car_type" : "轿车",
        "license_type" : "92式民用车",
        "vehicle_feature_path" : "",
        "device_name" : "湖织大道-香圩桥东侧",
        "monitor_direct" : "未知",
        "lane" : "001",
        "device_longitude" : "120.308512",
        "device_latitude" : "30.881026",
        "site_name" : "湖织大道-香圩桥东侧",
        "road_segment_direct" : "未知",
        "site_longitude" : "120.308512",
        "site_latitude" : "30.881026"
}

3、face表

MongoDB Enterprise > db.trs_action_face_info.findOne()
{
        "_id" : {
                "pid" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
                "start_time" : "2019-06-13 12:32:59"
        },
        "site_address" : "融泰宾馆",
        "img_mode" : "",
        "obj_img_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42",
        "quality_score" : "0.883593",
        "netbar_wacode" : "33052802001310942740",
        "device_name" : "融泰宾馆",
        "device_longitude" : "120.262211",
        "device_latitude" : "30.841749",
        "age" : "",
        "gender" : "1",
        "race" : "",
        "beard" : "",
        "eye_open" : "",
        "eye_glass" : "",
        "sun_glass" : "1",
        "mask" : "",
        "mouth_open" : "",
        "smile" : "1",
        "similarity" : "0.97059",
        "image_id" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
        "bkg_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42"
}

4、MAC表

二、合表后collectionsitetime结构

要求:将imsi、car、face、MAC(MAC暂时不合)四张表,将表中一些关键字段提取出来

1)以站点

2)以两分钟为间隔

3)一个document中,两分钟内最多只存200个关键数据

MongoDB Enterprise > db.collecsites.findOne()
{
        "_id" : ObjectId("5e159ef831d840f9482b2adc"),
        "timeline" : "2019-03-13 15:34:00",
        "site" : "织里-大港路与G318交叉口",
        "face" : [ ],
        "lpn" : [ ],
        "mac" : [ ],
        "nsamples" : 200,
        "imsi" : [
                {
                        "start_time" : "2019-03-13 15:35:56",
                        "imsi" : "460078995442766"
                },
                {
                        "start_time" : "2019-03-13 15:35:56",
                        "imsi" : "460006254007976"
                }
        ]
}

三、开发脚本

1、使用到python模块

from multiprocessing import Pool(进程池)

from pymongo import MongoClient(python连接mongodb驱动)

import pandas as pd(将一段时间划分为多个时间段,本例子以2分钟一个时间段)

2、脚本

1)连接mongodb的脚本

[[email protected] python3]# cat mongodbclient.py
#coding=utf-8

from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime

class Database(object):
    def __init__(self, address, port, database):
        self.conn = MongoClient(host=address, port=port)
        self.db = self.conn[database]

def get_state(self):
        return self.conn is not None and self.db is not None

def insert_one(self, collection, data):
        if self.get_state():
            ret = self.db[collection].insert_one(data)
            return ret.inserted_id
        else:
            return ""

def insert_many(self, collection, data):
        if self.get_state():
            ret = self.db[collection].insert_many(data)
            return ret.inserted_id
        else:
            return ""

def update(self, collection, data):
        # data format:
        # {key:[old_data,new_data]}
        data_filter = {}
        data_revised = {}
        for key in data.keys():
            data_filter[key] = data[key][0]
            data_revised[key] = data[key][1]
        if self.get_state():
            return self.db[collection].update_many(data_filter, {"$set": data_revised}).modified_count
        return 0

def updateOne(self, collection, data_filter,data_revised):
        if self.get_state():
            return self.db[collection].update(data_filter,data_revised,True)
        return 0

def find(self, col, condition, column=None):
        if self.get_state():
            if column is None:
                return self.db[col].find(condition)
            else:
                return self.db[col].find(condition, column)
        else:
            return None

def aggregate(self, col, condition):
        if self.get_state():
            options = {‘allowDiskUse‘:True}
            result=self.db[col].aggregate(condition,**options)
            return result
        else:
            return None

def delete(self, col, condition):
        if self.get_state():
            return self.db[col].delete_many(filter=condition).deleted_count
        return 0

def close_connect(self):
        self.conn.close()
        #return ‘mongo连接已关闭‘

2)对mongodb中collection做实际操作的脚本

[[email protected] python3]# cat collection_curd.py
#coding:utf-8

from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
import mongodbclient
import pandas as pd

def max_number(num1,num2,num3):   ##获取最大值
    max_num=max(num1,num2,num3)
    return max_num

def site_cursor_to_list(myresult,colum):  ##将mongodb输出的cursor转换为python的list
    sitelist=[]
    for i in myresult:
        sitelist.append(i[colum])
    return sitelist

def list_Duplicate_removal(inlist):   ##去除重复值
    outlist=list(set(inlist))
    return outlist

def get_time_interval(str_start_time,str_end_time):  ##以2分钟为单位,将输入的时间范围切分
    time_interval=pd.date_range(str_start_time, str_end_time,freq=‘2 Min‘)
    return time_interval

def get_site(collection_name,str_start_time,str_end_time):  ##获取2分钟内imsi/face/lpn/mac的站点名称
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time}})
    db.close_connect()
    return site_cursor_to_list(myresult,"site_address")

def get_site_data(collection_name,str_start_time,str_end_time,site,colums):  ##根据条件:2分钟的起止时间、站点名、集合名、字段名,获取所需数据
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time},"site_address":site},colums)
    db.close_connect()
    return myresult

def sitetime_insert(collection_name,site,str_start_time,imsi_sitetime,face_sitetime,car_sitetime,mac_sitetime):  ##将数据插入集合
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    db.insert_one(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,"imsi":imsi_sitetime,"face":face_sitetime,"lpn":car_sitetime,"mac":mac_sitetime})
    db.close_connect()

def sitetime_updateOne(collection_name,site,str_start_time,key,value):   ##将数据更新到集合中
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    db.updateOne(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,key:[]},{"$set":{key:value}})
    db.close_connect()

#def sit_colse():
#    db.close_connect()

3)操作脚本

[[email protected] python3]# cat collection_insert.py
#coding:utf-8

from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
import mongodbclient
import pandas as pd
import collection_curd as curd
from multiprocessing import Pool

#update_exec(imsi_outlen_flo,"collecsites",‘imsi‘,imsidata,imsi_outlen_int,imsi_max_len)
def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):
    if type_outlen_flo <=1.0:
        curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist)
    else:
        for x in range(type_outlen_int+1):
            if x==type_outlen_int:
                curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:type_max_len])
                #print(typelist)
            else:
                curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:(x+1)*200])
                #print(typelist)

def data_exec(nums,time_interval):
    #start = time.time()
    #print("start_time : ",start)
    #time_interval=curd.get_time_interval(‘20190310‘,‘20191230‘)
    #for i in range(len(time_interval)-1):  ##从时间切片中,选取每一个切片时间段
    #print("start : ",nums)
    str_start_time = datetime.datetime.strftime(time_interval[nums],‘%Y-%m-%d %H:%M:%S‘)  ##时间切片,每个切片的开始时间
    str_end_time = datetime.datetime.strftime(time_interval[nums+1],‘%Y-%m-%d %H:%M:%S‘)  ##时间切片,每个切片的结束时间
    #print(str_start_time,‘   ‘,str_end_time)
    #print("########################")
    #time.sleep(5)
    #exit()
    #sitelist=[]
    myresult_imsi_sit=curd.get_site("trs_action_dzwl_zm",str_start_time,str_end_time) ##获取2分钟内imsi的站点名称,并将站点名带入下面的循环
    myresult_car_sit=curd.get_site("trs_action_car_info",str_start_time,str_end_time) ##获取2分钟内car的站点名称,并将站点名带入下面的循环
    myresult_face_sit=curd.get_site("trs_action_face_info",str_start_time,str_end_time) ##获取2分钟内face的站点名称,并将站点名带入下面的循环
    myresult=myresult_imsi_sit+myresult_car_sit+myresult_face_sit
    #print(myresult)
    myresult=curd.list_Duplicate_removal(myresult) ##获取去重后的所有站点
    #print(myresult)
    #exit()
    if not myresult:
        pass
    else:
        for i in range(len(myresult)):
            site=myresult[i]
            #print(site)
            my_imsi_site_data=curd.get_site_data("trs_action_dzwl_zm",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据imsi
            my_car_site_data=curd.get_site_data("trs_action_car_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据car
            my_face_site_data=curd.get_site_data("trs_action_face_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据face

imsidata=curd.site_cursor_to_list(my_imsi_site_data,"_id")
            cardata=curd.site_cursor_to_list(my_car_site_data,"_id")
            facedata=curd.site_cursor_to_list(my_face_site_data,"_id")
            #print(imsidata)

imsi_outlen_int=len(imsidata)/200
            imsi_outlen_flo=len(imsidata)/200.0
            car_outlen_int=len(cardata)/200
            face_outlen_int=len(facedata)/200
            car_outlen_flo=len(cardata)/200.0
            face_outlen_flo=len(facedata)/200.0

car_max_len=len(cardata)
            face_max_len=len(facedata)
            imsi_max_len=len(imsidata)
            #print("car_max_len:",car_outlen_int," ","face_max_len:",face_outlen_int," ","imsi_max_len:",imsi_outlen_int)
            max_mod_200=max(imsi_outlen_int,car_outlen_int,face_outlen_int)+1
            #print(max_mod_200)

if imsi_outlen_flo>imsi_outlen_int or car_outlen_flo>car_outlen_int or face_outlen_flo>face_outlen_int:
                for i in range(max_mod_200):
                    curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
            else:
                for i in range(max_mod_200-1):
                    curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
            update_exec(imsi_outlen_flo,"collecsites",site,str_start_time,‘imsi‘,imsidata,imsi_outlen_int,imsi_max_len)
            update_exec(car_outlen_flo,"collecsites",site,str_start_time,‘lpn‘,cardata,car_outlen_int,car_max_len)
            update_exec(face_outlen_flo,"collecsites",site,str_start_time,‘face‘,facedata,face_outlen_int,face_max_len)
            #print(site)
            #exit()
            #curd.sit_colse
    #def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):

#end = time.time()
    #print("end_time : ",end)
    #print(‘ALL Insert Task runs %s(ms).‘ % ((end - start)*1000))

if __name__ == ‘__main__‘:
    start = time.time()
    p=Pool(30)
    #print("start_time : ",start)
    time_interval=curd.get_time_interval(‘20190310‘,‘20191230‘)
    for i in range(len(time_interval)-1):  ##从时间切片中,选取每一个切片时间段
        #print(i)
        #res=p.apply_async(data_exec,args=(i,))
        result=p.apply_async(data_exec, args=(i,time_interval))
    p.close()
    p.join()
    end = time.time()
    print("end_time : ",end)
    print(‘ALL Insert Task runs %s(ms).‘ % ((end - start)*1000))

四、开发中遇到的问题

1、如何将一段时间按照两分钟进行划分

2、实例化mongodb连接后,在脚本运行中,连接如何close

3、进程线程池的使用

原文地址:https://www.cnblogs.com/xibuhaohao/p/12167940.html

时间: 2024-07-30 23:29:48

Python MongoDB 合表的相关文章

Python mongoDB 的简单操作

#!/usr/bin/env python # coding:utf-8 # Filename:mongodb.py from pymongo import MongoClient,ASCENDING,DESCENDING import datetime # connection with mongoclient client=MongoClient() # getting a database db=client.test # getting a collection collection=d

MongoDB数据表基本操作

MongoDB数据表基本操作 查看全部数据表 > use ChatRoom switched to db ChatRoom > show collections Account Chat system.indexes system.users 创建数据表 > db.createCollection("Account") {"ok":1} > db.createCollection("Test",{capped:true,

Python——正則表達式(2)

本文译自官方文档:Regular Expression HOWTO 參考文章:Python--正則表達式(1) 全文下载 :Python正則表達式基础 ====================================================================================== 3.使用正則表達式 如今.我们已经学习了一些简单的正則表達式,但我们应该怎么在Python中使用它们呢?re模块提供了一个连接正則表達式引擎的接口,同意你将RE编译成对象并利

python 99乘法表

九九乘法表, for a in range(1,10):     for b in range(1,a+1]:         print "%sx%s=%s" %(a,b,a*b),     print "\n" python 99乘法表

转 python操作注册表模块_winreg

python操作注册表模块_winreg 2009-03-19 14:19:00 分类: WINDOWS 基本概念:KEY 键Value 值 函数和作用:CloseKey() - 关闭一个KeyConnectRegistry() - 链接到其他机器的注册表CreateKey() - 创建一个KeyDeleteKey() - 删除一个KeyDeleteValue() - 删除一个Key里面的值(value)EnumKey() - 为已经打开的Key里面的子键建立索引EnumValue() - 为打

Python正則表達式小结(1)

学习一段python正則表達式了, 对match.search.findall.finditer等函数作一小结  以下以一段网页为例,用python正則表達式作一个范例: strHtml = '''<div> <a href="/user/student/" class="user-t"><img src="/uploads/avatar/2015/06/082e408c-14fc-11e5-a98d-00163e02100b

UNION ALL合表查询

有时候需要连表查询数据,可以使用union all来做合表. 语法: SELECT column_name FROM table1UNION ALLSELECT column_name FROM table2 例如: $sql = "select rg.* from ( SELECT g.goods_id, g.sort_order, g.goods_name, g.goods_number, g.suppliers_id, g.goods_name_style, g.market_price,

Python Mongodb接口

Python Mongodb接口 MongoDB 是一个基于分布式文件存储的数据库.由 C++ 语言编写.旨在为 WEB 应用提供可扩展的高性能数据存储解决方案. 同时,MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是NoSQL的优秀实现. 本文记录使用PyMongo模块,用Python调用MongoDB 工具类实现 from pymongo import MongoClient mongodb_name = 'dev_map' client = MongoClient("mon

Python MySQL 插入表

章节 Python MySQL 入门 Python MySQL 创建数据库 Python MySQL 创建表 Python MySQL 插入表 Python MySQL Select Python MySQL Where Python MySQL Order By Python MySQL Delete Python MySQL 删除表 Python MySQL Update Python MySQL Limit Python MySQL Join 插入表 要把记录插入到MySQL中的表中,使用