开发者

i am getting error at pushing data to a pgSQL server in Luigi even-thou if i run the python script it works completely fine

I was building a data pipeline that gets json data from a url and changes it into csv format than i clean the data a bit and then push data to a sql server Everything works of except for last task i am getting this error RuntimeError: Unfulfilled dependency at run time: dataclean__99914b932b

import requests
import luigi
from bs4 import BeautifulSoup
import json
import csv
import pandas as pd
import psycopg2


class scrapper(luigi.Task):
    """
    Get data from nycstationinfo.com 
    """

    def output(self):
        return luigi.LocalTarget开发者_运维问答("nycbikedata.json")

    def run(self):
        headers = {
        'Access-Control-Allow-Origin': '*',
        'Access-Control-Allow-Methods': 'GET',
        'Access-Control-Allow-Headers': 'Content-Type',
        'Access-Control-Max-Age': '3600',
        'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0'
        }

        url = "https://gbfs.citibikenyc.com/gbfs/en/station_information.json"
        req = requests.get(url, headers)
        soup = BeautifulSoup(req.content, 'html.parser')
        #get the whole dump of the webpage in pretified form
        data = soup.prettify()
        #removing back slashes due to wraping
        json_string = json.dumps(data)
        mod_json_strings = json.loads(json_string)
        #writing the formated data in a file
        with self.output().open('w') as outfile:
            outfile.write(mod_json_strings)
class converter(luigi.Task):
    def requires(self):
        return scrapper()


    def output(self):
            return luigi.LocalTarget("nycbikedata.csv")

            
    def run(self):

        #opening json file with data
        with self.input().open('r') as json_file:
            data = json.load(json_file)
        #all the data is in station object data.json file
        station = data['data']['stations']
        data_file = self.output().open('w')
        csv_writer = csv.writer(data_file)
        count = 0

        for x in station:
            if count == 0:
                # Writing headers of CSV file
                header = x.keys()
                csv_writer.writerow(header)
                count += 1

            # Writing data of CSV file
            csv_writer.writerow(x.values())

        data_file.close()
        
class dataclean(luigi.Task):
    def requires(self):
        return converter()
    def run(self):
        df = pd.read_csv(self.input().open('r'))
        x = df.drop(df.columns[6:], axis = 1 )
        x = x[:1001]
        x.to_csv('formated.csv', index=False)

class datapush(luigi.Task):
    def requires(self):
        return dataclean()
    def run(self):
        hostname = 'localhost'
        database = 'mydb2'
        username = 'bob'
        pwd      = 'admin'
        port_id  =  5432
        try:
            conn = psycopg2.connect(
                        host = hostname,
                        dbname = database,
                        user = username,
                        password = pwd,
                        port = port_id)

            cur = conn.cursor()
        
            create_script = '''CREATE TABLE IF NOT EXISTS nyc_station( 
                                    name        varchar(240),
                                    lon         float,
                                    lat         float,
                                    region_id   int,
                                    rental_methods varchar(140),
                                    station_id  int); '''
            cur.execute(create_script)

            sql2 = '''COPY nyc_station(name,lon,lat,region_id,rental_methods,station_id)
                    FROM '/home/wassey/luigi-demo/formated.csv'
                    DELIMITER ','
                    CSV HEADER;''' 
 

            cur.execute(sql2)   
            conn.commit()
        except Exception as error:
            print('error')
        finally:
            if cur is not None:
                cur.close()
            if conn is not None:
                conn.close()


if __name__ == "__main__":
    luigi.run()

All the tasks work fine except for the datapush but i seprately run the script it works completly fine and pushes the data

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜