Processor

아파치 웹(Web)에서 소켓을 이용한 라즈베리파이 LED와 온습도 센서(DHT)

작성자 임베디드코리아 작성일26-06-04 22:08 조회63회 댓글0건
$ python -m pip config set global.break-system-packages true
$ pip install adafruit-circuitpython-dht

■ 아파치 서버
---<  index.html  >---------------------------------------------
<!DOCTYPE html>
<html>
<head>
 
  <title>LED 제어 및 온습도 모니터링</title>
  <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
  <script src="https://cdn.socket.io/4.0.0/socket.io.min.js"></script>
</head>
<body>
  <h3>LED 제어</h3>
  <label><input type="radio" name="led" value="on"> ON</label>
  <label><input type="radio" name="led" value="off"> OFF</label>
  <p id="status">LED 상태: 알 수 없음</p>
  <h3>온습도 모니터링</h3>
  <p id="th">온도: --℃ / 습도: --%</p>
  <script>
    //const socket = io(location.protocol + '//' + location.hostname + ':5000');
    const socket = io("http://172.30.1.58:5000"); // Flask-SocketIO 서버 주소
    // 접속 시 LED 상태 요청 및 온습도 요청
    socket.on("connect", function() {
      socket.emit("get_led_status");
      socket.emit("get_temperature_humidity");
    });
    // LED 상태 수신
    socket.on("led_status", function(data) {
      console.log("LED 상태:", data);
      $("#status").text("LED 상태: " + data.state);
      $("input[name='led'][value='" + data.state + "']").prop("checked", true);
    });
    // LED 상태 변경 요청
    $("input[name='led']").change(function() {
      const state = $(this).val();
      socket.emit("led_control", { state: state });
    });
    // 온습도 요청
    setInterval(function(){
      socket.emit("get_temperature_humidity_status");
    }, 2000);
 
    // 온습도 수신
    socket.on("temperature_humidity_status", function(data) {
      console.log("온습도 수신:", data);
      $("#th").text("온도: " + data.temp + "℃ / 습도: " + data.hum + "%");
    });
  </script>
</body>
</html>
--------------------------------------------------------------------

■ 라즈베리파이 Python
---<  flask_Socket_DHT.py  >---------------------------------
import time
import threading
from flask import Flask, request
from flask_socketio import SocketIO
import RPi.GPIO as GPIO
import board
import adafruit_dht

# --- 글로벌 변수 ---
latest_temp = None
latest_hum = None

# --- GPIO 설정 ---
LED_PIN = 17
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED_PIN, GPIO.OUT)

# --- Flask-SocketIO 설정 ---
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="eventlet")

# --- DHT22 센서 설정 ---
dht_device = adafruit_dht.DHT22(board.D18)

# --- LED 상태 반환 함수 ---
def get_led_state():
    return "on" if GPIO.input(LED_PIN) else "off"

# --- LED 제어 처리 ---
@socketio.on("led_control")
def control_led(data):
    state = data.get("state")
    if state == "on":
        GPIO.output(LED_PIN, GPIO.HIGH)
    elif state == "off":
        GPIO.output(LED_PIN, GPIO.LOW)
    state = get_led_state()
    print(f"led 상태 : {state}")
    socketio.emit("led_status", {"state": state})

# --- LED 상태 요청 처리 ---
@socketio.on("get_led_status")
def handle_status_request():
    state = get_led_state()
    print(f"led 상태 : {state}")
    socketio.emit("led_status", {"state": state}, room=request.sid)

# --- 온습도 상태 요청 처리 (유니캐스트) ---
@socketio.on("get_temperature_humidity_status")
def send_temperature_humidity_status():
    if latest_temp is not None and latest_hum is not None:
        socketio.emit("temperature_humidity_status", {
            "temp": latest_temp,
            "hum": latest_hum
        }, room=request.sid)
    else:
        socketio.emit("temperature_humidity_status", {
            "temp": "N/A",
            "hum": "N/A"
        }, room=request.sid)

# --- 센서 측정 스레드 ---
def temperature_monitor_thread():
    global latest_temp, latest_hum
    while True:
        try:
            temp = dht_device.temperature
            hum = dht_device.humidity
            if hum is not None and temp is not None:
                latest_temp = round(temp, 1)
                latest_hum = round(hum, 1)
                print(f"센서 측정: {latest_temp}℃ / {latest_hum}%")
            else:
                print("센서 데이터 없음")
        except RuntimeError as e:
            print("센서 에러:", e.args[0])
        except Exception as e:
            dht_device.exit()
            raise e
        time.sleep(2)

# --- 센서 스레드 시작 ---
def start_sensor_thread():
    t = threading.Thread(target=temperature_monitor_thread)
    t.daemon = True
    t.start()

# --- 메인 ---
if __name__ == "__main__":
    print("서버 시작")
    start_sensor_thread()
    socketio.run(app, host="172.30.1.58", port=5000)
-------------------------------------------------------------------------