#Builtin imports
import logging
import json
import core
import uuid
import time
import base64
try:
import queue as Queue
except ImportError:
import Queue
import datetime
import string
valid_chars = set(string.ascii_letters+string.digits+'{}|~^<>!#$%()+,-.@_[] ')
log = logging.getLogger()
log.debug("Valid SQL characters are {0}".format(valid_chars))
session_nums = 0
command_nums = {}
event_types = {
"notification": "NOT",
"url": "URL",
"function": "FUN"
}
[docs]def set_response(session_id, command_id, event, response_function):
"""
Set a response listener in the session object
:param session_id:
:param command_id:
:param event:
:param response_function:
"""
session_container = core.sessions[session_id]
commands_container = session_container["commands"]
for command in commands_container:
if command["id"] == command_id:
command_data = command
command_data.update({
"event": event,
"function": response_function
})
[docs]def gen_session(username, client_type, db):
"""
:param username:
:param client:
:return: session_id
"""
session_id = get_session_id(db)
# Start monitoring notifications
# Register a session id
core.sessions.update({
session_id: {
"username": username,
"commands": [],
"created": datetime.datetime.now(),
"updates": Queue.Queue(),
"id": session_id,
"client": client_type
}
})
return session_id
[docs]def gen_command_uid():
"""
Generate a 16 character url safe base64 string
:return urlsafe base64 string:
"""
return base64.urlsafe_b64encode(uuid.uuid1().bytes).decode("utf-8").rstrip('=\n').replace('/', '_')
[docs]def create_command_obj(session_id, command):
'''
Generate a properly formatted command object
:param session_id:
:param command:
:return command object:
'''
command_uid = "{0}_{1}".format(session_id,gen_command_uid())
log.debug(":{0}:Generating a new command object with command id {1}".format(session_id, command_uid))
command_object = {
"command": command,
"id": command_uid
}
#Add the command to the session
core.sessions[session_id]["commands"].append(command_object)
return command_object
[docs]def get_event_uid(type):
'''
Get an event uid using the event type
:param type:
:return: Event uid string
'''
e_type = event_types[type]
return "{0}:{1}".format(e_type, str(uuid.uuid1()))
[docs]def dump_events(events, db):
"""
Dump events
:param events:
:param db:
"""
#Delete all events from db that should be finished
events_table = db['events']
events_table.delete(time < time.time())
for event in events:
#Remove one time events that had functions in them
if event["type"] != "function":
events_table.upsert(event, ['uid'])
[docs]def load_key(key_type, db, load_url=False):
"""
Load a key from the database and implement the cycler
:param key_type:
:param db:
:param load_url:
:return api key:
"""
working_keys = db.query('SELECT * FROM `keys` WHERE type="{0}" and uses <= max_uses'.format(key_type))
correct_key = sorted(working_keys, key=lambda x: x["num"])[0]
key_uses = correct_key["uses"]
key_value = correct_key["value"]
updated_uses = key_uses+1
#Assume that keys reset monthly
db['keys'].update(dict(type=key_type, num=correct_key['num'], uses=updated_uses), ['type', 'num'])
if load_url:
return (key_value, correct_key["url"])
return key_value
[docs]def initialize_session_tracking(db):
"""
Deprecated and out of use
:param db:
"""
vars = db["vars"]
session_increment = vars.find_one(name="session_incremnet")
log.debug("Found session increment {0} from server".format(session_increment))
global session_nums
session_nums = session_increment
[docs]def get_session_id(db):
"""
Incrementer for session ids
:param db:
"""
global session_nums
session_nums+=1
session_id = uuid.uuid1()
session_str = str(session_id)
log.debug("Generated session_id {0}".format(session_str))
log.debug("Updating session increment in db")
data = dict(name="session_id", value=session_nums)
db['vars'].update(data, ['name'])
return session_str
[docs]def get_command_id(session_id):
"""
Incrementing command ids based on the session id
:param session_id:
:return command_id:
"""
global command_nums
command_nums+=1
command_id = "{0}_{1}".format(
session_id, gen_command_uid()
)
log.debug("Generated command id {0}".format(command_id))
return command_id
[docs]def get_user_token(username):
"""
Get a customized user token to store encrypted in the cookies
:param username:
:return user_token:
"""
user_uid = uuid.uuid3(uuid.NAMESPACE_DNS, str(username))
gen_uid = uuid.uuid1()
return str(gen_uid)+":u:"+str(user_uid)
[docs]def return_json(response):
"""
Render a response object as json, assert that it has all the correct keys, and return it
:param response:
:return json string:
"""
#Make sure the needed keys are in the response data
try:
assert type(response) == dict
assert "type" in response.keys()
assert "data" in response.keys()
assert "text" in response.keys()
log.debug("Returning response {0}".format(response))
return json.dumps(response)
except AssertionError as e:
log.error("AssertionError {0}, {1} when trying to render response {2}".format(
e.message, e.args, response
))
return {
"type": "error",
"data": None,
"text": "Server returned malformed response {0}".format(response)
}
[docs]def fold(string, line_length=120, indent=0, indent_first_line=False, _runs=0):
"""Fold a string into multiple Lines.
Fold function by Max Ertl (https://github.com/Sirs0ri)
:param string: The string you want to fold.
:param line_length: The desired max line length (int)
:param indent: if you want lines to be indented, you can specify the number of
spaces here
:param indent_first_line: if this is True, the first line won't be indented.
:return formatted string:
"""
if indent > line_length:
log.debug("The indentation is higher than the desired line-length and will "
"therefore be ignored.")
# Set up the actual line length
if indent_first_line is False and _runs == 0:
length = line_length
else:
length = line_length - indent
# The actual folding:
if len(string) < length:
# no need to fold
return (string)
else:
s = ""
i = 0
# Find the last space that would be in the last 12 chars of the new line
# The text will be folded here, 12 proved to be a good value in my tests
for c in string[length:length - 12:-1]:
if c == " ":
# Space found, fold here and remove the space
s += string[0:length - i]
string = string[length + 1 - i:]
# Fold the rest of the string recursively
return "{}\n{}{}".format(s, " " * indent,
fold(string, line_length, indent,
indent_first_line, _runs + 1))
else:
# Character is not a space, move to the previous one
i += 1
# No space found in the last 12 chars of the new line. Use full length
s += string[0:length]
string = string[length:]
return "{}\n{}{}".format(s, " " * indent,
fold(string, line_length, indent,
indent_first_line, _runs + 1))
[docs]def check_string(in_str):
"""
Sanatize data
:param in_str: List or string of strings to be validated
:return boolean:
"""
if type(in_str) == list:
return all([check_string(x) for x in in_str])
else:
filters = (
in_str.strip() and
all([x in valid_chars for x in in_str])
)
return filters