-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.py
119 lines (104 loc) · 4.34 KB
/
index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import json
from datetime import datetime, timedelta
import boto3
import time
import io
import pandas as pd
s3_client = boto3.client('s3')
def unixtime(dt):
return 1000*int(time.mktime(dt.timetuple()))
def get_as_run(region, start, end):
client = boto3.client('logs', region_name=region)
paginator = client.get_paginator('filter_log_events')
response_iterator = paginator.paginate(
logGroupName='ElementalMediaLive',
# logStreamNames=[lsn],
startTime=unixtime(start),
endTime=unixtime(end),
filterPattern='"input_switch_initiated"',
PaginationConfig={
'MaxItems': 10000,
'PageSize': 100,
# 'StartingToken': '',
}
)
return response_iterator
def gen(response_iterator):
for page in response_iterator:
for line in page['events']:
yield json.loads(line['message'])
def cmap(c):
return {
'name': c['Name'].split(' ')[1],
'arn': c['Arn'],
**c['Tags'],
}
def get_channels(region):
ml = boto3.client('medialive', region_name=region)
channels = [cmap(c) for c in ml.list_channels()['Channels']]
return [c for c in channels if 'vpid' in c]
def get_simple_as_run_report(region, channels, start, end):
response_iterator = get_as_run(region, start, end)
for line in gen(response_iterator):
if line.get('encoder_pipeline', None) == 0:
message = json.loads(line['message'])
input = message['input_id']
action = message['action_name']
ts = f"{line['timestamp']}Z"
planned, duration, source, vpid = action.split(' ')
for channel in channels:
name = channel['name']
if message['input_id'].startswith(name):
iplayervpid = channel['vpid']
yield {'name': name, 'start': ts, 'duration': duration, 'channel_vpid': iplayervpid, 'item_vpid': vpid}
def save_parquet(df, key):
out_buffer = io.BytesIO()
df.to_parquet(out_buffer, index=False, compression='gzip')
s3_client.put_object(Bucket='iplayer-dazzler-asruns', Key=f'{key}.gz', Body=out_buffer.getvalue())
def save_csv(df, key):
out_buffer = io.BytesIO()
df.to_csv(out_buffer, index=False, compression='gzip')
s3_client.put_object(Bucket='iplayer-dazzler-asruns', Key=f'{key}.csv.gz', Body=out_buffer.getvalue())
def make_one_second_data_for_channel(df, start, end):
cp = df[['start', 'channel_vpid', 'item_vpid']]
cp.set_index('start', inplace=True)
local = cp.tz_convert('Europe/London')
range = pd.date_range(start, end, freq='s', tz='Europe/London')
car = local.reindex(range, method='pad')
car['duration'] = 1
return car
def make_one_second_data(channels, start, end, df):
cd = []
for channel in channels:
cd.append(make_one_second_data_for_channel(df.loc[df['channel_vpid']==channel['vpid']], start, end))
df = pd.concat(cd).dropna()
df = df.tz_convert('Europe/London')
df = df.tz_localize(tz=None)
return df.reset_index(names=['start'])
def save_one_second_data(region, channels, start, end, df2):
e = pd.to_datetime(end).tz_convert('Europe/London')
s = pd.to_datetime(start).tz_convert('Europe/London')
df3 = make_one_second_data(channels, s, e, df2)
save_parquet(df3, f'by_the_second/parquet/{region}_{start.date().isoformat()}')
save_csv(df3, f'by_the_second/csv/{region}_{start.date().isoformat()}')
def report(region, channels, start, end, df):
df2=df.astype(
{'name': 'string', 'start': 'datetime64[ns, UTC]', 'duration': 'timedelta64[ns]', 'channel_vpid': 'string', 'item_vpid': 'string'}
)
save_parquet(df2, f'daily/{region}_{start.date().isoformat()}')
save_csv(df2, f'daily_csv/{region}_{start.date().isoformat()}')
save_one_second_data(region, channels, start, end, df2)
def main(time, region):
dt = datetime.fromisoformat(time)
end = dt.replace(hour=0,minute=0,second=0,microsecond=0)
start = end - timedelta(hours=24)
channels = get_channels(region)
logs = [i for i in get_simple_as_run_report(region, channels, start, end)]
if len(logs) > 0:
report(region, channels, start, end, pd.DataFrame(logs))
def lambda_handler(event, context):
main(event["time"], event['region'])
return {
'statusCode': 200,
'body': json.dumps('wrote daily as runs')
}