Code source de crocrodile.nn.basics_train

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Crocrodile Training.

Back to basics.

:author: Virinas-code and ZeBox
"""
from __future__ import annotations

import csv
import datetime
import json
import random
import sys

import chess
import crocrodile
import crocrodile.nn
import matplotlib.pyplot as plt
import numpy
from crocrodile.cli import Progress

NoneType = type(None)

LAYERS_COUNT = 31
MAX_ITERS = 50000
SYMETRY_MATRIX = numpy.array(
    [
        [
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
        [
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            1.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
        ],
    ]
)
LAYERS = 5


[docs]class BasicsTrain: """ Basics train - class for training Crocrodile. :author: @ZeBox and Virinas-code """ def __init__(self): """ Initialize training. :param self: Current BasicsTrain object. """ self.config: dict = json.loads(open("basics_train.json").read()) self.neural_networks: list[crocrodile.nn.NeuralNetwork] = []
[docs] @staticmethod def array_to_csv(array, csv_path): """Write array in csv_path CSV file.""" array = list(array) with open(csv_path, "w", newline="") as file: writer = csv.writer(file) writer.writerows(array) file.close() return 0
[docs] def ask(self) -> dict: """ Ask for inputs. :param self: Current BasicsTrain object. :type self: BasicsTrain :return: Good moves file. :rtype: str """ if "-i" in sys.argv or "--input" in sys.argv: good_moves_file: str = input("Good moves file : ") mutation_rate: float = float(input("Mutation rate : ")) mutation_change: float = float(input("Mutation change : ")) min_bad_moves: int = int(input("Minimum performance on bad moves : ")) self.config["good_moves"] = good_moves_file self.config["mutation_rate"] = mutation_rate self.config["mutation_change"] = mutation_change self.config["min_bad_moves"] = min_bad_moves open("basics_train.json", "w").write(json.dumps(self.config)) return self.config
[docs] @staticmethod def parse_good_moves(good_moves_file: str) -> list: """ Parse good moves in good_moves_file. good_moves_file is only a file path. :param good_moves_file: Path to the good moves file. :type good_moves_file: str :return: The list of FENs + good move. :rtype: list """ good_moves_content = open(good_moves_file).read().split("\n\n") # Remove \n at the end good_moves_content[-1] = good_moves_content[-1][:-1] good_moves_list = list() for move in good_moves_content: if move in good_moves_list: continue good_moves_list.append(move) return good_moves_list
[docs] @staticmethod def generate_bad_moves(good_move_pos: str, good_moves_list, bad_moves_list): """ Generate bad moves for position. :param good_move_pos: Good move in position (FEN + good move) :type good_move_pos: str """ result = list() position = chess.Board(good_move_pos.split("\n")[0]) for move in position.legal_moves: generated_position = position.fen() + "\n" + move.uci() if ( generated_position not in good_moves_list and generated_position not in bad_moves_list ): result.append(generated_position) return result
[docs] def generate(self) -> None: """ Generate empty networks and save them. :return: None :rtype: None """ self.neural_networks.clear() number = int(input("Population : ")) open("nns/population.dat", "w").write(str(number)) progress = Progress() progress.total = number progress.text = "Creating networks" for loop in range(number): progress.update(loop) self.neural_networks.append(crocrodile.nn.NeuralNetwork()) progress.done() progress.text = "Generating empty matrixes" for loop in range(number): progress.update(loop) self.neural_networks[loop].generate() progress.done() self.config["iterations_done"] = 0 open("basics_train.json", "w").write(json.dumps(self.config)) self.save()
[docs] def save(self) -> None: """ Save neural networks to nns/ folder. :return: None """ progress = Progress() progress.total = len(self.neural_networks) progress.text = "Saving networks" for loop in range(len(self.neural_networks)): progress.update(loop) self.neural_networks[loop].save(loop) progress.done()
[docs] def load(self) -> None: """ Load neural networks from nns/ folder. :return: None """ self.neural_networks.clear() progress = Progress() progress.text = "Loading population" number = int(open("nns/population.dat", "r").read()) progress.text = "Creating network objects" progress.total = number for loop in range(number): progress.update(loop) self.neural_networks.append(crocrodile.nn.NeuralNetwork()) progress.done() progress.text = "Loading networks" for loop in range(number): progress.update(loop) self.neural_networks[loop].load_layers(loop) progress.done() for indice in range(len(self.neural_networks)): self.neural_networks[indice].indice = indice
[docs] def couple_pawns( self, matrix1: numpy.ndarray, matrix2: numpy.ndarray ) -> numpy.ndarray: """ Couple two pawn matrixes. :param numpy.ndarray matrix1: First matrix to couple. :param numpy.ndarray matrix2: Second matrix to couple. :return: A new matrix. :rtype: numpy.ndarray """ mutation_change = self.config["mutation_change"] inverse_rate = 100 / self.config["mutation_rate"] choose_matrix: numpy.ndarray = numpy.zeros(matrix1.shape) direction: bool = bool(random.getrandbits(1)) choose: bool = bool(random.getrandbits(1)) try: len2 = len(choose_matrix[0]) except TypeError: matrix1 = numpy.array([matrix1]) matrix2 = numpy.array([matrix2]) choose_matrix: numpy.ndarray = numpy.zeros(matrix1.shape) len2 = len(choose_matrix[0]) except IndexError: matrix1 = numpy.array([[matrix1]]) matrix2 = numpy.array([[matrix2]]) choose_matrix: numpy.ndarray = numpy.zeros(matrix1.shape) len2 = len(choose_matrix[0]) for line in range(len(choose_matrix)): for column in range(len2): if random.random() < 0.001: choose: bool = not choose fill: int = int(choose) if direction: choose_matrix[line][column] = fill else: try: choose_matrix[column][line] = fill except IndexError: choose_matrix[line][column] = fill result = (matrix1 * choose_matrix + matrix2 * (1 - choose_matrix)) + ( numpy.random.rand(*matrix1.shape) * (2 * mutation_change) - mutation_change ) * numpy.heaviside( numpy.random.rand(*matrix1.shape) * inverse_rate + (1 - inverse_rate), 0 ) return 0.5 * (result + result @ SYMETRY_MATRIX)
[docs] def couple_pieces( self, matrix1: numpy.ndarray, matrix2: numpy.ndarray ) -> numpy.ndarray: """ Couple two pieces matrixes. :param numpy.ndarray matrix1: First matrix to couple. :param numpy.ndarray matrix2: Second matrix to couple. :return: A new matrix. :rtype: numpy.ndarray """ mutation_change = self.config["mutation_change"] inverse_rate = 100 / self.config["mutation_rate"] choose_matrix: numpy.ndarray = numpy.zeros(matrix1.shape) direction: bool = bool(random.getrandbits(1)) choose: bool = bool(random.getrandbits(1)) try: len2 = len(choose_matrix[0]) except TypeError: matrix1 = numpy.array([matrix1]) matrix2 = numpy.array([matrix2]) choose_matrix: numpy.ndarray = numpy.zeros(matrix1.shape) len2 = len(choose_matrix[0]) except IndexError: matrix1 = numpy.array([[matrix1]]) matrix2 = numpy.array([[matrix2]]) choose_matrix: numpy.ndarray = numpy.zeros(matrix1.shape) len2 = len(choose_matrix[0]) for line in range(len(choose_matrix)): for column in range(len2): if random.random() < 0.001: choose: bool = not choose fill: int = int(choose) if direction: choose_matrix[line][column] = fill else: try: choose_matrix[column][line] = fill except IndexError: choose_matrix[line][column] = fill result = (matrix1 * choose_matrix + matrix2 * (1 - choose_matrix)) + ( numpy.random.rand(*matrix1.shape) * (2 * mutation_change) - mutation_change ) * numpy.heaviside( numpy.random.rand(*matrix1.shape) * inverse_rate + (1 - inverse_rate), 0 ) return 0.25 * ( result + result @ SYMETRY_MATRIX + SYMETRY_MATRIX @ result + SYMETRY_MATRIX @ result @ SYMETRY_MATRIX )
[docs] def couple(self, matrix1: numpy.ndarray, matrix2: numpy.ndarray) -> numpy.ndarray: """ Couple two matrixes. :param numpy.ndarray matrix1: First matrix to couple. :param numpy.ndarray matrix2: Second matrix to couple. :return: A new matrix. :rtype: numpy.ndarray """ mutation_change = self.config["mutation_change"] inverse_rate = 100 / self.config["mutation_rate"] choose_matrix: numpy.ndarray = numpy.zeros(matrix1.shape) direction: bool = bool(random.getrandbits(1)) choose: bool = bool(random.getrandbits(1)) try: len2 = len(choose_matrix[0]) except TypeError: matrix1 = numpy.array([matrix1]) matrix2 = numpy.array([matrix2]) choose_matrix: numpy.ndarray = numpy.zeros(matrix1.shape) len2 = len(choose_matrix[0]) except IndexError: matrix1 = numpy.array([[matrix1]]) matrix2 = numpy.array([[matrix2]]) choose_matrix: numpy.ndarray = numpy.zeros(matrix1.shape) len2 = len(choose_matrix[0]) for line in range(len(choose_matrix)): for column in range(len2): if random.random() < 0.001: choose: bool = not choose fill: int = int(choose) if direction: choose_matrix[line][column] = fill else: try: choose_matrix[column][line] = fill except IndexError: choose_matrix[line][column] = fill return (matrix1 * choose_matrix + matrix2 * (1 - choose_matrix)) + ( numpy.random.rand(*matrix1.shape) * (2 * mutation_change) - mutation_change ) * numpy.heaviside( numpy.random.rand(*matrix1.shape) * inverse_rate + (1 - inverse_rate), 0 )
[docs] def couple_networks(self, worst_network: int, network1: int, network2: int) -> None: """ Couple two networks. :param int network1: First network indice :param int network2: Second network indice. :return: Nothing. :rtype: None. """ for layer_indice in range(LAYERS): self.neural_networks[worst_network].w_pawns[ layer_indice ] = self.couple_pawns( self.neural_networks[network1].w_pawns[layer_indice], self.neural_networks[network2].w_pawns[layer_indice], ) self.neural_networks[worst_network].b_pawns[ layer_indice ] = self.couple_pawns( self.neural_networks[network1].b_pawns[layer_indice], self.neural_networks[network2].b_pawns[layer_indice], ) self.neural_networks[worst_network].w_pieces[ layer_indice ] = self.couple_pieces( self.neural_networks[network1].w_pieces[layer_indice], self.neural_networks[network2].w_pieces[layer_indice], ) self.neural_networks[worst_network].b_pieces[ layer_indice ] = self.couple_pieces( self.neural_networks[network1].b_pieces[layer_indice], self.neural_networks[network2].b_pieces[layer_indice], ) self.neural_networks[worst_network].w_pawns[-1] = self.couple( self.neural_networks[network1].w_pawns[-1], self.neural_networks[network2].w_pawns[-1], ) self.neural_networks[worst_network].b_pawns[-1] = self.couple( self.neural_networks[network1].b_pawns[-1], self.neural_networks[network2].b_pawns[-1], ) self.neural_networks[worst_network].w_last = self.couple( self.neural_networks[network1].w_last, self.neural_networks[network2].w_last ) self.neural_networks[worst_network].b_last = self.couple( self.neural_networks[network1].b_last, self.neural_networks[network2].b_last )
[docs] def train( self, new_good_move: str, new_bad_moves: str, param_good_moves: list, param_bad_moves: list, ) -> float: """ Train neural networks. :return: Mean performance at end. :rtype: float """ perf_graph = [] dist_graph = [] best_graph = [] worst_graph = [] def sprint(value): centered = value.center(18) print("********** {0} **********".format(centered)) inverse_rate = 100 / self.config["mutation_rate"] mutation_change = self.config["mutation_change"] sprint("Initialize") iters = 0 tests_results = list() population = len(self.neural_networks) # First original testing. for loop in range(population): print(f"Testing networks... ({loop}/{population})", end="\r", flush=True) on_good_moves, on_bad_moves = self.neural_networks[ loop ].test_full_multiprocesses(param_good_moves, param_bad_moves) success = ( ( (on_good_moves / len(param_good_moves)) - (1 - (on_bad_moves / len(param_bad_moves))) ) * (on_good_moves / len(param_good_moves)) * 100 ) self.neural_networks[loop].result = success self.neural_networks[loop].perfs = (on_good_moves, on_bad_moves) print("Testing networks... Done. ") # https://stackoverflow.com/questions/16225677/get-the-second-largest-number-in-a-list-in-linear-time while True: # input("Press Enter to continue...") iters += 1 print(f"########## Session #{len(param_good_moves)} ##########") print( f"Bad moves: {len(param_bad_moves)} / Good moves: {len(param_good_moves)}" ) sprint("Training #{0}".format(iters)) print("Selecting best networks...", end=" ", flush=True) self.neural_networks: list[crocrodile.nn.NeuralNetwork] = sorted( self.neural_networks, key=lambda sub: sub.result, reverse=True ) for indice in range(len(self.neural_networks)): self.neural_networks[indice].indice = indice maxis_indices = [0, 1, 2, 3] minis_indices = [ population - 1, population - 2, population - 3, population - 4, ] print("Done.") print( f"Worst networks : {', '.join(repr(nn) for nn in self.neural_networks[-4:])}" ) print( f"Best networks : {', '.join(repr(nn) for nn in self.neural_networks[:4])}" ) for network_indice in range(1): print( f"Coupling network #{network_indice + 1}... (selecting second network)", end="\r", flush=True, ) rand = random.randint(2, population - 3) second_network = rand # print(f"Coupling network {self.neural_networks[maxis_indices[network_indice]]} with {self.neural_networks[second_network]} to {self.neural_networks[minis_indices[network_indice]]}") print( f"Coupling network #{network_indice + 1}... (generating coupling matrixes)", end="\r", flush=True, ) self.couple_networks( minis_indices[network_indice], second_network, maxis_indices[network_indice], ) print( f"Coupling network #{network_indice + 1}... (testing) ", end="\r", flush=True, ) on_good_moves, on_bad_moves = self.neural_networks[ minis_indices[network_indice] ].test_full(param_good_moves, param_bad_moves) success = ( ( (on_good_moves / len(param_good_moves)) - (1 - (on_bad_moves / len(param_bad_moves))) ) * (on_good_moves / len(param_good_moves)) * 100 ) self.neural_networks[loop].result = success self.neural_networks[loop].perfs = (on_good_moves, on_bad_moves) # print(f"New result: {self.neural_networks[minis_indices[network_indice]]}") print( f"Coupling network #{network_indice + 1}... Done. ", end="\r", flush=True, ) print("Coupling networks... Done. ") perf_sum = 0 for loop in range(population): perf_sum += self.neural_networks[loop].result print(f"Mean performance : {(perf_sum / population)}") self.neural_networks: list[crocrodile.nn.NeuralNetwork] = sorted( self.neural_networks, key=lambda sub: sub.result, reverse=True ) for indice in range(len(self.neural_networks)): self.neural_networks[indice].indice = indice print(f"perf: {self.neural_networks[4].perfs}") # patch-003 print(f"gdmvs: {self.neural_networks[4].perfs[0] / len(param_good_moves)}") print( f"bdmvs: {(self.neural_networks[4].perfs[1] / len(param_bad_moves)) * 100}" ) # patch-003 if ( self.neural_networks[4].perfs[0] / len(param_good_moves) >= 1 and (self.neural_networks[4].perfs[1] / len(param_bad_moves)) * 100 > self.config["min_bad_moves"] ): # patch-003 break # :) if iters >= MAX_ITERS: break # Prevent complex moves if iters % 10 == 0: self.save() best_graph.append(self.neural_networks[0].result) worst_graph.append(self.neural_networks[-2].result) perf_graph.append(perf_sum / population) dist_graph.append( self.neural_networks[0].result - self.neural_networks[-2].result ) l = range(iters) fig, axs = plt.subplots(2, 2) axs[0, 0].plot(l, perf_graph) axs[0, 0].set_title("Mean performance") axs[0, 1].plot(l, best_graph) axs[0, 1].set_title("Best network") axs[1, 0].plot(l, worst_graph) axs[1, 0].set_title("Worst network") axs[1, 1].plot(l, dist_graph) axs[1, 1].set_title("Difference") for ax in axs.flat: ax.set(xlabel="Iteration", ylabel="Performance") # Hide x labels and tick labels for top plots and y ticks for right plots. for ax in axs.flat: ax.label_outer() fig.savefig("nns/graph.png") del fig del axs print("Saving tests result...", end=" ", flush=True) saved_results = list() for element in tests_results: saved_results.append([float(element)]) self.array_to_csv(saved_results, "nns/results.csv") print("Done.") perf_sum = 0 for loop in range(population): perf_sum += self.neural_networks[loop].result best_graph.close() worst_graph.close() perf_graph.close() dist_graph.close() return perf_sum / population
[docs] def main(self, argv): """Start training.""" self.ask() performance_output_file = ("nns/log/" + str(datetime.datetime.now()) + ".log").replace(":", ".") with open(performance_output_file, "w") as file: file.write("") good_moves_file = self.config["good_moves"] good_moves_list = self.parse_good_moves(good_moves_file) good_moves_train = list() bad_moves_list = list() bad_moves_train: list = [] if "-n" in argv or "--new-networks" in argv: self.generate() sys.exit(0) else: self.load() first_train = True for good_move in good_moves_list: good_moves_train.append(good_move) print(f"########## Session #{len(good_moves_train)} ##########") new_bad_moves = self.generate_bad_moves( good_move, good_moves_list, bad_moves_list ) bad_moves_list.extend(new_bad_moves) bad_moves_train.extend(new_bad_moves) print( f"Bad moves: {len(bad_moves_train)} / Good moves: {len(good_moves_train)}" ) print("Training...", end="\r") if len(good_moves_train) > self.config["iterations_done"]: if first_train: random.shuffle(bad_moves_train) while len(bad_moves_train) > self.config["max_bad_moves"]: bad_moves_train.pop() random.shuffle(good_moves_train) while len(good_moves_train) > self.config["max_good_moves"]: good_moves_train.pop() """progress = Progress() progress.text = "Testing networks" progress.total = len(self.neural_networks) for network_indice in range(len(self.neural_networks)): progress.update(network_indice) self.neural_networks[network_indice].test_full(good_moves_train, bad_moves_train) progress.done()""" first_train = False with open(performance_output_file, "a") as file: file.write( str( self.train( good_move, new_bad_moves, good_moves_train, bad_moves_train, ) ) + "\n" ) if len(good_moves_train) % 10 == 0: self.save() self.config["iterations_done"] = len(good_moves_train) open("basics_train.json", "w").write(json.dumps(self.config))
[docs]def main(argv): """ Start function called in init. :param argv: sys.argv :type argv: list :return: None :rtype: None """ trainer = BasicsTrain() # Create trainer object trainer.main(argv) # Start trainer
if __name__ == "__main__": main(sys.argv)