-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtap_jsonfeed.py
111 lines (83 loc) · 2.81 KB
/
tap_jsonfeed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import argparse
import requests
import singer
import json
import os
import sys
import singer.stats
session = requests.Session()
logger = singer.get_logger()
def authed_get(source, url, headers={}):
with singer.stats.Timer(source=source) as stats:
session.headers.update(headers)
resp = session.request(method='get', url=url)
stats.http_status_code = resp.status_code
return resp
def authed_get_all_pages(source, url, headers={}):
while True:
r = authed_get(source, url, headers)
yield r
if 'next' in r.links:
url = r.links['next']['url']
else:
break
def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
def load_schemas():
schemas = {}
with open(get_abs_path('tap_jsonfeed/feed.json')) as file:
schemas['feed'] = json.load(file)
return schemas
def get_feed(feed_url, state):
with singer.stats.Counter(source='posts') as stats:
for response in authed_get_all_pages('feed', feed_url):
feed = response.json()
for post in feed['items']:
stats.add(record_count=1)
singer.write_records('feed', feed['items'])
return 0
def do_sync(config, state):
feed_url = config['feed_url']
schemas = load_schemas()
if state:
logger.info('Replicating posts since %s from %s', state, feed_url)
else:
logger.info('Replicating all posts from %s', feed_url)
singer.write_schema('feed', schemas['feed'], 'id')
state = get_feed(feed_url, state)
singer.write_state(state)
def do_discover():
logger.info("Starting discover")
streams = []
streams.append({'stream': 'feed', 'tap_stream_id': 'feed', 'schema': load_schemas()})
json.dump({'streams': streams}, sys.stdout, indent=2)
logger.info("Finished discover")
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-c', '--config', help='Config file', required=True)
parser.add_argument(
'-s', '--state', help='State file')
parser.add_argument(
'-d', '--discover', action='store_true', help='Discover schemas')
args, _ = parser.parse_known_args()
with open(args.config) as config_file:
config = json.load(config_file)
missing_keys = []
for key in ['feed_url']:
if key not in config:
missing_keys += [key]
if len(missing_keys) > 0:
logger.fatal("Missing required configuration keys: {}".format(missing_keys))
exit(1)
if args.discover:
do_discover()
else:
state = {}
if args.state:
with open(args.state, 'r') as file:
for line in file:
state = json.loads(line.strip())
do_sync(config, state)
if __name__ == '__main__':
main()