|
- import csv
- import sqlite3
- import time
- import numpy as np
- import pandas as pd
- import tensorflow as tf
- import itertools
- import keras
- from numpy.random import seed
- from pandas import DataFrame
- from sklearn.model_selection import KFold
- from sklearn.decomposition import PCA
- from sklearn.metrics import auc
- from sklearn.metrics import roc_auc_score
- from sklearn.metrics import accuracy_score
- from sklearn.metrics import recall_score
- from sklearn.metrics import f1_score
- from sklearn.metrics import precision_score
- from sklearn.metrics import precision_recall_curve
- from sklearn.ensemble import RandomForestClassifier
- from sklearn.preprocessing import label_binarize
- from sklearn.svm import SVC
- from sklearn.linear_model import LogisticRegression
- from sklearn.neighbors import KNeighborsClassifier
- from sklearn.ensemble import GradientBoostingClassifier
- from keras.models import Model
- from keras.layers import Dense, Dropout, Input, Activation, Flatten, Conv2D, MaxPooling2D,BatchNormalization,add,ZeroPadding2D,AveragePooling2D,GlobalAveragePooling2D
- from keras.callbacks import EarlyStopping
- from keras.models import Sequential
- from keras.wrappers.scikit_learn import KerasClassifier, KerasRegressor
- from keras.utils import np_utils
- from keras.utils.vis_utils import plot_model
- from sklearn.model_selection import cross_val_score, train_test_split, GridSearchCV ,StratifiedKFold
- from sklearn.preprocessing import LabelEncoder
- from keras.models import model_from_json,Model
- from keras import regularizers
- import matplotlib.pyplot as plt
- from sklearn.metrics import confusion_matrix
- from sklearn import svm
- from keras.callbacks import Callback,History
-
- seed(1)
- event_num = 99
- droprate = 0.3
- vector_size = 740
-
-
- def resnet(width,height,channel,classes):
- inpt = Input(shape=(width, height, channel))
- x = ZeroPadding2D((2, 2))(inpt)
-
- #conv1
- x = Conv2d_BN(x, nb_filter=64, kernel_size=(2, 2), strides=(2, 2), padding='valid')
- x = MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same')(x)
-
- #conv2_x
- x = identity_Block(x, nb_filter=64, kernel_size=(2, 2))
- x = identity_Block(x, nb_filter=64, kernel_size=(2, 2))
- x = identity_Block(x, nb_filter=64, kernel_size=(2, 2))
- x = Dropout(droprate)(x)
- #conv3_x
- x = identity_Block(x, nb_filter=128, kernel_size=(2, 2), strides=(2, 2), with_conv_shortcut=True)
- x = identity_Block(x, nb_filter=128, kernel_size=(2, 2), strides=(2, 2))
- x = identity_Block(x, nb_filter=128, kernel_size=(2, 2))
- x = identity_Block(x, nb_filter=128, kernel_size=(2, 2))
- x = Dropout(droprate)(x)
- #conv4_x
- x = identity_Block(x, nb_filter=256, kernel_size=(2, 2), strides=(2, 2), with_conv_shortcut=True)
- x = identity_Block(x, nb_filter=256, kernel_size=(2, 2))
- x = identity_Block(x, nb_filter=256, kernel_size=(2, 2))
- x = identity_Block(x, nb_filter=256, kernel_size=(2, 2))
- x = identity_Block(x, nb_filter=256, kernel_size=(2, 2))
- x = identity_Block(x, nb_filter=256, kernel_size=(2, 2))
- x = Dropout(droprate)(x)
- #conv5_x
- x = identity_Block(x, nb_filter=512, kernel_size=(2, 2), strides=(2, 2), with_conv_shortcut=True)
- x = identity_Block(x, nb_filter=512, kernel_size=(2, 2))
- x = identity_Block(x, nb_filter=512, kernel_size=(2, 2))
-
- x = Dropout(droprate)(x)
- x = GlobalAveragePooling2D()(x)
- x = Flatten()(x)
- x = Dense(classes, activation='softmax')(x)
- model = Model(inputs=inpt, outputs=x)
- plot_model(model, to_file='./model_classifier.png', show_shapes=True)
- print(model.summary())
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
- return model
- def identity_Block(inpt, nb_filter, kernel_size, strides=(1, 1), with_conv_shortcut=False):
- x = Conv2d_BN(inpt, nb_filter=nb_filter, kernel_size=kernel_size, strides=strides, padding='same')
- x = Conv2d_BN(x, nb_filter=nb_filter, kernel_size=kernel_size, padding='same')
- if with_conv_shortcut:
- shortcut = Conv2d_BN(inpt, nb_filter=nb_filter, strides=strides, kernel_size=kernel_size)
- x = add([x, shortcut])
- return x
- else:
- x = add([x, inpt])
- return x
-
- def Conv2d_BN(x, nb_filter, kernel_size, strides=(1, 1), padding='same', name=None):
- if name is not None:
- bn_name = name + '_bn'
- conv_name = name + '_conv'
- else:
- bn_name = None
- conv_name = None
-
- x = Conv2D(nb_filter, kernel_size, padding=padding, strides=strides, activation='relu', name=conv_name)(x)
- x = BatchNormalization(axis=3, name=bn_name)(x)
- return x
-
-
-
-
- def bottleneck_Block(inpt,nb_filters,strides=(1,1),with_conv_shortcut=False):
- k1,k2,k3=nb_filters
- x = Conv2d_BN(inpt, nb_filter=k1, kernel_size=1, strides=strides, padding='same')
- x = Conv2d_BN(x, nb_filter=k2, kernel_size=3, padding='same')
- x = Conv2d_BN(x, nb_filter=k3, kernel_size=1, padding='same')
- if with_conv_shortcut:
- shortcut = Conv2d_BN(inpt, nb_filter=k3, strides=strides, kernel_size=1)
- x = add([x, shortcut])
- return x
- else:
- x = add([x, inpt])
- return x
-
-
- # 卷积网络可视化
- def visual(model, data, num_layer=1):
- # data:图像array数据
- # layer:第n层的输出
- layer = keras.backend.function([model.layers[0].input], [model.layers[num_layer].output])
- f1 = layer([data])[0]
- print(f1.shape)
- num = f1.shape[-1]
- print(num)
- plt.figure(figsize=(8, 8))
- for i in range(num):
- plt.subplot(int(np.ceil(np.sqrt(num))), int(np.ceil(np.sqrt(num))), i+1)
- plt.imshow(f1[:, :, i] * 255, cmap='gray')
- plt.axis('off')
- plt.show()
-
-
- # 混淆矩阵定义
- def plot_confusion_matrix(cm, classes, title='Confusion matrix', cmap=plt.cm.jet):
- cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
- plt.imshow(cm, interpolation='nearest', cmap=cmap)
- plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
- plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
- plt.colorbar()
- tick_marks = np.arange(len(classes))
- plt.xticks(tick_marks, ('0%', '3%', '5%', '8%', '10%', '12%', '15%', '18%', '20%', '25%'))
- plt.yticks(tick_marks, ('0%', '3%', '5%', '8%', '10%', '12%', '15%', '18%', '20%', '25%'))
- thresh = cm.max() / 2.
- for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
- plt.text(j, i, '{:.2f}'.format(cm[i, j]), horizontalalignment="center",
- color="white" if cm[i, j] > thresh else "black")
- plt.tight_layout()
- plt.ylabel('真实类别')
- plt.xlabel('预测类别')
- plt.savefig('test_xx.png', dpi=200, bbox_inches='tight', transparent=False)
- plt.show()
-
-
-
-
- def prepare(df_drug, feature_list, vector_size,mechanism,action,drugA,drugB):
- d_label = {}
- d_feature = {}
- # Transfrom the interaction event to number
- # Splice the features
- d_event=[]
- for i in range(len(mechanism)):
- d_event.append(mechanism[i]+" "+action[i])
- label_value = 0
- count={}
- for i in d_event:
- if i in count:
- count[i]+=1
- else:
- count[i]=1
- list1 = sorted(count.items(), key=lambda x: x[1],reverse=True)
- for i in range(len(list1)):
- d_label[list1[i][0]]=i
- vector = np.zeros((len(np.array(df_drug['name']).tolist()), 0), dtype=float)
- for i in feature_list:
- vector = np.hstack((vector, feature_vector(i, df_drug, vector_size)))
- # Transfrom the drug ID to feature vector
-
- for i in range(len(np.array(df_drug['name']).tolist())):
- d_feature[np.array(df_drug['name']).tolist()[i]] = vector[i]
- # Use the dictionary to obtain feature vector and label
- new_feature = []
- new_label = []
- name_to_id = {}
- for i in range(len(d_event)):
- new_feature.append(np.vstack((d_feature[drugA[i]], d_feature[drugB[i]]))) # 2 * 572
-
- new_label.append(d_label[d_event[i]])
-
- new_feature = np.array(new_feature)
- new_label = np.array(new_label)
- print('new_feature', new_feature.shape)
- return (new_feature, new_label, event_num)
-
-
- def feature_vector(feature_name, df, vector_size):
- # df are the 572 kinds of drugs
- # Jaccard Similarity
- def Jaccard(matrix):
- matrix = np.mat(matrix)
- numerator = matrix * matrix.T
- denominator = np.ones(np.shape(matrix)) * matrix.T + matrix * np.ones(np.shape(matrix.T)) - matrix * matrix.T
- return numerator / denominator
-
- all_feature = []
-
-
- drug_list = np.array(df[feature_name]).tolist()
-
- # Features for each drug, for example, when feature_name is target, drug_list=["P30556|P05412","P28223|P46098|……"]
- for i in drug_list:
- for each_feature in i.split('|'):
- if each_feature not in all_feature:
- all_feature.append(each_feature) # obtain all the features
- feature_matrix = np.zeros((len(drug_list), len(all_feature)), dtype=float)
- df_feature = DataFrame(feature_matrix, columns=all_feature) # Consrtuct feature matrices with key of dataframe
- for i in range(len(drug_list)):
- for each_feature in df[feature_name].iloc[i].split('|'):
- df_feature[each_feature].iloc[i] = 1
- sim_matrix = Jaccard(np.array(df_feature))
- print(np.array(df_feature).shape)
- sim_matrix1 = np.array(sim_matrix)
- count = 0
- pca = PCA(n_components=vector_size) # PCA dimension
- pca.fit(sim_matrix)
- sim_matrix = pca.transform(sim_matrix)
- return sim_matrix
-
-
- def get_index(label_matrix, event_num, seed, CV):
- index_all_class = np.zeros(len(label_matrix))
- for j in range(event_num):
- index = np.where(label_matrix == j)
- kf = KFold(n_splits=CV, shuffle=True, random_state=seed)
- k_num = 0
- for train_index, test_index in kf.split(range(len(index[0]))):
- index_all_class[index[0][test_index]] = k_num
- k_num += 1
-
- return index_all_class
-
-
- def cross_validation(feature_matrix, label_matrix, clf_type, event_num, seed, CV, set_name):
-
- all_eval_type = 11
- result_all = np.zeros((all_eval_type, 1), dtype=float)
- each_eval_type = 6
- result_eve = np.zeros((event_num, each_eval_type), dtype=float)
- y_true = np.array([])
- y_pred = np.array([])
- y_score = np.zeros((0, event_num), dtype=float)
- index_all_class = get_index(label_matrix, event_num, seed, CV)
- matrix = []
- if type(feature_matrix) != list:
- matrix.append(feature_matrix)
- # =============================================================================
- # elif len(np.shape(feature_matrix))==3:
- # for i in range((np.shape(feature_matrix)[-1])):
- # matrix.append(feature_matrix[:,:,i])
- # =============================================================================
- feature_matrix = matrix
- for k in range(CV):
- train_index = np.where(index_all_class != k)
- test_index = np.where(index_all_class == k)
- pred = np.zeros((len(test_index[0]), event_num), dtype=float)
- # dnn=DNN()
- for i in range(len(feature_matrix)):
- x_train = feature_matrix[i][train_index]
- x_test = feature_matrix[i][test_index]
- y_train = label_matrix[train_index]
- # one-hot encoding
- y_train_one_hot = np.array(y_train)
- y_train_one_hot = (np.arange(y_train_one_hot.max() + 1) == y_train[:, None]).astype(dtype='float32')
- y_test = label_matrix[test_index]
- # one-hot encoding
- y_test_one_hot = np.array(y_test)
- y_test_one_hot = (np.arange(y_test_one_hot.max() + 1) == y_test[:, None]).astype(dtype='float32')
- if clf_type == 'TMDII':
- dnn = resnet(2, vector_size, 1, event_num)
- early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=0, mode='auto')
- dnn.fit(x_train, y_train_one_hot, batch_size=128, epochs=100, validation_data=(x_test, y_test_one_hot),callbacks=[early_stopping])
- pred += dnn.predict(x_test)
- np.set_printoptions(suppress=False)
- print("预测值:",pred)
- continue
- elif clf_type == 'RF':
- clf = RandomForestClassifier(n_estimators=100)
- elif clf_type == 'GBDT':
- clf = GradientBoostingClassifier()
- elif clf_type == 'SVM':
- clf = SVC(probability=True)
- elif clf_type == 'FM':
- clf = GradientBoostingClassifier()
- elif clf_type == 'KNN':
- clf = KNeighborsClassifier(n_neighbors=4)
- else:
- clf = LogisticRegression()
- clf.fit(x_train, y_train)
- pred += clf.predict_proba(x_test)
- pred_score = pred / len(feature_matrix)
- pred_type = np.argmax(pred_score, axis=1)
- y_true = np.hstack((y_true, y_test))
- y_pred = np.hstack((y_pred, pred_type))
- y_score = np.row_stack((y_score, pred_score))
- dnn.save('enzyme_model_weight_{}.h5'.format(k))
- result_all, result_eve = evaluate(y_pred, y_score, y_true, event_num, set_name)
- # =============================================================================
- # a,b=evaluate(pred_type,pred_score,y_test,event_num)
- # for i in range(all_eval_type):
- # result_all[i]+=a[i]
- # for i in range(each_eval_type):
- # result_eve[:,i]+=b[:,i]
- # result_all=result_all/5
- # result_eve=result_eve/5
- # =============================================================================
- return result_all, result_eve
-
-
- def evaluate(pred_type, pred_score, y_test, event_num, set_name):
- all_eval_type = 11
- result_all = np.zeros((all_eval_type, 1), dtype=float)
- each_eval_type = 6
- result_eve = np.zeros((event_num, each_eval_type), dtype=float)
- y_one_hot = label_binarize(y_test, classes=np.arange(event_num))
- pred_one_hot = label_binarize(pred_type, classes=np.arange(event_num))
-
- precision, recall, th = multiclass_precision_recall_curve(y_one_hot, pred_score)
-
- result_all[0] = accuracy_score(y_test, pred_type)
- result_all[1] = roc_aupr_score(y_one_hot, pred_score, average='micro')
- result_all[2] = roc_aupr_score(y_one_hot, pred_score, average='macro')
- result_all[3] = roc_auc_score(y_one_hot, pred_score, average='micro')
- result_all[4] = roc_auc_score(y_one_hot, pred_score, average='macro')
- result_all[5] = f1_score(y_test, pred_type, average='micro')
- result_all[6] = f1_score(y_test, pred_type, average='macro')
- result_all[7] = precision_score(y_test, pred_type, average='micro')
- result_all[8] = precision_score(y_test, pred_type, average='macro')
- result_all[9] = recall_score(y_test, pred_type, average='micro')
- result_all[10] = recall_score(y_test, pred_type, average='macro')
- for i in range(event_num):
- result_eve[i, 0] = accuracy_score(y_one_hot.take([i], axis=1).ravel(), pred_one_hot.take([i], axis=1).ravel())
- result_eve[i, 1] = roc_aupr_score(y_one_hot.take([i], axis=1).ravel(), pred_one_hot.take([i], axis=1).ravel(),
- average=None)
- result_eve[i, 2] = roc_auc_score(y_one_hot.take([i], axis=1).ravel(), pred_one_hot.take([i], axis=1).ravel(),
- average=None)
- result_eve[i, 3] = f1_score(y_one_hot.take([i], axis=1).ravel(), pred_one_hot.take([i], axis=1).ravel(),
- average='binary')
- result_eve[i, 4] = precision_score(y_one_hot.take([i], axis=1).ravel(), pred_one_hot.take([i], axis=1).ravel(),
- average='binary')
- result_eve[i, 5] = recall_score(y_one_hot.take([i], axis=1).ravel(), pred_one_hot.take([i], axis=1).ravel(),
- average='binary')
- return [result_all, result_eve]
-
-
- def self_metric_calculate(y_true, pred_type):
- y_true = y_true.ravel()
- y_pred = pred_type.ravel()
- if y_true.ndim == 1:
- y_true = y_true.reshape((-1, 1))
- if y_pred.ndim == 1:
- y_pred = y_pred.reshape((-1, 1))
- y_true_c = y_true.take([0], axis=1).ravel()
- y_pred_c = y_pred.take([0], axis=1).ravel()
- TP = 0
- TN = 0
- FN = 0
- FP = 0
- for i in range(len(y_true_c)):
- if (y_true_c[i] == 1) and (y_pred_c[i] == 1):
- TP += 1
- if (y_true_c[i] == 1) and (y_pred_c[i] == 0):
- FN += 1
- if (y_true_c[i] == 0) and (y_pred_c[i] == 1):
- FP += 1
- if (y_true_c[i] == 0) and (y_pred_c[i] == 0):
- TN += 1
- print("TP=", TP, "FN=", FN, "FP=", FP, "TN=", TN)
- return (TP / (TP + FP), TP / (TP + FN))
-
-
- def multiclass_precision_recall_curve(y_true, y_score):
- y_true = y_true.ravel()
- y_score = y_score.ravel()
- if y_true.ndim == 1:
- y_true = y_true.reshape((-1, 1))
- if y_score.ndim == 1:
- y_score = y_score.reshape((-1, 1))
- y_true_c = y_true.take([0], axis=1).ravel()
- y_score_c = y_score.take([0], axis=1).ravel()
- precision, recall, pr_thresholds = precision_recall_curve(y_true_c, y_score_c)
- return (precision, recall, pr_thresholds)
-
-
- def roc_aupr_score(y_true, y_score, average="macro"):
- def _binary_roc_aupr_score(y_true, y_score):
- precision, recall, pr_thresholds = precision_recall_curve(y_true, y_score)
- return auc(recall, precision)
-
- def _average_binary_score(binary_metric, y_true, y_score, average): # y_true= y_one_hot
- if average == "binary":
- return binary_metric(y_true, y_score)
- if average == "micro":
- y_true = y_true.ravel()
- y_score = y_score.ravel()
- if y_true.ndim == 1:
- y_true = y_true.reshape((-1, 1))
- if y_score.ndim == 1:
- y_score = y_score.reshape((-1, 1))
- n_classes = y_score.shape[1]
- score = np.zeros((n_classes,))
- for c in range(n_classes):
- y_true_c = y_true.take([c], axis=1).ravel()
- y_score_c = y_score.take([c], axis=1).ravel()
- score[c] = binary_metric(y_true_c, y_score_c)
- return np.average(score)
-
- return _average_binary_score(_binary_roc_aupr_score, y_true, y_score, average)
-
-
- def drawing(d_result, contrast_list, info_list):
- column = []
- for i in contrast_list:
- column.append(i)
- df = pd.DataFrame(columns=column)
- if info_list[-1] == 'aupr':
- for i in contrast_list:
- df[i] = d_result[i][:, 1]
- else:
- for i in contrast_list:
- df[i] = d_result[i][:, 2]
- df = df.astype('float')
- color = dict(boxes='DarkGreen', whiskers='DarkOrange', medians='DarkBlue', caps='Gray')
- df.plot.box(ylim=[0, 1.0], grid=True, color=color)
- return 0
-
-
- def save_result(feature_name, result_type, clf_type, result):
- with open(feature_name + '_' + result_type + '_' + clf_type+ '.csv', "w", newline='') as csvfile:
- writer = csv.writer(csvfile)
- for i in result:
- writer.writerow(i)
- return 0
-
-
- def main(args):
-
- seed = 0
- CV = 5
- df_drug = pd.read_csv('/content/drive/MyDrive/drugdataset/test740/approved_druglist_740.csv')
-
-
- feature_list = args['featureList']
- feature_list = ['enzyme']
- featureName="+".join(feature_list)
- clf_list = args['classifier']
- for feature in feature_list:
- set_name = feature + '+'
- set_name = set_name[:-1]
- result_all = {}
- result_eve = {}
- all_matrix = []
- drugList=[]
-
- extraction = pd.read_csv('/content/drive/MyDrive/drugdataset/test740/approved_druglist_740_interaction.csv')
- mechanism = extraction['mechanism']
- action = extraction['action']
- drugA = extraction['drugA']
- drugB = extraction['drugB']
-
- for feature in feature_list:
- print(feature)
- new_feature, new_label, event_num = prepare(df_drug, [feature], vector_size, mechanism,action,drugA,drugB)
- all_matrix.append(new_feature)
-
- start = time.clock()
- #
- for clf in clf_list:
- print(clf)
- all_result, each_result = cross_validation(all_matrix, new_label, clf, event_num, seed, CV,
- set_name)
- # # =============================================================================
- # # save_result('all_nosim','all',clf,all_result)
- # # save_result('all_nosim','eve',clf,each_result)
- # # =============================================================================
- save_result(featureName, 'all', clf, all_result)
- save_result(featureName, 'each', clf, each_result)
- result_all[clf] = all_result
- result_eve[clf] = each_result
- print("time used:", time.clock() - start)
-
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument("-f","--featureList",default=["smile"],help="features to use",nargs="+")
- parser.add_argument("-c","--classifier",choices=["TMDDI","RF","KNN","LR"],default=["TMDII"],help="classifiers to use",nargs="+")
- parser.add_argument("-p","--NLPProcess",choices=["read","process"],default="read",help="Read the NLP extraction result directly or process the events again")
- args=vars(parser.parse_args())
- print(args)
- main(args)
-
|