{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "4b15d9c3", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt" ] }, { "cell_type": "code", "execution_count": null, "id": "4e9129a5", "metadata": {}, "outputs": [], "source": [ "# Do not try to mask unused channels to optimize the code: we have tried it and it was in fact COUNTER-PRODUCTIVE.\n", "# Python is the bottleneck with 8 channels, not numpy, and it does not matter whether we use all 8 or 0 channels.\n", "\n", "def shift_numpy(arr, num, fill_value=np.nan):\n", " result = np.empty_like(arr)\n", " if num > 0:\n", " result[:num] = fill_value\n", " result[num:] = arr[:-num]\n", " elif num < 0:\n", " result[num:] = fill_value\n", " result[:num] = arr[-num:]\n", " else:\n", " result[:] = arr\n", " return result\n", "\n", "\n", "class FIR:\n", " def __init__(self, nb_channels, coefficients, buffer=None):\n", " \n", " self.coefficients = np.expand_dims(np.array(coefficients), axis=1)\n", " self.taps = len(self.coefficients)\n", " self.nb_channels = nb_channels\n", " self.buffer = np.array(z) if buffer is not None else np.zeros((self.taps, self.nb_channels))\n", " \n", " def filter(self, x):\n", " self.buffer = shift_numpy(self.buffer, 1, x)\n", " filtered = np.sum(self.buffer * self.coefficients, axis=0)\n", " return filtered\n", "\n", " \n", "class FilterPipeline:\n", " def __init__(self, nb_channels, power_line_fq=60):\n", " self.nb_channels = nb_channels\n", " assert power_line_fq in [50, 60], f\"The only supported power line frequencies are 50Hz and 60Hz\"\n", " if power_line_fq == 60:\n", " self.notch_coeff1 = -0.12478308884588535\n", " self.notch_coeff2 = 0.98729186796473023\n", " self.notch_coeff3 = 0.99364593398236511\n", " self.notch_coeff4 = -0.12478308884588535\n", " self.notch_coeff5 = 0.99364593398236511\n", " else:\n", " self.notch_coeff1 = -0.61410695998423581\n", " self.notch_coeff2 = 0.98729186796473023\n", " self.notch_coeff3 = 0.99364593398236511\n", " self.notch_coeff4 = -0.61410695998423581\n", " self.notch_coeff5 = 0.99364593398236511\n", " self.dfs = [np.zeros(self.nb_channels), np.zeros(self.nb_channels)]\n", " \n", " self.moving_average = None\n", " self.moving_variance = np.zeros(self.nb_channels)\n", " self.ALPHA_AVG = 0.1\n", " self.ALPHA_STD = 0.001\n", " self.EPSILON = 0.000001\n", " \n", " self.fir_30_coef = [\n", " 0.001623780150148094927192721215192250384,\n", " 0.014988684599373741992978104065059596905,\n", " 0.021287595318265635502275046064823982306,\n", " 0.007349500393709578957568417933998716762,\n", " -0.025127515717112181709014251396183681209,\n", " -0.052210507359822452833064687638398027048,\n", " -0.039273839505489904766477593511808663607,\n", " 0.033021568427940004020193498490698402748,\n", " 0.147606943281569008563636202779889572412,\n", " 0.254000252034505602516389899392379447818,\n", " 0.297330876398883392486283128164359368384,\n", " 0.254000252034505602516389899392379447818,\n", " 0.147606943281569008563636202779889572412,\n", " 0.033021568427940004020193498490698402748,\n", " -0.039273839505489904766477593511808663607,\n", " -0.052210507359822452833064687638398027048,\n", " -0.025127515717112181709014251396183681209,\n", " 0.007349500393709578957568417933998716762,\n", " 0.021287595318265635502275046064823982306,\n", " 0.014988684599373741992978104065059596905,\n", " 0.001623780150148094927192721215192250384]\n", " self.fir = FIR(self.nb_channels, self.fir_30_coef)\n", " \n", " def filter(self, value):\n", " \"\"\"\n", " value: a numpy array of shape (data series, channels)\n", " \"\"\"\n", " for i, x in enumerate(value): # loop over the data series\n", " # FIR:\n", " x = self.fir.filter(x)\n", " # notch:\n", " denAccum = (x - self.notch_coeff1 * self.dfs[0]) - self.notch_coeff2 * self.dfs[1]\n", " x = (self.notch_coeff3 * denAccum + self.notch_coeff4 * self.dfs[0]) + self.notch_coeff5 * self.dfs[1]\n", " self.dfs[1] = self.dfs[0]\n", " self.dfs[0] = denAccum\n", " # standardization:\n", " if self.moving_average is not None:\n", " delta = x - self.moving_average\n", " self.moving_average = self.moving_average + self.ALPHA_AVG * delta\n", " self.moving_variance = (1 - self.ALPHA_STD) * (self.moving_variance + self.ALPHA_STD * delta**2)\n", " moving_std = np.sqrt(self.moving_variance)\n", " x = (x - self.moving_average) / (moving_std + self.EPSILON)\n", " else:\n", " self.moving_average = x\n", " value[i] = x\n", " return value" ] }, { "cell_type": "code", "execution_count": null, "id": "80fc186e", "metadata": {}, "outputs": [], "source": [ "duration = 1\n", "fsample = 250\n", "f1 = 15\n", "f2 = 50\n", "f3 = 60\n", "f4 = 100\n", "f5 = 70\n", "f6 = 80\n", "f7 = 90\n", "scale = 4.0e-5\n", "\n", "w1 = 2*np.pi*f1\n", "w2 = 2*np.pi*f2\n", "w3 = 2*np.pi*f3\n", "w4 = 2*np.pi*f4\n", "w5 = 2*np.pi*f5\n", "w6 = 2*np.pi*f6\n", "w7 = 2*np.pi*f7\n", "nb_samples = int(duration*fsample)\n", "\n", "sig1 = np.array([np.sin(w1*i/fsample) for i in range(nb_samples)])\n", "sig2 = np.array([np.sin(w2*i/fsample) for i in range(nb_samples)])\n", "sig3 = np.array([np.sin(w3*i/fsample) for i in range(nb_samples)])\n", "sig4 = np.array([np.sin(w4*i/fsample) for i in range(nb_samples)])\n", "sig5 = np.array([np.sin(w5*i/fsample) for i in range(nb_samples)])\n", "sig6 = np.array([np.sin(w6*i/fsample) for i in range(nb_samples)])\n", "sig7 = np.array([np.sin(w7*i/fsample) for i in range(nb_samples)])\n", "sig8 = sig1 + sig2 + sig3 + sig4 + sig5 + sig6 + sig7\n", "\n", "v = np.array([sig1, sig2, sig3, sig4, sig5, sig6, sig7, sig8]).T * scale\n", "\n", "mask = [0,0,0,0,0,0,0,1]\n", "\n", "v.shape" ] }, { "cell_type": "code", "execution_count": null, "id": "b974a851", "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "\n", "plt.figure(figsize=(20,5))\n", "plt.plot(v[:, 7])" ] }, { "cell_type": "code", "execution_count": null, "id": "d2b7145c", "metadata": {}, "outputs": [], "source": [ "import time\n", "print(mask)\n", "fp = FilterPipeline(nb_channels=8, power_line_fq=60)\n", "\n", "ts = time.time()\n", "v = fp.filter(v)\n", "print(time.time() - ts)" ] }, { "cell_type": "code", "execution_count": null, "id": "70235c0a", "metadata": {}, "outputs": [], "source": [ "plt.figure(figsize=(20,10))\n", "plt.plot(v[:, 7])" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 5 }