基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

正所谓“工欲善其事,必先利其器”,我们在实现通信设计任务的过程中需要一些基础工具来帮助我们搭建部分基础组件,这些基础工具包括消息队列,线程池,缓冲区抽象,事件循环和日志工具。接下来对这部分基础工具进

大家好,又见面了,我是全栈君,今天给大家准备了Idea注册码。

  正所谓“工欲善其事,必先利其器”, 我们在实现通信设计任务的过程中需要一些基础工具来帮助我们搭建部分基础组件,这些基础工具包括消息队列,线程池,缓冲区抽象,事件循环和日志工具。接下来对这部分基础工具进行描述和实现。

1. 消息队列

1.1 linux消息队列应用

(1)成员函数和数据结构  

struct msgbuf{
    long mtype;
    char mtext[1]; 
}

// msgid_ds内核数据结构
struct msgid_ds
// 生成唯一的键
key ftok(const char *pathname, int proj_id)

int msgget(key_t key, int msgflg)
msgflg是一个标志参数:
* IPC_CREATE 如果内核不存在与key相等的消息队列,则创建一个
   一个消息队列,如果存在这样的消息队列,返回该消息队列的描述符
* IPC_EXCL 和IPC_CREATE一起使用,如果对应键值的消息队列已
   经存在,则出错,返回-1

int msgsnd(int msgid, struct msgbuf* msgp. size_t msgsz, int magflg)
* 特别注意第三个参数,可以设置为0或IPC_NOWAIT, 当为0时,消息
    已满的时候会阻塞,如果为IPC_NOWAIT,则不等待立即返回,常见错
    误码有EAGAIN(消息队列已满),EIDRN(消息队列已被删除)
    EACCESS(无法访问消息对列)
int msgrcv(int msgid, struct msgbuff* , size_t msgsz, long int msgtype, int msgflag)
* msgflg 操作标志位,IPC_NOWAIT(如果没有满足条件的消息立马返回) IPC_EXCEPT 
(返回队列中第一个类型不会msgtype的消息),IPC_NOERROR(如果队列中满足条件的消息
内容大于所请求的实际字节数,则把该消息截断,截断部分将被丢弃)
int msgctl(int msgid, int cmd, struct msgid_ds*)
*cmd有以下三个参数:
*IPC_STAT(获取消息队列对应的msgid_ds数据结构)
*IPC_SET(设置消息队列的属性)
*IPC_RMID(从内核中删除msgid标识的消息队列)

(2)示例

基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现
基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

// linux系统的消息队列使用
#include "stdio.h"

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdint.h>
#include <errno.h>

#include <string>
#include <string.h>
#include <iostream>
using namespace std;

const int32_t BUFFSIZE = 256;
struct msgBuff
{
    long msgType;
    char buff[BUFFSIZE];
};

int main()
{
    int32_t proj_id = 32;
    key_t key = ftok("./messagekey", proj_id);
    if(-1 == key) 
    {
        cout << "ftok error" <<endl;
    }

    int msgid = msgget(key, IPC_CREAT);
    if (msgid == -1)
    {
        cout << "msgget error" << endl;
    }

    msgBuff msg;
    memset(&msg, 0, sizeof(msgBuff));
    msg.msgType = 3;
    strcpy(msg.buff, "message tset");
    cout << msg.buff << endl;
    int32_t msgLen = sizeof(msgBuff) - sizeof(long);
    cout << "msgLen:" << msgLen << endl;

    int nsize;
    if  ((nsize = msgsnd(msgid, &msg, msgLen, 0)) == -1){
        cout << strerror(errno) << endl;
    }
    
    return 0;
}

SendMsg.c

基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现
基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

// linux系统的消息队列使用
#include "stdio.h"

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdint.h>
#include <errno.h>

#include <string>
#include <string.h>
#include <iostream>
using namespace std;

#include <thread>
#include <chrono>

const int32_t BUFFSIZE = 256;
struct msgBuff
{
    long msgType;
    char buff[BUFFSIZE];
};

int main()
{
    int32_t proj_id = 32;
    key_t key = ftok("./messagekey", proj_id);
    if(-1 == key) 
    {
        cout << "ftok error" <<endl;
    }

    int msgid = msgget(key, IPC_CREAT);
    if (msgid == -1)
    {
        cout << "msgget error" << endl;
    }

    msgBuff msg;
    memset(&msg, 0, sizeof(msgBuff));
    int32_t msgLen = sizeof(msgBuff) - sizeof(long);
    int nsize;
    while(1)
    {   
        if  (msgrcv(msgid, &msg, msgLen, 0, 0) == -1){
            cout << strerror(errno) << endl;
        }

        cout  << msg.msgType << "--" << msg.buff <<endl;
        std::this_thread::sleep_for(std::chrono::milliseconds (2000));
    }
    return 0;
}

RcvMsg.c

1.2 自定义消息队列实现

(1)类图

基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

(2)代码实现

基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现
基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

#pragma once
#include <stdint.h>
class Message
{
public:
    struct Type
    {
        enum{Stop = 0};
    };
    Message()
    {
        
    }
    Message(int32_t type) :_type(type)
    {

    }

    virtual ~Message()
    {

    }

    int32_t GetType() const
    {
        return _type;
    }

    void SetType(int32_t type)
    {
        _type = type;
    }
private:
    int32_t _type;
};

Message.h

基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现
基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

/*
    @线程安全的消息队列
*/
#pragma once
#include <mutex>
#include <memory>
#include <queue>
#include <condition_variable>

#include <iostream>
using namespace std;

template <class MSG>
class MessageQueue
{
public:
    MessageQueue():_mutex(), _condition(), _queue(){}
    MessageQueue(const MessageQueue&) = delete;
    const MessageQueue& operator=(const MessageQueue&) = delete;
    virtual ~MessageQueue(){};

    void Push(const MSG& msg)
    {
        std::lock_guard<std::mutex> lock(_mutex);
        _queue.push(msg);
        cout << "add msg" << endl;
        _condition.notify_one();
    }
    bool Pop(MSG& msg, bool isBlocked = true) 
    {
        std::unique_lock<std::mutex> lock(_mutex);
        if(isBlocked)
        {
            while(_queue.empty())
            {
                cout << "block state, MessageQueue is empty,please wait..." << endl;
                _condition.wait(lock);
            }
        }
        else
        {
            if(IsEmpty()) return false;
        }

        msg = std::move(_queue.front());
        _queue.pop();

        return true;
    }
    bool IsEmpty()
    {
        std::lock_guard<std::mutex> lock(_mutex);
        return _queue.empty();
    }
    int32_t Size()
    {
        std::lock_guard<std::mutex> lock(_mutex);
        return _queue.size();
    }

private:
    std::mutex _mutex;
    std::condition_variable _condition;
    std::queue<MSG> _queue;
};

MessageQueue.h

注意:unique_lock和lock_guard的使用,unique_lock不像lock_guard只能在析构时才释放锁,它可以随时释放锁,因此在wait时让unique_lock释放锁从语义上看更加准确,其次要防止死锁情况的出现,不要锁里面再等待锁

2. 线程池

  不多说,直接上代码

#pragma once

#include <thread>
#include <functional>
#include <stdint.h>
#include <vector>
#include <iostream>
using namespace std;

#include "MessageQueue.h"

#define MIN_THREADS 3

template <typename Type>
class ThreadPool{
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

public:
    ThreadPool(int32_t threads, std::function<void(Type &record)> handler);
    virtual ~ThreadPool();
    void Submit(Type record);

private:
    bool _shutdown;
    int32_t _threads;
    std::function<void(Type &record)> _handler;

    std::vector<std::thread> _workers;
    MessageQueue<Type> _tasks;
};

template <typename Type>
ThreadPool<Type>::ThreadPool(int32_t threads, std::function<void(Type &record)> handler)
    :_shutdown(false), _threads(threads), _handler(handler), _workers(), _tasks()
{
    if(_threads < MIN_THREADS)
    {
        _threads = MIN_THREADS;
    }

    for(int32_t i = 0; i < _threads; i++)
    {
        std::thread workThread([this]{
            while(!_shutdown)
            {
                Type record;
                bool ret = _tasks.Pop(record, true);
                _handler(record);
            }
        });
        workThread.detach();
        _workers.emplace_back(std::move(workThread));
    }
}

template <typename Type>
ThreadPool<Type>::~ThreadPool()
{
    for(std::thread &worker : _workers)
    {
        worker.join();
    }
}

template <typename Type>
void ThreadPool<Type>::Submit(Type record)
{
    _tasks.Push(record);
}

3. 事件循环

  以日志工具为例,为了实现高性能的日志工具,我们必须确保日志I/O全部处于一个独立线程,而不会影响后续的操作,因此,实际上日志记录就是其它线程向日志线程发送日志消息,这样一来,事件循环模型就变得非常必要。

3.1 类图设计

基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

3.2 关键代码实现

(1)ByteArray

基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现
基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

#pragma once

#include <vector>
#include <string>
#include <cstring>

class ByteArray : public std::vector<char> {
    public:
        ByteArray() = default;

        ByteArray(int32_t size) :
                std::vector<char>(size) {
        }

        ByteArray(const char *buffer, int32_t size) :
                std::vector<char>(buffer, buffer + size) {
        }

        ByteArray(const std::string &str) :
                std::vector<char>(str.size()) {
            memcpy(data(), str.c_str(), str.size());
        }

        std::string ToStdString() const {
            std::string result(this->cbegin(), this->cend());

            return result;
        }

        ByteArray &Concat(const ByteArray &buffer2) {
            size_t oldSize = size();
            size_t newSize = oldSize + buffer2.size();
            resize(newSize);
            memcpy(this->data() + oldSize, buffer2.data(), buffer2.size());

            return *this;
        }

        ByteArray operator+(const ByteArray &buffer2) const {
            ByteArray buffer1(this->size() + buffer2.size());
            memcpy(buffer1.data(), this->data(), this->size());
            memcpy(buffer1.data() + this->size(), buffer2.data(), buffer2.size());

            return buffer1;
        }
    };

View Code

(2)IStream

#pragma once

#include "ByteArray.h"

#include <functional>

class IStream {
    public:
        typedef std::function<void(const char* buf, int64_t size)> DataIndicationHandler;

        virtual int32_t Receive(char* buffer, int32_t bufferSize, int32_t& readSize) = 0;
        virtual int32_t Send(const ByteArray& byteArray) = 0;

        virtual void OnDataIndication(DataIndicationHandler handler) = 0;
        virtual DataIndicationHandler GetDataIndication() = 0;
    };

(3) BaseEvent

#pragma once
#include "ByteArray.h"
#include "IStream.h"

class BaseEvent
    {
    public:
        BaseEvent() { }

        BaseEvent(const std::string &type, const ByteArray &data,
                  IStream *stream) :
                _type(type), _data(data), _stream(stream)
        {
        }

        void SetData(const ByteArray &data)
        {
            _data = data;
        }

        const ByteArray &GetData() const
        {
            return _data;
        }

        void SetType(const std::string &type)
        {
            _type = type;
        }

        const std::string &GetType() const
        {
            return _type;
        }

        void SetStream(IStream *stream)
        {
            _stream = stream;
        }

        IStream *GetStream() const
        {
            return _stream;
        }

    private:
        std::string _type;
        ByteArray _data;
        IStream* _stream;
    };

(4)EventQueue

#pragma once

#include "BaseEvent.h"

#include <memory>
#include <mutex>
#include <condition_variable>
#include <chrono>

class EventQueue 
{
public:
    EventQueue(int timeout = 0) : _timeout(timeout) { }

    void PostEvent(BaseEvent *event)
    {
        std::unique_lock <std::mutex> locker(_mutex);

        _events.push_back(std::shared_ptr<BaseEvent>(event));
    }

    std::shared_ptr <BaseEvent> GetEvent()
    {
        std::unique_lock <std::mutex> locker(_mutex);

        if (_events.empty())
        {
            if (_timeout == 0)
            {
                return nullptr;
            }

            _waitCondition.wait_for(locker, std::chrono::milliseconds(_timeout));
        }

        if (!_events.empty())
        {
            std::shared_ptr <BaseEvent> event = _events.front();
            _events.erase(_events.begin());

            return event;
        }

        return nullptr;
    }

private:
    std::vector <std::shared_ptr<BaseEvent>> _events;
    std::mutex _mutex;
    std::condition_variable _waitCondition;
    // ms
    int _timeout;
};

(5)Loop

#pragma once

class Loop
{
public:
    void Start()
    {
        _Run();
    }

    virtual ~Loop(){}
private:
    virtual void _Run() = 0;
}

(6)EventQueueLoop

#pragma once
#include "Loop.h"
#include "EventQueue.h"
#include <memory>

class EventQueueLoop : public Loop
    {
    public:
        EventQueueLoop(EventQueue *queue);

        
    protected:
        virtual void _Run();

        virtual void OnEvent(std::shared_ptr <BaseEvent> event) = 0;

    private:
        EventQueue *_queue;
    };
#include "EventQueueLoop.h"

EventQueueLoop::EventQueueLoop(EventQueue *queue) : _queue(queue) 
{
}

void EventQueueLoop::_Run() 
{
    while (true) {
        std::shared_ptr <BaseEvent> event = _queue->GetEvent();
        if (!event) {
            continue;
        }

        OnEvent(event);
    }
}

4. 基于消息队列的日志实现

4.1 日志优先级

const std::string PRIORITY_STRING[] =
{
     "DEBUG",
     "CONFIG",
     "INFO",
     "WARNING",
     "ERROR"
};

4.2 日志格式

void Logger::WriteLog(Priority priority, const std::string &log) {
        if (priority < _priority)
            return;

        std::stringstream stream;
        stream << HurricaneUtils::GetCurrentTimeStamp()
        << " [" << PRIORITY_STRING[priority] << "] "
        << log;

        _queue.Push(stream.str());
    }
std::string GetCurrentTimeStamp()
{
    // get current time
    auto currentTime = std::chrono::system_clock::now();
    // get milliseconds
    auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime.time_since_epoch()) % 1000;

    auto currentTimePoint = std::chrono::system_clock::to_time_t(currentTime);

    // output the time stamp
    std::ostringstream stream;

#if (defined(WIN32) || defined(_WIN32) || defined(__WIN32__)) && !defined(__MINGW32__)
    stream << std::put_time(std::localtime(&currentTimePoint), "%T");
#else
    char buffer[80];
    auto success = std::strftime(buffer, 80, "%T", std::localtime(&currentTimePoint));  // %T显示时分秒
    assert(0 != success);
    stream << buffer;
#endif

    stream << '.' << std::setfill('0') << std::setw(3) << milliseconds.count();

    return stream.str();
}

4.3 代码实现

基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现
基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

/**
 * licensed to the apache software foundation (asf) under one
 * or more contributor license agreements.  see the notice file
 * distributed with this work for additional information
 * regarding copyright ownership.  the asf licenses this file
 * to you under the apache license, version 2.0 (the
 * "license"); you may not use this file except in compliance
 * with the license.  you may obtain a copy of the license at
 *
 * http://www.apache.org/licenses/license-2.0
 *
 * unless required by applicable law or agreed to in writing, software
 * distributed under the license is distributed on an "as is" basis,
 * without warranties or conditions of any kind, either express or implied.
 * see the license for the specific language governing permissions and
 * limitations under the license.
 */
#pragma once

#include "MessageQueue.h"
#include <memory>
#include <thread>
#include <queue>
#include <string>
#include <fstream>

enum Priority {
        DEBUG,
        STATE,
        INFO,
        WARNING,
        FAULT
    };

class Logger {
    Logger &operator=(const Logger &) = delete;

    Logger(const Logger &other) = delete;

public:
    static Logger *Get();

    void SetPriority(Priority priority);

    Priority GetPriority();

    void WriteLog(Priority priority, const std::string &log);

private:
    Logger(Priority priority);

    virtual ~Logger();

    void _InitializeFileStream();

    void _WriteThread();

    std::string GetCurrentTimeStamp();

private:
    MessageQueue <std::string> _queue;
    std::ofstream *_fileStream;
    Priority _priority;
    bool _shutdown;
};

#define TRACE_DEBUG(LOG_CONTENT) Logger::Get()->WriteLog(DEBUG, LOG_CONTENT);
#define TRACE_STATE(LOG_CONTENT) Logger::Get()->WriteLog(STATE, LOG_CONTENT);
#define TRACE_INFO(LOG_CONTENT) Logger::Get()->WriteLog(INFO, LOG_CONTENT);
#define TRACE_WARNING(LOG_CONTENT) Logger::Get()->WriteLog(WARNING, LOG_CONTENT);
#define TRACE_ERROR(LOG_CONTENT) Logger::Get()->WriteLog(FAULT, LOG_CONTENT);

Logger.h

基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现
基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

/** * licensed to the apache software foundation (asf) under one * or more contributor license agreements. see the notice file * distributed with this work for additional information * regarding copyright ownership. the asf licenses this file * to you under the apache license, version 2.0 (the * "license"); you may not use this file except in compliance * with the license. you may obtain a copy of the license at * * http://www.apache.org/licenses/license-2.0 * * unless required by applicable law or agreed to in writing, software * distributed under the license is distributed on an "as is" basis, * without warranties or conditions of any kind, either express or implied. * see the license for the specific language governing permissions and * limitations under the license. */ #include "logger.h" #include <iostream> #include <sstream> #include <iomanip> const std::string PRIORITY_STRING[] = { "DEBUG", "CONFIG", "INFO", "WARNING", "ERROR" }; Logger *Logger::Get() { static Logger logger(DEBUG); return &logger; } Logger::Logger(Priority priority) : _queue(), _fileStream(nullptr), _shutdown(false) { _priority = priority; _InitializeFileStream(); auto func = std::bind(&Logger::_WriteThread, this); std::thread writeThread(func); writeThread.detach(); } Logger::~Logger() { _shutdown = true; if (nullptr != _fileStream) { _fileStream->close(); delete _fileStream; _fileStream = nullptr; } } std::string Logger::GetCurrentTimeStamp() { // get current time auto currentTime = std::chrono::system_clock::now(); // get milliseconds auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime.time_since_epoch()) % 1000; auto currentTimePoint = std::chrono::system_clock::to_time_t(currentTime); // output the time stamp  std::ostringstream stream; char buffer[80]; auto success = std::strftime(buffer, 80, "%T", std::localtime(&currentTimePoint)); // %T显示时分秒 stream << buffer; //assert(0 != success); //stream << '.' << std::setfill('0') << std::setw(3) << milliseconds.count(); return stream.str(); } void Logger::SetPriority(Priority priority) { _priority = priority; } Priority Logger::GetPriority() { return _priority; } void Logger::_InitializeFileStream() { // Prepare fileName std::string fileName = "./logger.log"; // Initialize file stream _fileStream = new std::ofstream(); std::ios_base::openmode mode = std::ios_base::out; mode |= std::ios_base::trunc; _fileStream->open(fileName, mode); // Error handling if (!_fileStream->is_open()) { // Print error information  std::ostringstream ss_error; ss_error << "FATAL ERROR: could not Open log file: [" << fileName << "]"; ss_error << "\n\t\t std::ios_base state = " << _fileStream->rdstate(); std::cerr << ss_error.str().c_str() << std::endl << std::flush; // Cleanup _fileStream->close(); delete _fileStream; _fileStream = nullptr; } } void Logger::WriteLog(Priority priority, const std::string &log) { if (priority < _priority) return; std::stringstream stream; stream << GetCurrentTimeStamp() << " [" << PRIORITY_STRING[priority] << "] " << log; _queue.Push(stream.str()); } void Logger::_WriteThread() { while (!_shutdown) { std::string log; _queue.Pop(log, true); //std::cout << log << std::endl; if (_fileStream) *_fileStream << log << std::endl; } }

Logger.cpp

 测试:

#include "stdio.h" #include <thread> #include <chrono> #include "../logger.h" int main() { TRACE_DEBUG("Logger Debug test"); while(1) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } return 0; }

基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/120103.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • nginx负载均衡原理

    nginx负载均衡原理负载均衡在服务端开发中算是一个比较重要的特性。因为Nginx除了作为常规的Web服务器外,还会被大规模的用于反向代理前端,因为Nginx的异步框架可以处理很大的并发请求,把这些并发请求hold住之后就可以分发给后台服务端(backendservers,后面简称backend)来做复杂的计算、处理和响应,并且在业务量增加的时候可以方便地扩容后台服务器。负载均衡可以分为硬件负载均衡和软件负载均衡…

  • 富文本编辑器汇总_移动端富文本编辑器

    富文本编辑器汇总_移动端富文本编辑器富文本编辑器:(RichTextEditor,RTE)是一种可内嵌于浏览器,所见即所得的文本编辑器。它提供类似于OfficeWord的编辑功能,方便那些不太懂HTML用户使用,富文本编辑器的应用非常广泛,它的历史与图文网页诞生的历史几乎一样长。1.TinyMCTinyMCE是一个开源的所见即所得的HTML编辑器,界面相当清新,界面模拟本地软件的风格,顶部有菜单栏。支持图片在线处理,插件多,功能非常强大,易于集成,并且拥有可定制的主题。支持目前流行的各种浏览器,它可以达到微软…

    2022年10月29日
  • JAVA课程设计——飞机大战(个人)

    JAVA课程设计——飞机大战(个人)

  • 关于软连接的创建_linux设置软连接

    关于软连接的创建_linux设置软连接关于软连接的创建ln的功能是为某一个文件在另外一个位置建立一个同步的链接这个命令最常用的参数是-s,具体用法是:ln-s源文件目标文件。当在不同的目录,用到相同的文件时,可以不需要在每一个需要的目录下都放一个必须相同的文件,只要在某个固定的目录,放上该文件,然后在其它的目录下用ln命令链接(link)它就可以,不必重复的占用磁盘空间。例如:ln-s/bin/hello.sh…

  • “数加”斩获2017软博会金奖

    “数加”斩获2017软博会金奖

  • 二进制加,减法,23个位运算技巧[通俗易懂]

    二进制加,减法,23个位运算技巧[通俗易懂]二进制加,减法二进制最高位为1时表示负数,为0时表示正数。**原码:**一个正数,转换为二进制位就是这个正数的原码。负数的绝对值转换成二进制位然后在高位补1就是这个负数的原码。举例说明:      int类型的3的原码是11B(B表示二进制位),在32位机器上占四个字节,那么高位补零就得:      00000000000000000000000000000011    …

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号