Threaded MySQL

Submit modules/packages for inclusion with Source.Python.
User avatar
velocity
Member
Posts: 52
Joined: Sat May 10, 2014 6:17 pm

Threaded MySQL

Postby velocity » Wed Feb 08, 2017 12:13 am

Threaded MySQL

** UPDATE 15-02-2017**
Now uses GameThread instead of Repeat and Delay
Delaying the queue is still possible tho, through

Syntax: Select all

TSQL.wait(5) # Seconds


If your server requires a remote connection to a database, stacked up queries can cause noticeable lag in your game server (freezing, players twitching) since MySQL doesn't 'really' queue up the queries.

I've made a library to fix this problem, it basically queues up the queries and executes them with a dispatched GameThread. It's inspired from the Sourcemod threaded mysql https://wiki.alliedmods.net/SQL_(SourceMod_Scripting)
Remember that all queries now requires a callback, since they are truely dispatched.


The library:
See bottom for download instructions..
threaded_mysql/__init__.py

Syntax: Select all

"""
Created By Velocity-plus
Github: https://github.com/Velocity-plus/
"""
from listeners.tick import GameThread
from core import echo_console
from traceback import format_exc as traceback_format_exc
from logging import error as logging_error
from sys import exc_info
from queue import Queue
from time import time as timestamp, sleep
import pymysql.cursors


class ThreadedMySQL:

def __init__(self):
# Is the thread running?
self.thread_status = False
# Regular Queue
self._r_queue = Queue()
# Prioitized Queue
self._p_queue = Queue()

self.connection_method = 0

# Issue with print for WINDOWS user set the variable to 'echo_console'
self.error_handling = 'print'
# Show print messages?
self._debug = True

self.wait = 0

def wait(self, delay):
"""
If you for some reason want to delay the queue
:param delay: The delay in seconds
:return:
"""
self.wait = delay

def execute(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
"""
This function cannot pass fetched data to the callback!
:param query: The SQL query that you want to execute
:param args: If the SQL query have any args
:param callback: The callback for the query
:param data_pack: If you want to pass special information to the callback
:param prioritize: If you have large queues, prioritizing the query can make it skip the queue
before the rest of the queue is finished
:param get_info: If you want information about the query passed to the callback
(such as timestamp, query and prioritized)
:return:
"""
# We need this later
query_type = 0

# If callback = None assuming no data returned needed
if get_info:
get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}

if not prioritize:
self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
else:
self._p_queue.put([query, args, callback, data_pack, get_info, query_type])

def fetchone(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
"""
This function both execute and fetch data, no need to execute before using this!
:param query: The SQL query that you want to execute
:param args: If the SQL query have any args
:param callback: The callback for the query
:param data_pack: If you want to pass special information to the callback
:param prioritize: If you have large queues, prioritizing the query can make it skip the queue
before the rest of the queue is finished
:param get_info: If you want information about the query passed to the callback
(such as timestamp, query and prioritized)
:return:
"""
query_type = 1
if get_info:
get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}

# If callback = None assuming no data returned needed
if not prioritize:
self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
else:
self._p_queue.put([query, args, callback, data_pack, get_info, query_type])

def fetchall(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
"""
This function both execute and fetch data, no need to execute before using this!
:param query: The SQL query that you want to execute
:param args: If the SQL query have any args
:param callback: The callback for the query
:param data_pack: If you want to pass special information to the callback
:param prioritize: If you have large queues, prioritizing the query can make it skip the queue
before the rest of the queue is finished
:param get_info: If you want information about the query passed to the callback
(such as timestamp, query and prioritized)
:return:
"""
query_type = 2

if get_info:
get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}

# If callback = None assuming no data returned needed
if not prioritize:
self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
else:
self._p_queue.put([query, args, callback, data_pack, get_info, query_type])

def complete_task(self, worker, prio=None):
query = worker[0]
args = worker[1]
callback = worker[2]
data_pack = worker[3]
get_info = worker[4]
query_type = worker[5]
try:
if get_info:
get_info['time'] = timestamp() - get_info['time']

if args:
self.cursor.execute(query, args)
else:
self.cursor.execute(query)

if query_type == 0:
if get_info:
if callback:
if data_pack:
callback(data_pack, get_info)
else:
callback(get_info)
else:
if callback:
if data_pack:
callback(data_pack)
else:
callback()
if query_type == 1:
data = self.cursor.fetchone()
if get_info:
if callback:
if data_pack:
callback(data, data_pack, get_info)
else:
callback(data, get_info)
else:
if callback:
if data_pack:
callback(data, data_pack)
else:
callback(data)

if query_type == 2:
data = self.cursor.fetchall()
if get_info:
if callback:
if data_pack:
callback(data, data_pack, get_info)
else:
callback(data, get_info)
else:
if callback:
if data_pack:
callback(data, data_pack)
else:
callback(data)
if prio:
self._p_queue.task_done()
else:
self._r_queue.task_done()

except Exception as SQL_ERROR:

# Possible errors
class_error, actual_error, traceback = exc_info()

format_error = '-' * 64 + '\nExceptions probable cause (SQL Query: {0})\n{1}\nActual Error:\n{2}'.format(query,

class_error,

SQL_ERROR)

if self.error_handling == 'print':

print(format_error)

print('-' * 64)

else:

echo_console(format_error)

echo_console('-' * 64)

logging_error(traceback_format_exc())

def _threader(self):
while self.thread_status:
if self.wait:
sleep(self.wait)

if self._p_queue.empty():
worker = self._r_queue.get()
self.complete_task(worker, prio=False)

else:
worker = self._p_queue.get()
self.complete_task(worker, prio=True)

def _start_thread(self):
# Creates the thread
self.t = GameThread(target=self._threader)
self.t.daemon = True
self.t.start()

def handlequeue_start(self):
"""
This handles the queue, should be stopped on unload
:return:
"""
# Starts the queue
self.thread_status = True # This must be true before the thread can loop
self._start_thread()

def handlequeue_stop(self):
"""
This stops the queue for being processed, while a connection still might be open
no queries can be executed.
:return:
"""
self.thread_status = False

def queue_size(self):
"""
:return: Returns the size of the queue
"""
return self._r_queue.qsize() + self._p_queue.qsize()

def connect(self, host, user, password, db, charset, cursorclass=pymysql.cursors.DictCursor):
"""
Checkout PyMYSQL documentation for complete walkthrough
"""
try:
self.connection = pymysql.connect(host=host,
user=user,
password=password,
db=db,
charset=charset,
cursorclass=cursorclass)
self.cursor = self.connection.cursor()
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [SUCCES] connection was succesfully established.')
else:
echo_console('threaded_mysql: [SUCCES] connection was succesfully established.')


self.connection_method = 1
except:
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [ERROR] Not possible to make a connection.')
else:
echo_console('threaded_mysql: [ERROR] Not possible to make a connection.')

def connect_use(self, connection):
"""
If you created your connection elsewhere in your code, you can pass it to Threaded MySQL
:param connection: Your connection socket
:return:
"""
try:
self.connection = connection
self.cursor = self.connection.cursor()
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [SUCCES] Cursor created succesfully for your connection.')
else:
echo_console('threaded_mysql: [SUCCES] Cursor created succesfully for your connection.')
self.connection_method = 2
except:
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [ERROR] Not possible to create cursor.')
else:
echo_console('threaded_mysql: [ERROR] Not possible to create cursor.')


def commit(self):
"""
Regular pymysql commit
:return:
"""
self.connection.commit()

def close(self, finish_queue_before_close=False):
"""
Closes the mysql connection
:param finish_queue_before_close: Finishes the queue before it terminates the connection
:return:
"""
if finish_queue_before_close:
while self.queue_size() > 0:
pass
else:
self.connection.close()
else: self.connection.close()


Documentation
The library works as an extension of PyMYSQL, so as any other MySQL script, a connection must be established to a database, but before we can do that, let us initialize the class for threaded MySQL.

Syntax: Select all

from threaded_mysql import ThreadedMySQL

TSQL = ThreadedMySQL()


After we have initialized our class, we can connect to our MySQL database, you can use Threaded MySQL to connect to your database.

Syntax: Select all

# Available paramenters (host, user, password, db ,charset, cursorclass)
TSQL.connect(host='localhost', user='root', password='123', db='utf8')


If you don't want to connect with Threaded MySQL you can make your connection elsewhere and pass it to Threaded MySQL as shown below with PyMYSQL:

Syntax: Select all

import pymysql.cursors

connection = pymysql.connect(host="localhost",
user="root",
password="123",
db="test",
charset="utf8",
cursorclass=pymysql.cursors.DictCursor)

TSQL.connect_use(connection)


Now that our connection has been made, we need to start the gamethread that handles the queue of queries, as seen below.

Syntax: Select all

TSQL.handlequeue_start()


Finally, now we can make use of it. The functions available are listed below

Syntax: Select all

# Different types of queries
TSQL.execute(query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
TSQL.fetchone(query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
TSQL.fetchall(query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):

# Returns the size of the queue
TSQL.queue_size()

#If you want to delay the queue for a specific amount time, 1 being 1 seconed
TSQL.wait(delay)

# Refreshes the tables
TSQL.commit()

# Closes the mysql connection
TSQL.close()


It's important to note that when using the fetchone or fetchall it will execute the query BEFORE fetching it, so no need to use TSQL.execute when you want to fetch something.

If you want to grab the data from fetchone or fetchall a callback is necessary. To demonstrate this look at the code examples below:

Code examples (Updated)

Syntax: Select all

from messages import SayText2
from events import Event
from threaded_mysql import ThreadedMySQL

# Initializes the class
TSQL = ThreadedMySQL()

# Connects to a mysql database
TSQL.connect('localhost', 'root', '123', 'my_database', 'utf8')

# Starts the queuehandler (should only be called once)
TSQL.handlequeue_start()


# The callback from !fetchone
def sql_callback(data):
name = data['name']
SayText2(name).send()


# The callback from !fetchall
def sql_callback_2(data, data_pack):
text = data_pack['text']
SayText2("You wrote {}".format(text)).send()
for x in data:
name = x['name']
SayText2('Name: {}'.format(name)).send()

# The callback from !info
def sql_callback_3(get_info):
"""
get_info includes 'query', 'time', 'prioritized'
"""
query = get_info['query']
time = get_info['time']
prio = get_info['prioritized']
SayText2('Query: {0}\nTime: {1} seconds\nPrioritized: {2}'.format(query, time, prio)).send()



@Event('player_say')
def on_player_say(game_event):
# What did the player write
text = game_event['text']

if text == '!fetchone':
# Fetches all the names
TSQL.fetchone('SELECT name FROM my_database', callback=sql_callback)

if text == '!fetchall':
# Let's pass some extra things...
data_pack = {'text': text}
# Fetches one name
TSQL.fetchall('SELECT name FROM my_database', callback=sql_callback_2, data_pack=data_pack)

if text == '!info':
# Fetches one name
TSQL.execute("INSERT INTO my_database (name) VALUES('John')", callback=sql_callback_3, get_info=True)

Output !fetchall
=> You wrote: !fetchall
=> Name: <name >
=> Name: <name >
=> (...)

Output !fetchone
=> Name: John

Output !info
=> Query: INSERT INTO stats (name) VALUES('John')
=> Time: 0.014952421188354492 seconds
=> Prioritized: False


Stress test
Set the variable use_threaded_mysql = 1 or 0 to see difference.

Syntax: Select all

from messages import SayText2
from threaded_mysql import ThreadedMySQL
import pymysql.cursors

# ON = No lag | OFF = Server freeze
use_threaded_mysql = 1

connection = pymysql.connect(host="localhost",
user="root",
password="123",
db="my_database",
charset="utf8",
cursorclass=pymysql.cursors.DictCursor)
cursor = connection.cursor()


def load():
# Put use_threaded_mysql = 1 to test the difference
if not use_threaded_mysql:

# Executes the query 1000 times
for x in range(1000):

cursor.execute('SELECT name FROM my_database')
data = cursor.fetchone()
# Prints it out (not necessary tho)
SayText2('Name: {}'.format(data['name'])).send()
else:
# Class
TSQL = ThreadedMySQL()
# Use the connection already created
TSQL.connect_use(connection)
# Starts the queuehandler
TSQL.handlequeue_start()

for x in range(1000):
TSQL.fetchone('SELECT name FROM my_database', callback=test)


def test(data):
SayText2('Name: {}'.format(data['name'])).send()




Download
Latest release and notes are on my github
https://github.com/Velocity-plus/threaded_mysql

You can even create tick listener and spam queries without any lag at all

Enjoy :)
Last edited by velocity on Fri Feb 17, 2017 9:03 pm, edited 32 times in total.
decompile
Senior Member
Posts: 267
Joined: Sat Oct 10, 2015 10:37 am
Location: Germany
Contact:

Re: Threaded MySQL

Postby decompile » Wed Feb 08, 2017 12:27 am

Thank you so much!!!!!
User avatar
Ayuto
Project Leader
Posts: 1543
Joined: Sat Jul 07, 2012 8:17 am
Location: Germany

Re: Threaded MySQL

Postby Ayuto » Wed Feb 08, 2017 1:01 pm

Actually, this isn't really threaded as Delay and Repeat are used, which are executed in the same thread. So, this will probably just delay the lag. You need to utilize GameThread to create real threads.
User avatar
velocity
Member
Posts: 52
Joined: Sat May 10, 2014 6:17 pm

Re: Threaded MySQL

Postby velocity » Wed Feb 08, 2017 2:21 pm

Ayuto wrote:Actually, this isn't really threaded as Delay and Repeat are used, which are executed in the same thread. So, this will probably just delay the lag. You need to utilize GameThread to create real threads.


First of all, it will prevent lag 100%. Delaying every single query is a good solution because every query for itself does not cause lag, the lag occurs when you have multiple queries right after each other. It's also the same way sourcemod is doing it.
I'm posting this because it helped me with my timer plugin since I have a lot of queries going on all the time and this definitely made everything run smoothly. Anyways, I will look into it.
User avatar
Doldol
Senior Member
Posts: 155
Joined: Sat Jul 07, 2012 7:09 pm
Location: Belgium

Re: Threaded MySQL

Postby Doldol » Tue Feb 14, 2017 11:59 pm

velocity wrote:
Ayuto wrote:Actually, this isn't really threaded as Delay and Repeat are used, which are executed in the same thread. So, this will probably just delay the lag. You need to utilize GameThread to create real threads.


First of all, it will prevent lag 100%. Delaying every single query is a good solution because every query for itself does not cause lag, the lag occurs when you have multiple queries right after each other. It's also the same way sourcemod is doing it.
I'm posting this because it helped me with my timer plugin since I have a lot of queries going on all the time and this definitely made everything run smoothly. Anyways, I will look into it.


It's a horrible implementation, your server will lag for the x amount of time it takes for your query to be send and the results received, which can be seconds! Like Ayuto said the proper way is to use actual Python threads, which GameThread implements.
User avatar
velocity
Member
Posts: 52
Joined: Sat May 10, 2014 6:17 pm

Re: Threaded MySQL

Postby velocity » Wed Feb 15, 2017 11:20 am

Alright!!!!!!!! I will be re-coding it now, but in my defense IT REALLY doesn't lag.
User avatar
satoon101
Project Leader
Posts: 2292
Joined: Sat Jul 07, 2012 1:59 am

Re: Threaded MySQL

Postby satoon101 » Wed Feb 15, 2017 1:52 pm

Since you posted this in the Module/Package Submissions forum, which is designed for posting modules/packages to include with SP, you should definitely take Ayuto's advice if it's going to be considered.

If you did not intend for this, and instead just wanted to post a custom package that people can download and use, that is what the Custom Packages forum is designed for. Do not start another topic there, though, just let us know if that is your intention and we can move this thread.
Image
User avatar
velocity
Member
Posts: 52
Joined: Sat May 10, 2014 6:17 pm

Re: Threaded MySQL

Postby velocity » Wed Feb 15, 2017 1:54 pm

Yes, sir, I am soon finished with the rework. I'm totally listening to all of you guys. I will post an update later here today, where I have implemented GameThread.
User avatar
velocity
Member
Posts: 52
Joined: Sat May 10, 2014 6:17 pm

Re: Threaded MySQL 2.0.0 (Updated)

Postby velocity » Wed Feb 15, 2017 4:25 pm

It should be better now.
decompile
Senior Member
Posts: 267
Joined: Sat Oct 10, 2015 10:37 am
Location: Germany
Contact:

Re: Threaded MySQL 2.0.0 (Updated)

Postby decompile » Thu Feb 16, 2017 12:20 am

Hey, I decided to give youre package a try. Currently building it up right now, but Im wondering what does 'commit_before_save' on close() mean.
Will it execute all "queries" in the queue before closing the connection or something else?
User avatar
velocity
Member
Posts: 52
Joined: Sat May 10, 2014 6:17 pm

Re: Threaded MySQL 2.0.0 (Updated)

Postby velocity » Thu Feb 16, 2017 12:48 am

decompile wrote:Hey, I decided to give youre package a try. Currently building it up right now, but Im wondering what does 'commit_before_save' on close() mean.
Will it execute all "queries" in the queue before closing the connection or something else?


Oh yes.. You are exactly right. What you are seeing is actually a mistake on my end. That has been corrected now.
decompile
Senior Member
Posts: 267
Joined: Sat Oct 10, 2015 10:37 am
Location: Germany
Contact:

Re: Threaded MySQL 2.0.0 (Updated)

Postby decompile » Thu Feb 16, 2017 1:07 am

Thanks for clarifying and keep up the good work :)
decompile
Senior Member
Posts: 267
Joined: Sat Oct 10, 2015 10:37 am
Location: Germany
Contact:

Re: Threaded MySQL 2.0.0 (Updated)

Postby decompile » Thu Feb 16, 2017 3:04 am

But it looks like its currently not possible that it returns the dict back instead of calling a callback?

Thats what I realy need.

Greetings
User avatar
velocity
Member
Posts: 52
Joined: Sat May 10, 2014 6:17 pm

Re: Threaded MySQL 2.0.0 (Updated)

Postby velocity » Thu Feb 16, 2017 9:56 am

decompile wrote:But it looks like its currently not possible that it returns the dict back instead of calling a callback?

Thats what I realy need.

Greetings


I'm not sure what you mean, but you must have a callback, since it's threaded, it cannot return directly back from where you are calling the function in your code. Can you maybe show an example of what you are trying to do?
decompile
Senior Member
Posts: 267
Joined: Sat Oct 10, 2015 10:37 am
Location: Germany
Contact:

Re: Threaded MySQL 2.0.0 (Updated)

Postby decompile » Thu Feb 16, 2017 4:00 pm

pretty much like:

playerName = TSQL.fetchone("SELECT name from users WHERE steamid=%s", (playerSteamid,))
playerTime = TSQL.fetchone("SELECT time from times WHERE steamid=%s", (playerSteamid,))

(just completly random ones)

So it either returns the dict back or None if it got no results.
User avatar
velocity
Member
Posts: 52
Joined: Sat May 10, 2014 6:17 pm

Re: Threaded MySQL 2.0.0 (Updated)

Postby velocity » Thu Feb 16, 2017 6:14 pm


Return to “Module/Package Submissions”

Who is online

Users browsing this forum: No registered users and 1 guest