| import argparse |
| import csv |
| import os |
| import numpy as np |
| import random |
| from process_pretrain_data import get_kmer_sentence |
|
|
|
|
|
|
| def Process(args): |
| path = args.file_path |
| all_folders = os.listdir(path) |
| |
| count = 0 |
|
|
| for folder in all_folders: |
| |
| train_seq_path = os.path.join(args.file_path, folder, "train", "sequences_alph.npy") |
| test_seq_path = os.path.join(args.file_path, folder, "test", "sequences_alph.npy") |
| train_lab_path = os.path.join(args.file_path, folder, "train", "targets.npy") |
| test_lab_path = os.path.join(args.file_path, folder, "test", "targets.npy") |
| train_sequences = np.load(train_seq_path) |
| test_sequences = np.load(test_seq_path) |
| train_labels = np.load(train_lab_path) |
| test_labels = np.load(test_lab_path) |
|
|
| train_sequences = train_sequences.reshape(train_sequences.shape[0],1) |
| test_sequences = test_sequences.reshape(test_sequences.shape[0],1) |
| train_labels = train_labels.reshape(train_labels.shape[0],1) |
| test_labels = test_labels.reshape(test_labels.shape[0],1) |
|
|
| |
| trains = list(np.concatenate((train_sequences, train_labels), axis=1)) |
| tests = list(np.concatenate((test_sequences, test_labels), axis=1)) |
|
|
| random.seed(24) |
| random.shuffle(trains) |
| random.shuffle(trains) |
| random.shuffle(tests) |
| random.shuffle(tests) |
|
|
|
|
| |
| output_path = os.path.join(args.output_path, str(args.kmer), folder) |
| if not os.path.exists(output_path): |
| os.makedirs(output_path) |
|
|
| |
|
|
| |
| f_train = open(os.path.join(output_path, "train.tsv"), 'wt') |
| tsv_train = csv.writer(f_train, delimiter='\t') |
| tsv_train.writerow(["sequence", "label"]) |
| for i in range(len(trains)): |
| sentence = get_kmer_sentence(trains[i][0].decode("utf-8"), args.kmer) |
| tsv_train.writerow([sentence, int(trains[i][1])]) |
|
|
| f_dev = open(os.path.join(output_path, "dev.tsv"), 'wt') |
| tsv_dev = csv.writer(f_dev, delimiter='\t') |
| tsv_dev.writerow(["sequence", "label"]) |
| for i in range(len(tests)): |
| sentence = get_kmer_sentence(tests[i][0].decode("utf-8"), args.kmer) |
| tsv_dev.writerow([sentence, int(tests[i][1])]) |
| |
|
|
| count += 1 |
| print("Finish %s folders" % (count)) |
|
|
|
|
|
|
|
|
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--kmer", |
| default=1, |
| type=int, |
| help="K-mer", |
| ) |
| parser.add_argument( |
| "--file_path", |
| default=None, |
| type=str, |
| help="The path of the file to be processed", |
| ) |
| parser.add_argument( |
| "--output_path", |
| default=None, |
| type=str, |
| help="The path of the processed data", |
| ) |
| args = parser.parse_args() |
|
|
| Process(args) |
| |
|
|
| |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|