AiPE

[R&E] 5. PLX-DAQ 대체 소프트웨어 개발 : Python으로 아두이노 Serial 통신 값 받아 Excel에 작성하기 본문

[XiBBaL] Development Project/인터넷 강의 배속 시 집중력 곡선의 분석 및 해석

[R&E] 5. PLX-DAQ 대체 소프트웨어 개발 : Python으로 아두이노 Serial 통신 값 받아 Excel에 작성하기

Oshimaker XiBBaL 2023. 2. 8. 23:31
반응형

본 연구에서 사용하는 기능에 한해 PLX-DAQ보다 안정적이고 빠른 소프트웨어를 개발해 PLX-DAQ를 완전히 대체해야 한다. 앞선 편에 언급했듯이 연구 마감 때문에 3일 내로 개발해야 한다.

 

지금 생각하면 3일 내로 만드는건 좀 미친짓이었지만 그때는 하도 급해서 그냥 하겠다고 했다. 말 그대로 밥먹으면서도 코딩했다. 그 때가 학교 성적 미달로 기숙사 선발에서 탈락한 때라 지하철을 타고 통학했는데, 아침7시, 밤10시에 지하철을 타면서도 코딩을 했다.

 

나도 이렇게 급하게 하고 싶지 않았는데 .. 이 소프트웨어를 개발하고자 마음을 먹기까지의 과정은 아래 포스팅을 참고하자. 이 포스팅의 전 편이다.

https://xibbal-lab.tistory.com/47

 

[R&E] 4. Serial 통신으로 받은 값 PLX-DAQ로 엑셀에 저장

대망의 PLX-DAQ 편이다. 결론부터 말하자면 대차게 망했는데, 어떻게 디버그 하려고 노력했으며 왜 PLX-DAQ를 경유한 방식을 버렸는지에 초점을 맞추고 작성해보겠다. 미리 말하지만 PLX-DAQ 프로그램

xibbal-lab.tistory.com

 

 

 

1. 소프트웨어 구상

본 연구에서 사용하는 기능은 아래와 같이 요약할 수 있다.

  1. 아두이노 UNO 보드에서 출력되는 Serial 규격의 데이터를 받아 시간과 함께 Excel 매크로 문서에 작성 (데이터 형태 변환 과정 필요)
  2. 실시간으로 심전도의 극댓값 (\(R\)값)의 누적 개수를 연산해 터미널에 표시
  3. 아두이노에서 잘못된 자료형 (String 등)이 출력되었을 경우에 예외처리

위 기능을 모두 포함하는 Python 소프트웨어를 "최대한 사양이 낮은 컴퓨터에서도 실행 가능하게" 최적화해 만들어보려고 한다.

 

우리 팀이 가지고 있는 컴퓨터가 고사양이 아닐 뿐더러, 초당 측정하는 ECG의 개수가 많을수록 실험의 정확도가 올라가기 때문에 코드를 최대한 간단하게 짜 초당 많은 양을 찍어내도록 최적화하는 것이 관건이다.

 

 

 

2. 코딩

일단 아래와 같은 라이브러리들을 사용하겠다.

  • Numpy : Python이 계산과학 및 수학 분야에서 이용될 때 핵심 역할을 하는 라이브러리. 일반적으로 대규모 다차원 배열을 쉽게 처리할 수 있도록 지원하는 역할을 수행한다.
  • Pandas : Numpy와 연계하여 데이터를 효과적으로 처리할 수 있도록 도와주는 라이브러리. 인덱스에 따라 데이터를 나열하므로 Dictionary 자료형에 가까운 Series를 기본 자료형으로 사용함.
  • Serial : Python이 Serial 직렬통신을 받을 수 있도록 하는 역할.
  • Time : Python에서 시간 연산을 처리.
  • Re : Python에서 일치하는 문자열 집합을 지정하는 역할.
  • Threading : 저수준 Thread 모듈 위에 고수준 Threading 인터페이스를 구축하는 기능 제공. 병목 작업을 동시에 실행하는 경우 사용.
  • Openpyxl : Python에서 Excel문서를 작성하고 수정하며 값을 가져올 수 있도록 구현된 라이브러리.

 

 

 

기본적인 알고리즘은 아래와 같다.

간단한 프로그램 같아도 생각보다 구조가 복잡하다

단순히 엑셀에 DATA를 저장하는 기능만 가지고 있는 것이 아니라, 실시간으로 "R 값"을 추출해 개수를 세는 기능이 함께 들어가 있다. 따라서 순서도도 약간 복잡해졌다.

 

맨 처음에는 프로그램을 가볍게 하기 위해 Real_Excel 변수에 모든 "Excel 기록 대기중인 데이터"를 때려넣고 피실험자의 측정이 종료된 후 한꺼번에 저장하려고 했다.

하지만 피실험자의 상태 등 여러 요인으로 인해 실험에 소요될 시간을 정확히 예측하기가 어렵다는 문제가 있었다. 방금 말 한대로 하면 중간에 파이썬이 중지되었을 때, 데이터가 모두 날아가게 되는데, 프로그램이 끊겨도 끊기기 직전까지의 데이터를 살려둘 필요가 있었다. 따라서 코드를 중간에 종료해도 데이터가 날아가지 않도록 "매번 데이터가 들어올 때 마다 Excel에 직접 기록하기"로 결정했다.

 

따라서 "데이터 변환", "R값 세서 터미널에 출력", "데이터를 Excel에 작성"하는 과정이 모두 실시간으로 일어나게 된다.

 

 

 

1차로 완성된 코드는 다음과 같다.

(PC의 경우 만약 코드블럭 안의 내용이 까만 글씨로 보인다면 화면 우측 하단의 달 모양 버튼을 눌러 다크모드를 종료해주자. 코드블럭 안의 글씨가 다크/화이트 모드에 따라 자동으로 바뀌지 않는 오류가 있다. 조만간 고쳐보겠다.)

 

import numpy as np
import pandas as pd
import serial
import time 
import re 
import threading 
import openpyxl

ECG = serial.Serial("COM6", 9600)

def ECGAlgorithm(a, b):
              
     if a - b > 70 and a - b < 400: 
        return 1
     else:
          return 0
start_time = time.time() 
val_time_arr = [] 
val_ECG_arr = [] 

i = 0
while (i <=100000):   
    if ECG.readable():
        ECG_analog = str(ECG.readline())
        
        data_ECG = re.findall("\d+", ECG_analog)
        
        val_ECG = float('{0}'.format(data_ECG[0]))
        val_ECG_arr.insert(i, val_ECG) 

        now_time = time.time()

        val_time_arr.insert(i, now_time - start_time)
        i += 1

        COUNT=0
        #print(f'ECG = {val_ECG},time={now_time-start_time}')
        for i in range(0, len(val_ECG_arr)-1):
            COUNT+=ECGAlgorithm(val_ECG_arr[i], val_ECG_arr[i+1])
           
        print  (COUNT)   
        
Real_excel={'심전도':val_ECG_arr,
                '시간': val_time_arr}
Real_excel=pd.DataFrame(Real_excel)
Real_excel.to_excel(excel_writer='결과물 이름 입력.xlsx')

 

이번 R&E 프로젝트가 난생 처음 Python으로 코딩을 다뤄본 프로젝트라 (230212 현재로부터 약 1년반 전) 코드가 깔끔하지가 않다. 내 실력이 늘었다는 것을 체감할 수 있는 부분이었다.

 

 

코드에 대한 자세한 설명은 어차피 앞의 순서도에서 다 했으니 생략하도록 하겠다.

원래 <코딩 중간 보고서>에서는 위 순서도 대신 줄글로 설명했는데, 이 포스팅을 쓰면서 순서도로 정리해놓고 나니 딱히 줄글이 필요없어졌다.

 

줄글을 보고싶으면 이 포스팅 시리즈의 11편에 가면 <코딩 중간 보고서>를 통째로 올려둔 파일이 있는데, 그 파일을 참고하면 될 것 같다.

 

 

 

 

3. 디버그

3.1. 초당 측정되는 심전도 개수가 줄어드는 문제

앞에서 말했듯이 초당 심전도값을 최대한 많이 찍어낼수록 실험의 정확도가 올라간다. 또한, 일정 개수 이하로 찍어낸 심전도 값들은 "\(R\)값"이 명확하게 표현되지 않아 심각한 실험상의 오류를 야기할 수 있다.

 

예를 들어, 1초에 50번을 심전도를 측정하면 정의역의 개수가 충분해 \(R\)값까지 그래프가 올라갔다가 떨어지는 파형을 잘 나타낼 수 있지만, 정의역이 띄엄띄엄해지다보면 실제 \(R\)값이 그래프상에 표현되지 않아 Return 1이 실행되지 않아 Count값이 올라가지 않을 수 있다.

 

테스트 결과, Python 코드를 중간에 멈춰도 기록이 되도록 Real_excel 이하의 코드를 while문 안에 넣으면 Excel에 적힌 (엄밀히 말하면 Excel로 송출되는) 심전도 값의 초당 개수가 지속적으로 감소하는 버그가 발생한다. 이는 Python 언어의 설계 구조 때문에 발생하는 버그인데, C나 Java등 다른 언어를 사용하는 것 외에는 해결할 방법이 없다고 판단했다.

 

Python은 기본적으로 인터프리터 언어이므로 명령어를 한 줄 한줄 코드 실행 중간에 읽어 번역하여 사용한다. 초당 100개의 데이터를 쓰는 작업에서 이러한 구조는 비효율적이며, Excel에 데이터를 쓰는 작업은 Python의 처리 속도를 느리게 만든다. 따라서 Python에서는 Real_excel 이하의 구문을 while문 바깥으로 빼야 한다.

 

 

*이 부분이 당시에 코드를 짜면서도 상당히 많이 고민했던 부분인데, 작성 코드를 while문 안에 넣자니 전반적인 속도가 느려지고, 작성 코드를 while문 바깥에 넣자니 중간에 프로그램을 종료할 수 없다는 불편함이 발생했기 때문이다.

 

지금 생각하면 다른 해결방법이 있었다. 당시에는 코딩 초보라 다른 방법을 생각해내질 못했었다.

 

  • 파이썬 실행 중 터미널에 "Stop and Save"를 입력하면 현재까지의 결과를 Excel에 저장하고 종료
  • Excel에 작성하는 것이 부담이라면, 메모장에 .txt파일로 일단 작성하고 \(i=n\)일 때까지 실행 후 한꺼번에 Excel에 저장. (txt파일을 버퍼로 사용한다는 개념) 

 

이정도 방법이 있었을 것 같다. 아마 실제 코드에 적용하게 될 방법은 "Stop and Save" 구문을 미리 설정해놓는 방식일 듯 하다.

 

 

 

3.2. Arduino에서 float이 아닌 값이 송출되는 현상

앞에서 서술했듯이 Arduino는 불안정하다. 아두이노는 숫자열만을 출력하도록 코딩되어있지만, 아두이노 및 AD8232의 불안정성과 정보 전송의 부정확으로 인해 낮은 확률로 문자열이 출력되는 경우가 있다.

 

이 경우, val_ECG = float('{0}'.format(data_ECG[0]))함수가 이 값을 처리하지 못하므로 “index out of range” 오류가 발생하게 된다.

 

이를 해결하기 위하여 받아오는 값이 숫자열인 경우 위의 코드를 실행하고, 문자열이거나 문자열과 숫자열이 섞여 있는 경우 해당 데이터를 continue 처리(해당 데이터를 삭제 처리하고 다음 데이터로 넘어간다는 의미) 하는 방식으로 코드를 수정했다.

 

 

 

3.3. Arduino 코드 최적화

아두이노에 업로드했던 코드 중에 버그가 발생할 수 있을 만한 부분은 모두 삭제하고 최대한 깔끔하고 간단하게 수정했다. 물론 이렇게 수정했다고 해서 3.1.과 3.2.의 문제가 해결되지는 않지만, 그래도 약간의 성능 향상과 예기치 못한 버그를 방지하기 위해서.. 

int sensorA = A0;    
int sensorA_value = 0;

void setup() {
  Serial.begin(9600);
  pinMode(10, INPUT); 
  pinMode(11, INPUT);
}

void loop() {

   sensorA_value = analogRead(sensorA);                        

  Serial.println(sensorA_value); 

  delay(10);
}

 

 

 

 

 

3.1.과 3.2의 두 가지 문제를 수정한 파이썬 코드는은 다음과 같다.

import numpy as np
import pandas as pd
import serial
import time 
import re 
import threading 
import openpyxl

ECG = serial.Serial("COM6", 9600)

def ECGAlgorithm(a, b):
              
     if a - b > 70 and a - b < 400: 
        return 1
     else:
          return 0
start_time = time.time() 
val_time_arr = [] 
val_ECG_arr = [] 

i = 0
while (i <=100000):   
    if ECG.readable():
        ECG_analog = str(ECG.readline())
        
        data_ECG = re.findall("\d+", ECG_analog)
        val_ECG = data_ECG[0]
        
    # if str(type(val_ECG)) == "<class 'int'>":
            
        val_ECG_arr.insert(i, val_ECG)

        now_time = time.time()

        val_time_arr.insert(i, now_time - start_time)
        i += 1
        COUNT=0
            #print(f'ECG = {val_ECG},time={now_time-start_time}')
        for i in range(0, len(val_ECG_arr)-1):
                COUNT+=ECGAlgorithm(int(val_ECG_arr[i]), int(val_ECG_arr[i+1]))
           
        print  (COUNT)

    

Real_excel={'심전도':val_ECG_arr,
                '시간': val_time_arr}
Real_excel=pd.DataFrame(Real_excel)
Real_excel.to_excel(excel_writer='결과물 이름 입력.xlsx')
 

위의 일련의 과정을 거치면 실험 대상자의 심전도 log가 생성된다.

 

위 log는 나중에 단순선형회귀분석을 통해 분석하게 된다. 단순선형회귀분석(Simple Linear Regression)을 실행하기 위해서는 log의 값 중 배속에 따른 "R값"만을 추출해야 한다. 데이터의 양이 몇십만개에 육박하므로 Python으로 "R값"만을 추출해주는 프로그램을 따로 개발하였다.

 

이 프로그램의 개발은 다음 편에 계속 써보도록 하겠다.

반응형