-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkibana-dev-tools-saver.py
executable file
·128 lines (114 loc) · 5.26 KB
/
kibana-dev-tools-saver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import shutil
import os
import plyvel
import json
import argparse
import logging
import time
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Default values
KIBANA_URLS = [
"https://kibana1-example.com",
"https://kibana2-example.com",
]
SOURCE_DIR = "/mnt/c/Users/<user>/AppData/Local/Google/Chrome/User Data/Default/Local Storage/leveldb/"
TEMP_DIR = "/tmp/kibana-dev-tools-saver/"
SAVE_FOLDER = None
DEFAULT_PREFIX = ""
DEFAULT_TIME = None
# Argument parsing
parser = argparse.ArgumentParser(description="Extract and save Kibana's console data from Chrome's localStorage.")
parser.add_argument("-sd", "--source-dir", type=str, default=SOURCE_DIR, help="Source directory of Chrome's LevelDB.")
parser.add_argument("-td", "--temp-dir", type=str, default=TEMP_DIR, help="Temporary directory to copy LevelDB for processing.")
parser.add_argument("-sf", "--save-folder", type=str, default=SAVE_FOLDER, help="Target folder to save the console outputs.")
parser.add_argument("-p", "--prefix", type=str, default=DEFAULT_PREFIX, help="Prefix to prepend to saved filenames.")
parser.add_argument("-t", "--time", type=int, default=DEFAULT_TIME, help="Time in seconds to wait between reruns of the script.")
parser.add_argument("-q", "--quiet", action="store_true", help="Suppress output if saving to a file.")
parser.add_argument("-ku", "--kibana-urls", type=str, default=",".join(KIBANA_URLS), help="Comma-separated list of Kibana URLs to extract data from.")
args = parser.parse_args()
KIBANA_URLS = [url.strip() for url in args.kibana_urls.split(",")]
def copy_to_temp_dir():
if not os.path.exists(args.source_dir):
logging.error(f"The specified source directory {args.source_dir} does not exist.")
logging.info("Please specify the correct Chrome LevelDB directory using --source-dir argument.")
exit(1)
if os.path.exists(args.temp_dir):
user_input = input(f"Temporary directory {args.temp_dir} already exists. Do you want to delete it? (y/n): ").lower()
if user_input in ['y', 'yes']:
try:
shutil.rmtree(args.temp_dir)
except Exception as e:
logging.error(f"Error removing existing temporary directory: {e}")
raise
elif user_input in ['n', 'no']:
logging.info("Exiting without making changes.")
exit(0)
else:
logging.error("Invalid choice. Exiting.")
exit(1)
try:
shutil.copytree(args.source_dir, args.temp_dir, ignore=shutil.ignore_patterns('LOCK'))
except Exception as e:
logging.error(f"Error copying LevelDB to temporary directory: {e}")
raise
def delete_temp_dir():
if os.path.exists(args.temp_dir):
try:
shutil.rmtree(args.temp_dir)
except Exception as e:
logging.error(f"Error deleting temporary directory: {e}")
raise
def extract_console_data_from_leveldb():
db = plyvel.DB(args.temp_dir, create_if_missing=False)
for key, value in db.iterator():
try:
key_utf8 = key.decode('utf-8')
except UnicodeDecodeError:
logging.warning(f"Key cannot be decoded as UTF-8: {key}")
continue
for origin in KIBANA_URLS:
if origin in key.decode('utf-8') and "sense:console_local_text-object_" in key.decode('utf-8'):
try:
# Removing the first character which is a non-JSON character.
clean_value = value[1:].decode('utf-8')
json_data = json.loads(clean_value)
text_content = '\n'.join(json_data.get('text', '').splitlines()) # fix windows like characters (^M)
yield origin, text_content
except json.JSONDecodeError:
logging.warning(f"Invalid JSON for key {key}. Raw value: {value}")
db.close()
def main():
copy_to_temp_dir()
data_found = False # flag to check if any KIBANA_URL was found
for origin, text_content in extract_console_data_from_leveldb():
data_found = True # set the flag to True when data is found
if not args.quiet:
logging.info(f"Kibana: \"{origin}\"")
if args.save_folder:
if not os.path.exists(args.save_folder):
os.makedirs(args.save_folder)
file_name = f"{args.prefix}{origin.replace('https://', '').replace('/', '_')}.console"
full_path = os.path.join(args.save_folder, file_name)
with open(full_path, 'w') as f:
f.write(text_content)
if not args.quiet:
logging.info(f"Saving {origin} to {full_path}...")
else:
print(text_content)
delete_temp_dir()
if not data_found: # if no relevant data was found
logging.warning(f"No data found for the provided Kibana URLs: {', '.join(KIBANA_URLS)}")
if __name__ == "__main__":
if args.time:
try:
while True:
main()
if not args.quiet:
logging.info(f"Sleeping for {args.time} seconds...")
time.sleep(args.time)
except KeyboardInterrupt:
logging.info("Interrupted by user. Cleaning up...")
delete_temp_dir()
else:
main()