news 2026/6/12 6:26:04

社区公告推送小程序,输入公告信息,自动推送给小区用户,支持点赞和评论,解决社区公告传达不及时的问题。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
社区公告推送小程序,输入公告信息,自动推送给小区用户,支持点赞和评论,解决社区公告传达不及时的问题。

我将为您创建一个完整的社区公告推送小程序。以下是完整的项目结构:

项目结构

community_announcement_system/

├── main.py # 主程序入口

├── models.py # 数据模型模块

├── announcement_manager.py # 公告管理核心模块

├── user_manager.py # 用户管理模块

├── notification_service.py # 推送通知服务模块

├── database.py # 数据库操作模块

├── utils.py # 工具函数模块

├── config.py # 配置文件

├── templates.py # HTML模板模块

├── static/

│ ├── style.css # 样式文件

│ └── script.js # JavaScript文件

├── data/

│ └── community_data.json # 数据存储文件

├── README.md # 使用说明

├── requirements.txt # 依赖包列表

└── demo.py # 演示程序

1. requirements.txt

flask==2.3.3

flask-sqlalchemy==3.0.5

flask-login==0.6.3

werkzeug==2.3.7

jinja2==3.1.2

wtforms==3.0.1

email-validator==2.0.0

python-dateutil==2.8.2

sqlite3

2. config.py

"""

配置文件 - 存储应用程序的配置参数

基于创新创业理论:标准化配置提升系统稳定性

"""

import os

from datetime import timedelta

class Config:

# 基础配置

SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'

# 数据库配置

DATABASE_URL = os.environ.get('DATABASE_URL') or 'sqlite:///community.db'

# 应用配置

APP_NAME = "智慧社区公告系统"

APP_DESCRIPTION = "高效、便捷的社区信息推送平台"

# 推送配置

NOTIFICATION_ENABLED = True

PUSH_DELAY_SECONDS = 2 # 推送延迟时间

# 分页配置

ANNOUNCEMENTS_PER_PAGE = 10

COMMENTS_PER_PAGE = 5

# 权限配置

MAX_TITLE_LENGTH = 100

MAX_CONTENT_LENGTH = 2000

MAX_COMMENT_LENGTH = 500

# 时间配置

ANNOUNCEMENT_EXPIRY_DAYS = 30 # 公告过期天数

SESSION_TIMEOUT = timedelta(hours=24)

# 用户角色

ROLES = {

'admin': '管理员',

'resident': '居民',

'committee': '业委会成员'

}

# 公告类型

ANNOUNCEMENT_TYPES = {

'notice': '通知公告',

'activity': '活动通知',

'emergency': '紧急通知',

'service': '物业服务',

'other': '其他'

}

# 优先级等级

PRIORITY_LEVELS = {

'low': '普通',

'medium': '重要',

'high': '紧急',

'urgent': '特急'

}

class DevelopmentConfig(Config):

DEBUG = True

TESTING = True

class ProductionConfig(Config):

DEBUG = False

TESTING = False

# 配置映射

config = {

'development': DevelopmentConfig,

'production': ProductionConfig,

'default': DevelopmentConfig

}

3. models.py

"""

数据模型模块 - 定义系统的核心数据结构和关系

基于创新创业理论:数据模型是产品的骨架

"""

from flask_sqlalchemy import SQLAlchemy

from flask_login import UserMixin

from werkzeug.security import generate_password_hash, check_password_hash

from datetime import datetime

import json

db = SQLAlchemy()

class User(UserMixin, db.Model):

"""用户模型 - 存储社区居民信息"""

__tablename__ = 'users'

id = db.Column(db.Integer, primary_key=True)

username = db.Column(db.String(80), unique=True, nullable=False, comment='用户名')

email = db.Column(db.String(120), unique=True, nullable=False, comment='邮箱')

password_hash = db.Column(db.String(128), nullable=False, comment='密码哈希')

real_name = db.Column(db.String(50), nullable=False, comment='真实姓名')

phone = db.Column(db.String(20), comment='联系电话')

room_number = db.Column(db.String(20), comment='房间号')

role = db.Column(db.String(20), default='resident', comment='用户角色')

# 关联关系

announcements = db.relationship('Announcement', backref='author', lazy=True)

comments = db.relationship('Comment', backref='user', lazy=True)

likes = db.relationship('Like', backref='user', lazy=True)

# 时间戳

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

def set_password(self, password):

"""设置密码"""

self.password_hash = generate_password_hash(password)

def check_password(self, password):

"""验证密码"""

return check_password_hash(self.password_hash, password)

def get_avatar_url(self):

"""获取头像URL"""

return f"/static/avatars/{self.role}.png"

def to_dict(self, include_private=False):

"""转换为字典格式"""

data = {

'id': self.id,

'username': self.username,

'real_name': self.real_name,

'role': self.role,

'room_number': self.room_number,

'created_at': self.created_at.isoformat() if self.created_at else None

}

if include_private:

data.update({

'email': self.email,

'phone': self.phone

})

return data

def __repr__(self):

return f'<User {self.username}>'

class Announcement(db.Model):

"""公告模型 - 存储社区公告信息"""

__tablename__ = 'announcements'

id = db.Column(db.Integer, primary_key=True)

title = db.Column(db.String(200), nullable=False, comment='标题')

content = db.Column(db.Text, nullable=False, comment='内容')

type = db.Column(db.String(20), default='notice', comment='类型')

priority = db.Column(db.String(20), default='medium', comment='优先级')

# 作者信息

author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

# 推送状态

is_published = db.Column(db.Boolean, default=False, comment='是否已发布')

is_pinned = db.Column(db.Boolean, default=False, comment='是否置顶')

push_status = db.Column(db.String(20), default='draft', comment='推送状态')

# 统计数据

view_count = db.Column(db.Integer, default=0, comment='浏览次数')

like_count = db.Column(db.Integer, default=0, comment='点赞数')

comment_count = db.Column(db.Integer, default=0, comment='评论数')

# 时间信息

publish_time = db.Column(db.DateTime, comment='发布时间')

expiry_date = db.Column(db.DateTime, comment='过期时间')

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

# 关联关系

comments = db.relationship('Comment', backref='announcement', lazy=True, cascade='all, delete-orphan')

likes = db.relationship('Like', backref='announcement', lazy=True, cascade='all, delete-orphan')

def get_type_display(self):

"""获取类型显示名称"""

from config import Config

return Config.ANNOUNCEMENT_TYPES.get(self.type, '未知')

def get_priority_display(self):

"""获取优先级显示名称"""

from config import Config

return Config.PRIORITY_LEVELS.get(self.priority, '普通')

def get_author_name(self):

"""获取作者姓名"""

return self.author.real_name if self.author else '未知'

def is_expired(self):

"""判断是否过期"""

if not self.expiry_date:

return False

return datetime.utcnow() > self.expiry_date

def increment_view_count(self):

"""增加浏览次数"""

self.view_count += 1

db.session.commit()

def to_dict(self, include_content=True):

"""转换为字典格式"""

data = {

'id': self.id,

'title': self.title,

'type': self.type,

'type_display': self.get_type_display(),

'priority': self.priority,

'priority_display': self.get_priority_display(),

'author_name': self.get_author_name(),

'is_published': self.is_published,

'is_pinned': self.is_pinned,

'push_status': self.push_status,

'view_count': self.view_count,

'like_count': self.like_count,

'comment_count': self.comment_count,

'publish_time': self.publish_time.isoformat() if self.publish_time else None,

'expiry_date': self.expiry_date.isoformat() if self.expiry_date else None,

'created_at': self.created_at.isoformat() if self.created_at else None

}

if include_content:

data['content'] = self.content

return data

def __repr__(self):

return f'<Announcement {self.title}>'

class Comment(db.Model):

"""评论模型 - 存储用户对公告的评论"""

__tablename__ = 'comments'

id = db.Column(db.Integer, primary_key=True)

content = db.Column(db.Text, nullable=False, comment='评论内容')

is_reply = db.Column(db.Boolean, default=False, comment='是否为回复')

parent_id = db.Column(db.Integer, db.ForeignKey('comments.id'), comment='父评论ID')

# 关联信息

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

# 时间信息

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

# 自关联关系

replies = db.relationship('Comment', backref=db.backref('parent', remote_side=[id]), lazy=True)

def get_user_name(self):

"""获取用户姓名"""

return self.user.real_name if self.user else '匿名用户'

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'content': self.content,

'is_reply': self.is_reply,

'parent_id': self.parent_id,

'user_name': self.get_user_name(),

'user_id': self.user_id,

'announcement_id': self.announcement_id,

'created_at': self.created_at.isoformat() if self.created_at else None,

'replies': [reply.to_dict() for reply in self.replies] if self.replies else []

}

def __repr__(self):

return f'<Comment {self.content[:20]}>'

class Like(db.Model):

"""点赞模型 - 存储用户对公告的点赞记录"""

__tablename__ = 'likes'

id = db.Column(db.Integer, primary_key=True)

user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

# 时间信息

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

# 唯一约束

__table_args__ = (db.UniqueConstraint('user_id', 'announcement_id', name='unique_user_announcement_like'),)

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'user_id': self.user_id,

'announcement_id': self.announcement_id,

'created_at': self.created_at.isoformat() if self.created_at else None

}

def __repr__(self):

return f'<Like user:{self.user_id} announcement:{self.announcement_id}>'

class PushRecord(db.Model):

"""推送记录模型 - 记录公告推送历史"""

__tablename__ = 'push_records'

id = db.Column(db.Integer, primary_key=True)

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

recipient_count = db.Column(db.Integer, default=0, comment='接收人数')

success_count = db.Column(db.Integer, default=0, comment='成功推送数')

failure_count = db.Column(db.Integer, default=0, comment='推送失败数')

# 时间信息

push_time = db.Column(db.DateTime, default=datetime.utcnow, comment='推送时间')

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

# 关联关系

announcement = db.relationship('Announcement', backref='push_records')

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'announcement_id': self.announcement_id,

'recipient_count': self.recipient_count,

'success_count': self.success_count,

'failure_count': self.failure_count,

'push_time': self.push_time.isoformat() if self.push_time else None,

'created_at': self.created_at.isoformat() if self.created_at else None

}

def __repr__(self):

return f'<PushRecord announcement:{self.announcement_id}>'

4. database.py

"""

数据库操作模块 - 封装所有数据库相关的操作

基于创新创业理论:数据层抽象提升系统可维护性

"""

from sqlalchemy import create_engine, desc, asc, func, and_, or_

from sqlalchemy.orm import sessionmaker, scoped_session

from sqlalchemy.exc import SQLAlchemyError

import logging

from datetime import datetime, timedelta

import json

from models import db, User, Announcement, Comment, Like, PushRecord

from config import Config

class DatabaseManager:

"""数据库管理器 - 统一管理所有数据库操作"""

def __init__(self, app=None):

self.app = app

self.engine = None

self.Session = None

if app:

self.init_app(app)

def init_app(self, app):

"""初始化数据库连接"""

self.app = app

self.engine = create_engine(

app.config['SQLALCHEMY_DATABASE_URI'],

echo=app.config.get('SQLALCHEMY_ECHO', False)

)

self.Session = scoped_session(sessionmaker(bind=self.engine))

db.init_app(app)

def get_session(self):

"""获取数据库会话"""

return self.Session()

def close_session(self, session):

"""关闭数据库会话"""

if session:

session.close()

# 用户相关操作

def create_user(self, user_data):

"""创建新用户"""

session = self.get_session()

try:

# 检查用户名和邮箱是否已存在

existing_user = session.query(User).filter(

or_(

User.username == user_data['username'],

User.email == user_data['email']

)

).first()

if existing_user:

return None, "用户名或邮箱已存在"

user = User(

username=user_data['username'],

email=user_data['email'],

real_name=user_data['real_name'],

phone=user_data.get('phone', ''),

room_number=user_data.get('room_number', ''),

role=user_data.get('role', 'resident')

)

user.set_password(user_data['password'])

session.add(user)

session.commit()

return user, "用户创建成功"

except SQLAlchemyError as e:

session.rollback()

logging.error(f"创建用户失败: {e}")

return None, f"数据库错误: {str(e)}"

finally:

self.close_session(session)

def get_user_by_username(self, username):

"""根据用户名获取用户"""

session = self.get_session()

try:

return session.query(User).filter(User.username == username).first()

except SQLAlchemyError as e:

logging.error(f"查询用户失败: {e}")

return None

finally:

self.close_session(session)

def get_user_by_id(self, user_id):

"""根据用户ID获取用户"""

session = self.get_session()

try:

return session.query(User).filter(User.id == user_id).first()

except SQLAlchemyError as e:

logging.error(f"查询用户失败: {e}")

return None

finally:

self.close_session(session)

def get_users_paginated(self, page=1, per_page=20, filters=None):

"""分页获取用户列表"""

session = self.get_session()

try:

query = session.query(User)

# 应用过滤条件

if filters:

if filters.get('role'):

query = query.filter(User.role == filters['role'])

if filters.get('keyword'):

keyword = f"%{filters['keyword']}%"

query = query.filter(

or_(

User.username.like(keyword),

User.real_name.like(keyword),

User.room_number.like(keyword)

)

)

# 排序和分页

users = query.order_by(desc(User.created_at)).paginate(

page=page, per_page=per_page, error_out=False

)

return users

except SQLAlchemyError as e:

logging.error(f"获取用户列表失败: {e}")

return None

finally:

self.close_session(session)

# 公告相关操作

def create_announcement(self, announcement_data, author_id):

"""创建新公告"""

session = self.get_session()

try:

# 计算过期时间

expiry_days = announcement_data.get('expiry_days', 30)

expiry_date = datetime.utcnow() + timedelta(days=expiry_days)

announcement = Announcement(

title=announcement_data['title'],

content=announcement_data['content'],

type=announcement_data.get('type', 'notice'),

priority=announcement_data.get('priority', 'medium'),

author_id=author_id,

is_pinned=announcement_data.get('is_pinned', False),

expiry_date=expiry_date

)

session.add(announcement)

session.commit()

return announcement, "公告创建成功"

except SQLAlchemyError as e:

session.rollback()

logging.error(f"创建公告失败: {e}")

return None, f"数据库错误: {str(e)}"

finally:

self.close_session(session)

def get_announcement_by_id(self, announcement_id):

"""根据ID获取公告"""

session = self.get_session()

try:

return session.query(Announcement).filter(

Announcement.id == announcement_id

).first()

except SQLAlchemyError as e:

logging.error(f"查询公告失败: {e}")

return None

finally:

self.close_session(session)

def get_announcements_paginated(self, page=1, per_page=10, filters=None, user_id=None):

"""分页获取公告列表"""

session = self.get_session()

try:

query = session.query(Announcement).filter(

Announcement.is_published == True

)

# 排除过期的公告

query = query.filter(

or_(

Announcement.expiry_date.is_(None),

Announcement.expiry_date > datetime.utcnow()

)

)

# 应用过滤条件

if filters:

if filters.get('type'):

query = query.filter(Announcement.type == filters['type'])

if filters.get('priority'):

query = query.filter(Announcement.priority == filters['priority'])

if filters.get('author_id'):

query = query.filter(Announcement.author_id == filters['author_id'])

if filters.get('keyword'):

keyword = f"%{filters['keyword']}%"

query = query.filter(

or_(

Announcement.title.like(keyword),

Announcement.content.like(keyword)

)

)

# 排序:置顶的在前,然后按发布时间倒序

query = query.order_by(

desc(Announcement.is_pinned),

desc(Announcement.publish_time)

)

announcements = query.paginate(

page=page, per_page=per_page, error_out=False

)

# 如果用户已登录,标记已读状态

if user_id:

for announcement in announcements.items:

# 这里可以添加已读状态的逻辑

pass

return announcements

except SQLAlchemyError as e:

logging.error(f"获取公告列表失败: {e}")

return None

finally:

self.close_session(session)

def publish_announcement(self, announcement_id):

"""发布公告"""

session = self.get_session()

try:

announcement = session.query(Announcement).filter(

Announcement.id == announcement_id

).first()

if not announcement:

return None, "公告不存在"

announcement.is_published = True

announcement.publish_time = datetime.utcnow()

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/5 7:07:42

解锁 LLM 中 AI Agent 的效率密码,掌握实用优化技巧【线上直播】

在 AI Agent 飞速发展的当下&#xff0c;如何提升提示词效率、让模型响应更精准高效&#xff0c;成为许多开发者关注的核心问题。无论是刚接触 LLM 与 AI Agent 的新手&#xff0c;还是正在实操中遭遇效率瓶颈的从业者&#xff0c;都可能被这些问题困扰&#xff1a;MCP Service…

作者头像 李华
网站建设 2026/6/10 1:10:25

【AI×实时Linux:极速实战宝典】显存池 - 编写自定义 Allocator 预分配全量显存,杜绝运行时的 cudaMalloc 开销

简介在高性能计算和人工智能应用中&#xff0c;显存管理是影响程序性能的关键因素之一。传统的显存分配方式&#xff08;如使用 cudaMalloc 动态分配显存&#xff09;可能会导致运行时的随机延迟&#xff0c;尤其是在频繁分配和释放显存的场景下。为了优化显存管理&#xff0c;…

作者头像 李华
网站建设 2026/6/5 13:36:41

Vue2 中 Options API:组织组件逻辑的主要方式

Vue2 Options API 是组织组件逻辑的主要方式&#xff0c;通过 data、methods、computed、watch 等选项定义组件。其优势在于结构清晰、学习成本低&#xff0c;适合中小型组件。但随着复杂度增加&#xff0c;会出现逻辑分散、复用困难等问题。Vue3 的 Composition API 通过逻辑组…

作者头像 李华
网站建设 2026/6/10 12:41:55

LLM实时调校心电图设备,精度翻倍

&#x1f4dd; 博客主页&#xff1a;Jax的CSDN主页 AI赋能中医心身医学&#xff1a;破解精神障碍诊疗的“证型-方药”匹配困局目录AI赋能中医心身医学&#xff1a;破解精神障碍诊疗的“证型-方药”匹配困局 引言&#xff1a;被遗忘的诊疗断层 一、技术破局&#xff1a;LLM如何重…

作者头像 李华
网站建设 2026/6/8 21:45:32

为什么“容器化“技术很重要?——从虚拟机到 Docker

&#x1f4e6; 为什么"容器化"技术很重要&#xff1f;——从虚拟机到 Docker &#x1f680;大家好&#xff0c;我是无限大&#xff0c;欢迎收看十万个为什么系列文章 希望今天的内容能对大家有所帮助今天咱们来聊聊容器化这个"软件界的集装箱革命"&#xf…

作者头像 李华