i am getting error at pushing data to a pgSQL server in Luigi even-thou if i run the python script it works completely fine
I was building a data pipeline that gets json data from a url and changes it into csv format than i clean the data a bit and then push data to a sql server
Everything works of except for last task i am getting this error
RuntimeError: Unfulfilled dependency at run time: dataclean__99914b932b
import requests
import luigi
from bs4 import BeautifulSoup
import json
import csv
import pandas as pd
import psycopg2
class scrapper(luigi.Task):
"""
Get data from nycstationinfo.com
"""
def output(self):
return luigi.LocalTarget开发者_运维问答("nycbikedata.json")
def run(self):
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Max-Age': '3600',
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0'
}
url = "https://gbfs.citibikenyc.com/gbfs/en/station_information.json"
req = requests.get(url, headers)
soup = BeautifulSoup(req.content, 'html.parser')
#get the whole dump of the webpage in pretified form
data = soup.prettify()
#removing back slashes due to wraping
json_string = json.dumps(data)
mod_json_strings = json.loads(json_string)
#writing the formated data in a file
with self.output().open('w') as outfile:
outfile.write(mod_json_strings)
class converter(luigi.Task):
def requires(self):
return scrapper()
def output(self):
return luigi.LocalTarget("nycbikedata.csv")
def run(self):
#opening json file with data
with self.input().open('r') as json_file:
data = json.load(json_file)
#all the data is in station object data.json file
station = data['data']['stations']
data_file = self.output().open('w')
csv_writer = csv.writer(data_file)
count = 0
for x in station:
if count == 0:
# Writing headers of CSV file
header = x.keys()
csv_writer.writerow(header)
count += 1
# Writing data of CSV file
csv_writer.writerow(x.values())
data_file.close()
class dataclean(luigi.Task):
def requires(self):
return converter()
def run(self):
df = pd.read_csv(self.input().open('r'))
x = df.drop(df.columns[6:], axis = 1 )
x = x[:1001]
x.to_csv('formated.csv', index=False)
class datapush(luigi.Task):
def requires(self):
return dataclean()
def run(self):
hostname = 'localhost'
database = 'mydb2'
username = 'bob'
pwd = 'admin'
port_id = 5432
try:
conn = psycopg2.connect(
host = hostname,
dbname = database,
user = username,
password = pwd,
port = port_id)
cur = conn.cursor()
create_script = '''CREATE TABLE IF NOT EXISTS nyc_station(
name varchar(240),
lon float,
lat float,
region_id int,
rental_methods varchar(140),
station_id int); '''
cur.execute(create_script)
sql2 = '''COPY nyc_station(name,lon,lat,region_id,rental_methods,station_id)
FROM '/home/wassey/luigi-demo/formated.csv'
DELIMITER ','
CSV HEADER;'''
cur.execute(sql2)
conn.commit()
except Exception as error:
print('error')
finally:
if cur is not None:
cur.close()
if conn is not None:
conn.close()
if __name__ == "__main__":
luigi.run()
All the tasks work fine except for the datapush but i seprately run the script it works completly fine and pushes the data
精彩评论