{ "cells": [ { "cell_type": "markdown", "id": "3e7c79bb", "metadata": {}, "source": [ "# MiniCoderX Project - Full Pipeline Notebook" ] }, { "cell_type": "markdown", "id": "82aa402a", "metadata": {}, "source": [ "# Step 0: Environment Setup" ] }, { "cell_type": "code", "execution_count": null, "id": "fe661c57", "metadata": {}, "outputs": [], "source": [ "pip install -q tokenizers transformers datasets sentencepiece langchain_community ollama networkx evaluate rouge_score matplotlib seaborn lark fastapi uvicorn" ] }, { "cell_type": "markdown", "id": "7313bed0", "metadata": {}, "source": [ "# Step 1: Import and Load Model" ] }, { "cell_type": "code", "execution_count": null, "id": "2e28b42b", "metadata": {}, "outputs": [], "source": [ "from tokenizers import Tokenizer, models, trainers, pre_tokenizers\n", "from tokenizers.normalizers import Sequence, Lowercase, NFD, StripAccents\n", "from tokenizers.pre_tokenizers import Whitespace\n", "from tokenizers.processors import TemplateProcessing\n", "from transformers import PreTrainedTokenizerFast\n", "import os\n", "\n", "tokenizer = Tokenizer(models.BPE())\n", "tokenizer.normalizer = Sequence([NFD(), Lowercase(), StripAccents()])\n", "tokenizer.pre_tokenizer = Whitespace()\n", "\n", "trainer = trainers.BpeTrainer(\n", " vocab_size=32000,\n", " special_tokens=[\"\", \"\", \"\", \"\", \"\"]\n", ")\n", "\n", "data_path = \"data/code_corpus.txt\"\n", "\n", "if not os.path.exists(data_path):\n", " raise FileNotFoundError(f\"Dataset not found at: {data_path}\")\n", "else:\n", " print(\"Dataset found:\", data_path)\n", "\n", "tokenizer.train([data_path], trainer)\n", "\n", "\n", "tokenizer.post_processor = TemplateProcessing(\n", " single=\" $A \",\n", " pair=\" $A $B \",\n", " special_tokens=[\n", " (\"\", tokenizer.token_to_id(\"\")),\n", " (\"\", tokenizer.token_to_id(\"\")),\n", " ],\n", ")\n", "\n", "tokenizer_path = \"minicoderx-tokenizer\"\n", "os.makedirs(tokenizer_path, exist_ok=True)\n", "tokenizer.save(f\"{tokenizer_path}/tokenizer.json\")\n", "print(\"Tokenizer saved to:\", tokenizer_path)" ] }, { "cell_type": "code", "execution_count": null, "id": "d1ab6421", "metadata": {}, "outputs": [], "source": [ "from transformers import PreTrainedTokenizerFast\n", "\n", "hf_tokenizer = PreTrainedTokenizerFast(\n", " tokenizer_file=\"minicoderx-tokenizer/tokenizer.json\",\n", " unk_token=\"\",\n", " pad_token=\"\",\n", " cls_token=\"\",\n", " sep_token=\"\",\n", " mask_token=\"\",\n", ")\n", "\n", "hf_tokenizer.save_pretrained(\"minicoderx-tokenizer\")\n", "print(\"HuggingFace tokenizer saved and ready.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "ba28e05f", "metadata": {}, "outputs": [], "source": [ "from transformers import AutoTokenizer, AutoModelForSeq2SeqLM\n", "\n", "# Load your trained model and tokenizer\n", "tokenizer = AutoTokenizer.from_pretrained(\"minicoderx-model\")\n", "model = AutoModelForSeq2SeqLM.from_pretrained(\"minicoderx-model\")\n", "\n", "print(\"Model and tokenizer loaded.\")" ] }, { "cell_type": "markdown", "id": "852b82c3", "metadata": {}, "source": [ "# Step 2: Inference - Code Generation" ] }, { "cell_type": "code", "execution_count": null, "id": "6ab29f13", "metadata": {}, "outputs": [], "source": [ "input_text = \"Write a Python function to compute factorial\"\n", "inputs = tokenizer(input_text, return_tensors=\"pt\")\n", "outputs = model.generate(**inputs, max_length=128)\n", "print(\"\\nGenerated Code:\\n\")\n", "print(tokenizer.decode(outputs[0], skip_special_tokens=True))" ] }, { "cell_type": "markdown", "id": "e2e495b0", "metadata": {}, "source": [ "# Step 3: Structure-Aware Encoding with AST" ] }, { "cell_type": "code", "execution_count": null, "id": "c5337fe3", "metadata": {}, "outputs": [], "source": [ "import ast, networkx as nx, matplotlib.pyplot as plt, seaborn as sns\n", "\n", "def build_ast_graph_with_metadata(node, graph, parent=None):\n", " node_id = str(id(node))\n", " graph.add_node(node_id, label=type(node).__name__)\n", " if parent:\n", " graph.add_edge(parent, node_id)\n", " for child in ast.iter_child_nodes(node):\n", " build_ast_graph_with_metadata(child, graph, node_id)\n", "\n", "code_sample = \"\"\"\n", "def add(a, b):\n", " return a + b\n", "\"\"\"\n", "tree = ast.parse(code_sample)\n", "G = nx.DiGraph()\n", "build_ast_graph_with_metadata(tree, G)\n", "pos = nx.spring_layout(G)\n", "labels = nx.get_node_attributes(G, 'label')\n", "nx.draw(G, pos, labels=labels, with_labels=True, node_size=1200, node_color='lightblue')\n", "plt.title(\"AST Visualization\")\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "f732a8cf", "metadata": {}, "source": [ "# Step 4: LangChain + Ollama Integration" ] }, { "cell_type": "code", "execution_count": null, "id": "0b2c013c", "metadata": {}, "outputs": [], "source": [ "from langchain_community.llms import Ollama\n", "from langchain.chains import LLMChain\n", "from langchain.prompts import PromptTemplate\n", "\n", "llm = Ollama(model=\"minicoderx\")\n", "prompt = PromptTemplate(input_variables=[\"instruction\"], template=\"Generate Python code for the task: {instruction}\")\n", "chain = LLMChain(llm=llm, prompt=prompt)\n", "print(\"\\nLangChain-Ollama Output:\")\n", "print(chain.run(\"Create a function to reverse a string\"))" ] }, { "cell_type": "markdown", "id": "6ded4c5e", "metadata": {}, "source": [ "# Step 5: Evaluation (MBPP)" ] }, { "cell_type": "code", "execution_count": null, "id": "37f133a4", "metadata": {}, "outputs": [], "source": [ "from datasets import load_dataset\n", "import evaluate\n", "\n", "dataset = load_dataset(\"mbpp\")\n", "eval_bleu = evaluate.load(\"bleu\")\n", "eval_rouge = evaluate.load(\"rouge\")\n", "\n", "sample = dataset['test'][0]\n", "input_text = f\"Write a Python function: {sample['text']}\"\n", "inputs = tokenizer(input_text, return_tensors=\"pt\")\n", "output = model.generate(**inputs, max_length=128)\n", "generated_code = tokenizer.decode(output[0], skip_special_tokens=True)\n", "\n", "print(\"\\nEvaluation Sample Output:\\n\", generated_code)\n", "print(\"BLEU:\", eval_bleu.compute(predictions=[generated_code], references=[sample['code']]))\n", "print(\"ROUGE:\", eval_rouge.compute(predictions=[generated_code], references=[sample['code']]))" ] }, { "cell_type": "markdown", "id": "2b00a47c", "metadata": {}, "source": [ "# Step 6: Testing, Verification, and Unit Test Gen" ] }, { "cell_type": "code", "execution_count": null, "id": "a9a8ef01", "metadata": {}, "outputs": [], "source": [ "import tempfile, subprocess\n", "\n", "def run_code(code, test_case):\n", " with tempfile.NamedTemporaryFile(mode='w+', suffix='.py', delete=False) as tmp:\n", " tmp.write(code + '\\n' + test_case)\n", " tmp.flush()\n", " result = subprocess.run(['python', tmp.name], capture_output=True, text=True)\n", " print(\"Output:\\n\", result.stdout)\n", " if result.stderr:\n", " print(\"Errors:\\n\", result.stderr)\n", "\n", "test_case = \"print(factorial(5)) # Expected: 120\"\n", "run_code(generated_code, test_case)\n", "\n", "unit_prompt = PromptTemplate(input_variables=[\"code\"], template=\"Write a unittest in Python for the following function:\\n\\n{code}\")\n", "unit_chain = LLMChain(llm=llm, prompt=unit_prompt)\n", "print(\"\\nGenerated Unit Test:\\n\", unit_chain.run(code=generated_code))" ] }, { "cell_type": "markdown", "id": "9b9fcc1e", "metadata": {}, "source": [ "# Step 7: Safety and Grammar Constraints" ] }, { "cell_type": "code", "execution_count": null, "id": "5e3dd5ee", "metadata": {}, "outputs": [], "source": [ "from lark import Lark, UnexpectedInput\n", "\n", "python_grammar = \"\"\"\n", "start: stmt+\n", "stmt: \"def\" NAME \"(\" [params] \")\" \":\" suite\n", "params: NAME (\",\" NAME)*\n", "suite: NEWLINE INDENT stmt+ DEDENT | simple_stmt\n", "simple_stmt: NAME \"=\" expr NEWLINE\n", "expr: atom | atom operator atom\n", "atom: NAME | NUMBER\n", "operator: \"+\" | \"-\" | \"*\" | \"/\"\n", "%import common.CNAME -> NAME\n", "%import common.NUMBER\n", "%import common.NEWLINE\n", "%import common.WS_INLINE\n", "%import common.INDENT\n", "%import common.DEDENT\n", "%ignore WS_INLINE\n", "\"\"\"\n", "\n", "parser = Lark(python_grammar, parser=\"lalr\")\n", "\n", "unsafe_keywords = [\"os.system\", \"subprocess\", \"eval\", \"exec\", \"open(\", \"import socket\"]\n", "print(\"\\nSafety Check:\")\n", "print(\"Unsafe pattern found\" if any(k in generated_code for k in unsafe_keywords) else \"Code is safe\")\n", "\n", "print(\"\\nGrammar Check:\")\n", "try:\n", " parser.parse(generated_code)\n", " print(\"Code grammar is valid.\")\n", "except UnexpectedInput as e:\n", " print(\"Grammar error:\", e)" ] }, { "cell_type": "markdown", "id": "8b1e2b86", "metadata": {}, "source": [ "# Step 8: Multi-Task Preprocessing (gen, sum, trans)" ] }, { "cell_type": "code", "execution_count": null, "id": "09a12f1d", "metadata": {}, "outputs": [], "source": [ "def preprocess_multitask(example):\n", " if example['task'] == 'gen':\n", " input_text = f\"Write code: {example['text']}\"\n", " output_text = example['code']\n", " elif example['task'] == 'sum':\n", " input_text = f\"Summarize this code: {example['code']}\"\n", " output_text = example['text']\n", " elif example['task'] == 'trans':\n", " input_text = f\"Translate Java to Python: {example['java']}\"\n", " output_text = example['python']\n", " else:\n", " input_text, output_text = example['text'], example['code']\n", " model_input = tokenizer(input_text, max_length=128, truncation=True)\n", " with tokenizer.as_target_tokenizer():\n", " labels = tokenizer(output_text, max_length=128, truncation=True)\n", " model_input['labels'] = labels['input_ids']\n", " return model_input" ] }, { "cell_type": "markdown", "id": "6018db4c", "metadata": {}, "source": [ "# Step 9: Fine-Tuning Setup" ] }, { "cell_type": "code", "execution_count": null, "id": "c15ad38d", "metadata": {}, "outputs": [], "source": [ "from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments, DataCollatorForSeq2Seq\n", "\n", "train_dataset = dataset[\"train\"].map(preprocess_multitask, remove_columns=dataset[\"train\"].column_names)\n", "val_dataset = dataset[\"validation\"].map(preprocess_multitask, remove_columns=dataset[\"validation\"].column_names)\n", "\n", "training_args = Seq2SeqTrainingArguments(\n", " output_dir=\"./minicoderx-finetuned\",\n", " evaluation_strategy=\"epoch\",\n", " learning_rate=5e-5,\n", " per_device_train_batch_size=8,\n", " per_device_eval_batch_size=8,\n", " weight_decay=0.01,\n", " save_total_limit=2,\n", " num_train_epochs=3,\n", " predict_with_generate=True,\n", " logging_dir=\"./logs\",\n", " logging_steps=10,\n", ")\n", "\n", "data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)\n", "trainer = Seq2SeqTrainer(\n", " model=model,\n", " args=training_args,\n", " train_dataset=train_dataset,\n", " eval_dataset=val_dataset,\n", " tokenizer=tokenizer,\n", " data_collator=data_collator,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "28d3dcb1", "metadata": {}, "outputs": [], "source": [ "# Uncomment to run training\n", "# trainer.train()\n", "# trainer.save_model(\"./minicoderx-finetuned\")" ] }, { "cell_type": "markdown", "id": "8c4d0d79", "metadata": {}, "source": [ "# Step 10: Deploy with FastAPI" ] }, { "cell_type": "code", "execution_count": null, "id": "3f239ed4", "metadata": {}, "outputs": [], "source": [ "from fastapi import FastAPI\n", "from pydantic import BaseModel\n", "import uvicorn\n", "\n", "app = FastAPI()\n", "\n", "class CodeRequest(BaseModel):\n", " instruction: str\n", "\n", "@app.post(\"/generate\")\n", "def generate_code(req: CodeRequest):\n", " inputs = tokenizer(req.instruction, return_tensors=\"pt\")\n", " outputs = model.generate(**inputs, max_length=128)\n", " code = tokenizer.decode(outputs[0], skip_special_tokens=True)\n", " return {\"code\": code}" ] }, { "cell_type": "code", "execution_count": null, "id": "33ec10a2", "metadata": {}, "outputs": [], "source": [ "# Uncomment to run API\n", "# uvicorn.run(app, host=\"0.0.0.0\", port=8000)" ] } ], "metadata": { "kernelspec": { "display_name": "myenv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.16" } }, "nbformat": 4, "nbformat_minor": 5 }