Source code for pankus.taurus.importer
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = 'Maciej Kamiński Politechnika Wrocławska'
import json,pdb
from .utils import TaurusLongTask
from .data_journal import DataJournal
[docs]class Importer(DataJournal):
def __init__(self,**kwargs):
super().__init__(**kwargs)
self.kwargs=kwargs
self.network_filename=kwargs.get('network_filename','net.geojson')
self.od_filename=kwargs.get('od_filename','od.geojson')
self.od_id_name=kwargs.get('od_id_name','od_id')
[docs] def import_network_geojson(self,make_two_side=False):
self.do('initial/create_network')
with open(self.network_filename,'rb') as net:
net_data=json.load(net)
# print(net_data.keys())
geometry_to_insert=[]
data_to_insert=[]
for feature in TaurusLongTask(net_data['features'],**self.kwargs):
assert 'LineString' == feature['geometry']['type']
linestring=json.dumps(feature['geometry']['coordinates'])
start=json.dumps(feature['geometry']['coordinates'][0])
end=json.dumps(feature['geometry']['coordinates'][-1])
geometry_to_insert.append({
'start':str(start),
'end':str(end),
'linestring':str(linestring)
})
if make_two_side:
linestring=json.dumps(list(reversed(feature['geometry']['coordinates'])))
geometry_to_insert.append({
'start':str(end),
'end':str(start),
'linestring':str(linestring)
})
for key in feature['properties']:
name=key
value=feature['properties'][key]
data_to_insert.append({
'start':str(start),
'end':str(end),
'name':str(name),
'value':str(value)
})
if make_two_side:
data_to_insert.append({
'start':str(end),
'end':str(start),
'name':str(name),
'value':str(value)
})
self.transaction('initial/import_network_geometry',geometry_to_insert)
self.transaction('initial/import_network_properties',data_to_insert)
if self.table_exists('od_geometry'):
self.point_from_network_od()
self.check_geometry()
[docs] def import_od_geojson(self):
self.do('initial/create_od')
with open(self.od_filename,'rb') as od:
od_data=json.load(od)
geometry_to_insert=[]
data_to_insert=[]
for feature in TaurusLongTask(od_data['features'],**self.kwargs):
assert 'Point' == feature['geometry']['type']
assert self.od_id_name in feature['properties']
od_id=feature['properties'][self.od_id_name]
geometry=json.dumps(feature['geometry']['coordinates'])
geometry_to_insert.append({
'od_id':int(od_id),
'point':str(geometry)
})
for key in feature['properties']:
name=key
value=feature['properties'][key]
data_to_insert.append({
'od_id':int(od_id),
'name':str(name),
'value':str(value)
})
self.transaction('initial/import_od_geometry',geometry_to_insert)
self.transaction('initial/import_od_properties',data_to_insert)
if self.table_exists('network_geometry'):
self.point_from_network_od()
self.check_geometry()
[docs] def point_from_network_od(self):
self.do('initial/create_point')
self.do('initial/insert_point')
[docs] def point_from_od(self):
self.do('initial/create_point')
self.do('initial/insert_point_from_od')
[docs] def check_geometry(self):
for point, in self.do('initial/check_geometry'):
print("problem with geometry at:", point)
if not list(self.do('initial/check_geometry')):
print("No geometry problems")
[docs] def fix_geometry(self,range):
self.do('initial/fix_geometry',{'range':range})
self.point_from_network_od()
self.check_geometry()